From 0a3bc102ebebf168c7acfb867aeb84b786c06511 Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Fri, 14 Apr 2023 23:28:57 +0100 Subject: [PATCH] Pre functional snap to full sync (#1546) * Update sync scheduler pool mode why: The pool mode allows to loop over active peers one after another. This is ideal for soft re-starting peers. As this is a two tier experience (start/stop, setup/release) the loop must be run twice. This is controlled by a more rigid re-definition of how to use the `poolMode` flag. * Mitigate RLP serialiser deficiency why: Currently, serialising the `BlockBody` in not conevrtible and need to be checked in the `eth` module. Currently a local fix for the wire protocol applies. Unit tests will stay (after this local solution will have been removed.) * Code cosmetics and massage details: Main part is `types.toStr()` as a unified function for logging block numbers. * Allow to use a logical genesis replacement (start of history) why: Snap sync will set up an arbitrary pivot at a block number different from zero. In fact, the higher the block number the better. details: A non-genesis start of history will currently only affect the score values which were derived from the difficulty. * Provide function to store the snap pivot block header in chain db why: Together with the start of history facility, this allows to proceed with full syncing once snap has finished. details: Snap db storage was switched from a sub-tables to the flat chain db. * Provide database completeness and sanity checker details: For debugging on smaller databases, only * Implement snap -> full sync switch --- nimbus/common/common.nim | 27 ++- nimbus/core/chain/persist_blocks.nim | 5 +- nimbus/db/db_chain.nim | 24 +- nimbus/sync/full.nim | 6 +- nimbus/sync/full/worker.nim | 17 +- nimbus/sync/misc/best_pivot.nim | 15 +- nimbus/sync/misc/block_queue.nim | 22 +- nimbus/sync/protocol/eth/eth_types.nim | 10 + nimbus/sync/protocol/eth66.nim | 4 + nimbus/sync/protocol/eth67.nim | 4 + nimbus/sync/snap.nim | 6 +- nimbus/sync/snap/worker.nim | 109 +++------ .../sync/snap/worker/com/get_block_header.nim | 3 +- .../snap/worker/com/get_storage_ranges.nim | 2 +- .../sync/snap/worker/com/get_trie_nodes.nim | 2 +- nimbus/sync/snap/worker/db/hexary_range.nim | 2 +- nimbus/sync/snap/worker/db/snapdb_desc.nim | 23 +- .../sync/snap/worker/db/snapdb_persistent.nim | 22 +- nimbus/sync/snap/worker/pivot.nim | 196 ++++++++++++++- .../sync/snap/worker/pivot/heal_accounts.nim | 4 +- .../snap/worker/pivot/heal_storage_slots.nim | 6 +- .../worker/pivot/range_fetch_accounts.nim | 8 +- .../pivot/range_fetch_storage_slots.nim | 9 +- nimbus/sync/snap/worker/pivot/swap_in.nim | 2 +- nimbus/sync/snap/worker/play.nim | 15 +- nimbus/sync/snap/worker/play/play_desc.nim | 35 ++- .../sync/snap/worker/play/play_full_sync.nim | 226 ++++++++++++++++-- .../sync/snap/worker/play/play_prep_full.nim | 92 ------- .../sync/snap/worker/play/play_snap_sync.nim | 191 ++++++++++++--- nimbus/sync/snap/worker/ticker.nim | 126 +++++----- nimbus/sync/snap/worker_desc.nim | 20 +- nimbus/sync/sync_desc.nim | 3 +- nimbus/sync/sync_sched.nim | 42 +++- nimbus/sync/types.nim | 11 +- tests/test_sync_snap.nim | 5 +- tests/test_sync_snap/test_calc.nim | 38 ++- 36 files changed, 927 insertions(+), 405 deletions(-) delete mode 100644 nimbus/sync/snap/worker/play/play_prep_full.nim diff --git a/nimbus/common/common.nim b/nimbus/common/common.nim index 85194b59e..f55d27c3b 100644 --- a/nimbus/common/common.nim +++ b/nimbus/common/common.nim @@ -72,10 +72,15 @@ type syncReqNewHead: SyncReqNewHeadCB - pow: PowRef ##\ + startOfHistory: Hash256 + ## This setting is needed for resuming blockwise syncying after + ## installing a snapshot pivot. The default value for this field is + ## `GENESIS_PARENT_HASH` to start at the very beginning. + + pow: PowRef ## Wrapper around `hashimotoLight()` and lookup cache - poa: Clique ##\ + poa: Clique ## For non-PoA networks this descriptor is ignored. pos: CasperRef @@ -154,6 +159,9 @@ proc init(com : CommonRef, com.pow = PowRef.new com.pos = CasperRef.new + # By default, history begins at genesis. + com.startOfHistory = GENESIS_PARENT_HASH + proc getTd(com: CommonRef, blockHash: Hash256): Option[DifficultyInt] = var td: DifficultyInt if not com.db.getTd(blockHash, td): @@ -348,15 +356,20 @@ proc syncReqNewHead*(com: CommonRef; header: BlockHeader) # ------------------------------------------------------------------------------ # Getters # ------------------------------------------------------------------------------ -proc poa*(com: CommonRef): Clique = + +func startOfHistory*(com: CommonRef): Hash256 = + ## Getter + com.startOfHistory + +func poa*(com: CommonRef): Clique = ## Getter com.poa -proc pow*(com: CommonRef): PowRef = +func pow*(com: CommonRef): PowRef = ## Getter com.pow -proc pos*(com: CommonRef): CasperRef = +func pos*(com: CommonRef): CasperRef = ## Getter com.pos @@ -441,6 +454,10 @@ proc `syncCurrent=`*(com: CommonRef, number: BlockNumber) = proc `syncHighest=`*(com: CommonRef, number: BlockNumber) = com.syncProgress.highest = number +proc `startOfHistory=`*(com: CommonRef, val: Hash256) = + ## Setter + com.startOfHistory = val + proc setTTD*(com: CommonRef, ttd: Option[DifficultyInt]) = ## useful for testing com.config.terminalTotalDifficulty = ttd diff --git a/nimbus/core/chain/persist_blocks.nim b/nimbus/core/chain/persist_blocks.nim index 514dfa0a6..a25a6d6cc 100644 --- a/nimbus/core/chain/persist_blocks.nim +++ b/nimbus/core/chain/persist_blocks.nim @@ -116,7 +116,8 @@ proc persistBlocksImpl(c: ChainRef; headers: openArray[BlockHeader]; return ValidationResult.Error if NoPersistHeader notin flags: - discard c.db.persistHeaderToDb(header, c.com.consensus == ConsensusType.POS) + discard c.db.persistHeaderToDb( + header, c.com.consensus == ConsensusType.POS, c.com.startOfHistory) if NoSaveTxs notin flags: discard c.db.persistTransactions(header.blockNumber, body.transactions) @@ -141,7 +142,7 @@ proc insertBlockWithoutSetHead*(c: ChainRef, header: BlockHeader, result = c.persistBlocksImpl( [header], [body], {NoPersistHeader, NoSaveReceipts}) if result == ValidationResult.OK: - c.db.persistHeaderToDbWithoutSetHead(header) + c.db.persistHeaderToDbWithoutSetHead(header, c.com.startOfHistory) proc setCanonical*(c: ChainRef, header: BlockHeader): ValidationResult {.gcsafe, raises: [CatchableError].} = diff --git a/nimbus/db/db_chain.nim b/nimbus/db/db_chain.nim index bbb620d96..f954e3350 100644 --- a/nimbus/db/db_chain.nim +++ b/nimbus/db/db_chain.nim @@ -407,16 +407,20 @@ proc getReceipts*(db: ChainDBRef; receiptRoot: Hash256): seq[Receipt] = receipts.add(r) return receipts -proc persistHeaderToDb*(db: ChainDBRef; header: BlockHeader, - forceCanonical: bool): seq[BlockHeader] = - let isGenesis = header.parentHash == GENESIS_PARENT_HASH +proc persistHeaderToDb*( + db: ChainDBRef; + header: BlockHeader; + forceCanonical: bool; + startOfHistory = GENESIS_PARENT_HASH; + ): seq[BlockHeader] = + let isStartOfHistory = header.parentHash == startOfHistory let headerHash = header.blockHash - if not isGenesis and not db.headerExists(header.parentHash): + if not isStartOfHistory and not db.headerExists(header.parentHash): raise newException(ParentNotFound, "Cannot persist block header " & $headerHash & " with unknown parent " & $header.parentHash) db.db.put(genericHashKey(headerHash).toOpenArray, rlp.encode(header)) - let score = if isGenesis: header.difficulty + let score = if isStartOfHistory: header.difficulty else: db.getScore(header.parentHash) + header.difficulty db.db.put(blockHashToScoreKey(headerHash).toOpenArray, rlp.encode(score)) @@ -431,10 +435,14 @@ proc persistHeaderToDb*(db: ChainDBRef; header: BlockHeader, if score > headScore or forceCanonical: return db.setAsCanonicalChainHead(headerHash) -proc persistHeaderToDbWithoutSetHead*(db: ChainDBRef; header: BlockHeader) = - let isGenesis = header.parentHash == GENESIS_PARENT_HASH +proc persistHeaderToDbWithoutSetHead*( + db: ChainDBRef; + header: BlockHeader; + startOfHistory = GENESIS_PARENT_HASH; + ) = + let isStartOfHistory = header.parentHash == startOfHistory let headerHash = header.blockHash - let score = if isGenesis: header.difficulty + let score = if isStartOfHistory: header.difficulty else: db.getScore(header.parentHash) + header.difficulty db.db.put(blockHashToScoreKey(headerHash).toOpenArray, rlp.encode(score)) diff --git a/nimbus/sync/full.nim b/nimbus/sync/full.nim index b09006a9c..0fdfbe01d 100644 --- a/nimbus/sync/full.nim +++ b/nimbus/sync/full.nim @@ -11,7 +11,7 @@ {.push raises: [].} import - eth/[common, p2p], + eth/p2p, chronicles, chronos, stew/[interval_set, sorted_set], @@ -89,9 +89,9 @@ proc runStop(buddy: FullBuddyRef) = tracerFrameBuddy("runStop", buddy): worker.stop(buddy) -proc runPool(buddy: FullBuddyRef; last: bool): bool = +proc runPool(buddy: FullBuddyRef; last: bool; laps: int): bool = tracerFrameBuddy("runPool", buddy): - result = worker.runPool(buddy, last) + result = worker.runPool(buddy, last, laps) proc runSingle(buddy: FullBuddyRef) {.async.} = tracerFrameBuddy("runSingle", buddy): diff --git a/nimbus/sync/full/worker.nim b/nimbus/sync/full/worker.nim index 3515cfb57..ed22cda1a 100644 --- a/nimbus/sync/full/worker.nim +++ b/nimbus/sync/full/worker.nim @@ -11,10 +11,9 @@ {.push raises:[].} import - std/[options], chronicles, chronos, - eth/[common, p2p], + eth/p2p, ".."/[protocol, sync_desc], ../misc/[best_pivot, block_queue, sync_ctrl], "."/[ticker, worker_desc] @@ -367,16 +366,17 @@ proc runSingle*(buddy: FullBuddyRef) {.async.} = await sleepAsync napping -proc runPool*(buddy: FullBuddyRef; last: bool): bool = +proc runPool*(buddy: FullBuddyRef; last: bool; laps: int): bool = ## Once started, the function `runPool()` is called for all worker peers in ## sequence as the body of an iteration as long as the function returns ## `false`. There will be no other worker peer functions activated ## simultaneously. ## ## This procedure is started if the global flag `buddy.ctx.poolMode` is set - ## `true` (default is `false`.) It is the responsibility of the `runPool()` - ## instance to reset the flag `buddy.ctx.poolMode`, typically at the first - ## peer instance. + ## `true` (default is `false`.) It will be automatically reset before the + ## the loop starts. Re-setting it again results in repeating the loop. The + ## argument `lap` (starting with `0`) indicated the currend lap of the + ## repeated loops. ## ## The argument `last` is set `true` if the last entry is reached. ## @@ -384,10 +384,7 @@ proc runPool*(buddy: FullBuddyRef; last: bool): bool = ## # Mind the gap, fill in if necessary (function is peer independent) buddy.only.bQueue.blockQueueGrout() - - # Stop after running once regardless of peer - buddy.ctx.poolMode = false - true + true # Stop after running once regardless of peer proc runMulti*(buddy: FullBuddyRef) {.async.} = ## This peer worker is invoked if the `buddy.ctrl.multiOk` flag is set diff --git a/nimbus/sync/misc/best_pivot.nim b/nimbus/sync/misc/best_pivot.nim index 57542e779..e38990af1 100644 --- a/nimbus/sync/misc/best_pivot.nim +++ b/nimbus/sync/misc/best_pivot.nim @@ -380,9 +380,8 @@ proc pivotNegotiate*( if rx.isOk: bp.global.trusted.incl peer when extraTraceMessages: - let bestHeader {.used.} = - if bp.header.isSome: "#" & $bp.header.get.blockNumber - else: "nil" + let bestHeader {.used.} = if bp.header.isNone: "n/a" + else: bp.header.unsafeGet.blockNumber.toStr trace "Accepting peer", peer, trusted=bp.global.trusted.len, untrusted=bp.global.untrusted.len, runState=bp.ctrl.state, bestHeader @@ -397,9 +396,8 @@ proc pivotNegotiate*( if bp.global.trusted.len == 0: bp.global.trusted.incl peer when extraTraceMessages: - let bestHeader {.used.} = - if bp.header.isSome: "#" & $bp.header.get.blockNumber - else: "nil" + let bestHeader {.used.} = if bp.header.isNone: "n/a" + else: bp.header.unsafeGet.blockNumber.toStr trace "Assume initial trusted peer", peer, trusted=bp.global.trusted.len, runState=bp.ctrl.state, bestHeader return false @@ -465,9 +463,8 @@ proc pivotNegotiate*( # Evaluate status, finally if bp.global.minPeers <= bp.global.trusted.len: when extraTraceMessages: - let bestHeader {.used.} = - if bp.header.isSome: "#" & $bp.header.get.blockNumber - else: "nil" + let bestHeader {.used.} = if bp.header.isNone: "n/a" + else: bp.header.unsafeGet.blockNumber.toStr trace "Peer trusted now", peer, trusted=bp.global.trusted.len, runState=bp.ctrl.state, bestHeader return true diff --git a/nimbus/sync/misc/block_queue.nim b/nimbus/sync/misc/block_queue.nim index dd8fd80af..9c652a084 100644 --- a/nimbus/sync/misc/block_queue.nim +++ b/nimbus/sync/misc/block_queue.nim @@ -55,19 +55,18 @@ ## would have no effect. In that case, the record with the largest block ## numbers are deleted from the `` list. ## +{.push raises:[].} import std/[algorithm, options, sequtils, strutils], chronicles, chronos, - eth/[common, p2p], + eth/p2p, stew/[byteutils, interval_set, sorted_set], ../../db/db_chain, ../../utils/utils, ".."/[protocol, sync_desc, types] -{.push raises:[].} - logScope: topics = "block-queue" @@ -134,7 +133,10 @@ type nStagedQueue*: int reOrg*: bool -let +const + extraTraceMessages = false or true + ## Enabled additional logging noise + highBlockNumber = high(BlockNumber) highBlockRange = BlockRange.new(highBlockNumber,highBlockNumber) @@ -163,23 +165,19 @@ proc reduce(ivSet: BlockRangeSetRef; wi: BlockItemRef): Uint256 = # --------------- -proc pp(n: BlockNumber): string = - ## Dedicated pretty printer (`$` is defined elsewhere using `UInt256`) - if n == highBlockNumber: "high" else:"#" & $n - proc `$`(iv: BlockRange): string = ## Needed for macro generated DSL files like `snap.nim` because the ## `distinct` flavour of `NodeTag` is discarded there. - result = "[" & iv.minPt.pp + result = "[" & iv.minPt.toStr if iv.minPt != iv.maxPt: - result &= "," & iv.maxPt.pp + result &= "," & iv.maxPt.toStr result &= "]" proc `$`(n: Option[BlockRange]): string = if n.isNone: "n/a" else: $n.get proc `$`(n: Option[BlockNumber]): string = - if n.isNone: "n/a" else: n.get.pp + n.toStr proc `$`(brs: BlockRangeSetRef): string = "{" & toSeq(brs.increasing).mapIt($it).join(",") & "}" @@ -227,6 +225,8 @@ proc newWorkItem(qd: BlockQueueWorkerRef): Result[BlockItemRef,BlockQueueRC] = # Check whether there is somthing to do at all if qd.bestNumber.isNone or qd.bestNumber.unsafeGet < rc.value.minPt: + when extraTraceMessages: + trace "no new work item", bestNumer=qd.bestNumber.toStr, range=rc.value return err(NoMorePeerBlocks) # no more data for this peer # Compute interval diff --git a/nimbus/sync/protocol/eth/eth_types.nim b/nimbus/sync/protocol/eth/eth_types.nim index 89e7aba02..d1c039220 100644 --- a/nimbus/sync/protocol/eth/eth_types.nim +++ b/nimbus/sync/protocol/eth/eth_types.nim @@ -46,6 +46,16 @@ const maxReceiptsFetch* = 256 maxHeadersFetch* = 192 + +# Kludge, should be fixed in `eth/common/eth_types_rlp.append()` +proc ethAppend*(w: var RlpWriter, b: BlockBody) = + w.startList 2 + b.withdrawals.isSome.ord # <--- this line was missing + w.append(b.transactions) + w.append(b.uncles) + if b.withdrawals.isSome: + w.append(b.withdrawals.unsafeGet) + + proc notImplemented(name: string) = debug "Method not implemented", meth = name diff --git a/nimbus/sync/protocol/eth66.nim b/nimbus/sync/protocol/eth66.nim index e658f3a5b..8e5b3f583 100644 --- a/nimbus/sync/protocol/eth66.nim +++ b/nimbus/sync/protocol/eth66.nim @@ -76,6 +76,10 @@ const trEthSendNewBlockHashes* = ">> " & prettyEthProtoName & " Sending NewBlockHashes" +# Kludge, should be fixed in `eth/common/eth_types_rlp` +proc append(w: var RlpWriter, b: BlockBody) = + w.ethAppend b + p2pProtocol eth66(version = ethVersion, rlpxName = "eth", peerState = EthPeerState, diff --git a/nimbus/sync/protocol/eth67.nim b/nimbus/sync/protocol/eth67.nim index 101f42dd3..3d78d1968 100644 --- a/nimbus/sync/protocol/eth67.nim +++ b/nimbus/sync/protocol/eth67.nim @@ -76,6 +76,10 @@ const trEthSendNewBlockHashes* = ">> " & prettyEthProtoName & " Sending NewBlockHashes" +# Kludge, should be fixed in `eth/common/eth_types_rlp` +proc append(w: var RlpWriter, b: BlockBody) = + w.ethAppend b + p2pProtocol eth67(version = ethVersion, rlpxName = "eth", peerState = EthPeerState, diff --git a/nimbus/sync/snap.nim b/nimbus/sync/snap.nim index bc24f7054..27ef6e7ce 100644 --- a/nimbus/sync/snap.nim +++ b/nimbus/sync/snap.nim @@ -11,9 +11,9 @@ {.push raises: [].} import - eth/[common, p2p], chronicles, chronos, + eth/p2p, ../db/select_backend, ../core/chain, ./snap/[worker, worker_desc], @@ -90,9 +90,9 @@ proc runStop(buddy: SnapBuddyRef) = tracerFrameBuddy("runStop", buddy): worker.stop(buddy) -proc runPool(buddy: SnapBuddyRef; last: bool): bool = +proc runPool(buddy: SnapBuddyRef; last: bool; laps: int): bool = tracerFrameBuddy("runPool", buddy): - result = worker.runPool(buddy, last) + result = worker.runPool(buddy, last=last, laps=laps) proc runSingle(buddy: SnapBuddyRef) {.async.} = tracerFrameBuddy("runSingle", buddy): diff --git a/nimbus/sync/snap/worker.nim b/nimbus/sync/snap/worker.nim index 7bb36b3ac..a357dc5e6 100644 --- a/nimbus/sync/snap/worker.nim +++ b/nimbus/sync/snap/worker.nim @@ -16,10 +16,10 @@ import eth/p2p, stew/[interval_set, keyed_queue], "../.."/[common, db/select_backend], - ".."/[handlers/eth, protocol, sync_desc], - ./worker/[pivot, play, ticker], + ../sync_desc, + ./worker/[play, ticker], ./worker/com/com_error, - ./worker/db/[snapdb_desc, snapdb_pivot], + ./worker/db/snapdb_desc, "."/[range_desc, worker_desc] logScope: @@ -39,56 +39,20 @@ template ignoreException(info: static[string]; code: untyped) = # Private functions # ------------------------------------------------------------------------------ -proc disableWireServices(ctx: SnapCtxRef) = - ## Helper for `setup()`: Temporarily stop useless wire protocol services. - ctx.ethWireCtx.txPoolEnabled = false - -proc enableWireServices(ctx: SnapCtxRef) = - ## Helper for `release()` - ctx.ethWireCtx.txPoolEnabled = true - -# -------------- - -proc enableTicker(ctx: SnapCtxRef; tickerOK: bool) = - ## Helper for `setup()`: Log/status ticker +proc setupTicker(ctx: SnapCtxRef; tickerOK: bool) = + let blindTicker = proc: TickerSnapStats = + discard if tickerOK: - ctx.pool.ticker = TickerRef.init(ctx.pool.pivotTable.tickerStats(ctx)) - else: - trace "Ticker is disabled" + ctx.pool.ticker = TickerRef.init(blindTicker) -proc disableTicker(ctx: SnapCtxRef) = +proc releaseTicker(ctx: SnapCtxRef) = ## Helper for `release()` - if not ctx.pool.ticker.isNil: - ctx.pool.ticker.stop() - ctx.pool.ticker = nil + ctx.pool.ticker.stop() + ctx.pool.ticker = nil # -------------- -proc enableRpcMagic(ctx: SnapCtxRef) = - ## Helper for `setup()`: Enable external pivot update via RPC - ctx.chain.com.syncReqNewHead = ctx.pivotUpdateBeaconHeaderCB - -proc disableRpcMagic(ctx: SnapCtxRef) = - ## Helper for `release()` - ctx.chain.com.syncReqNewHead = nil - -# -------------- - -proc detectSnapSyncRecovery(ctx: SnapCtxRef) = - ## Helper for `setup()`: Initiate snap sync recovery (if any) - let rc = ctx.pool.snapDb.pivotRecoverDB() - if rc.isOk: - ctx.pool.recovery = SnapRecoveryRef(state: rc.value) - ctx.daemon = true - - # Set up early initial pivot - ctx.pool.pivotTable.reverseUpdate(ctx.pool.recovery.state.header, ctx) - trace "Snap sync recovery started", - checkpoint=("#" & $ctx.pool.pivotTable.topNumber() & "(0)") - if not ctx.pool.ticker.isNil: - ctx.pool.ticker.startRecovery() - -proc initSnapDb(ctx: SnapCtxRef) = +proc setupSnapDb(ctx: SnapCtxRef) = ## Helper for `setup()`: Initialise snap sync database layer ctx.pool.snapDb = if ctx.pool.dbBackend.isNil: SnapDbRef.init(ctx.chain.db.db) @@ -100,16 +64,12 @@ proc initSnapDb(ctx: SnapCtxRef) = proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool = ## Global set up + ctx.playSetup() # Set up sync sub-mode specs. + ctx.setupSnapDb() # Set database backend, subject to change + ctx.setupTicker(tickerOK) # Start log/status ticker (if any) - # For snap sync book keeping - ctx.pool.coveredAccounts = NodeTagRangeSet.init() - - ctx.enableRpcMagic() # Allow external pivot update via RPC - ctx.disableWireServices() # Stop unwanted public services - ctx.pool.syncMode.playInit() # Set up sync sub-mode specs. - ctx.initSnapDb() # Set database backend, subject to change - ctx.detectSnapSyncRecovery() # Check for recovery mode - ctx.enableTicker(tickerOK) # Start log/status ticker (if any) + ignoreException("setup"): + ctx.playMethod.setup(ctx) # Experimental, also used for debugging if ctx.exCtrlFile.isSome: @@ -119,29 +79,26 @@ proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool = proc release*(ctx: SnapCtxRef) = ## Global clean up - ctx.disableTicker() # Stop log/status ticker (if any) - ctx.enableWireServices() # re-enable public services - ctx.disableRpcMagic() # Disable external pivot update via RPC + ignoreException("release"): + ctx.playMethod.release(ctx) + + ctx.releaseTicker() # Stop log/status ticker (if any) + ctx.playRelease() # Shut down sync methods proc start*(buddy: SnapBuddyRef): bool = ## Initialise worker peer - let - ctx = buddy.ctx - peer = buddy.peer - if peer.supports(protocol.snap) and - peer.supports(protocol.eth) and - peer.state(protocol.eth).initialized: - buddy.only.errors = ComErrorStatsRef() - if not ctx.pool.ticker.isNil: - ctx.pool.ticker.startBuddy() - return true + let ctx = buddy.ctx + ignoreException("start"): + if ctx.playMethod.start(buddy): + buddy.only.errors = ComErrorStatsRef() + return true proc stop*(buddy: SnapBuddyRef) = ## Clean up this peer let ctx = buddy.ctx - if not ctx.pool.ticker.isNil: - ctx.pool.ticker.stopBuddy() + ignoreException("stop"): + ctx.playMethod.stop(buddy) # ------------------------------------------------------------------------------ # Public functions, sync handler multiplexers @@ -150,22 +107,22 @@ proc stop*(buddy: SnapBuddyRef) = proc runDaemon*(ctx: SnapCtxRef) {.async.} = ## Sync processsing multiplexer ignoreException("runDaemon"): - await ctx.playSyncSpecs.daemon(ctx) + await ctx.playMethod.daemon(ctx) proc runSingle*(buddy: SnapBuddyRef) {.async.} = ## Sync processsing multiplexer ignoreException("runSingle"): - await buddy.ctx.playSyncSpecs.single(buddy) + await buddy.ctx.playMethod.single(buddy) -proc runPool*(buddy: SnapBuddyRef, last: bool): bool = +proc runPool*(buddy: SnapBuddyRef, last: bool; laps: int): bool = ## Sync processsing multiplexer ignoreException("runPool"): - result = buddy.ctx.playSyncSpecs.pool(buddy,last) + result = buddy.ctx.playMethod.pool(buddy,last,laps) proc runMulti*(buddy: SnapBuddyRef) {.async.} = ## Sync processsing multiplexer ignoreException("runMulti"): - await buddy.ctx.playSyncSpecs.multi(buddy) + await buddy.ctx.playMethod.multi(buddy) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/com/get_block_header.nim b/nimbus/sync/snap/worker/com/get_block_header.nim index ea83ee77f..54c0e9edd 100644 --- a/nimbus/sync/snap/worker/com/get_block_header.nim +++ b/nimbus/sync/snap/worker/com/get_block_header.nim @@ -11,7 +11,6 @@ {.push raises: [].} import - std/options, chronos, eth/[common, p2p], stew/byteutils, @@ -43,7 +42,7 @@ proc getBlockHeader*( skip: 0, reverse: false) - trace trEthSendSendingGetBlockHeaders, peer, header=("#" & $num), reqLen + trace trEthSendSendingGetBlockHeaders, peer, header=num.toStr, reqLen var hdrResp: Option[blockHeadersObj] try: diff --git a/nimbus/sync/snap/worker/com/get_storage_ranges.nim b/nimbus/sync/snap/worker/com/get_storage_ranges.nim index d234a7f60..a33fa44a5 100644 --- a/nimbus/sync/snap/worker/com/get_storage_ranges.nim +++ b/nimbus/sync/snap/worker/com/get_storage_ranges.nim @@ -12,7 +12,7 @@ {.push raises: [].} import - std/[options, sequtils], + std/sequtils, chronos, eth/[common, p2p], stew/interval_set, diff --git a/nimbus/sync/snap/worker/com/get_trie_nodes.nim b/nimbus/sync/snap/worker/com/get_trie_nodes.nim index 46e5b2147..c7538e15a 100644 --- a/nimbus/sync/snap/worker/com/get_trie_nodes.nim +++ b/nimbus/sync/snap/worker/com/get_trie_nodes.nim @@ -11,7 +11,7 @@ {.push raises: [].} import - std/[options, sequtils], + std/sequtils, chronos, eth/[common, p2p], "../../.."/[protocol, protocol/trace_config], diff --git a/nimbus/sync/snap/worker/db/hexary_range.nim b/nimbus/sync/snap/worker/db/hexary_range.nim index 42d094d28..f6a880966 100644 --- a/nimbus/sync/snap/worker/db/hexary_range.nim +++ b/nimbus/sync/snap/worker/db/hexary_range.nim @@ -135,7 +135,7 @@ template collectLeafs( # The following logic might be sub-optimal. A strict version of the # `next()` function that stops with an error at dangling links could # be faster if the leaf nodes are not too far apart on the hexary trie. - var + let xPath = block: let rx = nodeTag.hexaryPath(rootKey,db).hexaryNearbyRight(db) if rx.isErr: diff --git a/nimbus/sync/snap/worker/db/snapdb_desc.nim b/nimbus/sync/snap/worker/db/snapdb_desc.nim index b61cd633b..0f6e10710 100644 --- a/nimbus/sync/snap/worker/db/snapdb_desc.nim +++ b/nimbus/sync/snap/worker/db/snapdb_desc.nim @@ -155,11 +155,26 @@ proc kvDb*(pv: SnapDbRef): TrieDatabaseRef = # Public functions, select sub-tables for persistent storage # ------------------------------------------------------------------------------ -proc toAccountsKey*(a: NodeKey): ByteArray33 = - a.ByteArray32.snapSyncAccountKey.data +proc toBlockHeaderKey*(a: Hash256): ByteArray33 = + a.genericHashKey.data -proc toStorageSlotsKey*(a: NodeKey): ByteArray33 = - a.ByteArray32.snapSyncStorageSlotKey.data +proc toBlockNumberKey*(a: BlockNumber): ByteArray33 = + static: + doAssert 32 == sizeof BlockNumber # needed in `blockNumberToHashKey()` + a.blockNumberToHashKey.data + +when false: + proc toAccountsKey*(a: NodeKey): ByteArray33 = + a.ByteArray32.snapSyncAccountKey.data + + proc toStorageSlotsKey*(a: NodeKey): ByteArray33 = + a.ByteArray32.snapSyncStorageSlotKey.data +else: + proc toAccountsKey*(a: NodeKey): ByteArray32 = + a.ByteArray32 + + proc toStorageSlotsKey*(a: NodeKey): ByteArray32 = + a.ByteArray32 proc toStateRootKey*(a: NodeKey): ByteArray33 = a.ByteArray32.snapSyncStateRootKey.data diff --git a/nimbus/sync/snap/worker/db/snapdb_persistent.nim b/nimbus/sync/snap/worker/db/snapdb_persistent.nim index e3296949a..308ac53cf 100644 --- a/nimbus/sync/snap/worker/db/snapdb_persistent.nim +++ b/nimbus/sync/snap/worker/db/snapdb_persistent.nim @@ -13,7 +13,7 @@ import std/[algorithm, tables], chronicles, - eth/[common, rlp, trie/db], + eth/[common, trie/db], ../../../../db/kvstore_rocksdb, ../../range_desc, "."/[hexary_desc, hexary_error, rocky_bulk_load, snapdb_desc] @@ -63,10 +63,10 @@ proc convertTo(key: RepairKey; T: type NodeTag): T = ## Might be lossy, check before use UInt256.fromBytesBE(key.ByteArray33[1 .. 32]).T -proc toAccountsKey(a: RepairKey): ByteArray33 = +proc toAccountsKey(a: RepairKey): auto = a.convertTo(NodeKey).toAccountsKey -proc toStorageSlotsKey(a: RepairKey): ByteArray33 = +proc toStorageSlotsKey(a: RepairKey): auto = a.convertTo(NodeKey).toStorageSlotsKey proc stateRootGet*(db: TrieDatabaseRef; nodeKey: Nodekey): Blob = @@ -107,9 +107,21 @@ proc persistentStateRootGet*( # Public functions: store/put # ------------------------------------------------------------------------------ +proc persistentBlockHeaderPut*( + db: TrieDatabaseRef; + hdr: BlockHeader; + ) = + ## Store a single header. This function is intended to finalise snap sync + ## with storing a universal pivot header not unlike genesis. + let hashKey = hdr.blockHash + db.TrieDatabaseRef.put( # see `nimbus/db/db_chain.db()` + hashKey.toBlockHeaderKey.toOpenArray, rlp.encode(hdr)) + db.TrieDatabaseRef.put( + hdr.blockNumber.toBlockNumberKey.toOpenArray, rlp.encode(hashKey)) + proc persistentAccountsPut*( db: HexaryTreeDbRef; - base: TrieDatabaseRef + base: TrieDatabaseRef; ): Result[void,HexaryError] = ## Bulk store using transactional `put()` let dbTx = base.beginTransaction @@ -125,7 +137,7 @@ proc persistentAccountsPut*( proc persistentStorageSlotsPut*( db: HexaryTreeDbRef; - base: TrieDatabaseRef + base: TrieDatabaseRef; ): Result[void,HexaryError] = ## Bulk store using transactional `put()` let dbTx = base.beginTransaction diff --git a/nimbus/sync/snap/worker/pivot.nim b/nimbus/sync/snap/worker/pivot.nim index 81d97a3b5..ae6732fa7 100644 --- a/nimbus/sync/snap/worker/pivot.nim +++ b/nimbus/sync/snap/worker/pivot.nim @@ -14,9 +14,9 @@ import std/[math, sets, sequtils], chronicles, chronos, - eth/[common, p2p, trie/trie_defs], + eth/[p2p, trie/trie_defs], stew/[interval_set, keyed_queue, sorted_set], - ../../sync_desc, + "../.."/[sync_desc, types], ".."/[constants, range_desc, worker_desc], ./db/[hexary_error, snapdb_accounts, snapdb_pivot], ./pivot/[heal_accounts, heal_storage_slots, @@ -33,6 +33,20 @@ const proc pivotMothball*(env: SnapPivotRef) {.gcsafe.} + +# ------------------------------------------------------------------------------ +# Private helpers, logging +# ------------------------------------------------------------------------------ + +template logTxt(info: static[string]): static[string] = + "Pivot " & info + +template ignExceptionOops(info: static[string]; code: untyped) = + try: + code + except CatchableError as e: + trace logTxt "Ooops", `info`=info, name=($e.name), msg=(e.msg) + # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ @@ -186,6 +200,12 @@ proc tickerStats*( # Public functions: particular pivot # ------------------------------------------------------------------------------ +proc pivotCompleteOk*(env: SnapPivotRef): bool = + ## Returns `true` iff the pivot covers a complete set of accounts ans + ## storage slots. + env.fetchAccounts.processed.isFull and env.storageQueueTotal() == 0 + + proc pivotMothball*(env: SnapPivotRef) = ## Clean up most of this argument `env` pivot record and mark it `archived`. ## Note that archived pivots will be checked for swapping in already known @@ -218,6 +238,9 @@ proc execSnapSyncAction*( let ctx = buddy.ctx + if env.savedFullPivotOk: + return # no need to do anything + block: # Clean up storage slots queue first it it becomes too large let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len @@ -274,6 +297,9 @@ proc saveCheckpoint*( ## Save current sync admin data. On success, the size of the data record ## saved is returned (e.g. for logging.) ## + if env.savedFullPivotOk: + return ok(0) # no need to do anything + let fa = env.fetchAccounts nStoQu = env.storageQueueTotal() @@ -287,7 +313,7 @@ proc saveCheckpoint*( if accountsSaveStorageSlotsMax < nStoQu: return err(TooManySlotAccounts) - ctx.pool.snapDb.pivotSaveDB SnapDbPivotRegistry( + result = ctx.pool.snapDb.pivotSaveDB SnapDbPivotRegistry( header: env.stateHeader, nAccounts: env.nAccounts, nSlotLists: env.nSlotLists, @@ -297,6 +323,9 @@ proc saveCheckpoint*( toSeq(env.fetchStoragePart.nextKeys)).mapIt(it.to(NodeKey)) & toSeq(env.parkedStorage.items)) + if result.isOk and env.pivotCompleteOk(): + env.savedFullPivotOk = true + proc pivotRecoverFromCheckpoint*( env: SnapPivotRef; # Current pivot environment @@ -339,7 +368,15 @@ proc pivotRecoverFromCheckpoint*( env.storageQueueAppendFull(rc.value.storageRoot, w) # Handle mothballed pivots for swapping in (see `pivotMothball()`) - if not topLevel: + if topLevel: + env.savedFullPivotOk = env.pivotCompleteOk() + when extraTraceMessages: + trace logTxt "recovered top level record", + pivot=env.stateHeader.blockNumber.toStr, + savedFullPivotOk=env.savedFullPivotOk, + processed=env.fetchAccounts.processed.fullPC3, + nStoQuTotal=env.storageQueueTotal() + else: for kvp in env.fetchStorageFull.nextPairs: let rc = env.storageAccounts.insert(kvp.data.accKey.to(NodeTag)) if rc.isOk: @@ -355,7 +392,6 @@ proc pivotApprovePeer*(buddy: SnapBuddyRef) {.async.} = ## it will not proceed to the next scheduler task. let ctx = buddy.ctx - peer {.used.} = buddy.peer beaconHeader = ctx.pool.beaconHeader var pivotHeader: BlockHeader @@ -375,9 +411,9 @@ proc pivotApprovePeer*(buddy: SnapBuddyRef) {.async.} = ctx.poolMode = true when extraTraceMessages: - trace "New pivot from beacon chain", peer, - pivot=("#" & $pivotHeader.blockNumber), - beacon=("#" & $beaconHeader.blockNumber), poolMode=ctx.poolMode + trace logTxt "new pivot from beacon chain", peer=buddy.peer, + pivot=pivotHeader.blockNumber.toStr, + beacon=beaconHeader.blockNumber.toStr, poolMode=ctx.poolMode discard ctx.pool.pivotTable.lruAppend( beaconHeader.stateRoot, SnapPivotRef.init(ctx, beaconHeader), @@ -396,9 +432,151 @@ proc pivotUpdateBeaconHeaderCB*(ctx: SnapCtxRef): SyncReqNewHeadCB = result = proc(h: BlockHeader) {.gcsafe.} = if ctx.pool.beaconHeader.blockNumber < h.blockNumber: # when extraTraceMessages: - # trace "External beacon info update", header=("#" & $h.blockNumber) + # trace logTxt "external beacon info update", header=h.blockNumber.toStr ctx.pool.beaconHeader = h +# ------------------------------------------------------------------------------ +# Public function, debugging +# ------------------------------------------------------------------------------ + +import + db/[hexary_desc, hexary_inspect, hexary_nearby, hexary_paths, + snapdb_storage_slots] + +const + pivotVerifyExtraBlurb = false # or true + inspectSuspendAfter = 10_000 + inspectExtraNap = 100.milliseconds + +proc pivotVerifyComplete*( + env: SnapPivotRef; # Current pivot environment + ctx: SnapCtxRef; # Some global context + inspectAccountsTrie = false; # Check for dangling links + walkAccountsDB = true; # Walk accounts db + inspectSlotsTries = true; # Check dangling links (if `walkAccountsDB`) + ): Future[bool] + {.async,discardable.} = + ## Check the database whether the pivot is complete -- not advidsed on a + ## production system as the process takes a lot of ressources. + let + rootKey = env.stateHeader.stateRoot.to(NodeKey) + accFn = ctx.pool.snapDb.getAccountFn + + # Verify consistency of accounts trie database. This should not be needed + # if `walkAccountsDB` is set. In case that there is a dangling link that would + # have been detected by `hexaryInspectTrie()`, the `hexaryNearbyRight()` + # function should fail at that point as well. + if inspectAccountsTrie: + var + stats = accFn.hexaryInspectTrie(rootKey, + suspendAfter=inspectSuspendAfter, + maxDangling=1) + nVisited = stats.count + nRetryCount = 0 + while stats.dangling.len == 0 and not stats.resumeCtx.isNil: + when pivotVerifyExtraBlurb: + trace logTxt "accounts db inspect ..", nVisited, nRetryCount + await sleepAsync inspectExtraNap + nRetryCount.inc + stats = accFn.hexaryInspectTrie(rootKey, + resumeCtx=stats.resumeCtx, + suspendAfter=inspectSuspendAfter, + maxDangling=1) + nVisited += stats.count + # End while + + if stats.dangling.len != 0: + error logTxt "accounts trie has danglig links", nVisited, nRetryCount + return false + trace logTxt "accounts trie ok", nVisited, nRetryCount + # End `if inspectAccountsTrie` + + # Visit accounts and make sense of storage slots + if walkAccountsDB: + var + nAccounts = 0 + nStorages = 0 + nRetryTotal = 0 + nodeTag = low(NodeTag) + while true: + if (nAccounts mod inspectSuspendAfter) == 0 and 0 < nAccounts: + when pivotVerifyExtraBlurb: + trace logTxt "accounts db walk ..", + nAccounts, nStorages, nRetryTotal, inspectSlotsTries + await sleepAsync inspectExtraNap + + # Find next account key => `nodeTag` + let rc = nodeTag.hexaryPath(rootKey,accFn).hexaryNearbyRight(accFn) + if rc.isErr: + if rc.error == NearbyBeyondRange: + break # No more accounts + error logTxt "accounts db problem", nodeTag, + nAccounts, nStorages, nRetryTotal, inspectSlotsTries, + error=rc.error + return false + nodeTag = rc.value.getPartialPath.convertTo(NodeKey).to(NodeTag) + nAccounts.inc + + # Decode accounts data + var accData: Account + try: + accData = rc.value.leafData.decode(Account) + except RlpError as e: + error logTxt "account data problem", nodeTag, + nAccounts, nStorages, nRetryTotal, inspectSlotsTries, + name=($e.name), msg=(e.msg) + return false + + # Check for storage slots for this account + if accData.storageRoot != emptyRlpHash: + nStorages.inc + if inspectSlotsTries: + let + slotFn = ctx.pool.snapDb.getStorageSlotsFn(nodeTag.to(NodeKey)) + stoKey = accData.storageRoot.to(NodeKey) + var + stats = slotFn.hexaryInspectTrie(stoKey, + suspendAfter=inspectSuspendAfter, + maxDangling=1) + nVisited = stats.count + nRetryCount = 0 + while stats.dangling.len == 0 and not stats.resumeCtx.isNil: + when pivotVerifyExtraBlurb: + trace logTxt "storage slots inspect ..", nodeTag, + nAccounts, nStorages, nRetryTotal, inspectSlotsTries, + nVisited, nRetryCount + await sleepAsync inspectExtraNap + nRetryCount.inc + nRetryTotal.inc + stats = accFn.hexaryInspectTrie(stoKey, + resumeCtx=stats.resumeCtx, + suspendAfter=inspectSuspendAfter, + maxDangling=1) + nVisited += stats.count + + if stats.dangling.len != 0: + error logTxt "storage slots trie has dangling link", nodeTag, + nAccounts, nStorages, nRetryTotal, inspectSlotsTries, + nVisited, nRetryCount + return false + if nVisited == 0: + error logTxt "storage slots trie is empty", nodeTag, + nAccounts, nStorages, nRetryTotal, inspectSlotsTries, + nVisited, nRetryCount + return false + + # Set up next node key for looping + if nodeTag == high(NodeTag): + break + nodeTag = nodeTag + 1.u256 + # End while + + trace logTxt "accounts db walk ok", + nAccounts, nStorages, nRetryTotal, inspectSlotsTries + # End `if walkAccountsDB` + + return true + # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/pivot/heal_accounts.nim b/nimbus/sync/snap/worker/pivot/heal_accounts.nim index d51238c87..350ce94e6 100644 --- a/nimbus/sync/snap/worker/pivot/heal_accounts.nim +++ b/nimbus/sync/snap/worker/pivot/heal_accounts.nim @@ -85,7 +85,7 @@ proc healingCtx( ): string = let ctx = buddy.ctx "{" & - "piv=" & "#" & $env.stateHeader.blockNumber & "," & + "piv=" & env.stateHeader.blockNumber.toStr & "," & "ctl=" & $buddy.ctrl.state & "," & "nAccounts=" & $env.nAccounts & "," & ("covered=" & $env.fetchAccounts.processed & "/" & @@ -156,7 +156,7 @@ proc getNodesFromNetwork( let peer {.used.} = buddy.peer rootHash = env.stateHeader.stateRoot - pivot = "#" & $env.stateHeader.blockNumber # for logging + pivot = env.stateHeader.blockNumber.toStr # for logging in `getTrieNodes()` # Initalise for fetching nodes from the network via `getTrieNodes()` var diff --git a/nimbus/sync/snap/worker/pivot/heal_storage_slots.nim b/nimbus/sync/snap/worker/pivot/heal_storage_slots.nim index 3f1248783..f9227f52c 100644 --- a/nimbus/sync/snap/worker/pivot/heal_storage_slots.nim +++ b/nimbus/sync/snap/worker/pivot/heal_storage_slots.nim @@ -86,7 +86,7 @@ proc healingCtx( env: SnapPivotRef; ): string {.used.} = "{" & - "piv=" & "#" & $env.stateHeader.blockNumber & "," & + "piv=" & env.stateHeader.blockNumber.toStr & "," & "ctl=" & $buddy.ctrl.state & "," & "nStoQu=" & $env.storageQueueTotal() & "," & "nQuPart=" & $env.fetchStoragePart.len & "," & @@ -99,7 +99,7 @@ proc healingCtx( env: SnapPivotRef; ): string = "{" & - "piv=" & "#" & $env.stateHeader.blockNumber & "," & + "piv=" & env.stateHeader.blockNumber.toStr & "," & "ctl=" & $buddy.ctrl.state & "," & "processed=" & $kvp.data.slots.processed & "," & "nStoQu=" & $env.storageQueueTotal() & "," & @@ -171,7 +171,7 @@ proc getNodesFromNetwork( peer {.used.} = buddy.peer accPath = kvp.data.accKey.to(Blob) rootHash = env.stateHeader.stateRoot - pivot = "#" & $env.stateHeader.blockNumber # for logging + pivot = env.stateHeader.blockNumber.toStr # for logging in `getTrieNodes()` # Initalise for fetching nodes from the network via `getTrieNodes()` var diff --git a/nimbus/sync/snap/worker/pivot/range_fetch_accounts.nim b/nimbus/sync/snap/worker/pivot/range_fetch_accounts.nim index f1d30ca0c..3d90fc069 100644 --- a/nimbus/sync/snap/worker/pivot/range_fetch_accounts.nim +++ b/nimbus/sync/snap/worker/pivot/range_fetch_accounts.nim @@ -48,9 +48,7 @@ import chronos, eth/[common, p2p], stew/[interval_set, keyed_queue], - stint, - ../../../../utils/prettify, - ../../../sync_desc, + "../../.."/[sync_desc, types], "../.."/[constants, range_desc, worker_desc], ../com/[com_error, get_account_range], ../db/[hexary_envelope, snapdb_accounts], @@ -81,7 +79,7 @@ proc fetchCtx( env: SnapPivotRef; ): string {.used.} = "{" & - "piv=" & "#" & $env.stateHeader.blockNumber & "," & + "piv=" & env.stateHeader.blockNumber.toStr & "," & "ctl=" & $buddy.ctrl.state & "," & "nStoQu=" & $env.storageQueueTotal() & "," & "nSlotLists=" & $env.nSlotLists & "}" @@ -129,7 +127,7 @@ proc accountsRangefetchImpl( # Process received accounts and stash storage slots to fetch later let dd = block: let - pivot = "#" & $env.stateHeader.blockNumber + pivot = env.stateHeader.blockNumber.toStr rc = await buddy.getAccountRange(stateRoot, iv, pivot) if rc.isErr: fa.unprocessed.mergeSplit iv # fail => interval back to pool diff --git a/nimbus/sync/snap/worker/pivot/range_fetch_storage_slots.nim b/nimbus/sync/snap/worker/pivot/range_fetch_storage_slots.nim index a4bd28d84..1ad4bdfc9 100644 --- a/nimbus/sync/snap/worker/pivot/range_fetch_storage_slots.nim +++ b/nimbus/sync/snap/worker/pivot/range_fetch_storage_slots.nim @@ -68,10 +68,9 @@ import std/sets, chronicles, chronos, - eth/[common, p2p], + eth/p2p, stew/[interval_set, keyed_queue], - stint, - ../../../sync_desc, + "../../.."/[sync_desc, types], "../.."/[constants, range_desc, worker_desc], ../com/[com_error, get_storage_ranges], ../db/[hexary_error, snapdb_storage_slots], @@ -95,7 +94,7 @@ proc fetchCtx( env: SnapPivotRef; ): string = "{" & - "piv=" & "#" & $env.stateHeader.blockNumber & "," & + "piv=" & env.stateHeader.blockNumber.toStr & "," & "ctl=" & $buddy.ctrl.state & "," & "nQuFull=" & $env.fetchStorageFull.len & "," & "nQuPart=" & $env.fetchStoragePart.len & "," & @@ -118,7 +117,7 @@ proc fetchStorageSlotsImpl( ctx = buddy.ctx peer = buddy.peer stateRoot = env.stateHeader.stateRoot - pivot = "#" & $env.stateHeader.blockNumber # for logging + pivot = env.stateHeader.blockNumber.toStr # logging in `getStorageRanges()` # Get storages slots data from the network var stoRange = block: diff --git a/nimbus/sync/snap/worker/pivot/swap_in.nim b/nimbus/sync/snap/worker/pivot/swap_in.nim index 4ee0b67cf..6af1f3f60 100644 --- a/nimbus/sync/snap/worker/pivot/swap_in.nim +++ b/nimbus/sync/snap/worker/pivot/swap_in.nim @@ -257,7 +257,7 @@ proc swapInAccounts*( return # nothing to do let - pivot {.used.} = "#" & $env.stateHeader.blockNumber # Logging & debugging + pivot {.used.} = env.stateHeader.blockNumber.toStr # Logging & debugging rootKey = env.stateHeader.stateRoot.to(NodeKey) getFn = ctx.pool.snapDb.getAccountFn diff --git a/nimbus/sync/snap/worker/play.nim b/nimbus/sync/snap/worker/play.nim index c37155c32..9c3f343af 100644 --- a/nimbus/sync/snap/worker/play.nim +++ b/nimbus/sync/snap/worker/play.nim @@ -9,17 +9,18 @@ import ../worker_desc, - ./play/[play_desc, play_full_sync, play_prep_full, play_snap_sync] + ./play/[play_desc, play_full_sync, play_snap_sync] export PlaySyncSpecs, - playSyncSpecs, - `playMode=` + playMethod -proc playInit*(desc: var SnapSyncSpecs) = +proc playSetup*(ctx: SnapCtxRef) = ## Set up sync mode specs table. This cannot be done at compile time. - desc.tab[SnapSyncMode] = playSnapSyncSpecs() - desc.tab[PreFullSyncMode] = playPrepFullSpecs() - desc.tab[FullSyncMode] = playFullSyncSpecs() + ctx.pool.syncMode.tab[SnapSyncMode] = playSnapSyncSpecs() + ctx.pool.syncMode.tab[FullSyncMode] = playFullSyncSpecs() + +proc playRelease*(ctx: SnapCtxRef) = + discard # End diff --git a/nimbus/sync/snap/worker/play/play_desc.nim b/nimbus/sync/snap/worker/play/play_desc.nim index 2f2e3abaa..9b0f91ee2 100644 --- a/nimbus/sync/snap/worker/play/play_desc.nim +++ b/nimbus/sync/snap/worker/play/play_desc.nim @@ -14,17 +14,38 @@ import type PlayVoidFutureCtxFn* = proc( - ctx: SnapCtxRef): Future[void] {.gcsafe, raises: [CatchableError].} + ctx: SnapCtxRef): Future[void] + {.gcsafe, raises: [CatchableError].} + + PlayVoidCtxFn* = proc( + ctx: SnapCtxRef) + {.gcsafe, raises: [CatchableError].} + PlayVoidFutureBuddyFn* = proc( - buddy: SnapBuddyRef): Future[void] {.gcsafe, raises: [CatchableError].} + buddy: SnapBuddyRef): Future[void] + {.gcsafe, raises: [CatchableError].} + + PlayBoolBuddyBoolIntFn* = proc( + buddy: SnapBuddyRef; last: bool; laps: int): bool + {.gcsafe, raises: [CatchableError].} PlayBoolBuddyFn* = proc( - buddy: SnapBuddyRef, last: bool): bool {.gcsafe, raises: [CatchableError].} + buddy: SnapBuddyRef): bool + {.gcsafe, raises: [CatchableError].} + + PlayVoidBuddyFn* = proc( + buddy: SnapBuddyRef) + {.gcsafe, raises: [CatchableError].} + PlaySyncSpecs* = ref object of RootRef ## Holds sync mode specs & methods for a particular sync state - pool*: PlayBoolBuddyFn + setup*: PlayVoidCtxFn + release*: PlayVoidCtxFn + start*: PlayBoolBuddyFn + stop*: PlayVoidBuddyFn + pool*: PlayBoolBuddyBoolIntFn daemon*: PlayVoidFutureCtxFn single*: PlayVoidFutureBuddyFn multi*: PlayVoidFutureBuddyFn @@ -33,14 +54,10 @@ type # Public functions # ------------------------------------------------------------------------------ -proc playSyncSpecs*(ctx: SnapCtxRef): PlaySyncSpecs = +proc playMethod*(ctx: SnapCtxRef): PlaySyncSpecs = ## Getter ctx.pool.syncMode.tab[ctx.pool.syncMode.active].PlaySyncSpecs -proc `playMode=`*(ctx: SnapCtxRef; val: SnapSyncModeType) = - ## Setter - ctx.pool.syncMode.active = val - # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/play/play_full_sync.nim b/nimbus/sync/snap/worker/play/play_full_sync.nim index bdb3ab36b..41cf0b56d 100644 --- a/nimbus/sync/snap/worker/play/play_full_sync.nim +++ b/nimbus/sync/snap/worker/play/play_full_sync.nim @@ -14,8 +14,11 @@ import chronicles, chronos, eth/p2p, - ../../../sync_desc, - ../../worker_desc, + ../../../misc/[best_pivot, block_queue], + "../../.."/[protocol, sync_desc, types], + "../.."/[range_desc, worker_desc], + ../db/[snapdb_desc, snapdb_persistent], + ".."/[pivot, ticker], play_desc const @@ -23,30 +26,217 @@ const ## Enabled additional logging noise # ------------------------------------------------------------------------------ -# Private functions, full sync handlers +# Private helpers # ------------------------------------------------------------------------------ -proc fullSyncPool(buddy: SnapBuddyRef, last: bool): bool = - buddy.ctx.poolMode = false - true +template ignoreException(info: static[string]; code: untyped) = + try: + code + except CatchableError as e: + error "Exception at " & info & ":", name=($e.name), msg=(e.msg) + +proc tickerUpdater(ctx: SnapCtxRef): TickerFullStatsUpdater = + result = proc: TickerFullStats = + var stats: BlockQueueStats + ctx.pool.bCtx.blockQueueStats(stats) + + TickerFullStats( + topPersistent: stats.topAccepted, + nextStaged: stats.nextStaged, + nextUnprocessed: stats.nextUnprocessed, + nStagedQueue: stats.nStagedQueue, + reOrg: stats.reOrg) + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +proc processStaged(buddy: SnapBuddyRef): bool = + ## Fetch a work item from the `staged` queue an process it to be + ## stored on the persistent block chain. + let + ctx {.used.} = buddy.ctx + peer = buddy.peer + chainDb = buddy.ctx.chain.db + chain = buddy.ctx.chain + bq = buddy.only.bQueue + + # Get a work item, a list of headers + bodies + wi = block: + let rc = bq.blockQueueFetchStaged() + if rc.isErr: + return false + rc.value + + #startNumber = wi.headers[0].blockNumber -- unused + + # Store in persistent database + try: + if chain.persistBlocks(wi.headers, wi.bodies) == ValidationResult.OK: + bq.blockQueueAccept(wi) + return true + except CatchableError as e: + error "Storing persistent blocks failed", peer, range=($wi.blocks), + name=($e.name), msg=(e.msg) + + # Something went wrong. Recycle work item (needs to be re-fetched, anyway) + let + parentHash = wi.headers[0].parentHash + try: + # Check whether hash of the first block is consistent + var parent: BlockHeader + if chainDb.getBlockHeader(parentHash, parent): + # First block parent is ok, so there might be other problems. Re-fetch + # the blocks from another peer. + trace "Storing persistent blocks failed", peer, range=($wi.blocks) + bq.blockQueueRecycle(wi) + buddy.ctrl.zombie = true + return false + except CatchableError as e: + error "Failed to access parent blocks", peer, + blockNumber=wi.headers[0].blockNumber.toStr, name=($e.name), msg=e.msg + + # Parent block header problem, so we might be in the middle of a re-org. + # Set single mode backtrack following the offending parent hash. + bq.blockQueueBacktrackFrom(wi) + buddy.ctrl.multiOk = false + + if wi.topHash.isNone: + # Assuming that currently staged entries are on the wrong branch + bq.blockQueueRecycleStaged() + notice "Starting chain re-org backtrack work item", peer, range=($wi.blocks) + else: + # Leave that block range in the staged list + trace "Resuming chain re-org backtrack work item", peer, range=($wi.blocks) + discard + + return false + +# ------------------------------------------------------------------------------ +# Private functions, full sync admin handlers +# ------------------------------------------------------------------------------ + +proc fullSyncSetup(ctx: SnapCtxRef) = + let blockNum = if ctx.pool.fullPivot.isNil: ctx.pool.pivotTable.topNumber + else: ctx.pool.fullPivot.stateHeader.blockNumber + + ctx.pool.bCtx = BlockQueueCtxRef.init(blockNum + 1) + ctx.pool.bPivot = BestPivotCtxRef.init(rng=ctx.pool.rng, minPeers=0) + ctx.pool.ticker.init(cb = ctx.tickerUpdater()) + +proc fullSyncRelease(ctx: SnapCtxRef) = + discard + + +proc fullSyncStart(buddy: SnapBuddyRef): bool = + let + ctx = buddy.ctx + peer = buddy.peer + + if peer.supports(protocol.eth) and + peer.state(protocol.eth).initialized: + + buddy.only.bQueue = BlockQueueWorkerRef.init( + ctx.pool.bCtx, buddy.ctrl, peer) + buddy.only.bPivot = BestPivotWorkerRef.init( + ctx.pool.bPivot, buddy.ctrl, buddy.peer) + + ctx.pool.ticker.startBuddy() + buddy.ctrl.multiOk = false # confirm default mode for soft restart + return true + +proc fullSyncStop(buddy: SnapBuddyRef) = + buddy.only.bPivot.clear() + buddy.ctx.pool.ticker.stopBuddy() + +# ------------------------------------------------------------------------------ +# Private functions, full sync action handlers +# ------------------------------------------------------------------------------ proc fullSyncDaemon(ctx: SnapCtxRef) {.async.} = ctx.daemon = false + +proc fullSyncPool(buddy: SnapBuddyRef, last: bool; laps: int): bool = + let + ctx = buddy.ctx + env = ctx.pool.fullPivot + + # Take over soft restart after switch to full sync mode. + # This process needs to be applied to all buddy peers. + if not env.isNil: + # Soft start all peers on the second lap. + ignoreException("fullSyncPool"): + if not ctx.playMethod.start(buddy): + # Start() method failed => wait for another peer + buddy.ctrl.stopped = true + if last: + trace "Soft full sync restart done", peer=buddy.peer, last, laps, + pivot=env.stateHeader.blockNumber.toStr, + mode=ctx.pool.syncMode.active, state= buddy.ctrl.state + + # Store pivot as parent hash in database + ctx.pool.snapDb.kvDb.persistentBlockHeaderPut env.stateHeader + + # Instead of genesis. + ctx.chain.com.startOfHistory = env.stateHeader.blockHash + + # Reset so that this actuin would not be triggered, again + ctx.pool.fullPivot = nil + return false # do stop magically when looping over peers is exhausted + + # Mind the gap, fill in if necessary (function is peer independent) + buddy.only.bQueue.blockQueueGrout() + true # Stop after running once regardless of peer + + proc fullSyncSingle(buddy: SnapBuddyRef) {.async.} = - buddy.ctrl.multiOk = true + let + pv = buddy.only.bPivot + bq = buddy.only.bQueue + bNum = bq.bestNumber.get(otherwise = bq.topAccepted + 1) + + # Negotiate in order to derive the pivot header from this `peer`. + if await pv.pivotNegotiate(some(bNum)): + # Update/activate `bestNumber` from the pivot header + bq.bestNumber = some(pv.pivotHeader.value.blockNumber) + buddy.ctrl.multiOk = true + when extraTraceMessages: + trace "Full sync pivot accepted", peer=buddy.peer, + minNumber=bNum.toStr, bestNumber=bq.bestNumber.unsafeGet.toStr + return + + if buddy.ctrl.stopped: + when extraTraceMessages: + trace "Full sync single mode stopped", peer=buddy.peer + return # done with this buddy + + # Without waiting, this function repeats every 50ms (as set with the constant + # `sync_sched.execLoopTimeElapsedMin`.) + await sleepAsync 300.milliseconds + proc fullSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} = ## Full sync processing let ctx = buddy.ctx - peer = buddy.peer + bq = buddy.only.bQueue - trace "Snap full sync -- not implemented yet", peer - await sleepAsync(5.seconds) + # Fetch work item + let rc = await bq.blockQueueWorker() + if rc.isErr: + if rc.error == StagedQueueOverflow: + # Mind the gap: Turn on pool mode if there are too may staged items. + ctx.poolMode = true + else: + trace "Full sync error", peer=buddy.peer, error=rc.error + return - # flip over to single mode for getting new instructins - buddy.ctrl.multiOk = false + # Update persistent database + while buddy.processStaged() and not buddy.ctrl.stopped: + trace "Full sync multi processed", peer=buddy.peer + # Allow thread switch as `persistBlocks()` might be slow + await sleepAsync(10.milliseconds) # ------------------------------------------------------------------------------ # Public functions @@ -55,10 +245,14 @@ proc fullSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} = proc playFullSyncSpecs*: PlaySyncSpecs = ## Return full sync handler environment PlaySyncSpecs( - pool: fullSyncPool, - daemon: fullSyncDaemon, - single: fullSyncSingle, - multi: fullSyncMulti) + setup: fullSyncSetup, + release: fullSyncRelease, + start: fullSyncStart, + stop: fullSyncStop, + pool: fullSyncPool, + daemon: fullSyncDaemon, + single: fullSyncSingle, + multi: fullSyncMulti) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/play/play_prep_full.nim b/nimbus/sync/snap/worker/play/play_prep_full.nim deleted file mode 100644 index ecff962b1..000000000 --- a/nimbus/sync/snap/worker/play/play_prep_full.nim +++ /dev/null @@ -1,92 +0,0 @@ -# http://www.apache.org/licenses/LICENSE-2.0) -# * MIT license ([LICENSE-MIT](LICENSE-MIT) or -# http://opensource.org/licenses/MIT) -# at your option. This file may not be copied, modified, or distributed -# except according to those terms. - -## Transitional handlers preparing for full sync - -{.push raises: [].} - -import - chronicles, - chronos, - eth/p2p, - stew/keyed_queue, - ../../../sync_desc, - ../../worker_desc, - ../ticker, - play_desc - -const - extraTraceMessages = false or true - ## Enabled additional logging noise - -# ------------------------------------------------------------------------------ -# Private helpers -# ------------------------------------------------------------------------------ - -proc blindTicker(ctx: SnapCtxRef): TickerFullStatsUpdater = - result = proc: TickerFullStats = - discard - -# ------------------------------------------------------------------------------ -# Private functions, transitional handlers preparing for full sync -# ------------------------------------------------------------------------------ - -proc prepFullSyncPool(buddy: SnapBuddyRef, last: bool): bool = - buddy.ctx.poolMode = false - true - -proc prepFullSyncDaemon(ctx: SnapCtxRef) {.async.} = - ctx.daemon = false - -proc prepFullSyncSingle(buddy: SnapBuddyRef) {.async.} = - ## One off, setting up full sync processing in single mode - let - ctx = buddy.ctx - - # Fetch latest state root environment - env = block: - let rc = ctx.pool.pivotTable.lastValue - if rc.isErr: - buddy.ctrl.multiOk = false - return - rc.value - - peer = buddy.peer - pivot = "#" & $env.stateHeader.blockNumber # for logging - - when extraTraceMessages: - trace "Full sync prepare in single mode", peer, pivot - - # update ticker (currently blind) - ctx.pool.ticker.init(cb = ctx.blindTicker()) - - # Cosmetics: allow other processes (e.g. ticker) to log the current - # state. There is no other intended purpose of this wait state. - await sleepAsync 1100.milliseconds - - ctx.playMode = FullSyncMode - buddy.ctrl.multiOk = true - - -proc prepFullSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} = - ## One off, setting up full sync processing in single mode - buddy.ctrl.multiOk = false - -# ------------------------------------------------------------------------------ -# Public functions -# ------------------------------------------------------------------------------ - -proc playPrepFullSpecs*: PlaySyncSpecs = - ## Return full sync preparation handler environment - PlaySyncSpecs( - pool: prepFullSyncPool, - daemon: prepFullSyncDaemon, - single: prepFullSyncSingle, - multi: prepFullSyncMulti) - -# ------------------------------------------------------------------------------ -# End -# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/play/play_snap_sync.nim b/nimbus/sync/snap/worker/play/play_snap_sync.nim index f9fabc189..b8cc31c97 100644 --- a/nimbus/sync/snap/worker/play/play_snap_sync.nim +++ b/nimbus/sync/snap/worker/play/play_snap_sync.nim @@ -15,7 +15,7 @@ import chronos, eth/p2p, stew/[interval_set, keyed_queue], - ../../../sync_desc, + "../../.."/[handlers/eth, protocol, sync_desc, types], ".."/[pivot, ticker], ../pivot/storage_queue_helper, ../db/[hexary_desc, snapdb_pivot], @@ -33,19 +33,63 @@ const # Private helpers # ------------------------------------------------------------------------------ +template logTxt(info: static[string]): static[string] = + "Snap worker " & info + +template ignoreException(info: static[string]; code: untyped) = + try: + code + except CatchableError as e: + error "Exception at " & info & ":", name=($e.name), msg=(e.msg) + +# -------------- + +proc disableWireServices(ctx: SnapCtxRef) = + ## Helper for `setup()`: Temporarily stop useless wire protocol services. + ctx.ethWireCtx.txPoolEnabled = false + +proc enableWireServices(ctx: SnapCtxRef) = + ## Helper for `release()` + ctx.ethWireCtx.txPoolEnabled = true + +# -------------- + +proc enableRpcMagic(ctx: SnapCtxRef) = + ## Helper for `setup()`: Enable external pivot update via RPC + ctx.chain.com.syncReqNewHead = ctx.pivotUpdateBeaconHeaderCB + +proc disableRpcMagic(ctx: SnapCtxRef) = + ## Helper for `release()` + ctx.chain.com.syncReqNewHead = nil + +# -------------- + +proc detectSnapSyncRecovery(ctx: SnapCtxRef) = + ## Helper for `setup()`: Initiate snap sync recovery (if any) + let rc = ctx.pool.snapDb.pivotRecoverDB() + if rc.isOk: + ctx.pool.recovery = SnapRecoveryRef(state: rc.value) + ctx.daemon = true + + # Set up early initial pivot + ctx.pool.pivotTable.reverseUpdate(ctx.pool.recovery.state.header, ctx) + trace logTxt "recovery started", + checkpoint=(ctx.pool.pivotTable.topNumber.toStr & "(0)") + if not ctx.pool.ticker.isNil: + ctx.pool.ticker.startRecovery() + proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} = let recov = ctx.pool.recovery if recov.isNil: return false let - checkpoint = - "#" & $recov.state.header.blockNumber & "(" & $recov.level & ")" + checkpoint = recov.state.header.blockNumber.toStr & "(" & $recov.level & ")" topLevel = recov.level == 0 env = block: let rc = ctx.pool.pivotTable.eq recov.state.header.stateRoot if rc.isErr: - error "Recovery pivot context gone", checkpoint, topLevel + error logTxt "recovery pivot context gone", checkpoint, topLevel return false rc.value @@ -68,7 +112,7 @@ proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} = let rc = ctx.pool.snapDb.pivotRecoverDB(recov.state.predecessor) if rc.isErr: when extraTraceMessages: - trace "Recovery stopped at pivot stale checkpoint", checkpoint, topLevel + trace logTxt "stale pivot, recovery stopped", checkpoint, topLevel return false # Set up next level pivot checkpoint @@ -81,24 +125,98 @@ proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} = return true # continue recovery +# -------------- + +proc snapSyncCompleteOk( + env: SnapPivotRef; # Current pivot environment + ctx: SnapCtxRef; # Some global context + ): Future[bool] + {.async.} = + ## Check whether this pivot is fully downloaded. The `async` part is for + ## debugging, only and should not be used on a large database as it uses + ## quite a bit of computation ressources. + if env.pivotCompleteOk(): + if env.nAccounts <= 1_000_000: # Larger sizes might be infeasible + if not await env.pivotVerifyComplete(ctx): + error logTxt "inconsistent state, pivot incomplete", + pivot = env.stateHeader.blockNumber.toStr + return false + ctx.pool.fullPivot = env + ctx.poolMode = true # Fast sync mode must be synchronized among all peers + return true + # ------------------------------------------------------------------------------ -# Private functions, snap sync handlers +# Private functions, snap sync admin handlers # ------------------------------------------------------------------------------ -proc snapSyncPool(buddy: SnapBuddyRef, last: bool): bool = +proc snapSyncSetup(ctx: SnapCtxRef) = + # For snap sync book keeping + ctx.pool.coveredAccounts = NodeTagRangeSet.init() + ctx.pool.ticker.init(cb = ctx.pool.pivotTable.tickerStats(ctx)) + + ctx.enableRpcMagic() # Allow external pivot update via RPC + ctx.disableWireServices() # Stop unwanted public services + ctx.detectSnapSyncRecovery() # Check for recovery mode + +proc snapSyncRelease(ctx: SnapCtxRef) = + ctx.disableRpcMagic() # Disable external pivot update via RPC + ctx.enableWireServices() # re-enable public services + ctx.pool.ticker.stop() + +proc snapSyncStart(buddy: SnapBuddyRef): bool = + let + ctx = buddy.ctx + peer = buddy.peer + if peer.supports(protocol.snap) and + peer.supports(protocol.eth) and + peer.state(protocol.eth).initialized: + ctx.pool.ticker.startBuddy() + buddy.ctrl.multiOk = false # confirm default mode for soft restart + return true + +proc snapSyncStop(buddy: SnapBuddyRef) = + buddy.ctx.pool.ticker.stopBuddy() + +# ------------------------------------------------------------------------------ +# Private functions, snap sync action handlers +# ------------------------------------------------------------------------------ + +proc snapSyncPool(buddy: SnapBuddyRef, last: bool, laps: int): bool = ## Enabled when `buddy.ctrl.poolMode` is `true` ## - let ctx = buddy.ctx - ctx.poolMode = false - result = true + let + ctx = buddy.ctx + env = ctx.pool.fullPivot - # Clean up empty pivot slots (never the top one) + # Check whether the snapshot is complete. If so, switch to full sync mode. + # This process needs to be applied to all buddy peers. + if not env.isNil: + ignoreException("snapSyncPool"): + # Stop all peers + ctx.playMethod.stop(buddy) + # After the last buddy peer was stopped switch to full sync mode + # and repeat that loop over buddy peers for re-starting them. + if last: + when extraTraceMessages: + trace logTxt "switch to full sync", peer=buddy.peer, last, laps, + pivot=env.stateHeader.blockNumber.toStr, + mode=ctx.pool.syncMode.active, state= buddy.ctrl.state + ctx.playMethod.release(ctx) + ctx.pool.syncMode.active = FullSyncMode + ctx.playMethod.setup(ctx) + ctx.poolMode = true # repeat looping over peers + return false # do stop magically when looping over peers is exhausted + + # Clean up empty pivot slots (never the top one.) This needs to be run on + # a single peer only. So the loop can stop immediately (returning `true`) + # after this job is done. var rc = ctx.pool.pivotTable.beforeLast while rc.isOK: let (key, env) = (rc.value.key, rc.value.data) if env.fetchAccounts.processed.isEmpty: ctx.pool.pivotTable.del key rc = ctx.pool.pivotTable.prev(key) + true # Stop ok proc snapSyncDaemon(ctx: SnapCtxRef) {.async.} = @@ -120,11 +238,15 @@ proc snapSyncSingle(buddy: SnapBuddyRef) {.async.} = ## * `buddy.ctrl.multiOk` is `false` ## * `buddy.ctrl.poolMode` is `false` ## - let ctx = buddy.ctx - # External beacon header updater await buddy.updateBeaconHeaderFromFile() + # Dedicate some process cycles to the recovery process (if any) + if not buddy.ctx.pool.recovery.isNil: + when extraTraceMessages: + trace "Throttling single mode in favour of recovery", peer=buddy.peer + await sleepAsync 900.milliseconds + await buddy.pivotApprovePeer() buddy.ctrl.multiOk = true @@ -145,17 +267,8 @@ proc snapSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} = return # nothing to do rc.value - peer = buddy.peer - pivot = "#" & $env.stateHeader.blockNumber # for logging - fa = env.fetchAccounts - # Check whether this pivot is fully downloaded - if env.fetchAccounts.processed.isFull and env.storageQueueTotal() == 0: - # Switch to full sync => final state - ctx.playMode = PreFullSyncMode - trace "Switch to full sync", peer, pivot, nAccounts=env.nAccounts, - processed=fa.processed.fullPC3, nStoQu=env.storageQueueTotal(), - nSlotLists=env.nSlotLists + if await env.snapSyncCompleteOk(ctx): return # If this is a new snap sync pivot, the previous one can be cleaned up and @@ -163,6 +276,11 @@ proc snapSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} = # data any longer. ctx.pool.pivotTable.beforeTopMostlyClean() + let + peer = buddy.peer + pivot = env.stateHeader.blockNumber.toStr # for logging + fa = env.fetchAccounts + when extraTraceMessages: trace "Multi sync runner", peer, pivot, nAccounts=env.nAccounts, processed=fa.processed.fullPC3, nStoQu=env.storageQueueTotal(), @@ -180,7 +298,7 @@ proc snapSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} = # Archive this pivot eveironment if it has become stale if env.archived: when extraTraceMessages: - trace "Mothballing", peer, pivot, nAccounts, nSlotLists + trace logTxt "mothballing", peer, pivot, nAccounts, nSlotLists env.pivotMothball() return @@ -188,12 +306,17 @@ proc snapSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} = let rc = env.saveCheckpoint(ctx) if rc.isOk: when extraTraceMessages: - trace "Saved recovery checkpoint", peer, pivot, nAccounts, processed, - nStoQu=env.storageQueueTotal(), nSlotLists, blobSize=rc.value + trace logTxt "saved checkpoint", peer, pivot, nAccounts, + processed, nStoQu=env.storageQueueTotal(), nSlotLists, + blobSize=rc.value return - error "Failed to save recovery checkpoint", peer, pivot, nAccounts, - processed, nStoQu=env.storageQueueTotal(), nSlotLists, error=rc.error + error logTxt "failed to save checkpoint", peer, pivot, nAccounts, + processed, nStoQu=env.storageQueueTotal(), nSlotLists, + error=rc.error + + # Check whether this pivot is fully downloaded + discard await env.snapSyncCompleteOk(ctx) # ------------------------------------------------------------------------------ # Public functions @@ -202,10 +325,14 @@ proc snapSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} = proc playSnapSyncSpecs*: PlaySyncSpecs = ## Return snap sync handler environment PlaySyncSpecs( - pool: snapSyncPool, - daemon: snapSyncDaemon, - single: snapSyncSingle, - multi: snapSyncMulti) + setup: snapSyncSetup, + release: snapSyncRelease, + start: snapSyncStart, + stop: snapSyncStop, + pool: snapSyncPool, + daemon: snapSyncDaemon, + single: snapSyncSingle, + multi: snapSyncMulti) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/ticker.nim b/nimbus/sync/snap/worker/ticker.nim index c9d07930f..33d4a443e 100644 --- a/nimbus/sync/snap/worker/ticker.nim +++ b/nimbus/sync/snap/worker/ticker.nim @@ -9,6 +9,8 @@ # at your option. This file may not be copied, modified, or distributed # except according to those terms. +{.push raises: [].} + import std/[options, strformat, strutils], chronos, @@ -16,9 +18,8 @@ import eth/[common, p2p], stint, ../../../utils/prettify, - ../../misc/timer_helper - -{.push raises: [].} + ../../misc/timer_helper, + ../../types logScope: topics = "snap-tick" @@ -55,6 +56,7 @@ type TickerFullStats* = object ## Full sync state (see `TickerFullStatsUpdater`) + pivotBlock*: Option[BlockNumber] topPersistent*: BlockNumber nextUnprocessed*: Option[BlockNumber] nextStaged*: Option[BlockNumber] @@ -128,12 +130,6 @@ proc pc99(val: float): string = elif 0.0 < val and val <= 0.01: "1%" else: val.toPC(0) -proc pp(n: BlockNumber): string = - "#" & $n - -proc pp(n: Option[BlockNumber]): string = - if n.isNone: "n/a" else: n.get.pp - # ------------------------------------------------------------------------------ # Private functions: printing ticker messages # ------------------------------------------------------------------------------ @@ -155,9 +151,9 @@ proc snapTicker(t: TickerRef) {.gcsafe.} = var nAcc, nSto: string pv = "n/a" - bc = "n/a" nStoQue = "n/a" let + bc = data.beaconBlock.toStr recoveryDone = t.snap.lastRecov accCov = data.accountsFill[0].pc99 & "(" & data.accountsFill[1].pc99 & ")" & @@ -173,11 +169,10 @@ proc snapTicker(t: TickerRef) {.gcsafe.} = t.visited = now t.snap.lastRecov = t.snap.recovery + if data.pivotBlock.isSome: + pv = data.pivotBlock.toStr & "/" & $data.nQueues + noFmtError("runLogTicker"): - if data.pivotBlock.isSome: - pv = &"#{data.pivotBlock.get}/{data.nQueues}" - if data.beaconBlock.isSome: - bc = &"#{data.beaconBlock.get}" nAcc = (&"{(data.nAccounts[0]+0.5).int64}" & &"({(data.nAccounts[1]+0.5).int64})") nSto = (&"{(data.nSlotLists[0]+0.5).int64}" & @@ -205,13 +200,14 @@ proc fullTicker(t: TickerRef) {.gcsafe.} = if data != t.full.lastStats or tickerLogSuppressMax < (now - t.visited): let - persistent = data.topPersistent.pp - staged = data.nextStaged.pp - unprocessed = data.nextUnprocessed.pp + persistent = data.topPersistent.toStr + staged = data.nextStaged.toStr + unprocessed = data.nextUnprocessed.toStr queued = data.nStagedQueue reOrg = if data.reOrg: "t" else: "f" + pv = data.pivotBlock.toStr - buddies = t.nBuddies + nInst = t.nBuddies # With `int64`, there are more than 29*10^10 years range for seconds up = (now - t.started).seconds.uint64.toSI @@ -221,10 +217,10 @@ proc fullTicker(t: TickerRef) {.gcsafe.} = t.visited = now if data.suspended: - info "Sync statistics (suspended)", up, buddies, + info "Full sync statistics (suspended)", up, nInst, pv, persistent, unprocessed, staged, queued, reOrg, mem else: - info "Sync statistics", up, buddies, + info "Full sync statistics", up, nInst, pv, persistent, unprocessed, staged, queued, reOrg, mem # ------------------------------------------------------------------------------ @@ -255,36 +251,40 @@ proc initImpl(t: TickerRef; cb: TickerFullStatsUpdater) = # Public constructor and start/stop functions # ------------------------------------------------------------------------------ -proc init*(t: TickerRef; cb: TickerSnapStatsUpdater) = - ## Re-initialise ticket - t.visited.reset - if t.fullMode: - t.prettyPrint(t) # print final state for full sync - t.initImpl(cb) - -proc init*(t: TickerRef; cb: TickerFullStatsUpdater) = - ## Re-initialise ticket - t.visited.reset - if not t.fullMode: - t.prettyPrint(t) # print final state for snap sync - t.initImpl(cb) - proc init*(T: type TickerRef; cb: TickerSnapStatsUpdater): T = ## Constructor new result result.initImpl(cb) +proc init*(t: TickerRef; cb: TickerSnapStatsUpdater) = + ## Re-initialise ticket + if not t.isNil: + t.visited.reset + if t.fullMode: + t.prettyPrint(t) # print final state for full sync + t.initImpl(cb) + +proc init*(t: TickerRef; cb: TickerFullStatsUpdater) = + ## Re-initialise ticket + if not t.isNil: + t.visited.reset + if not t.fullMode: + t.prettyPrint(t) # print final state for snap sync + t.initImpl(cb) + proc start*(t: TickerRef) = ## Re/start ticker unconditionally - #debug "Started ticker" - t.logTicker = safeSetTimer(Moment.fromNow(tickerStartDelay), runLogTicker, t) - if t.started == Moment.default: - t.started = Moment.now() + if not t.isNil: + #debug "Started ticker" + t.logTicker = safeSetTimer(Moment.fromNow(tickerStartDelay),runLogTicker,t) + if t.started == Moment.default: + t.started = Moment.now() proc stop*(t: TickerRef) = ## Stop ticker unconditionally - t.logTicker = nil - #debug "Stopped ticker" + if not t.isNil: + t.logTicker = nil + #debug "Stopped ticker" # ------------------------------------------------------------------------------ # Public functions @@ -292,37 +292,41 @@ proc stop*(t: TickerRef) = proc startBuddy*(t: TickerRef) = ## Increment buddies counter and start ticker unless running. - if t.nBuddies <= 0: - t.nBuddies = 1 - if not t.fullMode: - if not t.snap.recovery: - t.start() - else: - t.nBuddies.inc + if not t.isNil: + if t.nBuddies <= 0: + t.nBuddies = 1 + if not t.fullMode: + if not t.snap.recovery: + t.start() + else: + t.nBuddies.inc proc startRecovery*(t: TickerRef) = ## Ditto for recovery mode - if not t.fullMode: - if not t.snap.recovery: - t.snap.recovery = true - if t.nBuddies <= 0: - t.start() + if not t.isNil: + if not t.fullMode: + if not t.snap.recovery: + t.snap.recovery = true + if t.nBuddies <= 0: + t.start() proc stopBuddy*(t: TickerRef) = ## Decrement buddies counter and stop ticker if there are no more registered ## buddies. - t.nBuddies.dec - if t.nBuddies <= 0: - if not t.fullMode: - if not t.snap.recovery: - t.stop() + if not t.isNil: + t.nBuddies.dec + if t.nBuddies <= 0: + if not t.fullMode: + if not t.snap.recovery: + t.stop() proc stopRecovery*(t: TickerRef) = ## Ditto for recovery mode - if not t.fullMode: - if t.snap.recovery: - t.snap.recovery = false - t.stop() + if not t.isNil: + if not t.fullMode: + if t.snap.recovery: + t.snap.recovery = false + t.stop() # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker_desc.nim b/nimbus/sync/snap/worker_desc.nim index 4f1040c99..15c82e85a 100644 --- a/nimbus/sync/snap/worker_desc.nim +++ b/nimbus/sync/snap/worker_desc.nim @@ -16,6 +16,7 @@ import eth/[common, p2p], stew/[interval_set, keyed_queue, sorted_set], ../../db/select_backend, + ../misc/[best_pivot, block_queue], ../sync_desc, ./worker/com/com_error, ./worker/db/[snapdb_desc, snapdb_pivot], @@ -68,6 +69,9 @@ type nAccounts*: uint64 ## Imported # of accounts nSlotLists*: uint64 ## Imported # of account storage tries + # Checkponting + savedFullPivotOk*: bool ## This fully completed pivot was saved + # Mothballing, ready to be swapped into newer pivot record storageAccounts*: SnapAccountsList ## Accounts with missing storage slots archived*: bool ## Not latest pivot, anymore @@ -84,11 +88,14 @@ type ## Per-worker local descriptor data extension errors*: ComErrorStatsRef ## For error handling + # Full sync continuation parameters + bPivot*: BestPivotWorkerRef ## Local pivot worker descriptor + bQueue*: BlockQueueWorkerRef ## Block queue worker + SnapSyncModeType* = enum ## Current sync mode, after a snapshot has been downloaded, the system ## proceeds with full sync. SnapSyncMode = 0 ## Start mode - PreFullSyncMode FullSyncMode SnapSyncSpecs* = object @@ -111,12 +118,17 @@ type covAccTimesFull*: uint ## # of 100% coverages recovery*: SnapRecoveryRef ## Current recovery checkpoint/context - # Snap/full mode muliplexing - syncMode*: SnapSyncSpecs ## Sync mode data contaier - # Info ticker*: TickerRef ## Ticker, logger + # Snap/full mode muliplexing + syncMode*: SnapSyncSpecs ## Sync mode methods & data + fullPivot*: SnapPivotRef ## Start full sync from here + + # Full sync continuation parameters + bPivot*: BestPivotCtxRef ## Global pivot descriptor + bCtx*: BlockQueueCtxRef ## Global block queue descriptor + SnapBuddyRef* = BuddyRef[SnapCtxData,SnapBuddyData] ## Extended worker peer descriptor diff --git a/nimbus/sync/sync_desc.nim b/nimbus/sync/sync_desc.nim index 62a4df243..045e608cb 100644 --- a/nimbus/sync/sync_desc.nim +++ b/nimbus/sync/sync_desc.nim @@ -16,8 +16,7 @@ {.push raises: [].} import - #std/options, - eth/[common, p2p], + eth/p2p, ../core/chain, ../db/db_chain, ./handlers/eth diff --git a/nimbus/sync/sync_sched.nim b/nimbus/sync/sync_sched.nim index 2fe88e9eb..9b3d24042 100644 --- a/nimbus/sync/sync_sched.nim +++ b/nimbus/sync/sync_sched.nim @@ -38,18 +38,21 @@ ## Clean up this worker peer. ## ## -## *runPool(buddy: BuddyRef[S,W], last: bool): bool* +## *runPool(buddy: BuddyRef[S,W], last: bool; laps: int): bool* ## Once started, the function `runPool()` is called for all worker peers in ## sequence as the body of an iteration as long as the function returns ## `false`. There will be no other worker peer functions activated ## simultaneously. ## ## This procedure is started if the global flag `buddy.ctx.poolMode` is set -## `true` (default is `false`.) It is the responsibility of the `runPool()` -## instance to reset the flag `buddy.ctx.poolMode`, typically at the first -## peer instance. +## `true` (default is `false`.) It will be automatically reset before the +## the loop starts. Re-setting it again results in repeating the loop. The +## argument `laps` (starting with `0`) indicated the currend lap of the +## repeated loops. To avoid continous looping, the number of `laps` is +## limited (see `exexPoolModeMax`, below.) ## -## The argument `last` is set `true` if the last entry is reached. +## The argument `last` is set `true` if the last entry of the current loop +## has been reached. ## ## Note: ## + This function does *not* runs in `async` mode. @@ -91,7 +94,7 @@ import std/hashes, chronos, - eth/[common, p2p, p2p/peer_pool, p2p/private/p2p_types], + eth/[p2p, p2p/peer_pool, p2p/private/p2p_types], stew/keyed_queue, "."/[handlers, sync_desc] @@ -134,6 +137,9 @@ const execLoopPollingTime = 50.milliseconds ## Single asynchroneous time interval wait state for event polling + execPoolModeLoopMax = 100 + ## Avoids continuous looping + # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ @@ -201,11 +207,25 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} = if worker.ctrl.stopped: dsc.monitorLock = false break taskExecLoop - var count = dsc.buddies.len - for w in dsc.buddies.nextValues: - count.dec - if worker.runPool(count == 0): - break # `true` => stop + + var count = 0 + while count < execPoolModeLoopMax: + ctx.poolMode = false + # Pool mode: stop this round if returned `true`, + # last invocation this round with `true` argument + var delayed = BuddyRef[S,W](nil) + for w in dsc.buddies.nextValues: + # Execute previous (aka delayed) item (unless first) + if delayed.isNil or not delayed.runPool(last=false, laps=count): + delayed = w.worker + else: + delayed = nil # not executing any final item + break # `true` => stop + if not delayed.isNil: + discard delayed.runPool(last=true, laps=count) # final item + if not ctx.poolMode: + break + count.inc dsc.monitorLock = false else: diff --git a/nimbus/sync/types.nim b/nimbus/sync/types.nim index a47472ed9..fa6cbb463 100644 --- a/nimbus/sync/types.nim +++ b/nimbus/sync/types.nim @@ -8,13 +8,13 @@ # at your option. This file may not be copied, modified, or # distributed except according to those terms. +{.push raises: [].} + import std/[math, hashes], eth/common/eth_types_rlp, stew/byteutils -{.push raises: [].} - type BlockHash* = distinct Hash256 ## Hash of a block, goes with `BlockNumber`. @@ -106,6 +106,13 @@ func `$`*(hashOrNum: HashOrNum): string = if hashOrNum.isHash: $hashOrNum.hash else: $hashOrNum.number +func toStr*(n: BlockNumber): string = + ## Pretty print block number, explicitely format with a leading hash `#` + if n == high(BlockNumber): "high" else:"#" & $n + +func toStr*(n: Option[BlockNumber]): string = + if n.isNone: "n/a" else: n.get.toStr + # ------------------------------------------------------------------------------ # Public debug printing helpers # ------------------------------------------------------------------------------ diff --git a/tests/test_sync_snap.nim b/tests/test_sync_snap.nim index 0bf0404e7..e69b1acb4 100644 --- a/tests/test_sync_snap.nim +++ b/tests/test_sync_snap.nim @@ -202,9 +202,12 @@ proc miscRunner(noisy = true) = test "RLP proofs list sizes": test_calcProofsListSizes() - test "RLP en/decode GetTrieNodes argument list": + test "RLP en/decode GetTrieNodes arguments list": test_calcTrieNodeTranscode() + test "RLP en/decode BockBody arguments list": + test_calcBlockBodyTranscode() + proc accountsRunner(noisy = true; persistent = true; sample = accSample) = let diff --git a/tests/test_sync_snap/test_calc.nim b/tests/test_sync_snap/test_calc.nim index 65daa8810..c882dcdbc 100644 --- a/tests/test_sync_snap/test_calc.nim +++ b/tests/test_sync_snap/test_calc.nim @@ -33,6 +33,16 @@ var # Private helpers for `test_calcAccountsListSizes()` # ------------------------------------------------------------------------------ +# Kludge, should be fixed in `eth/common/eth_types_rlp` +proc append(w: var RlpWriter, b: BlockBody) = + ## helper for ` test_calcBlockBodyTranscode()` + w.ethAppend b + +proc `==`(a,b: ChainId): bool {.borrow.} + ## helper for ` test_calcBlockBodyTranscode()` + +# ------------------ + proc randAccSize(r: var Rand): int = ## Print random account size accObjRlpMin + r.rand(accBlobs.len - 1) @@ -123,8 +133,8 @@ proc test_calcProofsListSizes*() = check nodeBlobsHex == brNodesHex -proc test_calcTrieNodeTranscode*() = - ## RLP encode/decode of `SnapTriePaths` objects +proc test_calcTrieNodeTranscode*() = + ## RLP encode/decode a list of `SnapTriePaths` objects let raw = @[ # Accounts @@ -174,6 +184,30 @@ proc test_calcTrieNodeTranscode*() = check raw == rlp.decode(cooked, seq[SnapTriePaths]) check cured == rlp.decode(cooked, seq[seq[Blob]]) + +proc test_calcBlockBodyTranscode*() = + ## RLP encode/decode a list of `BlockBody` objects. Note that tere is/was a + ## problem in `eth/common/eth_types_rlp.append()` for `BlockBody` encoding. + ## This has been fixed temporarily via `protocol/eth/eth_types.ethAppend()`. + let blkSeq = @[ + BlockBody( + transactions: @[ + Transaction(nonce: 1)]), + BlockBody( + uncles: @[ + BlockHeader(nonce: [0x20u8,0,0,0,0,0,0,0])]), + BlockBody(), + BlockBody( + transactions: @[ + Transaction(nonce: 3), + Transaction(nonce: 4)])] + + let trBlkSeq = blkSeq.encode.decode(typeof blkSeq) + + check trBlkSeq.len == blkSeq.len + for n in 0 ..< min(trBlkSeq.len, trBlkSeq.len): + check (n, trBlkSeq[n]) == (n, blkSeq[n]) + # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------