From d53eacb85450de515f065b1bef861f1e90638f16 Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Sat, 8 Oct 2022 18:20:50 +0100 Subject: [PATCH] Prep for full sync after snap (#1253) * Split fetch accounts into sub-modules details: There will be separated modules for accounts snapshot, storage snapshot, and healing for either. * Allow to rebase pivot before negotiated header why: Peers seem to have not too many snapshots available. By setting back the pivot block header slightly, the chances might be higher to find more peers to serve this pivot. Experiment on mainnet showed that setting back too much (tested with 1024), the chances to find matching snapshot peers seem to decrease. * Add accounts healing * Update variable/field naming in `worker_desc` for readability * Handle leaf nodes in accounts healing why: There is no need to fetch accounts when they had been added by the healing process. On the flip side, these accounts must be checked for storage data and the batch queue updated, accordingly. * Reorganising accounts hash ranges batch queue why: The aim is to formally cover as many accounts as possible for different pivot state root environments. Formerly, this was tried by starting the accounts batch queue at a random value for each pivot (and wrapping around.) Now, each pivot environment starts with an interval set mutually disjunct from any interval set retrieved with other pivot state roots. also: Stop fishing for more pivots in `worker` if 100% download is reached * Reorganise/update accounts healing why: Error handling was wrong and the (math. complexity of) whole process could be better managed. details: Much of the algorithm is now documented at the top of the file `heal_accounts.nim` --- newBlockchainTests.md | 54 +- newGeneralStateTests.md | 27 +- nimbus/sync/full/worker.nim | 4 +- nimbus/sync/misc/best_pivot.nim | 9 +- nimbus/sync/misc/block_queue.nim | 28 +- nimbus/sync/misc/snap_pivot.nim | 38 +- nimbus/sync/snap/range_desc.nim | 36 +- nimbus/sync/snap/worker.nim | 389 ++++++------ nimbus/sync/snap/worker/com/com_error.nim | 102 ++++ .../snap/worker/com/get_account_range.nim | 2 +- .../sync/snap/worker/com/get_block_header.nim | 74 +++ .../sync/snap/worker/com/get_byte_codes.nim | 2 +- nimbus/sync/snap/worker/com/get_error.nim | 34 -- .../snap/worker/com/get_storage_ranges.nim | 2 +- .../sync/snap/worker/com/get_trie_nodes.nim | 5 +- nimbus/sync/snap/worker/db/hexary_desc.nim | 15 +- nimbus/sync/snap/worker/db/hexary_error.nim | 1 + nimbus/sync/snap/worker/db/hexary_inspect.nim | 110 +++- nimbus/sync/snap/worker/db/hexary_paths.nim | 12 + .../{accounts_db.nim => db/snap_db.nim} | 232 ++++--- nimbus/sync/snap/worker/fetch_accounts.nim | 566 ------------------ nimbus/sync/snap/worker/heal_accounts.nim | 380 ++++++++++++ nimbus/sync/snap/worker/store_accounts.nim | 186 ++++++ nimbus/sync/snap/worker/store_storages.nim | 177 ++++++ nimbus/sync/snap/worker/ticker.nim | 8 +- nimbus/sync/snap/worker_desc.nim | 151 ++--- tests/test_sync_snap.nim | 60 +- 27 files changed, 1588 insertions(+), 1116 deletions(-) create mode 100644 nimbus/sync/snap/worker/com/com_error.nim create mode 100644 nimbus/sync/snap/worker/com/get_block_header.nim delete mode 100644 nimbus/sync/snap/worker/com/get_error.nim rename nimbus/sync/snap/worker/{accounts_db.nim => db/snap_db.nim} (76%) delete mode 100644 nimbus/sync/snap/worker/fetch_accounts.nim create mode 100644 nimbus/sync/snap/worker/heal_accounts.nim create mode 100644 nimbus/sync/snap/worker/store_accounts.nim create mode 100644 nimbus/sync/snap/worker/store_storages.nim diff --git a/newBlockchainTests.md b/newBlockchainTests.md index 4214f5e56..da49b9e91 100644 --- a/newBlockchainTests.md +++ b/newBlockchainTests.md @@ -1,12 +1,5 @@ newBlockchainTests === -## bcArrowGlacierToMerge -```diff -+ difficultyFormula.json OK -+ powToPosBlockRejection.json OK -+ powToPosTest.json OK -``` -OK: 3/3 Fail: 0/3 Skip: 0/3 ## bcBerlinToLondon ```diff + BerlinToLondonTransition.json OK @@ -36,7 +29,6 @@ OK: 1/1 Fail: 0/1 Skip: 0/1 + baseFee.json OK + besuBaseFeeBug.json OK + burnVerify.json OK -+ burnVerifyLondon.json OK + checkGasLimit.json OK + feeCap.json OK + gasLimit20m.json OK @@ -48,29 +40,21 @@ OK: 1/1 Fail: 0/1 Skip: 0/1 + lowDemand.json OK + medDemand.json OK + tips.json OK -+ tipsLondon.json OK + transFail.json OK + transType.json OK + valCausesOOF.json OK ``` -OK: 21/21 Fail: 0/21 Skip: 0/21 +OK: 19/19 Fail: 0/19 Skip: 0/19 ## bcEIP158ToByzantium ```diff + ByzantiumTransition.json OK ``` OK: 1/1 Fail: 0/1 Skip: 0/1 -## bcEIP3675 -```diff -+ timestampPerBlock.json OK -+ tipInsideBlock.json OK -``` -OK: 2/2 Fail: 0/2 Skip: 0/2 ## bcExample ```diff + basefeeExample.json OK -+ mergeExample.json OK ``` -OK: 2/2 Fail: 0/2 Skip: 0/2 +OK: 1/1 Fail: 0/1 Skip: 0/1 ## bcExploitTest ```diff DelegateCallSpam.json Skip @@ -282,7 +266,6 @@ OK: 96/96 Fail: 0/96 Skip: 0/96 + RefundOverflow.json OK + RefundOverflow2.json OK + SuicidesMixingCoinbase.json OK -+ SuicidesMixingCoinbase2.json OK + TransactionFromCoinbaseHittingBlockGasLimit1.json OK + TransactionFromCoinbaseNotEnoughFounds.json OK + TransactionNonceCheck.json OK @@ -316,7 +299,6 @@ OK: 96/96 Fail: 0/96 Skip: 0/96 + extcodehashEmptySuicide.json OK + logRevert.json OK + multimpleBalanceInstruction.json OK -+ random.json OK + randomStatetest123.json OK + randomStatetest136.json OK + randomStatetest160.json OK @@ -362,7 +344,7 @@ OK: 96/96 Fail: 0/96 Skip: 0/96 + transactionFromSelfDestructedContract.json OK + txCost-sec73.json OK ``` -OK: 89/90 Fail: 0/90 Skip: 1/90 +OK: 87/88 Fail: 0/88 Skip: 1/88 ## bcTotalDifficultyTest ```diff + lotsOfBranchesOverrideAtTheEnd.json OK @@ -423,7 +405,6 @@ OK: 9/9 Fail: 0/9 Skip: 0/9 ## bcUncleTest ```diff + EqualUncleInTwoDifferentBlocks.json OK -+ EqualUncleInTwoDifferentBlocks2.json OK + InChainUncle.json OK + InChainUncleFather.json OK + InChainUncleGrandPa.json OK @@ -446,7 +427,7 @@ OK: 9/9 Fail: 0/9 Skip: 0/9 + uncleHeaderWithGeneration0.json OK + uncleWithSameBlockNumber.json OK ``` -OK: 23/23 Fail: 0/23 Skip: 0/23 +OK: 22/22 Fail: 0/22 Skip: 0/22 ## bcValidBlockTest ```diff + ExtraData32.json OK @@ -749,9 +730,8 @@ OK: 5/5 Fail: 0/5 Skip: 0/5 + callcodecallcodecallcode_111_SuicideEnd.json OK + callcodecallcodecallcode_111_SuicideMiddle.json OK callcodecallcodecallcode_ABCB_RECURSIVE.json Skip -+ touchAndGo.json OK ``` -OK: 73/80 Fail: 0/80 Skip: 7/80 +OK: 72/79 Fail: 0/79 Skip: 7/79 ## stCallCreateCallCodeTest ```diff Call1024BalanceTooLow.json Skip @@ -1026,7 +1006,6 @@ OK: 51/52 Fail: 0/52 Skip: 1/52 + CREATE_HighNonceMinus1.json OK + CREATE_empty000CreateinInitCode_Transaction.json OK + CodeInConstructor.json OK -+ CreateAddressWarmAfterFail.json OK + CreateCollisionResults.json OK + CreateCollisionToEmpty.json OK + CreateOOGFromCallRefunds.json OK @@ -1040,14 +1019,12 @@ OK: 51/52 Fail: 0/52 Skip: 1/52 + CreateOOGafterInitCodeRevert2.json OK + CreateOOGafterMaxCodesize.json OK + CreateResults.json OK -+ CreateTransactionHighNonce.json OK + TransactionCollisionToEmpty.json OK + TransactionCollisionToEmptyButCode.json OK + TransactionCollisionToEmptyButNonce.json OK + createFailResult.json OK -+ createLargeResult.json OK ``` -OK: 44/44 Fail: 0/44 Skip: 0/44 +OK: 41/41 Fail: 0/41 Skip: 0/41 ## stDelegatecallTestHomestead ```diff Call1024BalanceTooLow.json Skip @@ -1151,13 +1128,12 @@ OK: 40/40 Fail: 0/40 Skip: 0/40 + lowGasPriceOldTypes.json OK + outOfFunds.json OK + outOfFundsOldTypes.json OK -+ senderBalance.json OK + tipTooHigh.json OK + transactionIntinsicBug.json OK + typeTwoBerlin.json OK + valCausesOOF.json OK ``` -OK: 13/13 Fail: 0/13 Skip: 0/13 +OK: 12/12 Fail: 0/12 Skip: 0/12 ## stEIP158Specific ```diff + CALL_OneVCallSuicide.json OK @@ -1199,12 +1175,11 @@ OK: 5/5 Fail: 0/5 Skip: 0/5 + indexesOmitExample.json OK + invalidTr.json OK + labelsExample.json OK -+ mergeTest.json OK + rangesExample.json OK + solidityExample.json OK + yulExample.json OK ``` -OK: 12/12 Fail: 0/12 Skip: 0/12 +OK: 11/11 Fail: 0/11 Skip: 0/11 ## stExtCodeHash ```diff + callToNonExistent.json OK @@ -1482,7 +1457,6 @@ OK: 24/24 Fail: 0/24 Skip: 0/24 ## stPreCompiledContracts ```diff + blake2B.json OK -+ delegatecall09Undefined.json OK + idPrecomps.json OK + identity_to_bigger.json OK + identity_to_smaller.json OK @@ -1491,7 +1465,7 @@ OK: 24/24 Fail: 0/24 Skip: 0/24 + precompsEIP2929.json OK + sec80.json OK ``` -OK: 9/9 Fail: 0/9 Skip: 0/9 +OK: 8/8 Fail: 0/8 Skip: 0/8 ## stPreCompiledContracts2 ```diff + CALLBlake2f.json OK @@ -1559,7 +1533,6 @@ OK: 9/9 Fail: 0/9 Skip: 0/9 + CallEcrecoverS_prefixed0.json OK + CallEcrecoverUnrecoverableKey.json OK + CallEcrecoverV_prefixed0.json OK -+ CallEcrecover_Overflow.json OK + CallIdentitiy_0.json OK + CallIdentitiy_1.json OK + CallIdentity_1_nonzeroValue.json OK @@ -1589,15 +1562,13 @@ OK: 9/9 Fail: 0/9 Skip: 0/9 + CallSha256_4.json OK + CallSha256_4_gas99.json OK + CallSha256_5.json OK -+ ecrecoverShortBuff.json OK -+ ecrecoverWeirdV.json OK + modexpRandomInput.json OK + modexp_0_0_0_20500.json OK + modexp_0_0_0_22000.json OK + modexp_0_0_0_25000.json OK + modexp_0_0_0_35000.json OK ``` -OK: 102/102 Fail: 0/102 Skip: 0/102 +OK: 99/99 Fail: 0/99 Skip: 0/99 ## stQuadraticComplexityTest ```diff Call1MB1024Calldepth.json Skip @@ -2801,7 +2772,6 @@ OK: 13/13 Fail: 0/13 Skip: 0/13 + currentAccountBalance.json OK + doubleSelfdestructTest.json OK + doubleSelfdestructTest2.json OK -+ doubleSelfdestructTouch.json OK + extcodecopy.json OK + return0.json OK + return1.json OK @@ -2816,7 +2786,7 @@ OK: 13/13 Fail: 0/13 Skip: 0/13 + suicideSendEtherToMe.json OK + testRandomTest.json OK ``` -OK: 57/67 Fail: 0/67 Skip: 10/67 +OK: 56/66 Fail: 0/66 Skip: 10/66 ## stTimeConsuming ```diff CALLBlake2f_MaxRounds.json Skip @@ -3336,4 +3306,4 @@ OK: 0/3 Fail: 0/3 Skip: 3/3 OK: 11/11 Fail: 0/11 Skip: 0/11 ---TOTAL--- -OK: 2881/2986 Fail: 0/2986 Skip: 105/2986 +OK: 2859/2964 Fail: 0/2964 Skip: 105/2964 diff --git a/newGeneralStateTests.md b/newGeneralStateTests.md index ed4e7bec6..8be5311b7 100644 --- a/newGeneralStateTests.md +++ b/newGeneralStateTests.md @@ -271,9 +271,8 @@ OK: 5/5 Fail: 0/5 Skip: 0/5 + callcodecallcodecallcode_111_SuicideEnd.json OK + callcodecallcodecallcode_111_SuicideMiddle.json OK callcodecallcodecallcode_ABCB_RECURSIVE.json Skip -+ touchAndGo.json OK ``` -OK: 73/80 Fail: 0/80 Skip: 7/80 +OK: 72/79 Fail: 0/79 Skip: 7/79 ## stCallCreateCallCodeTest ```diff Call1024BalanceTooLow.json Skip @@ -548,7 +547,6 @@ OK: 51/52 Fail: 0/52 Skip: 1/52 + CREATE_HighNonceMinus1.json OK + CREATE_empty000CreateinInitCode_Transaction.json OK + CodeInConstructor.json OK -+ CreateAddressWarmAfterFail.json OK + CreateCollisionResults.json OK + CreateCollisionToEmpty.json OK + CreateOOGFromCallRefunds.json OK @@ -562,14 +560,12 @@ OK: 51/52 Fail: 0/52 Skip: 1/52 + CreateOOGafterInitCodeRevert2.json OK + CreateOOGafterMaxCodesize.json OK + CreateResults.json OK -+ CreateTransactionHighNonce.json OK + TransactionCollisionToEmpty.json OK + TransactionCollisionToEmptyButCode.json OK + TransactionCollisionToEmptyButNonce.json OK + createFailResult.json OK -+ createLargeResult.json OK ``` -OK: 44/44 Fail: 0/44 Skip: 0/44 +OK: 41/41 Fail: 0/41 Skip: 0/41 ## stDelegatecallTestHomestead ```diff Call1024BalanceTooLow.json Skip @@ -673,13 +669,12 @@ OK: 40/40 Fail: 0/40 Skip: 0/40 + lowGasPriceOldTypes.json OK + outOfFunds.json OK + outOfFundsOldTypes.json OK -+ senderBalance.json OK + tipTooHigh.json OK + transactionIntinsicBug.json OK + typeTwoBerlin.json OK + valCausesOOF.json OK ``` -OK: 13/13 Fail: 0/13 Skip: 0/13 +OK: 12/12 Fail: 0/12 Skip: 0/12 ## stEIP158Specific ```diff + CALL_OneVCallSuicide.json OK @@ -721,12 +716,11 @@ OK: 5/5 Fail: 0/5 Skip: 0/5 + indexesOmitExample.json OK + invalidTr.json OK + labelsExample.json OK -+ mergeTest.json OK + rangesExample.json OK + solidityExample.json OK + yulExample.json OK ``` -OK: 12/12 Fail: 0/12 Skip: 0/12 +OK: 11/11 Fail: 0/11 Skip: 0/11 ## stExtCodeHash ```diff + callToNonExistent.json OK @@ -1004,7 +998,6 @@ OK: 24/24 Fail: 0/24 Skip: 0/24 ## stPreCompiledContracts ```diff + blake2B.json OK -+ delegatecall09Undefined.json OK + idPrecomps.json OK + identity_to_bigger.json OK + identity_to_smaller.json OK @@ -1013,7 +1006,7 @@ OK: 24/24 Fail: 0/24 Skip: 0/24 + precompsEIP2929.json OK + sec80.json OK ``` -OK: 9/9 Fail: 0/9 Skip: 0/9 +OK: 8/8 Fail: 0/8 Skip: 0/8 ## stPreCompiledContracts2 ```diff + CALLBlake2f.json OK @@ -1081,7 +1074,6 @@ OK: 9/9 Fail: 0/9 Skip: 0/9 + CallEcrecoverS_prefixed0.json OK + CallEcrecoverUnrecoverableKey.json OK + CallEcrecoverV_prefixed0.json OK -+ CallEcrecover_Overflow.json OK + CallIdentitiy_0.json OK + CallIdentitiy_1.json OK + CallIdentity_1_nonzeroValue.json OK @@ -1111,15 +1103,13 @@ OK: 9/9 Fail: 0/9 Skip: 0/9 + CallSha256_4.json OK + CallSha256_4_gas99.json OK + CallSha256_5.json OK -+ ecrecoverShortBuff.json OK -+ ecrecoverWeirdV.json OK + modexpRandomInput.json OK + modexp_0_0_0_20500.json OK + modexp_0_0_0_22000.json OK + modexp_0_0_0_25000.json OK + modexp_0_0_0_35000.json OK ``` -OK: 102/102 Fail: 0/102 Skip: 0/102 +OK: 99/99 Fail: 0/99 Skip: 0/99 ## stQuadraticComplexityTest ```diff Call1MB1024Calldepth.json Skip @@ -2323,7 +2313,6 @@ OK: 13/13 Fail: 0/13 Skip: 0/13 + currentAccountBalance.json OK + doubleSelfdestructTest.json OK + doubleSelfdestructTest2.json OK -+ doubleSelfdestructTouch.json OK + extcodecopy.json OK + return0.json OK + return1.json OK @@ -2338,7 +2327,7 @@ OK: 13/13 Fail: 0/13 Skip: 0/13 + suicideSendEtherToMe.json OK + testRandomTest.json OK ``` -OK: 57/67 Fail: 0/67 Skip: 10/67 +OK: 56/66 Fail: 0/66 Skip: 10/66 ## stTimeConsuming ```diff CALLBlake2f_MaxRounds.json Skip @@ -2858,4 +2847,4 @@ OK: 1/3 Fail: 0/3 Skip: 2/3 OK: 11/11 Fail: 0/11 Skip: 0/11 ---TOTAL--- -OK: 2506/2608 Fail: 0/2608 Skip: 102/2608 +OK: 2495/2597 Fail: 0/2597 Skip: 102/2597 diff --git a/nimbus/sync/full/worker.nim b/nimbus/sync/full/worker.nim index b6b1afec9..607465e2d 100644 --- a/nimbus/sync/full/worker.nim +++ b/nimbus/sync/full/worker.nim @@ -240,11 +240,11 @@ proc runSingle*(buddy: FullBuddyRef) {.async.} = buddy.ctrl.zombie = true # Initialise/re-initialise this worker - elif await buddy.data.pivot.bestPivotNegotiate(buddy.data.bestNumber): + elif await buddy.data.pivot.pivotNegotiate(buddy.data.bestNumber): buddy.ctrl.multiOk = true # Update/activate `bestNumber` for local use buddy.data.bestNumber = - some(buddy.data.pivot.bestPivotHeader.value.blockNumber) + some(buddy.data.pivot.pivotHeader.value.blockNumber) elif not buddy.ctrl.stopped: await sleepAsync(2.seconds) diff --git a/nimbus/sync/misc/best_pivot.nim b/nimbus/sync/misc/best_pivot.nim index 622355f7f..7421a260d 100644 --- a/nimbus/sync/misc/best_pivot.nim +++ b/nimbus/sync/misc/best_pivot.nim @@ -242,7 +242,7 @@ proc clear*(bp: BestPivotWorkerRef) = # Public functions # ------------------------------------------------------------------------------ -proc bestPivotHeader*(bp: BestPivotWorkerRef): Result[BlockHeader,void] = +proc pivotHeader*(bp: BestPivotWorkerRef): Result[BlockHeader,void] = ## Returns cached block header if available and the buddy `peer` is trusted. if bp.header.isSome and bp.peer notin bp.global.untrusted and @@ -251,10 +251,11 @@ proc bestPivotHeader*(bp: BestPivotWorkerRef): Result[BlockHeader,void] = return ok(bp.header.unsafeGet) err() -proc bestPivotNegotiate*( - bp: BestPivotWorkerRef; ## Worker peer +proc pivotNegotiate*( + bp: BestPivotWorkerRef; ## Worker peer minBlockNumber: Option[BlockNumber]; ## Minimum block number to expect - ): Future[bool] {.async.} = + ): Future[bool] + {.async.} = ## Negotiate best header pivot. This function must be run in *single mode* at ## the beginning of a running worker peer. If the function returns `true`, ## the current `buddy` can be used for syncing and the function diff --git a/nimbus/sync/misc/block_queue.nim b/nimbus/sync/misc/block_queue.nim index d072dbdcf..12092faa1 100644 --- a/nimbus/sync/misc/block_queue.nim +++ b/nimbus/sync/misc/block_queue.nim @@ -13,21 +13,21 @@ ## ## Worker items state diagram and sketch of sync algorithm: ## :: -## set of unprocessed | peer workers | list of work items ready -## block ranges | | for persistent database -## ================================================================== +## unprocessed | | ready for | store into +## block ranges | peer workers | persistent database | database +## ======================================================================= ## -## +---------------------------------------------+ -## | | -## | +---------------------------------+ | -## | | | | -## V v | | -## ---+-----> -----+---> ---> OUTPUT -## | | -## +-----> -----+ -## | | -## +-----> -----+ -## : : +## +------------------------------------------+ +## | | +## | +----------------------------+ | +## | | | | +## V v | | +## ---+---> ---+-----> -------> OUTPUT +## | | +## +---> ---+ +## | | +## +---> ---+ +## : : ## ## A work item is created from a range of block numbers extracted from the ## `` set of block ranges. diff --git a/nimbus/sync/misc/snap_pivot.nim b/nimbus/sync/misc/snap_pivot.nim index 3f92dd931..8ba2b2175 100644 --- a/nimbus/sync/misc/snap_pivot.nim +++ b/nimbus/sync/misc/snap_pivot.nim @@ -122,16 +122,16 @@ type ## Statistics counters for events associated with this peer. ## These may be used to recognise errors and select good peers. ok: tuple[ - reorgDetected: BuddyStat, - getBlockHeaders: BuddyStat, - getNodeData: BuddyStat] + reorgDetected: uint, + getBlockHeaders: uint, + getNodeData: uint] minor: tuple[ - timeoutBlockHeaders: BuddyStat, - unexpectedBlockHash: BuddyStat] + timeoutBlockHeaders: uint, + unexpectedBlockHash: uint] major: tuple[ - networkErrors: BuddyStat, - excessBlockHeaders: BuddyStat, - wrongBlockHeader: BuddyStat] + networkErrors: uint, + excessBlockHeaders: uint, + wrongBlockHeader: uint] SnapPivotCtxRef* = ref object of RootRef stats*: SnapWorkerStats ## Statistics counters @@ -550,14 +550,19 @@ proc init*( # Public functions # ------------------------------------------------------------------------------ -proc snapPivotHeader*(sp: SnapPivotWorkerRef): Result[BlockHeader,void] = +proc pivotHeader*(sp: SnapPivotWorkerRef): Result[BlockHeader,void] = ## Returns cached block header if available if sp.header.isSome: - ok(sp.header.unsafeGet) - else: - err() + let header = sp.header.unsafeGet + if header.blockNumber != 0: + return ok(header) + err() -proc snapPivotNegotiate*(sp: SnapPivotWorkerRef) {.async.} = +proc pivotNegotiate*( + sp: SnapPivotWorkerRef; + ign: Option[BlockNumber]; ## Minimum block number to expect,ignored for now + ): Future[bool] + {.async.} = ## Query a peer to update our knowledge of its canonical chain and its best ## block, which is its canonical chain head. This can be called at any time ## after a peer has negotiated the connection. @@ -588,13 +593,13 @@ proc snapPivotNegotiate*(sp: SnapPivotWorkerRef) {.async.} = inc sp.global.stats.major.networkErrors # Just try another peer sp.ctrl.zombie = true - return + return false if reply.isNone: trace trEthRecvTimeoutWaiting & "for GetBlockHeaders reply", peer # TODO: Should disconnect? inc sp.global.stats.minor.timeoutBlockHeaders - return + return false let nHeaders = reply.get.headers.len if nHeaders == 0: @@ -611,7 +616,7 @@ proc snapPivotNegotiate*(sp: SnapPivotWorkerRef) {.async.} = peer, got=nHeaders, requested=request.maxResults # TODO: Should disconnect. inc sp.global.stats.major.excessBlockHeaders - return + return false if 0 < nHeaders: # TODO: Check this is not copying the `headers`. @@ -620,6 +625,7 @@ proc snapPivotNegotiate*(sp: SnapPivotWorkerRef) {.async.} = sp.peerSyncChainEmptyReply(request) trace "Done pivotExec()", peer + return sp.header.isSome # ------------------------------------------------------------------------------ # Debugging diff --git a/nimbus/sync/snap/range_desc.nim b/nimbus/sync/snap/range_desc.nim index 295ebe7e0..3df2ec432 100644 --- a/nimbus/sync/snap/range_desc.nim +++ b/nimbus/sync/snap/range_desc.nim @@ -1,5 +1,4 @@ -# Nimbus - Types, data structures and shared utilities used in network sync -# +# Nimbus # Copyright (c) 2018-2021 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or @@ -162,25 +161,46 @@ proc digestTo*(data: Blob; T: type NodeTag): T = ## Hash the `data` argument keccakHash(data).to(T) -proc freeFactor*(lrs: LeafRangeSet): float = - ## Free factor, ie. `#items-free / 2^256` to be used in statistics + +proc emptyFactor*(lrs: LeafRangeSet): float = + ## Relative uncovered total, i.e. `#points-not-covered / 2^256` to be used + ## in statistics or triggers. if 0 < lrs.total: ((high(NodeTag) - lrs.total).u256 + 1).to(float) / (2.0^256) elif lrs.chunks == 0: 1.0 # `total` represents the residue class `mod 2^256` from `0`..`(2^256-1)` else: - 0.0 + 0.0 # number of points in `lrs` is `2^256 + 1` + +proc emptyFactor*(lrs: openArray[LeafRangeSet]): float = + ## Variant of `emptyFactor()` where intervals are distributed across several + ## sets. This function makes sense only if the interval sets are mutually + ## disjunct. + var accu: Nodetag + for ivSet in lrs: + if 0 < ivSet.total: + if high(NodeTag) - ivSet.total < accu: + return 0.0 + accu = accu + ivSet.total + elif ivSet.chunks == 0: + discard + else: # number of points in `ivSet` is `2^256 + 1` + return 0.0 + ((high(NodeTag) - accu).u256 + 1).to(float) / (2.0^256) proc fullFactor*(lrs: LeafRangeSet): float = - ## Free factor, ie. `#items-contained / 2^256` to be used in statistics + ## Relative covered total, i.e. `#points-covered / 2^256` to be used + ## in statistics or triggers if 0 < lrs.total: lrs.total.u256.to(float) / (2.0^256) elif lrs.chunks == 0: - 0.0 + 0.0 # `total` represents the residue class `mod 2^256` from `0`..`(2^256-1)` else: - 1.0 # `total` represents the residue class `mod 2^256` from `0`..`(2^256-1)` + 1.0 # number of points in `lrs` is `2^256 + 1` + # Printing & pretty printing + proc `$`*(nt: NodeTag): string = if nt == high(NodeTag): "high(NodeTag)" diff --git a/nimbus/sync/snap/worker.nim b/nimbus/sync/snap/worker.nim index 66ae16ad0..d55c6eacb 100644 --- a/nimbus/sync/snap/worker.nim +++ b/nimbus/sync/snap/worker.nim @@ -1,5 +1,4 @@ # Nimbus -# # Copyright (c) 2021 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or @@ -17,30 +16,41 @@ import stew/[interval_set, keyed_queue], ../../db/select_backend, ".."/[protocol, sync_desc], - ./worker/[accounts_db, fetch_accounts, ticker], + ./worker/[heal_accounts, store_accounts, store_storages, ticker], + ./worker/com/[com_error, get_block_header], + ./worker/db/snap_db, "."/[range_desc, worker_desc] const usePivot2ok = false or true when usePivot2ok: - import ../misc/best_pivot + import + ../misc/best_pivot + type + PivotCtxRef = BestPivotCtxRef + PivotWorkerRef = BestPivotWorkerRef else: - import ../misc/snap_pivot, ../../p2p/chain/chain_desc + import + ../../p2p/chain/chain_desc, + ../misc/snap_pivot + type + PivotCtxRef = SnapPivotCtxRef + PivotWorkerRef = SnapPivotWorkerRef {.push raises: [Defect].} logScope: topics = "snap-sync" +const + extraTraceMessages = false or true + ## Enabled additional logging noise + # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ -proc hash(h: Hash256): Hash = - ## Mixin for `Table` or `keyedQueue` - h.data.hash - proc meanStdDev(sum, sqSum: float; length: int): (float,float) = if 0 < length: result[0] = sum / length.float @@ -57,29 +67,24 @@ template noExceptionOops(info: static[string]; code: untyped) = raiseAssert "Ooops " & info & ": name=" & $e.name & " msg=" & e.msg # ------------------------------------------------------------------------------ -# Private helpers: integration of pivot negotiator +# Private helpers: integration of pivot finder # ------------------------------------------------------------------------------ -when usePivot2ok: - type - PivotCtxRef = BestPivotCtxRef - PivotWorkerRef = BestPivotWorkerRef -else: - type - PivotCtxRef = SnapPivotCtxRef - PivotWorkerRef = SnapPivotWorkerRef - proc pivot(ctx: SnapCtxRef): PivotCtxRef = - ctx.data.pivotData.PivotCtxRef + # Getter + ctx.data.pivotFinderCtx.PivotCtxRef proc `pivot=`(ctx: SnapCtxRef; val: PivotCtxRef) = - ctx.data.pivotData = val + # Setter + ctx.data.pivotFinderCtx = val proc pivot(buddy: SnapBuddyRef): PivotWorkerRef = - buddy.data.workerPivot.PivotWorkerRef + # Getter + buddy.data.pivotFinder.PivotWorkerRef proc `pivot=`(buddy: SnapBuddyRef; val: PivotWorkerRef) = - buddy.data.workerPivot = val + # Setter + buddy.data.pivotFinder = val # -------------------- @@ -92,104 +97,139 @@ proc pivotSetup(ctx: SnapCtxRef) = proc pivotRelease(ctx: SnapCtxRef) = ctx.pivot = nil - proc pivotStart(buddy: SnapBuddyRef) = buddy.pivot = PivotWorkerRef.init(buddy.ctx.pivot, buddy.ctrl, buddy.peer) proc pivotStop(buddy: SnapBuddyRef) = buddy.pivot.clear() -# -------------------- - -proc pivotHeader(buddy: SnapBuddyRef): Result[BlockHeader,void] = - when usePivot2ok: - buddy.pivot.bestPivotHeader() - else: - buddy.pivot.snapPivotHeader() - -when usePivot2ok: - proc envPivotNumber(buddy: SnapBuddyRef): Option[BlockNumber] = - let env = buddy.ctx.data.pivotEnv - if env.isNil: - none(BlockNumber) - else: - some(env.stateHeader.blockNumber) - - template pivotNegotiate(buddy: SnapBuddyRef; n: Option[BlockNumber]): auto = - buddy.pivot.bestPivotNegotiate(n) -else: - template pivotNegotiate(buddy: SnapBuddyRef): auto = - buddy.pivot.snapPivotNegotiate() - # ------------------------------------------------------------------------------ # Private functions # ------------------------------------------------------------------------------ -proc rndNodeTag(buddy: SnapBuddyRef): NodeTag = - ## Create random node tag - var data: array[32,byte] - buddy.ctx.data.rng[].generate(data) - UInt256.fromBytesBE(data).NodeTag +proc init(T: type SnapAccountRanges; ctx: SnapCtxRef): T = + ## Return a pair of account hash range lists with the whole range of + ## smartly spread `[low(NodeTag),high(NodeTag)]` across the mutually + ## disjunct interval sets. + result = [LeafRangeSet.init(),LeafRangeSet.init()] + + # Initialise accounts range fetch batch, the pair of `fetchAccounts[]` + # range sets. + if ctx.data.coveredAccounts.total == 0 and + ctx.data.coveredAccounts.chunks == 1: + # 100% of accounts covered by range fetch batches for the total + # of pivot environments. Do a random split distributing the range + # `[low(NodeTag),high(NodeTag)]` across the pair of range sats. + var nodeKey: NodeKey + ctx.data.rng[].generate(nodeKey.ByteArray32) + + let partition = nodeKey.to(NodeTag) + discard result[0].merge(partition, high(NodeTag)) + if low(NodeTag) < partition: + discard result[1].merge(low(NodeTag), partition - 1.u256) + else: + # Not all account hashes are covered, yet. So keep the uncovered + # account hashes in the first range set, and the other account hashes + # in the second range set. + + # Pre-filled with the first range set with largest possible interval + discard result[0].merge(low(NodeTag),high(NodeTag)) + + # Move covered account ranges (aka intervals) to the second set. + for iv in ctx.data.coveredAccounts.increasing: + discard result[0].reduce(iv) + discard result[1].merge(iv) -proc setPivotEnv(buddy: SnapBuddyRef; header: BlockHeader) = - ## Activate environment for state root implied by `header` argument +proc appendPivotEnv(buddy: SnapBuddyRef; header: BlockHeader) = + ## Activate environment for state root implied by `header` argument. This + ## function appends a new environment unless there was any not far enough + ## apart. + ## + ## Note that this function relies on a queue sorted by the block numbers of + ## the pivot header. To maintain the sort order, the function `lruFetch()` + ## must not be called and only records appended with increasing block + ## numbers. let ctx = buddy.ctx - key = header.stateRoot - rc = ctx.data.pivotTable.lruFetch(key) - if rc.isOk: - ctx.data.pivotEnv = rc.value - return + minNumber = block: + let rc = ctx.data.pivotTable.lastValue + if rc.isOk: rc.value.stateHeader.blockNumber + minPivotBlockDistance + else: 1.toBlockNumber - let env = SnapPivotRef( - stateHeader: header, - pivotAccount: buddy.rndNodeTag, - availAccounts: LeafRangeSet.init()) - # Pre-filled with the largest possible interval - discard env.availAccounts.merge(low(NodeTag),high(NodeTag)) + # Check whether the new header follows minimum depth requirement. This is + # where the queue is assumed to have increasing block numbers. + if minNumber <= header.blockNumber: + # Ok, append a new environment + let env = SnapPivotRef( + stateHeader: header, + fetchAccounts: SnapAccountRanges.init(ctx)) - # Statistics - ctx.data.pivotCount.inc + # Append per-state root environment to LRU queue + discard ctx.data.pivotTable.lruAppend(header.stateRoot, env, ctx.buddiesMax) - # Activate per-state root environment (and hold previous one) - ctx.data.prevEnv = ctx.data.pivotEnv - ctx.data.pivotEnv = ctx.data.pivotTable.lruAppend(key, env, ctx.buddiesMax) + # Debugging, will go away + block: + let ivSet = env.fetchAccounts[0].clone + for iv in env.fetchAccounts[1].increasing: + doAssert ivSet.merge(iv) == iv.len + doAssert ivSet.chunks == 1 + doAssert ivSet.total == 0 -proc updatePivotEnv(buddy: SnapBuddyRef): bool = - ## Update global state root environment from local `pivotHeader`. Choose the - ## latest block number. Returns `true` if the environment was changed - let rc = buddy.pivotHeader - if rc.isOk: - let - peer = buddy.peer - ctx = buddy.ctx - env = ctx.data.pivotEnv - pivotHeader = rc.value - newStateNumber = pivotHeader.blockNumber - stateNumber = if env.isNil: 0.toBlockNumber - else: env.stateHeader.blockNumber - stateWindow = stateNumber + maxPivotBlockWindow - - block keepCurrent: - if env.isNil: - break keepCurrent # => new pivot - if stateNumber < newStateNumber: - when switchPivotAfterCoverage < 1.0: - if env.minCoverageReachedOk: - break keepCurrent # => new pivot - if stateWindow < newStateNumber: - break keepCurrent # => new pivot - if newStateNumber <= maxPivotBlockWindow: - break keepCurrent # => new pivot - # keep current - return false - - # set new block - buddy.setPivotEnv(pivotHeader) +proc updatePivotImpl(buddy: SnapBuddyRef): Future[bool] {.async.} = + ## Helper, negotiate pivot unless present + if buddy.pivot.pivotHeader.isOk: return true + let + ctx = buddy.ctx + peer = buddy.peer + env = ctx.data.pivotTable.lastValue.get(otherwise = nil) + nMin = if env.isNil: none(BlockNumber) + else: some(env.stateHeader.blockNumber) + + if await buddy.pivot.pivotNegotiate(nMin): + var header = buddy.pivot.pivotHeader.value + + # Check whether there is no environment change needed + when noPivotEnvChangeIfComplete: + let rc = ctx.data.pivotTable.lastValue + if rc.isOk and rc.value.serialSync: + # No neede to change + if extraTraceMessages: + trace "No need to change snap pivot", peer, + pivot=("#" & $rc.value.stateHeader.blockNumber), + multiOk=buddy.ctrl.multiOk, runState=buddy.ctrl.state + return true + + when 0 < backPivotBlockDistance: + # Backtrack, do not use the very latest pivot header + if backPivotBlockThreshold.toBlockNumber < header.blockNumber: + let + backNum = header.blockNumber - backPivotBlockDistance.toBlockNumber + rc = await buddy.getBlockHeader(backNum) + if rc.isErr: + if rc.error in {ComNoHeaderAvailable, ComTooManyHeaders}: + buddy.ctrl.zombie = true + return false + header = rc.value + + buddy.appendPivotEnv(header) + + trace "Snap pivot initialised", peer, pivot=("#" & $header.blockNumber), + multiOk=buddy.ctrl.multiOk, runState=buddy.ctrl.state + + return true + +# Syntactic sugar +when usePivot2ok: + template updateSinglePivot(buddy: SnapBuddyRef): auto = + buddy.updatePivotImpl() +else: + template updateMultiPivot(buddy: SnapBuddyRef): auto = + buddy.updatePivotImpl() + proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater = result = proc: TickerStats = @@ -206,7 +246,7 @@ proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater = aSqSum += aLen * aLen # Fill utilisation mean & variance - let fill = kvp.data.availAccounts.freeFactor + let fill = kvp.data.fetchAccounts.emptyFactor uSum += fill uSqSum += fill * fill @@ -215,77 +255,35 @@ proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater = sSqSum += sLen * sLen let - tabLen = ctx.data.pivotTable.len - pivotBlock = if ctx.data.pivotEnv.isNil: none(BlockNumber) - else: some(ctx.data.pivotEnv.stateHeader.blockNumber) + env = ctx.data.pivotTable.lastValue.get(otherwise = nil) + pivotBlock = if env.isNil: none(BlockNumber) + else: some(env.stateHeader.blockNumber) accCoverage = ctx.data.coveredAccounts.fullFactor accFill = meanStdDev(uSum, uSqSum, count) - when snapAccountsDumpEnable: - if snapAccountsDumpCoverageStop < accCoverage: - trace " Snap proofs dump stop", - threshold=snapAccountsDumpCoverageStop, coverage=accCoverage.toPC - ctx.data.proofDumpOk = false - TickerStats( pivotBlock: pivotBlock, - activeQueues: tabLen, - flushedQueues: ctx.data.pivotCount.int64 - tabLen, + nQueues: ctx.data.pivotTable.len, nAccounts: meanStdDev(aSum, aSqSum, count), nStorage: meanStdDev(sSum, sSqSum, count), accountsFill: (accFill[0], accFill[1], accCoverage)) - -proc havePivot(buddy: SnapBuddyRef): bool = - ## ... - let rc = buddy.pivotHeader - if rc.isOk and rc.value.blockNumber != 0: - - # So there is a `ctx.data.pivotEnv` - when 1.0 <= switchPivotAfterCoverage: - return true - else: - let - ctx = buddy.ctx - env = ctx.data.pivotEnv - - # Force fetching new pivot if coverage reached by returning `false` - if not env.minCoverageReachedOk: - - # Not sure yet, so check whether coverage has been reached at all - let cov = env.availAccounts.freeFactor - if switchPivotAfterCoverage <= cov: - trace " Snap accounts coverage reached", peer, - threshold=switchPivotAfterCoverage, coverage=cov.toPC - - # Need to reset pivot handlers - buddy.ctx.poolMode = true - buddy.ctx.data.runPoolHook = proc(b: SnapBuddyRef) = - b.ctx.data.pivotEnv.minCoverageReachedOk = true - b.pivotRestart - return true - # ------------------------------------------------------------------------------ # Public start/stop and admin functions # ------------------------------------------------------------------------------ proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool = ## Global set up - ctx.data.accountRangeMax = high(UInt256) div ctx.buddiesMax.u256 ctx.data.coveredAccounts = LeafRangeSet.init() - ctx.data.accountsDb = - if ctx.data.dbBackend.isNil: AccountsDbRef.init(ctx.chain.getTrieDB) - else: AccountsDbRef.init(ctx.data.dbBackend) + ctx.data.snapDb = + if ctx.data.dbBackend.isNil: SnapDbRef.init(ctx.chain.getTrieDB) + else: SnapDbRef.init(ctx.data.dbBackend) ctx.pivotSetup() if tickerOK: ctx.data.ticker = TickerRef.init(ctx.tickerUpdate) else: trace "Ticker is disabled" result = true - # ----------------------- - when snapAccountsDumpEnable: - doAssert ctx.data.proofDumpFile.open("./dump-stream.out", fmWrite) - ctx.data.proofDumpOk = true proc release*(ctx: SnapCtxRef) = ## Global clean up @@ -303,6 +301,7 @@ proc start*(buddy: SnapBuddyRef): bool = peer.supports(protocol.eth) and peer.state(protocol.eth).initialized: buddy.pivotStart() + buddy.data.errors = ComErrorStatsRef() if not ctx.data.ticker.isNil: ctx.data.ticker.startBuddy() return true @@ -334,28 +333,17 @@ proc runSingle*(buddy: SnapBuddyRef) {.async.} = ## Note that this function runs in `async` mode. ## when usePivot2ok: - # # Run alternative pivot finder. This one harmonises difficulties of at # least two peers. The can only be one instance active/unfinished of the # `pivot2Exec()` functions. - # let peer = buddy.peer - if buddy.pivotHeader.isErr: - if await buddy.pivotNegotiate(buddy.envPivotNumber): - discard buddy.updatePivotEnv() - else: - # Wait if needed, then return => repeat - if not buddy.ctrl.stopped: - await sleepAsync(2.seconds) - return + if not await buddy.updateSinglePivot(): + # Wait if needed, then return => repeat + if not buddy.ctrl.stopped: + await sleepAsync(2.seconds) + return - buddy.ctrl.multiOk = true - - trace "Snap pivot initialised", peer, - multiOk=buddy.ctrl.multiOk, runState=buddy.ctrl.state - else: - # Default pivot finder runs in multi mode => nothing to do here. - buddy.ctrl.multiOk = true + buddy.ctrl.multiOk = true proc runPool*(buddy: SnapBuddyRef, last: bool) = @@ -375,11 +363,21 @@ proc runPool*(buddy: SnapBuddyRef, last: bool) = let ctx = buddy.ctx if ctx.poolMode: ctx.poolMode = false - if not ctx.data.runPoolHook.isNil: - noExceptionOops("runPool"): - ctx.data.runPoolHook(buddy) - if last: - ctx.data.runPoolHook = nil + + let rc = ctx.data.pivotTable.lastValue + if rc.isOk: + # Check whether accounts and storage might be complete. + let env = rc.value + if not env.serialSync: + # Check whether accounts download is complete + block checkAccountsComplete: + for ivSet in env.fetchAccounts: + if ivSet.chunks != 0: + break checkAccountsComplete + env.accountsDone = true + # Check whether storage slots are complete + if env.fetchStorage.len == 0: + env.serialSync = true proc runMulti*(buddy: SnapBuddyRef) {.async.} = @@ -392,23 +390,44 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} = peer = buddy.peer when not usePivot2ok: - if not buddy.havePivot: - await buddy.pivotNegotiate() - if not buddy.updatePivotEnv(): - return + discard await buddy.updateMultiPivot() - # Ignore rest if the pivot is still acceptably covered - when switchPivotAfterCoverage < 1.0: - if ctx.data.pivotEnv.minCoverageReachedOk: - await sleepAsync(50.milliseconds) - return + # Set up current state root environment for accounts snapshot + let env = block: + let rc = ctx.data.pivotTable.lastValue + if rc.isErr: + return # nothing to do + rc.value - await buddy.fetchAccounts() + buddy.data.pivotEnv = env - if ctx.data.pivotEnv.repairState == Done: - buddy.ctrl.multiOk = false - buddy.pivotStop() - buddy.pivotStart() + if env.serialSync: + trace "Snap serial sync -- not implemented yet", peer + await sleepAsync(5.seconds) + + else: + # Snapshot sync processing. Note that *serialSync => accountsDone*. + await buddy.storeStorages() # always pre-clean the queue + await buddy.storeAccounts() + await buddy.storeStorages() + + # If the current database is not complete yet + if 0 < env.fetchAccounts[0].chunks or + 0 < env.fetchAccounts[1].chunks: + + # Healing applies to the latest pivot only. The pivot might have changed + # in the background (while netwoking) due to a new peer worker that has + # negotiated another, newer pivot. + if env == ctx.data.pivotTable.lastValue.value: + await buddy.healAccountsDb() + + # TODO: use/apply storage healer + + # Check whether accounts might be complete. + if env.fetchStorage.len == 0: + # Possibly done but some buddies might wait for an account range to be + # received from the network. So we need to sync. + buddy.ctx.poolMode = true # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/com/com_error.nim b/nimbus/sync/snap/worker/com/com_error.nim new file mode 100644 index 000000000..0165f694e --- /dev/null +++ b/nimbus/sync/snap/worker/com/com_error.nim @@ -0,0 +1,102 @@ +# Nimbus +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +import + chronos, + ../../../sync_desc + +const + comErrorsTimeoutMax* = 2 + ## Maximal number of non-resonses accepted in a row. If there are more than + ## `comErrorsTimeoutMax` consecutive errors, the worker will be degraded + ## as zombie. + +type + ComErrorStatsRef* = ref object + ## particular error counters so connections will not be cut immediately + ## after a particular error. + nTimeouts*: uint + nNetwork*: uint + + ComError* = enum + ComNothingSerious + ComAccountsMaxTooLarge + ComAccountsMinTooSmall + ComEmptyAccountsArguments + ComEmptyRequestArguments + ComMissingProof + ComNetworkProblem + ComNoAccountsForStateRoot + ComNoByteCodesAvailable + ComNoDataForProof + ComNoHeaderAvailable + ComNoStorageForAccounts + ComNoTrieNodesAvailable + ComResponseTimeout + ComTooManyByteCodes + ComTooManyHeaders + ComTooManyStorageSlots + ComTooManyTrieNodes + + # Other errors not directly related to communication + ComInspectDbFailed + ComImportAccountsFailed + + +proc stopAfterSeriousComError*( + ctrl: BuddyCtrlRef; + error: ComError; + stats: ComErrorStatsRef; + ): Future[bool] + {.async.} = + ## Error handling after data protocol failed. + case error: + of ComResponseTimeout: + stats.nTimeouts.inc + if comErrorsTimeoutMax < stats.nTimeouts: + # Mark this peer dead, i.e. avoid fetching from this peer for a while + ctrl.zombie = true + else: + # Otherwise try again some time later. Nevertheless, stop the + # current action. + await sleepAsync(5.seconds) + return true + + of ComNetworkProblem, + ComMissingProof, + ComAccountsMinTooSmall, + ComAccountsMaxTooLarge: + stats.nNetwork.inc + # Mark this peer dead, i.e. avoid fetching from this peer for a while + ctrl.zombie = true + return true + + of ComEmptyAccountsArguments, + ComEmptyRequestArguments, + ComInspectDbFailed, + ComImportAccountsFailed, + ComNoDataForProof, + ComNothingSerious: + discard + + of ComNoAccountsForStateRoot, + ComNoStorageForAccounts, + ComNoByteCodesAvailable, + ComNoHeaderAvailable, + ComNoTrieNodesAvailable, + ComTooManyByteCodes, + ComTooManyHeaders, + ComTooManyStorageSlots, + ComTooManyTrieNodes: + # Mark this peer dead, i.e. avoid fetching from this peer for a while + ctrl.zombie = true + return true + +# End diff --git a/nimbus/sync/snap/worker/com/get_account_range.nim b/nimbus/sync/snap/worker/com/get_account_range.nim index bbcdb6e10..68c8922c6 100644 --- a/nimbus/sync/snap/worker/com/get_account_range.nim +++ b/nimbus/sync/snap/worker/com/get_account_range.nim @@ -19,7 +19,7 @@ import stew/interval_set, "../../.."/[protocol, protocol/trace_config], "../.."/[range_desc, worker_desc], - ./get_error + ./com_error {.push raises: [Defect].} diff --git a/nimbus/sync/snap/worker/com/get_block_header.nim b/nimbus/sync/snap/worker/com/get_block_header.nim new file mode 100644 index 000000000..817622d1d --- /dev/null +++ b/nimbus/sync/snap/worker/com/get_block_header.nim @@ -0,0 +1,74 @@ +# Nimbus +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +import + std/options, + chronos, + eth/[common/eth_types, p2p], + ../../../protocol, + ../../worker_desc, + ./com_error + +{.push raises: [Defect].} + +logScope: + topics = "snap-fetch" + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc getBlockHeader*( + buddy: SnapBuddyRef; + num: BlockNumber; + ): Future[Result[BlockHeader,ComError]] + {.async.} = + ## Get single block header + let + peer = buddy.peer + reqLen = 1u + hdrReq = BlocksRequest( + startBlock: HashOrNum( + isHash: false, + number: num), + maxResults: reqLen, + skip: 0, + reverse: false) + + trace trEthSendSendingGetBlockHeaders, peer, startBlock=num, reqLen + + var hdrResp: Option[blockHeadersObj] + try: + hdrResp = await peer.getBlockHeaders(hdrReq) + except CatchableError as e: + trace trSnapRecvError & "waiting for GetByteCodes reply", peer, + error=e.msg + return err(ComNetworkProblem) + + var hdrRespLen = 0 + if hdrResp.isSome: + hdrRespLen = hdrResp.get.headers.len + if hdrRespLen == 0: + trace trEthRecvReceivedBlockHeaders, peer, reqLen, respose="n/a" + return err(ComNoHeaderAvailable) + + if hdrRespLen == 1: + let + header = hdrResp.get.headers[0] + blockNumber = header.blockNumber + trace trEthRecvReceivedBlockHeaders, peer, hdrRespLen, blockNumber + return ok(header) + + trace trEthRecvReceivedBlockHeaders, peer, reqLen, hdrRespLen + return err(ComTooManyHeaders) + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/com/get_byte_codes.nim b/nimbus/sync/snap/worker/com/get_byte_codes.nim index 6a272f13a..992ff0810 100644 --- a/nimbus/sync/snap/worker/com/get_byte_codes.nim +++ b/nimbus/sync/snap/worker/com/get_byte_codes.nim @@ -16,7 +16,7 @@ import eth/[common/eth_types, p2p], "../../.."/[protocol, protocol/trace_config], "../.."/[range_desc, worker_desc], - ./get_error + ./com_error {.push raises: [Defect].} diff --git a/nimbus/sync/snap/worker/com/get_error.nim b/nimbus/sync/snap/worker/com/get_error.nim deleted file mode 100644 index e649d3b9c..000000000 --- a/nimbus/sync/snap/worker/com/get_error.nim +++ /dev/null @@ -1,34 +0,0 @@ -# Nimbus -# Copyright (c) 2021 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or -# http://www.apache.org/licenses/LICENSE-2.0) -# * MIT license ([LICENSE-MIT](LICENSE-MIT) or -# http://opensource.org/licenses/MIT) -# at your option. This file may not be copied, modified, or distributed -# except according to those terms. - -type - ComError* = enum - ComNothingSerious - ComAccountsMaxTooLarge - ComAccountsMinTooSmall - ComEmptyAccountsArguments - ComEmptyRequestArguments - ComMissingProof - ComNetworkProblem - ComNoAccountsForStateRoot - ComNoByteCodesAvailable - ComNoDataForProof - ComNoStorageForAccounts - ComNoTrieNodesAvailable - ComResponseTimeout - ComTooManyByteCodes - ComTooManyStorageSlots - ComTooManyTrieNodes - - # Other errors not directly related to communication - ComInspectDbFailed - ComImportAccountsFailed - -# End diff --git a/nimbus/sync/snap/worker/com/get_storage_ranges.nim b/nimbus/sync/snap/worker/com/get_storage_ranges.nim index d0b09f922..1bc8dc318 100644 --- a/nimbus/sync/snap/worker/com/get_storage_ranges.nim +++ b/nimbus/sync/snap/worker/com/get_storage_ranges.nim @@ -16,7 +16,7 @@ import stew/interval_set, "../../.."/[protocol, protocol/trace_config], "../.."/[range_desc, worker_desc], - ./get_error + ./com_error {.push raises: [Defect].} diff --git a/nimbus/sync/snap/worker/com/get_trie_nodes.nim b/nimbus/sync/snap/worker/com/get_trie_nodes.nim index 7a2aacc48..7f52f59f6 100644 --- a/nimbus/sync/snap/worker/com/get_trie_nodes.nim +++ b/nimbus/sync/snap/worker/com/get_trie_nodes.nim @@ -1,3 +1,6 @@ +# Nimbus +# Copyright (c) 2018-2021 Status Research & Development GmbH +# Licensed and distributed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or # http://www.apache.org/licenses/LICENSE-2.0) # * MIT license ([LICENSE-MIT](LICENSE-MIT) or @@ -11,7 +14,7 @@ import eth/[common/eth_types, p2p], "../../.."/[protocol, protocol/trace_config], ../../worker_desc, - ./get_error + ./com_error {.push raises: [Defect].} diff --git a/nimbus/sync/snap/worker/db/hexary_desc.nim b/nimbus/sync/snap/worker/db/hexary_desc.nim index 8dfd8e44d..4e816f5ff 100644 --- a/nimbus/sync/snap/worker/db/hexary_desc.nim +++ b/nimbus/sync/snap/worker/db/hexary_desc.nim @@ -136,6 +136,7 @@ type TrieNodeStat* = object ## Trie inspection report dangling*: seq[Blob] ## Paths from nodes with incomplete refs + leaves*: seq[NodeKey] ## Paths to leave nodes (if any) level*: int ## Maximim nesting depth of dangling nodes stopped*: bool ## Potential loop detected if `true` @@ -281,7 +282,13 @@ proc ppDangling(a: seq[Blob]; maxItems = 30): string = proc ppBlob(w: Blob): string = w.mapIt(it.toHex(2)).join.toLowerAscii let - q = a.toSeq.mapIt(it.ppBlob)[0 ..< min(maxItems,a.len)] + q = a.mapIt(it.ppBlob)[0 ..< min(maxItems,a.len)] + andMore = if maxItems < a.len: ", ..[#" & $a.len & "].." else: "" + "{" & q.join(",") & andMore & "}" + +proc ppLeaves(a: openArray[NodeKey]; db: HexaryTreeDbRef; maxItems=30): string = + let + q = a.mapIt(it.ppImpl(db))[0 ..< min(maxItems,a.len)] andMore = if maxItems < a.len: ", ..[#" & $a.len & "].." else: "" "{" & q.join(",") & andMore & "}" @@ -328,10 +335,12 @@ proc pp*(db: HexaryTreeDbRef; indent=4): string = db.ppImpl(NodeKey.default).join(indent.toPfx) proc pp*(a: TrieNodeStat; db: HexaryTreeDbRef; maxItems = 30): string = - result = "(" & $a.level & "," + result = "(" & $a.level if a.stopped: result &= "stopped," - result &= $a.dangling.len & "," & a.dangling.ppDangling(maxItems) & ")" + result &= $a.dangling.len & "," & + a.dangling.ppDangling(maxItems) & "," & + a.leaves.ppLeaves(db, maxItems) & ")" # ------------------------------------------------------------------------------ # Public constructor (or similar) diff --git a/nimbus/sync/snap/worker/db/hexary_error.nim b/nimbus/sync/snap/worker/db/hexary_error.nim index 5418c19b2..790756c9e 100644 --- a/nimbus/sync/snap/worker/db/hexary_error.nim +++ b/nimbus/sync/snap/worker/db/hexary_error.nim @@ -16,6 +16,7 @@ type AccountSmallerThanBase AccountsNotSrictlyIncreasing AccountRangesOverlap + NodeNotFound RlpEncoding SlotsNotSrictlyIncreasing TrieLoopAlert diff --git a/nimbus/sync/snap/worker/db/hexary_inspect.nim b/nimbus/sync/snap/worker/db/hexary_inspect.nim index 7e701a64a..9ca1edd2c 100644 --- a/nimbus/sync/snap/worker/db/hexary_inspect.nim +++ b/nimbus/sync/snap/worker/db/hexary_inspect.nim @@ -24,6 +24,9 @@ logScope: const extraTraceMessages = false # or true +when extraTraceMessages: + import stew/byteutils + # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ @@ -63,7 +66,7 @@ proc doStepLink(step: XPathStep): Result[NodeKey,bool] = err(true) # fully fail -proc hexaryInspectPath( +proc hexaryInspectPathImpl( db: HexaryTreeDbRef; ## Database rootKey: RepairKey; ## State root path: NibblesSeq; ## Starting path @@ -84,8 +87,8 @@ proc hexaryInspectPath( return ok(rc.value) err() -proc hexaryInspectPath( - getFn: HexaryGetFn; ## Database retrival function +proc hexaryInspectPathImpl( + getFn: HexaryGetFn; ## Database retrieval function root: NodeKey; ## State root path: NibblesSeq; ## Starting path ): Result[NodeKey,void] @@ -170,11 +173,25 @@ proc hexaryInspectPath*( ## Returns the `NodeKey` for a given path if there is any. let (isLeaf,nibbles) = hexPrefixDecode path if not isLeaf: - let rc = db.hexaryInspectPath(root.to(RepairKey), nibbles) + let rc = db.hexaryInspectPathImpl(root.to(RepairKey), nibbles) if rc.isOk and rc.value.isNodeKey: return ok(rc.value.convertTo(NodeKey)) err() +proc hexaryInspectPath*( + getFn: HexaryGetFn; ## Database abstraction + root: NodeKey; ## State root + path: Blob; ## Partial database path + ): Result[NodeKey,void] + {.gcsafe, raises: [Defect,RlpError]} = + ## Variant of `hexaryInspectPath()` for persistent database. + let (isLeaf,nibbles) = hexPrefixDecode path + if not isLeaf: + let rc = getFn.hexaryInspectPathImpl(root, nibbles) + if rc.isOk: + return ok(rc.value) + err() + proc hexaryInspectToKeys*( db: HexaryTreeDbRef; ## Database root: NodeKey; ## State root @@ -193,25 +210,37 @@ proc hexaryInspectTrie*( db: HexaryTreeDbRef; ## Database root: NodeKey; ## State root paths: seq[Blob]; ## Starting paths for search + maxLeafPaths = 0; ## Record leaves with proper 32 bytes path stopAtLevel = 32; ## Instead of loop detector ): TrieNodeStat {.gcsafe, raises: [Defect,KeyError]} = ## Starting with the argument list `paths`, find all the non-leaf nodes in ## the hexary trie which have at least one node key reference missing in - ## the trie database. + ## the trie database. The references for these nodes are collected and + ## returned. + ## * At most `maxLeafPaths` leaf node references are collected along the way. + ## * Search list `paths` argument entries that do not refer to a hexary node + ## are ignored. + ## * For any search list `paths` argument entry, this function stops if + ## the search depth exceeds `stopAtLevel` levels of linked sub-nodes. + ## * Argument `paths` list entries that do not refer to a valid node are + ## silently ignored. + ## let rootKey = root.to(RepairKey) if not db.tab.hasKey(rootKey): return TrieNodeStat() # Initialise TODO list - var reVisit = newTable[RepairKey,NibblesSeq]() + var + leafSlots = maxLeafPaths + reVisit = newTable[RepairKey,NibblesSeq]() if paths.len == 0: reVisit[rootKey] = EmptyNibbleRange else: for w in paths: let (isLeaf,nibbles) = hexPrefixDecode w if not isLeaf: - let rc = db.hexaryInspectPath(rootKey, nibbles) + let rc = db.hexaryInspectPathImpl(rootKey, nibbles) if rc.isOk: reVisit[rc.value] = nibbles @@ -240,8 +269,13 @@ proc hexaryInspectTrie*( child = node.bLink[n] db.processLink(stats=result, inspect=again, parent, trail, child) of Leaf: - # Done with this link, forget the key - discard + if 0 < leafSlots: + let trail = parentTrail & node.lPfx + if trail.len == 64: + result.leaves.add trail.getBytes.convertTo(NodeKey) + leafSlots.dec + # Done with this link + # End `for` result.level.inc @@ -250,53 +284,70 @@ proc hexaryInspectTrie*( proc hexaryInspectTrie*( - getFn: HexaryGetFn; - root: NodeKey; ## State root + getFn: HexaryGetFn; ## Database abstraction + rootKey: NodeKey; ## State root paths: seq[Blob]; ## Starting paths for search + maxLeafPaths = 0; ## Record leaves with proper 32 bytes path stopAtLevel = 32; ## Instead of loop detector ): TrieNodeStat {.gcsafe, raises: [Defect,RlpError,KeyError]} = ## Variant of `hexaryInspectTrie()` for persistent database. - ## - if root.to(Blob).getFn().len == 0: + when extraTraceMessages: + let nPaths = paths.len + + let root = rootKey.to(Blob) + if root.getFn().len == 0: + when extraTraceMessages: + trace "Hexary inspect: missing root", nPaths, maxLeafPaths, + rootKey=root.toHex return TrieNodeStat() # Initialise TODO list - var reVisit = newTable[NodeKey,NibblesSeq]() + var + leafSlots = maxLeafPaths + reVisit = newTable[NodeKey,NibblesSeq]() if paths.len == 0: - reVisit[root] = EmptyNibbleRange + reVisit[rootKey] = EmptyNibbleRange else: for w in paths: let (isLeaf,nibbles) = hexPrefixDecode w if not isLeaf: - let rc = getFn.hexaryInspectPath(root, nibbles) + let rc = getFn.hexaryInspectPathImpl(rootKey, nibbles) if rc.isOk: reVisit[rc.value] = nibbles - when extraTraceMessages: - trace "Hexary inspect start", nPaths=paths.len, reVisit=reVisit.len - while 0 < reVisit.len: + when extraTraceMessages: + trace "Hexary inspect processing", nPaths, maxLeafPaths, + level=result.level, nReVisit=reVisit.len, nDangling=result.dangling.len + if stopAtLevel < result.level: result.stopped = true break - when extraTraceMessages: - trace "Hexary inspect processing", level=result.level, - reVisit=reVisit.len, dangling=result.dangling.len - let again = newTable[NodeKey,NibblesSeq]() for parent,parentTrail in reVisit.pairs: - let nodeRlp = rlpFromBytes parent.to(Blob).getFn() + + let parentBlob = parent.to(Blob).getFn() + if parentBlob.len == 0: + # Ooops, forget node and key + continue + + let nodeRlp = rlpFromBytes parentBlob case nodeRlp.listLen: of 2: - let (isLeaf,ePfx) = hexPrefixDecode nodeRlp.listElem(0).toBytes + let (isLeaf,xPfx) = hexPrefixDecode nodeRlp.listElem(0).toBytes if not isleaf: let - trail = parentTrail & ePfx + trail = parentTrail & xPfx child = nodeRlp.listElem(1) getFn.processLink(stats=result, inspect=again, parent, trail, child) + elif 0 < leafSlots: + let trail = parentTrail & xPfx + if trail.len == 64: + result.leaves.add trail.getBytes.convertTo(NodeKey) + leafSlots.dec of 17: for n in 0 ..< 16: let @@ -304,7 +355,7 @@ proc hexaryInspectTrie*( child = nodeRlp.listElem(n) getFn.processLink(stats=result, inspect=again, parent, trail, child) else: - # Done with this link, forget the key + # Ooops, forget node and key discard # End `for` @@ -313,8 +364,9 @@ proc hexaryInspectTrie*( # End while when extraTraceMessages: - trace "Hexary inspect finished", level=result.level, maxLevel=stopAtLevel, - reVisit=reVisit.len, dangling=result.dangling.len, stopped=result.stopped + trace "Hexary inspect finished", nPaths, maxLeafPaths, + level=result.level, nReVisit=reVisit.len, nDangling=result.dangling.len, + maxLevel=stopAtLevel, stopped=result.stopped # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/db/hexary_paths.nim b/nimbus/sync/snap/worker/db/hexary_paths.nim index 280aca059..9d5cf66d4 100644 --- a/nimbus/sync/snap/worker/db/hexary_paths.nim +++ b/nimbus/sync/snap/worker/db/hexary_paths.nim @@ -373,6 +373,18 @@ proc leafData*(path: XPath): Blob = of Extension: discard +proc leafData*(path: RPath): Blob = + ## Return the leaf data from a successful `RPath` computation (if any.) + if path.tail.len == 0 and 0 < path.path.len: + let node = path.path[^1].node + case node.kind: + of Branch: + return node.bData + of Leaf: + return node.lData + of Extension: + discard + # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/accounts_db.nim b/nimbus/sync/snap/worker/db/snap_db.nim similarity index 76% rename from nimbus/sync/snap/worker/accounts_db.nim rename to nimbus/sync/snap/worker/db/snap_db.nim index 6226bb2fe..79fc24a2f 100644 --- a/nimbus/sync/snap/worker/accounts_db.nim +++ b/nimbus/sync/snap/worker/db/snap_db.nim @@ -16,32 +16,35 @@ import stew/byteutils, stint, rocksdb, - ../../../constants, - ../../../db/[kvstore_rocksdb, select_backend], - "../.."/[protocol, types], - ../range_desc, - ./db/[bulk_storage, hexary_desc, hexary_error, hexary_import, - hexary_interpolate, hexary_inspect, hexary_paths, rocky_bulk_load] + ../../../../constants, + ../../../../db/[kvstore_rocksdb, select_backend], + "../../.."/[protocol, types], + ../../range_desc, + "."/[bulk_storage, hexary_desc, hexary_error, hexary_import, + hexary_interpolate, hexary_inspect, hexary_paths, rocky_bulk_load] {.push raises: [Defect].} logScope: - topics = "snap-proof" + topics = "snap-db" export - HexaryDbError + HexaryDbError, + TrieNodeStat const extraTraceMessages = false or true type - AccountsDbRef* = ref object + SnapDbRef* = ref object + ## Global, re-usable descriptor db: TrieDatabaseRef ## General database rocky: RocksStoreRef ## Set if rocksdb is available - AccountsDbSessionRef* = ref object + SnapDbSessionRef* = ref object + ## Database session descriptor keyMap: Table[RepairKey,uint] ## For debugging only (will go away) - base: AccountsDbRef ## Back reference to common parameters + base: SnapDbRef ## Back reference to common parameters peer: Peer ## For log messages accRoot: NodeKey ## Current accounts root node accDb: HexaryTreeDbRef ## Accounts database @@ -51,7 +54,7 @@ type # Private helpers # ------------------------------------------------------------------------------ -proc newHexaryTreeDbRef(ps: AccountsDbSessionRef): HexaryTreeDbRef = +proc newHexaryTreeDbRef(ps: SnapDbSessionRef): HexaryTreeDbRef = HexaryTreeDbRef(keyPp: ps.stoDb.keyPp) # for debugging, will go away proc to(h: Hash256; T: type NodeKey): T = @@ -94,27 +97,27 @@ template noPpError(info: static[string]; code: untyped) = except Exception as e: raiseAssert "Ooops (" & info & ") " & $e.name & ": " & e.msg -proc toKey(a: RepairKey; ps: AccountsDbSessionRef): uint = +proc toKey(a: RepairKey; ps: SnapDbSessionRef): uint = if not a.isZero: noPpError("pp(RepairKey)"): if not ps.keyMap.hasKey(a): ps.keyMap[a] = ps.keyMap.len.uint + 1 result = ps.keyMap[a] -proc toKey(a: NodeKey; ps: AccountsDbSessionRef): uint = +proc toKey(a: NodeKey; ps: SnapDbSessionRef): uint = a.to(RepairKey).toKey(ps) -proc toKey(a: NodeTag; ps: AccountsDbSessionRef): uint = +proc toKey(a: NodeTag; ps: SnapDbSessionRef): uint = a.to(NodeKey).toKey(ps) -proc pp(a: NodeKey; ps: AccountsDbSessionRef): string = +proc pp(a: NodeKey; ps: SnapDbSessionRef): string = if a.isZero: "ø" else:"$" & $a.toKey(ps) -proc pp(a: RepairKey; ps: AccountsDbSessionRef): string = +proc pp(a: RepairKey; ps: SnapDbSessionRef): string = if a.isZero: "ø" elif a.isNodeKey: "$" & $a.toKey(ps) else: "@" & $a.toKey(ps) -proc pp(a: NodeTag; ps: AccountsDbSessionRef): string = +proc pp(a: NodeTag; ps: SnapDbSessionRef): string = a.to(NodeKey).pp(ps) # ------------------------------------------------------------------------------ @@ -122,11 +125,11 @@ proc pp(a: NodeTag; ps: AccountsDbSessionRef): string = # ------------------------------------------------------------------------------ proc mergeProofs( - peer: Peer, ## For log messages - db: HexaryTreeDbRef; ## Database table - root: NodeKey; ## Root for checking nodes - proof: seq[Blob]; ## Node records - freeStandingOk = false; ## Remove freestanding nodes + peer: Peer, ## For log messages + db: HexaryTreeDbRef; ## Database table + root: NodeKey; ## Root for checking nodes + proof: seq[Blob]; ## Node records + freeStandingOk = false; ## Remove freestanding nodes ): Result[void,HexaryDbError] {.gcsafe, raises: [Defect, RlpError, KeyError].} = ## Import proof records (as received with snap message) into a hexary trie @@ -160,8 +163,8 @@ proc mergeProofs( proc persistentAccounts( - db: HexaryTreeDbRef; ## Current table - pv: AccountsDbRef; ## Persistent database + db: HexaryTreeDbRef; ## Current table + pv: SnapDbRef; ## Persistent database ): Result[void,HexaryDbError] {.gcsafe, raises: [Defect,OSError,KeyError].} = ## Store accounts trie table on databse @@ -174,8 +177,8 @@ proc persistentAccounts( ok() proc persistentStorages( - db: HexaryTreeDbRef; ## Current table - pv: AccountsDbRef; ## Persistent database + db: HexaryTreeDbRef; ## Current table + pv: SnapDbRef; ## Persistent database ): Result[void,HexaryDbError] {.gcsafe, raises: [Defect,OSError,KeyError].} = ## Store accounts trie table on databse @@ -268,9 +271,9 @@ proc collectStorageSlots( proc importStorageSlots*( - ps: AccountsDbSessionRef; ## Re-usable session descriptor - data: AccountSlots; ## account storage descriptor - proof: SnapStorageProof; ## account storage proof + ps: SnapDbSessionRef; ## Re-usable session descriptor + data: AccountSlots; ## Account storage descriptor + proof: SnapStorageProof; ## Account storage proof ): Result[void,HexaryDbError] {.gcsafe, raises: [Defect, RlpError,KeyError].} = ## Preocess storage slots for a particular storage root @@ -307,14 +310,14 @@ proc importStorageSlots*( # ------------------------------------------------------------------------------ proc init*( - T: type AccountsDbRef; + T: type SnapDbRef; db: TrieDatabaseRef ): T = ## Main object constructor T(db: db) proc init*( - T: type AccountsDbRef; + T: type SnapDbRef; db: ChainDb ): T = ## Variant of `init()` allowing bulk import on rocksdb backend @@ -323,14 +326,14 @@ proc init*( result.rocky = nil proc init*( - T: type AccountsDbSessionRef; - pv: AccountsDbRef; + T: type SnapDbSessionRef; + pv: SnapDbRef; root: Hash256; peer: Peer = nil ): T = ## Start a new session, do some actions an then discard the session ## descriptor (probably after commiting data.) - let desc = AccountsDbSessionRef( + let desc = SnapDbSessionRef( base: pv, peer: peer, accRoot: root.to(NodeKey), @@ -344,13 +347,13 @@ proc init*( return desc proc dup*( - ps: AccountsDbSessionRef; + ps: SnapDbSessionRef; root: Hash256; peer: Peer; - ): AccountsDbSessionRef = + ): SnapDbSessionRef = ## Resume a session with different `root` key and `peer`. This new session ## will access the same memory database as the `ps` argument session. - AccountsDbSessionRef( + SnapDbSessionRef( base: ps.base, peer: peer, accRoot: root.to(NodeKey), @@ -358,9 +361,9 @@ proc dup*( stoDb: ps.stoDb) proc dup*( - ps: AccountsDbSessionRef; + ps: SnapDbSessionRef; root: Hash256; - ): AccountsDbSessionRef = + ): SnapDbSessionRef = ## Variant of `dup()` without the `peer` argument. ps.dup(root, ps.peer) @@ -368,16 +371,16 @@ proc dup*( # Public functions # ------------------------------------------------------------------------------ -proc dbBackendRocksDb*(pv: AccountsDbRef): bool = +proc dbBackendRocksDb*(pv: SnapDbRef): bool = ## Returns `true` if rocksdb features are available not pv.rocky.isNil -proc dbBackendRocksDb*(ps: AccountsDbSessionRef): bool = +proc dbBackendRocksDb*(ps: SnapDbSessionRef): bool = ## Returns `true` if rocksdb features are available not ps.base.rocky.isNil proc importAccounts*( - ps: AccountsDbSessionRef; ## Re-usable session descriptor + ps: SnapDbSessionRef; ## Re-usable session descriptor base: NodeTag; ## before or at first account entry in `data` data: PackedAccountRange; ## re-packed `snap/1 ` reply data persistent = false; ## store data on disk @@ -425,20 +428,20 @@ proc importAccounts*( ok() proc importAccounts*( - pv: AccountsDbRef; ## Base descriptor on `BaseChainDB` + pv: SnapDbRef; ## Base descriptor on `BaseChainDB` peer: Peer, ## for log messages root: Hash256; ## state root base: NodeTag; ## before or at first account entry in `data` data: PackedAccountRange; ## re-packed `snap/1 ` reply data ): Result[void,HexaryDbError] = ## Variant of `importAccounts()` - AccountsDbSessionRef.init( + SnapDbSessionRef.init( pv, root, peer).importAccounts(base, data, persistent=true) proc importStorages*( - ps: AccountsDbSessionRef; ## Re-usable session descriptor + ps: SnapDbSessionRef; ## Re-usable session descriptor data: AccountStorageRange; ## Account storage reply from `snap/1` protocol persistent = false; ## store data on disk ): Result[void,seq[(int,HexaryDbError)]] = @@ -506,18 +509,18 @@ proc importStorages*( ok() proc importStorages*( - pv: AccountsDbRef; ## Base descriptor on `BaseChainDB` + pv: SnapDbRef; ## Base descriptor on `BaseChainDB` peer: Peer, ## For log messages, only data: AccountStorageRange; ## Account storage reply from `snap/1` protocol ): Result[void,seq[(int,HexaryDbError)]] = ## Variant of `importStorages()` - AccountsDbSessionRef.init( + SnapDbSessionRef.init( pv, Hash256(), peer).importStorages(data, persistent=true) proc importRawNodes*( - ps: AccountsDbSessionRef; ## Re-usable session descriptor + ps: SnapDbSessionRef; ## Re-usable session descriptor nodes: openArray[Blob]; ## Node records persistent = false; ## store data on disk ): Result[void,seq[(int,HexaryDbError)]] = @@ -559,64 +562,133 @@ proc importRawNodes*( ok() proc importRawNodes*( - pv: AccountsDbRef; ## Base descriptor on `BaseChainDB` + pv: SnapDbRef; ## Base descriptor on `BaseChainDB` peer: Peer, ## For log messages, only nodes: openArray[Blob]; ## Node records ): Result[void,seq[(int,HexaryDbError)]] = ## Variant of `importRawNodes()` for persistent storage. - AccountsDbSessionRef.init( + SnapDbSessionRef.init( pv, Hash256(), peer).importRawNodes(nodes, persistent=true) proc inspectAccountsTrie*( - ps: AccountsDbSessionRef; ## Re-usable session descriptor + ps: SnapDbSessionRef; ## Re-usable session descriptor pathList = seq[Blob].default; ## Starting nodes for search + maxLeafPaths = 0; ## Record leaves with proper 32 bytes path persistent = false; ## Read data from disk - ignoreError = false; ## Return partial results if available + ignoreError = false; ## Always return partial results if available ): Result[TrieNodeStat, HexaryDbError] = ## Starting with the argument list `pathSet`, find all the non-leaf nodes in ## the hexary trie which have at least one node key reference missing in - ## the trie database. + ## the trie database. Argument `pathSet` list entries that do not refer to a + ## valid node are silently ignored. ## let peer = ps.peer var stats: TrieNodeStat noRlpExceptionOops("inspectAccountsTrie()"): if persistent: let getFn: HexaryGetFn = proc(key: Blob): Blob = ps.base.db.get(key) - stats = getFn.hexaryInspectTrie(ps.accRoot, pathList) + stats = getFn.hexaryInspectTrie(ps.accRoot, pathList, maxLeafPaths) else: - stats = ps.accDb.hexaryInspectTrie(ps.accRoot, pathList) + stats = ps.accDb.hexaryInspectTrie(ps.accRoot, pathList, maxLeafPaths) block checkForError: let error = block: if stats.stopped: TrieLoopAlert - elif stats.level <= 1: + elif stats.level == 0: TrieIsEmpty else: break checkForError trace "Inspect account trie failed", peer, nPathList=pathList.len, - nDangling=stats.dangling.len, stoppedAt=stats.level, error + nDangling=stats.dangling.len, leaves=stats.leaves.len, + maxleaves=maxLeafPaths, stoppedAt=stats.level, error return err(error) when extraTraceMessages: trace "Inspect account trie ok", peer, nPathList=pathList.len, - nDangling=stats.dangling.len, level=stats.level + nDangling=stats.dangling.len, leaves=stats.leaves.len, + maxleaves=maxLeafPaths, level=stats.level return ok(stats) proc inspectAccountsTrie*( - pv: AccountsDbRef; ## Base descriptor on `BaseChainDB` - peer: Peer, ## For log messages, only + pv: SnapDbRef; ## Base descriptor on `BaseChainDB` + peer: Peer; ## For log messages, only root: Hash256; ## state root pathList = seq[Blob].default; ## Starting paths for search - ignoreError = false; ## Return partial results if available + maxLeafPaths = 0; ## Record leaves with proper 32 bytes path + ignoreError = false; ## Always return partial results when avail. ): Result[TrieNodeStat, HexaryDbError] = ## Variant of `inspectAccountsTrie()` for persistent storage. - AccountsDbSessionRef.init( - pv, root, peer).inspectAccountsTrie(pathList, persistent=true, ignoreError) + SnapDbSessionRef.init( + pv, root, peer).inspectAccountsTrie( + pathList, maxLeafPaths, persistent=true, ignoreError) + + +proc getAccountNodeKey*( + ps: SnapDbSessionRef; ## Re-usable session descriptor + path: Blob; ## Partial node path + persistent = false; ## Read data from disk + ): Result[NodeKey,HexaryDbError] = + ## For a partial node path argument `path`, return the raw node key. + var rc: Result[NodeKey,void] + noRlpExceptionOops("inspectAccountsPath()"): + if persistent: + let getFn: HexaryGetFn = proc(key: Blob): Blob = ps.base.db.get(key) + rc = getFn.hexaryInspectPath(ps.accRoot, path) + else: + rc = ps.accDb.hexaryInspectPath(ps.accRoot, path) + if rc.isOk: + return ok(rc.value) + err(NodeNotFound) + +proc getAccountNodeKey*( + pv: SnapDbRef; ## Base descriptor on `BaseChainDB` + peer: Peer; ## For log messages, only + root: Hash256; ## state root + path: Blob; ## Partial node path + ): Result[NodeKey,HexaryDbError] = + ## Variant of `inspectAccountsPath()` for persistent storage. + SnapDbSessionRef.init( + pv, root, peer).getAccountNodeKey(path, persistent=true) + + +proc getAccountData*( + ps: SnapDbSessionRef; ## Re-usable session descriptor + path: NodeKey; ## Account to visit + persistent = false; ## Read data from disk + ): Result[Account,HexaryDbError] = + ## Fetch account data. + ## + ## Caveat: There is no unit test yet for the non-persistent version + let peer = ps.peer + var acc: Account + + noRlpExceptionOops("getAccountData()"): + var leaf: Blob + if persistent: + let getFn: HexaryGetFn = proc(key: Blob): Blob = ps.base.db.get(key) + leaf = path.hexaryPath(ps.accRoot, getFn).leafData + else: + leaf = path.hexaryPath(ps.accRoot.to(RepairKey),ps.accDb).leafData + + if leaf.len == 0: + return err(AccountNotFound) + acc = rlp.decode(leaf,Account) + + return ok(acc) + +proc getAccountData*( + pv: SnapDbRef; ## Base descriptor on `BaseChainDB` + peer: Peer, ## For log messages, only + root: Hash256; ## state root + path: NodeKey; ## Account to visit + ): Result[Account,HexaryDbError] = + ## Variant of `getAccount()` for persistent storage. + SnapDbSessionRef.init(pv, root, peer).getAccountData(path, persistent=true) # ------------------------------------------------------------------------------ -# Debugging (and playing with the hexary database) +# Public functions: additional helpers # ------------------------------------------------------------------------------ proc sortMerge*(base: openArray[NodeTag]): NodeTag = @@ -642,22 +714,14 @@ proc sortMerge*(acc: openArray[seq[PackedAccount]]): seq[PackedAccount] = result = toSeq(accounts.keys).sorted(cmp).mapIt(accounts[it]) proc getChainDbAccount*( - ps: AccountsDbSessionRef; + ps: SnapDbSessionRef; accHash: Hash256 ): Result[Account,HexaryDbError] = ## Fetch account via `BaseChainDB` - noRlpExceptionOops("getChainDbAccount()"): - let - getFn: HexaryGetFn = proc(key: Blob): Blob = ps.base.db.get(key) - leaf = accHash.to(NodeKey).hexaryPath(ps.accRoot, getFn).leafData - if 0 < leaf.len: - let acc = rlp.decode(leaf,Account) - return ok(acc) - - err(AccountNotFound) + ps.getAccountData(accHash.to(NodeKey),persistent=true) proc nextChainDbKey*( - ps: AccountsDbSessionRef; + ps: SnapDbSessionRef; accHash: Hash256 ): Result[Hash256,HexaryDbError] = ## Fetch the account path on the `BaseChainDB`, the one next to the @@ -675,7 +739,7 @@ proc nextChainDbKey*( err(AccountNotFound) proc prevChainDbKey*( - ps: AccountsDbSessionRef; + ps: SnapDbSessionRef; accHash: Hash256 ): Result[Hash256,HexaryDbError] = ## Fetch the account path on the `BaseChainDB`, the one before to the @@ -692,7 +756,11 @@ proc prevChainDbKey*( err(AccountNotFound) -proc assignPrettyKeys*(ps: AccountsDbSessionRef) = +# ------------------------------------------------------------------------------ +# Debugging (and playing with the hexary database) +# ------------------------------------------------------------------------------ + +proc assignPrettyKeys*(ps: SnapDbSessionRef) = ## Prepare foe pretty pringing/debugging. Run early enough this function ## sets the root key to `"$"`, for instance. noPpError("validate(1)"): @@ -710,22 +778,22 @@ proc assignPrettyKeys*(ps: AccountsDbSessionRef) = of Extension: discard node.eLink.toKey(ps) of Leaf: discard -proc dumpPath*(ps: AccountsDbSessionRef; key: NodeTag): seq[string] = +proc dumpPath*(ps: SnapDbSessionRef; key: NodeTag): seq[string] = ## Pretty print helper compiling the path into the repair tree for the ## argument `key`. noPpError("dumpPath"): let rPath= key.to(NodeKey).hexaryPath(ps.accRoot.to(RepairKey), ps.accDb) result = rPath.path.mapIt(it.pp(ps.accDb)) & @["(" & rPath.tail.pp & ")"] -proc dumpAccDB*(ps: AccountsDbSessionRef; indent = 4): string = +proc dumpAccDB*(ps: SnapDbSessionRef; indent = 4): string = ## Dump the entries from the a generic accounts trie. ps.accDb.pp(ps.accRoot,indent) -proc getAcc*(ps: AccountsDbSessionRef): HexaryTreeDbRef = +proc getAcc*(ps: SnapDbSessionRef): HexaryTreeDbRef = ## Low level access to accounts DB ps.accDb -proc hexaryPpFn*(ps: AccountsDbSessionRef): HexaryPpFn = +proc hexaryPpFn*(ps: SnapDbSessionRef): HexaryPpFn = ## Key mapping function used in `HexaryTreeDB` ps.accDb.keyPp diff --git a/nimbus/sync/snap/worker/fetch_accounts.nim b/nimbus/sync/snap/worker/fetch_accounts.nim deleted file mode 100644 index e25685c53..000000000 --- a/nimbus/sync/snap/worker/fetch_accounts.nim +++ /dev/null @@ -1,566 +0,0 @@ -# Nimbus -# Copyright (c) 2021 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or -# http://www.apache.org/licenses/LICENSE-2.0) -# * MIT license ([LICENSE-MIT](LICENSE-MIT) or -# http://opensource.org/licenses/MIT) -# at your option. This file may not be copied, modified, or distributed -# except according to those terms. - -import - std/sequtils, - chronicles, - chronos, - eth/[common/eth_types, p2p], - stew/[interval_set, keyed_queue], - stint, - ../../sync_desc, - ".."/[range_desc, worker_desc], - ./com/[get_account_range, get_error, get_storage_ranges, get_trie_nodes], - ./accounts_db - -when snapAccountsDumpEnable: - import ../../../tests/replay/[undump_accounts, undump_storages] - -{.push raises: [Defect].} - -logScope: - topics = "snap-fetch" - -const - extraTraceMessages = false or true - ## Enabled additional logging noise - - maxTimeoutErrors = 2 - ## maximal number of non-resonses accepted in a row - -# ------------------------------------------------------------------------------ -# Private debugging -# ------------------------------------------------------------------------------ - -proc dumpBegin( - buddy: SnapBuddyRef; - iv: LeafRange; - dd: GetAccountRange; - error = NothingSerious) = - # For debuging, will go away - discard - when snapAccountsDumpEnable: - let ctx = buddy.ctx - if ctx.data.proofDumpOk: - let - peer = buddy.peer - env = ctx.data.pivotEnv - stateRoot = env.stateHeader.stateRoot - trace " Snap proofs dump", peer, enabled=ctx.data.proofDumpOk, iv - var - fd = ctx.data.proofDumpFile - try: - if error != NothingSerious: - fd.write " # Error: base=" & $iv.minPt & " msg=" & $error & "\n" - fd.write "# count ", $ctx.data.proofDumpInx & "\n" - fd.write stateRoot.dumpAccounts(iv.minPt, dd.data) & "\n" - except CatchableError: - discard - ctx.data.proofDumpInx.inc - -proc dumpStorage(buddy: SnapBuddyRef; data: AccountStorageRange) = - # For debuging, will go away - discard - when snapAccountsDumpEnable: - let ctx = buddy.ctx - if ctx.data.proofDumpOk: - let - peer = buddy.peer - env = ctx.data.pivotEnv - stateRoot = env.stateHeader.stateRoot - var - fd = ctx.data.proofDumpFile - try: - fd.write stateRoot.dumpStorages(data) & "\n" - except CatchableError: - discard - -proc dumpEnd(buddy: SnapBuddyRef) = - # For debuging, will go away - discard - when snapAccountsDumpEnable: - let ctx = buddy.ctx - if ctx.data.proofDumpOk: - var fd = ctx.data.proofDumpFile - fd.flushFile - -# ------------------------------------------------------------------------------ -# Private helpers -# ------------------------------------------------------------------------------ - -proc withMaxLen(buddy: SnapBuddyRef; iv: LeafRange): LeafRange = - ## Reduce accounts interval to maximal size - let maxlen = buddy.ctx.data.accountRangeMax - if 0 < iv.len and iv.len <= maxLen: - iv - else: - LeafRange.new(iv.minPt, iv.minPt + (maxLen - 1.u256)) - -# ------------------------------------------------------------------------------ -# Private functions -# ------------------------------------------------------------------------------ - -proc getUnprocessed(buddy: SnapBuddyRef): Result[LeafRange,void] = - ## Fetch an interval from the account range list. Use the `pivotAccount` - ## value as a start entry to fetch data from, wrapping around if necessary. - let - ctx = buddy.ctx - env = ctx.data.pivotEnv - pivotPt = env.pivotAccount - - block: - # Take the next interval to the right (aka ge) `pivotPt` - let rc = env.availAccounts.ge(pivotPt) - if rc.isOk: - let iv = buddy.withMaxLen(rc.value) - discard env.availAccounts.reduce(iv) - return ok(iv) - - block: - # Check whether the `pivotPt` is in the middle of an interval - let rc = env.availAccounts.envelope(pivotPt) - if rc.isOk: - let iv = buddy.withMaxLen(LeafRange.new(pivotPt, rc.value.maxPt)) - discard env.availAccounts.reduce(iv) - return ok(iv) - - block: - # Otherwise wrap around - let rc = env.availAccounts.ge() - if rc.isOk: - let iv = buddy.withMaxLen(rc.value) - discard env.availAccounts.reduce(iv) - return ok(iv) - - err() - -proc putUnprocessed(buddy: SnapBuddyRef; iv: LeafRange) = - ## Shortcut - discard buddy.ctx.data.pivotEnv.availAccounts.merge(iv) - -proc delUnprocessed(buddy: SnapBuddyRef; iv: LeafRange) = - ## Shortcut - discard buddy.ctx.data.pivotEnv.availAccounts.reduce(iv) - - -proc stopAfterError( - buddy: SnapBuddyRef; - error: ComError; - ): Future[bool] - {.async.} = - ## Error handling after data protocol failed. - case error: - of ComResponseTimeout: - if maxTimeoutErrors <= buddy.data.errors.nTimeouts: - # Mark this peer dead, i.e. avoid fetching from this peer for a while - buddy.ctrl.zombie = true - else: - # Otherwise try again some time later. Nevertheless, stop the - # current action. - buddy.data.errors.nTimeouts.inc - await sleepAsync(5.seconds) - return true - - of ComNetworkProblem, - ComMissingProof, - ComAccountsMinTooSmall, - ComAccountsMaxTooLarge: - # Mark this peer dead, i.e. avoid fetching from this peer for a while - # buddy.data.stats.major.networkErrors.inc() - buddy.ctrl.zombie = true - return true - - of ComEmptyAccountsArguments, - ComEmptyRequestArguments, - ComInspectDbFailed, - ComImportAccountsFailed, - ComNoDataForProof, - ComNothingSerious: - discard - - of ComNoAccountsForStateRoot, - ComNoStorageForAccounts, - ComNoByteCodesAvailable, - ComNoTrieNodesAvailable, - ComTooManyByteCodes, - ComTooManyStorageSlots, - ComTooManyTrieNodes: - # Mark this peer dead, i.e. avoid fetching from this peer for a while - buddy.ctrl.zombie = true - return true - -# ------------------------------------------------------------------------------ -# Private functions: accounts -# ------------------------------------------------------------------------------ - -proc processAccounts( - buddy: SnapBuddyRef; - iv: LeafRange; ## Accounts range to process - ): Future[Result[void,ComError]] - {.async.} = - ## Process accounts and storage by bulk download on the current envirinment - # Reset error counts for detecting repeated timeouts - buddy.data.errors.nTimeouts = 0 - - # Process accounts - let - ctx = buddy.ctx - peer = buddy.peer - env = ctx.data.pivotEnv - stateRoot = env.stateHeader.stateRoot - - # Fetch data for this range delegated to `fetchAccounts()` - let dd = block: - let rc = await buddy.getAccountRange(stateRoot, iv) - if rc.isErr: - buddy.putUnprocessed(iv) # fail => interval back to pool - return err(rc.error) - rc.value - - let - nAccounts = dd.data.accounts.len - nStorage = dd.withStorage.len - - block: - let rc = ctx.data.accountsDb.importAccounts( - peer, stateRoot, iv.minPt, dd.data) - if rc.isErr: - # Bad data, just try another peer - buddy.putUnprocessed(iv) - buddy.ctrl.zombie = true - trace "Import failed, restoring unprocessed accounts", peer, stateRoot, - range=dd.consumed, nAccounts, nStorage, error=rc.error - - buddy.dumpBegin(iv, dd, rc.error) # FIXME: Debugging (will go away) - buddy.dumpEnd() # FIXME: Debugging (will go away) - return err(ComImportAccountsFailed) - - buddy.dumpBegin(iv, dd) # FIXME: Debugging (will go away) - - # Statistics - env.nAccounts.inc(nAccounts) - env.nStorage.inc(nStorage) - - # Register consumed intervals on the accumulator over all state roots - discard buddy.ctx.data.coveredAccounts.merge(dd.consumed) - - # Register consumed and bulk-imported (well, not yet) accounts range - block registerConsumed: - block: - # Both intervals `min(iv)` and `min(dd.consumed)` are equal - let rc = iv - dd.consumed - if rc.isOk: - # Now, `dd.consumed` < `iv`, return some unused range - buddy.putUnprocessed(rc.value) - break registerConsumed - block: - # The processed interval might be a bit larger - let rc = dd.consumed - iv - if rc.isOk: - # Remove from unprocessed data. If it is not unprocessed, anymore - # then it was doubly processed which is ok. - buddy.delUnprocessed(rc.value) - break registerConsumed - # End registerConsumed - - # Store accounts on the storage TODO list. - discard env.leftOver.append SnapSlotQueueItemRef(q: dd.withStorage) - - return ok() - -# ------------------------------------------------------------------------------ -# Private functions: accounts storage -# ------------------------------------------------------------------------------ - -proc fetchAndImportStorageSlots( - buddy: SnapBuddyRef; - reqSpecs: seq[AccountSlotsHeader]; - ): Future[Result[seq[SnapSlotQueueItemRef],ComError]] - {.async.} = - ## Fetch storage slots data from the network, store it on disk and - ## return yet unprocessed data. - let - ctx = buddy.ctx - peer = buddy.peer - env = ctx.data.pivotEnv - stateRoot = env.stateHeader.stateRoot - - # Get storage slots - var stoRange = block: - let rc = await buddy.getStorageRanges(stateRoot, reqSpecs) - if rc.isErr: - return err(rc.error) - rc.value - - if 0 < stoRange.data.storages.len: - # ------------------------------ - buddy.dumpStorage(stoRange.data) - # ------------------------------ - - # Verify/process data and save to disk - block: - let rc = ctx.data.accountsDb.importStorages(peer, stoRange.data) - - if rc.isErr: - # Push back parts of the error item - var once = false - for w in rc.error: - if 0 <= w[0]: - # Reset any partial requests by not copying the `firstSlot` field. - # So all the storage slots are re-fetched completely for this - # account. - stoRange.addLeftOver( - @[AccountSlotsHeader( - accHash: stoRange.data.storages[w[0]].account.accHash, - storageRoot: stoRange.data.storages[w[0]].account.storageRoot)], - forceNew = not once) - once = true - # Do not ask for the same entries again on this `peer` - if once: - buddy.data.vetoSlots.incl stoRange.leftOver[^1] - - if rc.error[^1][0] < 0: - discard - # TODO: disk storage failed or something else happend, so what? - - # Return the remaining part to be processed later - return ok(stoRange.leftOver) - -proc processStorageSlots( - buddy: SnapBuddyRef; - ): Future[Result[void,ComError]] - {.async.} = - ## Fetch storage data and save it on disk. Storage requests are managed by - ## a request queue for handling partioal replies and re-fetch issues. For - ## all practical puroses, this request queue should mostly be empty. - let - ctx = buddy.ctx - peer = buddy.peer - env = ctx.data.pivotEnv - stateRoot = env.stateHeader.stateRoot - - while true: - # Pull out the next request item from the queue - var req: SnapSlotQueueItemRef - block getNextAvailableItem: - for w in env.leftOver.nextKeys: - # Make sure that this item was not fetched for, already - if w notin buddy.data.vetoSlots: - env.leftOver.del(w) - req = w - break getNextAvailableItem - return ok() - - block: - # Fetch and store account storage slots. On some sort of success, - # the `rc` return value contains a list of left-over items to be - # re-processed. - let rc = await buddy.fetchAndImportStorageSlots(req.q) - - if rc.isErr: - # Save accounts/storage list to be processed later, then stop - discard env.leftOver.append req - return err(rc.error) - - for qLo in rc.value: - # Handle queue left-overs for processing in the next cycle - if qLo.q[0].firstSlot == Hash256.default and 0 < env.leftOver.len: - # Appending to last queue item is preferred over adding new item - let item = env.leftOver.first.value - item.q = item.q & qLo.q - else: - # Put back as-is. - discard env.leftOver.append qLo - # End while - - return ok() - -# ------------------------------------------------------------------------------ -# Private functions: healing -# ------------------------------------------------------------------------------ - -proc accountsTrieHealing( - buddy: SnapBuddyRef; - env: SnapPivotRef; - envSource: string; - ): Future[Result[void,ComError]] - {.async.} = - ## ... - # Starting with a given set of potentially dangling nodes, this set is - # updated. - let - ctx = buddy.ctx - peer = buddy.peer - stateRoot = env.stateHeader.stateRoot - - while env.repairState != Done and - (env.dangling.len != 0 or env.repairState == Pristine): - - trace "Accounts healing loop", peer, repairState=env.repairState, - envSource, nDangling=env.dangling.len - - var needNodes: seq[Blob] - block getDanglingNodes: - let rc = ctx.data.accountsDb.inspectAccountsTrie( - peer, stateRoot, env.dangling) - if rc.isOk: - needNodes = rc.value.dangling - break getDanglingNodes - let error = rc.error - if error == TrieIsEmpty and env.dangling.len == 0: - when extraTraceMessages: - trace "Accounts healing on empty trie", peer, - repairState=env.repairState, envSource - return ok() - trace "Accounts healing failed", peer, repairState=env.repairState, - envSource, nDangling=env.dangling.len, error - return err(ComInspectDbFailed) - - # Update dangling nodes register so that other processes would not fetch - # the same list simultaneously. - if maxTrieNodeFetch < needNodes.len: - # No point in processing more at the same time. So leave the rest on - # the `dangling` queue. - env.dangling = needNodes[maxTrieNodeFetch ..< needNodes.len] - needNodes.setLen(maxTrieNodeFetch) - else: - env.dangling.setLen(0) - - # Noting to anymore - if needNodes.len == 0: - if env.repairState != Pristine: - env.repairState = Done - when extraTraceMessages: - trace "Done accounts healing for now", peer, - repairState=env.repairState, envSource, nDangling=env.dangling.len - return ok() - - let lastState = env.repairState - env.repairState = KeepGoing - - when extraTraceMessages: - trace "Need nodes for healing", peer, repairState=env.repairState, - envSource, nDangling=env.dangling.len, nNodes=needNodes.len - - # Fetch nodes - let dd = block: - let rc = await buddy.getTrieNodes(stateRoot, needNodes.mapIt(@[it])) - if rc.isErr: - env.dangling = needNodes - env.repairState = lastState - return err(rc.error) - rc.value - - # Store to disk and register left overs for the next pass - block: - let rc = ctx.data.accountsDb.importRawNodes(peer, dd.nodes) - if rc.isOk: - env.dangling = dd.leftOver.mapIt(it[0]) - elif 0 < rc.error.len and rc.error[^1][0] < 0: - # negative index => storage error - env.dangling = needNodes - else: - let nodeKeys = rc.error.mapIt(dd.nodes[it[0]]) - env.dangling = dd.leftOver.mapIt(it[0]) & nodeKeys - # End while - - return ok() - -# ------------------------------------------------------------------------------ -# Public functions -# ------------------------------------------------------------------------------ - -proc fetchAccounts*(buddy: SnapBuddyRef) {.async.} = - ## Fetch accounts and data and store them in the database. - ## - ## TODO: Healing for storages. Currently, healing in only run for accounts. - let - ctx = buddy.ctx - peer = buddy.peer - env = ctx.data.pivotEnv - stateRoot = env.stateHeader.stateRoot - var - # Complete the previous environment by trie database healing (if any) - healingEnvs = if not ctx.data.prevEnv.isNil: @[ctx.data.prevEnv] else: @[] - - block processAccountsFrame: - # Get a range of accounts to fetch from - let iv = block: - let rc = buddy.getUnprocessed() - if rc.isErr: - # Although there are no accounts left to process, the other peer might - # still work on some accounts. As a general rule, not all from an - # account range gets served so the remaining range will magically - # reappear on the unprocessed ranges database. - trace "No more unprocessed accounts (maybe)", peer, stateRoot - - # Complete healing for sporadic nodes missing. - healingEnvs.add env - break processAccountsFrame - rc.value - - when extraTraceMessages: - trace "Start fetching accounts", peer, stateRoot, iv, - repairState=env.repairState - - # Process received accounts and stash storage slots to fetch later - block: - let rc = await buddy.processAccounts(iv) - if rc.isErr: - let error = rc.error - if await buddy.stopAfterError(error): - buddy.dumpEnd() # FIXME: Debugging (will go away) - trace "Stop fetching cycle", peer, repairState=env.repairState, - processing="accounts", error - return - break processAccountsFrame - - # End `block processAccountsFrame` - - when extraTraceMessages: - trace "Start fetching storages", peer, nAccounts=env.leftOver.len, - nVetoSlots=buddy.data.vetoSlots.len, repairState=env.repairState - - # Process storage slots from environment batch - block: - let rc = await buddy.processStorageSlots() - if rc.isErr: - let error = rc.error - if await buddy.stopAfterError(error): - buddy.dumpEnd() # FIXME: Debugging (will go away) - trace "Stop fetching cycle", peer, repairState=env.repairState, - processing="storage", error - return - - # Check whether there is some environment that can be completed by - # Patricia Merkle Tree healing. - for w in healingEnvs: - let envSource = if env == ctx.data.pivotEnv: "pivot" else: "retro" - when extraTraceMessages: - trace "Start accounts healing", peer, repairState=env.repairState, - envSource, dangling=w.dangling.len - - let rc = await buddy.accountsTrieHealing(w, envSource) - if rc.isErr: - let error = rc.error - if await buddy.stopAfterError(error): - buddy.dumpEnd() # FIXME: Debugging (will go away) - when extraTraceMessages: - trace "Stop fetching cycle", peer, repairState=env.repairState, - processing="healing", dangling=w.dangling.len, error - return - - buddy.dumpEnd() # FIXME: Debugging (will go away) - when extraTraceMessages: - trace "Done fetching cycle", peer, repairState=env.repairState - -# ------------------------------------------------------------------------------ -# End -# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/heal_accounts.nim b/nimbus/sync/snap/worker/heal_accounts.nim new file mode 100644 index 000000000..59acd96c5 --- /dev/null +++ b/nimbus/sync/snap/worker/heal_accounts.nim @@ -0,0 +1,380 @@ +# Nimbus +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +## Heal accounts DB: +## ================= +## +## Flow chart for healing algorithm +## -------------------------------- +## :: +## START with {state-root} +## | +## | +------------------------------------------------+ +## | | | +## v v | +## --> {missing-account-nodes} | +## | | | +## v v | +## {leaf-nodes} | +## | | | +## v v | +## | +## | | | +## v v | +## {storage-roots} {check-account-nodes} ---------+ +## | +## v +## +## +## Legend: +## * `<..>` some action, process, etc. +## * `{..}` some data set, list, or queue etc. +## +## Discussion of flow chart +## ------------------------ +## * Input nodes for `` are checked for dangling child +## node links which in turn are collected as output. +## +## * Nodes of the `{missing-account-nodes}` list are fetched from the network +## and merged into the accounts trie database. Successfully processed nodes +## are collected in the `{check-account-nodes}` list which is fed back into +## the `` process. +## +## * If there is a problem with a node travelling from the source list +## `{missing-account-nodes}` towards the target list `{check-account-nodes}`, +## this problem node will simply held back in the source list. +## +## In order to avoid unnecessary stale entries, the `{missing-account-nodes}` +## list is regularly checked for whether nodes are still missing or some +## other process has done the magic work of merging some of then into the +## trie database. +## +## Competing with other trie algorithms +## ------------------------------------ +## * Healing runs (semi-)parallel to processing `GetAccountRange` network +## messages from the `snap/1` protocol. This is more network bandwidth +## efficient in comparison with the healing algorithm. Here, leaf nodes are +## transferred wholesale while with the healing algorithm, only the top node +## of a sub-trie can be transferred at once (but for multiple sub-tries). +## +## * The healing algorithm visits all nodes of a complete trie unless it is +## stopped in between. +## +## * If a trie node is missing, it can be fetched directly by the healing +## algorithm or one can wait for another process to do the job. Waiting for +## other processes to do the job also applies to problem nodes as indicated +## in the last bullet item of the previous chapter. +## +## * Network bandwidth can be saved if nodes are fetched by a more efficient +## process (if that is available.) This suggests that fetching missing trie +## nodes by the healing algorithm should kick in very late when the trie +## database is nearly complete. +## +## * Healing applies to a trie database associated with the currently latest +## *state root*, which may change occasionally. It suggests to start the +## healing algorithm very late altogether (not fetching nodes, only) because +## most trie databases will never be completed by healing. +## + +import + std/sequtils, + chronicles, + chronos, + eth/[common/eth_types, p2p, trie/trie_defs], + stew/[interval_set, keyed_queue], + ../../../utils/prettify, + ../../sync_desc, + ".."/[range_desc, worker_desc], + ./com/get_trie_nodes, + ./db/snap_db + +{.push raises: [Defect].} + +logScope: + topics = "snap-fetch" + +const + extraTraceMessages = false or true + ## Enabled additional logging noise + +# ------------------------------------------------------------------------------ +# Helpers +# ------------------------------------------------------------------------------ + +proc coverageInfo(buddy: SnapBuddyRef): string = + ## Logging helper ... + let + ctx = buddy.ctx + env = buddy.data.pivotEnv + env.fetchAccounts.emptyFactor.toPC(0) & + "/" & + ctx.data.coveredAccounts.fullFactor.toPC(0) + +proc getCoveringLeafRangeSet(buddy: SnapBuddyRef; pt: NodeTag): LeafRangeSet = + ## Helper ... + let env = buddy.data.pivotEnv + for ivSet in env.fetchAccounts: + if 0 < ivSet.covered(pt,pt): + return ivSet + +proc commitLeafAccount(buddy: SnapBuddyRef; ivSet: LeafRangeSet; pt: NodeTag) = + ## Helper ... + discard ivSet.reduce(pt,pt) + discard buddy.ctx.data.coveredAccounts.merge(pt,pt) + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +proc updateMissingNodesList(buddy: SnapBuddyRef) = + ## Check whether previously missing nodes from the `missingAccountNodes` list + ## have been magically added to the database since it was checked last time. + ## These nodes will me moved to `checkAccountNodes` for further processing. + let + ctx = buddy.ctx + peer = buddy.peer + env = buddy.data.pivotEnv + stateRoot = env.stateHeader.stateRoot + var + nodes: seq[Blob] + + for accKey in env.missingAccountNodes: + let rc = ctx.data.snapDb.getAccountNodeKey(peer, stateRoot, accKey) + if rc.isOk: + # Check nodes for dangling links + env.checkAccountNodes.add acckey + else: + # Node is still missing + nodes.add acckey + + env.missingAccountNodes = nodes + + +proc mergeIsolatedAccounts( + buddy: SnapBuddyRef; + paths: openArray[NodeKey]; + ): seq[AccountSlotsHeader] = + ## Process leaves found with nodes inspection, returns a list of + ## storage slots for these nodes. + let + ctx = buddy.ctx + peer = buddy.peer + env = buddy.data.pivotEnv + stateRoot = env.stateHeader.stateRoot + + # Remove reported leaf paths from the accounts interval + for accKey in paths: + let + pt = accKey.to(NodeTag) + ivSet = buddy.getCoveringLeafRangeSet(pt) + if not ivSet.isNil: + let + rc = ctx.data.snapDb.getAccountData(peer, stateRoot, accKey) + accountHash = Hash256(data: accKey.ByteArray32) + if rc.isOk: + let storageRoot = rc.value.storageRoot + when extraTraceMessages: + let stRootStr = if storageRoot != emptyRlpHash: $storageRoot + else: "emptyRlpHash" + trace "Registered isolated persistent account", peer, accountHash, + storageRoot=stRootStr + if storageRoot != emptyRlpHash: + result.add AccountSlotsHeader( + accHash: accountHash, + storageRoot: storageRoot) + buddy.commitLeafAccount(ivSet, pt) + env.nAccounts.inc + continue + + when extraTraceMessages: + let error = rc.error + trace "Get persistent account problem", peer, accountHash, error + + +proc fetchDanglingNodesList( + buddy: SnapBuddyRef + ): Result[TrieNodeStat,HexaryDbError] = + ## Starting with a given set of potentially dangling account nodes + ## `checkAccountNodes`, this set is filtered and processed. The outcome + ## is fed back to the vey same list `checkAccountNodes` + let + ctx = buddy.ctx + peer = buddy.peer + env = buddy.data.pivotEnv + stateRoot = env.stateHeader.stateRoot + + maxLeaves = if env.checkAccountNodes.len == 0: 0 + else: maxHealingLeafPaths + + rc = ctx.data.snapDb.inspectAccountsTrie( + peer, stateRoot, env.checkAccountNodes, maxLeaves) + + if rc.isErr: + # Attempt to switch peers, there is not much else we can do here + buddy.ctrl.zombie = true + return err(rc.error) + + # Global/env batch list to be replaced by by `rc.value.leaves` return value + env.checkAccountNodes.setLen(0) + + # Store accounts leaves on the storage batch list. + let withStorage = buddy.mergeIsolatedAccounts(rc.value.leaves) + if 0 < withStorage.len: + discard env.fetchStorage.append SnapSlotQueueItemRef(q: withStorage) + when extraTraceMessages: + trace "Accounts healing storage nodes", peer, + nAccounts=env.nAccounts, + covered=buddy.coverageInfo(), + nWithStorage=withStorage.len, + nDangling=rc.value.dangling + + return ok(rc.value) + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc healAccountsDb*(buddy: SnapBuddyRef) {.async.} = + ## Fetching and merging missing account trie database nodes. + let + ctx = buddy.ctx + peer = buddy.peer + env = buddy.data.pivotEnv + stateRoot = env.stateHeader.stateRoot + + # Only start healing if there is some completion level, already. + # + # We check against the global coverage factor, i.e. a measure for how + # much of the total of all accounts have been processed. Even if the trie + # database for the current pivot state root is sparsely filled, there + # is a good chance that it can inherit some unchanged sub-trie from an + # earlier pivor state root download. The healing process then works like + # sort of glue. + # + if env.nAccounts == 0 or + ctx.data.coveredAccounts.fullFactor < healAccountsTrigger: + when extraTraceMessages: + trace "Accounts healing postponed", peer, + nAccounts=env.nAccounts, + covered=buddy.coverageInfo(), + nCheckAccountNodes=env.checkAccountNodes.len, + nMissingAccountNodes=env.missingAccountNodes.len + return + + when extraTraceMessages: + trace "Start accounts healing", peer, + nAccounts=env.nAccounts, + covered=buddy.coverageInfo(), + nCheckAccountNodes=env.checkAccountNodes.len, + nMissingAccountNodes=env.missingAccountNodes.len + + # Update for changes since last visit + buddy.updateMissingNodesList() + + # If `checkAccountNodes` is empty, healing is at the very start or + # was postponed in which case `missingAccountNodes` is non-empty. + var + nodesMissing: seq[Blob] # Nodes to process by this instance + nLeaves = 0 # For logging + if 0 < env.checkAccountNodes.len or env.missingAccountNodes.len == 0: + let rc = buddy.fetchDanglingNodesList() + if rc.isErr: + error "Accounts healing failed => stop", peer, + nAccounts=env.nAccounts, + covered=buddy.coverageInfo(), + nCheckAccountNodes=env.checkAccountNodes.len, + nMissingAccountNodes=env.missingAccountNodes.len, + error=rc.error + return + + nodesMissing = rc.value.dangling + nLeaves = rc.value.leaves.len + + # Check whether the trie is complete. + if nodesMissing.len == 0 and env.missingAccountNodes.len == 0: + when extraTraceMessages: + trace "Accounts healing complete", peer, + nAccounts=env.nAccounts, + covered=buddy.coverageInfo(), + nCheckAccountNodes=0, + nMissingAccountNodes=0, + nNodesMissing=0, + nLeaves + return # nothing to do + + # Ok, clear global `env.missingAccountNodes` list and process `nodesMissing`. + nodesMissing = nodesMissing & env.missingAccountNodes + env.missingAccountNodes.setlen(0) + + # Fetch nodes, merge it into database and feed back results + while 0 < nodesMissing.len: + var fetchNodes: seq[Blob] + # There is no point in processing too many nodes at the same time. So + # leave the rest on the `nodesMissing` queue for a moment. + if maxTrieNodeFetch < nodesMissing.len: + let inxLeft = nodesMissing.len - maxTrieNodeFetch + fetchNodes = nodesMissing[inxLeft ..< nodesMissing.len] + nodesMissing.setLen(inxLeft) + else: + fetchNodes = nodesMissing + nodesMissing.setLen(0) + + when extraTraceMessages: + trace "Accounts healing loop", peer, + nAccounts=env.nAccounts, + covered=buddy.coverageInfo(), + nCheckAccountNodes=env.checkAccountNodes.len, + nMissingAccountNodes=env.missingAccountNodes.len, + nNodesMissing=nodesMissing.len, + nLeaves + + # Fetch nodes from the network + let dd = block: + let rc = await buddy.getTrieNodes(stateRoot, fetchNodes.mapIt(@[it])) + if rc.isErr: + env.missingAccountNodes = env.missingAccountNodes & fetchNodes + when extraTraceMessages: + trace "Error fetching account nodes for healing", peer, + nAccounts=env.nAccounts, + covered=buddy.coverageInfo(), + nCheckAccountNodes=env.checkAccountNodes.len, + nMissingAccountNodes=env.missingAccountNodes.len, + nNodesMissing=nodesMissing.len, + nLeaves, + error=rc.error + # Just run the next lap + continue + rc.value + + # Store to disk and register left overs for the next pass + block: + let rc = ctx.data.snapDb.importRawNodes(peer, dd.nodes) + if rc.isOk: + env.checkAccountNodes = env.checkAccountNodes & dd.leftOver.mapIt(it[0]) + elif 0 < rc.error.len and rc.error[^1][0] < 0: + # negative index => storage error + env.missingAccountNodes = env.missingAccountNodes & fetchNodes + else: + env.missingAccountNodes = env.missingAccountNodes & + dd.leftOver.mapIt(it[0]) & rc.error.mapIt(dd.nodes[it[0]]) + + # End while + + when extraTraceMessages: + trace "Done accounts healing", peer, + nAccounts=env.nAccounts, + covered=buddy.coverageInfo(), + nCheckAccountNodes=env.checkAccountNodes.len, + nMissingAccountNodes=env.missingAccountNodes.len, + nLeaves + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/store_accounts.nim b/nimbus/sync/snap/worker/store_accounts.nim new file mode 100644 index 000000000..64ae92202 --- /dev/null +++ b/nimbus/sync/snap/worker/store_accounts.nim @@ -0,0 +1,186 @@ + +# Nimbus +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +## Fetch accounts stapshot +## ======================= +## +## Worker items state diagram: +## :: +## unprocessed | peer workers + | +## account ranges | account database update | unprocessed storage slots +## ======================================================================== +## +## +---------------------------------------+ +## | | +## v | +## -----+------> ------+-----> OUTPUT +## | | +## +------> ------+ +## | | +## +------> ------+ +## : : +## + +import + chronicles, + chronos, + eth/[common/eth_types, p2p], + stew/[interval_set, keyed_queue], + stint, + ../../sync_desc, + ".."/[range_desc, worker_desc], + ./com/[com_error, get_account_range], + ./db/snap_db + +{.push raises: [Defect].} + +logScope: + topics = "snap-fetch" + +const + extraTraceMessages = false or true + ## Enabled additional logging noise + +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +proc withMaxLen( + buddy: SnapBuddyRef; + iv: LeafRange; + maxlen: UInt256; + ): LeafRange = + ## Reduce accounts interval to maximal size + if 0 < iv.len and iv.len <= maxLen: + iv + else: + LeafRange.new(iv.minPt, iv.minPt + (maxLen - 1.u256)) + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +proc getUnprocessed(buddy: SnapBuddyRef): Result[LeafRange,void] = + ## Fetch an interval from one of the account range lists. + let + env = buddy.data.pivotEnv + accountRangeMax = high(UInt256) div buddy.ctx.buddiesMax.u256 + + for ivSet in env.fetchAccounts: + let rc = ivSet.ge() + if rc.isOk: + let iv = buddy.withMaxLen(rc.value, accountRangeMax) + discard ivSet.reduce(iv) + return ok(iv) + + err() + +proc putUnprocessed(buddy: SnapBuddyRef; iv: LeafRange) = + ## Shortcut + discard buddy.data.pivotEnv.fetchAccounts[1].merge(iv) + +proc delUnprocessed(buddy: SnapBuddyRef; iv: LeafRange) = + ## Shortcut + discard buddy.data.pivotEnv.fetchAccounts[1].reduce(iv) + +proc markGloballyProcessed(buddy: SnapBuddyRef; iv: LeafRange) = + ## Shortcut + discard buddy.ctx.data.coveredAccounts.merge(iv) + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc storeAccounts*(buddy: SnapBuddyRef) {.async.} = + ## Fetch accounts and store them in the database. + let + ctx = buddy.ctx + peer = buddy.peer + env = buddy.data.pivotEnv + stateRoot = env.stateHeader.stateRoot + + # Get a range of accounts to fetch from + let iv = block: + let rc = buddy.getUnprocessed() + if rc.isErr: + trace "Currently no unprocessed accounts", peer, stateRoot + return + rc.value + + when extraTraceMessages: + trace "Start fetching accounts", peer, stateRoot, iv + + # Process received accounts and stash storage slots to fetch later + let dd = block: + let rc = await buddy.getAccountRange(stateRoot, iv) + if rc.isErr: + buddy.putUnprocessed(iv) # fail => interval back to pool + let error = rc.error + if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): + when extraTraceMessages: + trace "Error fetching accounts => stop", peer, error + return + # Reset error counts for detecting repeated timeouts + buddy.data.errors.nTimeouts = 0 + rc.value + + let + nAccounts = dd.data.accounts.len + nStorage = dd.withStorage.len + + block: + let rc = ctx.data.snapDb.importAccounts(peer, stateRoot, iv.minPt, dd.data) + if rc.isErr: + # Bad data, just try another peer + buddy.putUnprocessed(iv) + buddy.ctrl.zombie = true + when extraTraceMessages: + let error = ComImportAccountsFailed + trace "Accounts import failed => stop", peer, stateRoot, + range=dd.consumed, nAccounts, nStorage, error + return + + # Statistics + env.nAccounts.inc(nAccounts) + env.nStorage.inc(nStorage) + + # Register consumed intervals on the accumulator over all state roots + buddy.markGloballyProcessed(dd.consumed) + + # Register consumed and bulk-imported (well, not yet) accounts range + block registerConsumed: + block: + # Both intervals `min(iv)` and `min(dd.consumed)` are equal + let rc = iv - dd.consumed + if rc.isOk: + # Now, `dd.consumed` < `iv`, return some unused range + buddy.putUnprocessed(rc.value) + break registerConsumed + block: + # The processed interval might be a bit larger + let rc = dd.consumed - iv + if rc.isOk: + # Remove from unprocessed data. If it is not unprocessed, anymore + # then it was doubly processed which is ok. + buddy.delUnprocessed(rc.value) + break registerConsumed + # End registerConsumed + + # Store accounts on the storage TODO list. + discard env.fetchStorage.append SnapSlotQueueItemRef(q: dd.withStorage) + + when extraTraceMessages: + let withStorage = dd.withStorage.len + trace "Done fetching accounts", peer, stateRoot, nAccounts, withStorage, iv + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/store_storages.nim b/nimbus/sync/snap/worker/store_storages.nim new file mode 100644 index 000000000..0ee607168 --- /dev/null +++ b/nimbus/sync/snap/worker/store_storages.nim @@ -0,0 +1,177 @@ +# Nimbus +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +## Fetch accounts stapshot +## ======================= +## +## Worker items state diagram: +## :: +## unprocessed slot requests | peer workers + storages database update +## =================================================================== +## +## +-----------------------------------------------+ +## | | +## v | +## ------------+-------> ------+ +## | | +## +-------> ------+ +## | | +## +-------> ------+ +## : : +## + +import + chronicles, + chronos, + eth/[common/eth_types, p2p], + stew/keyed_queue, + stint, + ../../sync_desc, + ".."/[range_desc, worker_desc], + ./com/[com_error, get_storage_ranges], + ./db/snap_db + +{.push raises: [Defect].} + +logScope: + topics = "snap-fetch" + +const + extraTraceMessages = false or true + ## Enabled additional logging noise + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +proc getNextSlotItem(buddy: SnapBuddyRef): Result[SnapSlotQueueItemRef,void] = + let env = buddy.data.pivotEnv + for w in env.fetchStorage.nextKeys: + # Make sure that this item was not fetched and rejected earlier + if w notin buddy.data.vetoSlots: + env.fetchStorage.del(w) + return ok(w) + err() + +proc fetchAndImportStorageSlots( + buddy: SnapBuddyRef; + reqSpecs: seq[AccountSlotsHeader]; + ): Future[Result[seq[SnapSlotQueueItemRef],ComError]] + {.async.} = + ## Fetch storage slots data from the network, store it on disk and + ## return data to process in the next cycle. + let + ctx = buddy.ctx + peer = buddy.peer + env = buddy.data.pivotEnv + stateRoot = env.stateHeader.stateRoot + + # Get storage slots + var stoRange = block: + let rc = await buddy.getStorageRanges(stateRoot, reqSpecs) + if rc.isErr: + return err(rc.error) + rc.value + + if 0 < stoRange.data.storages.len: + # Verify/process data and save to disk + block: + let rc = ctx.data.snapDb.importStorages(peer, stoRange.data) + + if rc.isErr: + # Push back parts of the error item + var once = false + for w in rc.error: + if 0 <= w[0]: + # Reset any partial requests by not copying the `firstSlot` field. + # So all the storage slots are re-fetched completely for this + # account. + stoRange.addLeftOver( + @[AccountSlotsHeader( + accHash: stoRange.data.storages[w[0]].account.accHash, + storageRoot: stoRange.data.storages[w[0]].account.storageRoot)], + forceNew = not once) + once = true + # Do not ask for the same entries again on this `peer` + if once: + buddy.data.vetoSlots.incl stoRange.leftOver[^1] + + if rc.error[^1][0] < 0: + discard + # TODO: disk storage failed or something else happend, so what? + + # Return the remaining part to be processed later + return ok(stoRange.leftOver) + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc storeStorages*(buddy: SnapBuddyRef) {.async.} = + ## Fetch account storage slots and store them in the database. + let + ctx = buddy.ctx + peer = buddy.peer + env = buddy.data.pivotEnv + stateRoot = env.stateHeader.stateRoot + var + once = true # for logging + + # Fetch storage data and save it on disk. Storage requests are managed by + # a request queue for handling partioal replies and re-fetch issues. For + # all practical puroses, this request queue should mostly be empty. + while true: + # Pull out the next request item from the queue + let req = block: + let rc = buddy.getNextSlotItem() + if rc.isErr: + return # currently nothing to do + rc.value + + when extraTraceMessages: + if once: + once = false + let nAccounts = 1 + env.fetchStorage.len + trace "Start fetching storage slotss", peer, + nAccounts, nVetoSlots=buddy.data.vetoSlots.len + + block: + # Fetch and store account storage slots. On success, the `rc` value will + # contain a list of left-over items to be re-processed. + let rc = await buddy.fetchAndImportStorageSlots(req.q) + if rc.isErr: + # Save accounts/storage list to be processed later, then stop + discard env.fetchStorage.append req + let error = rc.error + if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): + trace "Error fetching storage slots => stop", peer, error + discard + return + + # Reset error counts for detecting repeated timeouts + buddy.data.errors.nTimeouts = 0 + + for qLo in rc.value: + # Handle queue left-overs for processing in the next cycle + if qLo.q[0].firstSlot == Hash256() and 0 < env.fetchStorage.len: + # Appending to last queue item is preferred over adding new item + let item = env.fetchStorage.first.value + item.q = item.q & qLo.q + else: + # Put back as-is. + discard env.fetchStorage.append qLo + # End while + + when extraTraceMessages: + trace "Done fetching storage slots", peer, nAccounts=env.fetchStorage.len + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/ticker.nim b/nimbus/sync/snap/worker/ticker.nim index 615d5af56..4c1f59b4c 100644 --- a/nimbus/sync/snap/worker/ticker.nim +++ b/nimbus/sync/snap/worker/ticker.nim @@ -30,8 +30,7 @@ type nStorage*: (float,float) ## mean and standard deviation accountsFill*: (float,float,float) ## mean, standard deviation, merged total accCoverage*: float ## as factor - activeQueues*: int - flushedQueues*: int64 + nQueues*: int TickerStatsUpdater* = proc: TickerStats {.gcsafe, raises: [Defect].} @@ -119,19 +118,18 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} = accCov = data.accountsFill[0].toPC(1) & "(" & data.accountsFill[1].toPC(1) & ")" & "/" & data.accountsFill[2].toPC(0) - flushed = data.flushedQueues buddies = t.nBuddies tick = t.tick.toSI mem = getTotalMem().uint.toSI noFmtError("runLogTicker"): if data.pivotBlock.isSome: - pivot = &"#{data.pivotBlock.get}/{data.activeQueues}" + pivot = &"#{data.pivotBlock.get}/{data.nQueues}" nAcc = &"{(data.nAccounts[0]+0.5).int64}({(data.nAccounts[1]+0.5).int64})" nStore = &"{(data.nStorage[0]+0.5).int64}({(data.nStorage[1]+0.5).int64})" info "Snap sync statistics", - tick, buddies, pivot, nAcc, accCov, nStore, flushed, mem + tick, buddies, pivot, nAcc, accCov, nStore, mem t.tick.inc t.setLogTicker(Moment.fromNow(tickerLogInterval)) diff --git a/nimbus/sync/snap/worker_desc.nim b/nimbus/sync/snap/worker_desc.nim index 5053f0b5f..dd702de9a 100644 --- a/nimbus/sync/snap/worker_desc.nim +++ b/nimbus/sync/snap/worker_desc.nim @@ -1,5 +1,4 @@ -# Nimbus - New sync approach - A fusion of snap, trie, beam and other methods -# +# Nimbus # Copyright (c) 2021 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or @@ -13,10 +12,9 @@ import std/[hashes, sequtils, strutils], eth/[common/eth_types, p2p], stew/[byteutils, keyed_queue], - ../../db/select_backend, - ../../constants, + "../.."/[constants, db/select_backend], ".."/[sync_desc, types], - ./worker/[accounts_db, ticker], + ./worker/[com/com_error, db/snap_db, ticker], ./range_desc {.push raises: [Defect].} @@ -25,14 +23,40 @@ const snapRequestBytesLimit* = 2 * 1024 * 1024 ## Soft bytes limit to request in `snap` protocol calls. - maxPivotBlockWindow* = 128 - ## The maximal depth of two block headers. If the pivot block header - ## (containing the current state root) is more than this many blocks - ## away from a new pivot block header candidate, then the latter one - ## replaces the current block header. + minPivotBlockDistance* = 128 + ## The minimal depth of two block headers needed to activate a new state + ## root pivot. ## - ## This mechanism applies to new worker buddies which start by finding - ## a new pivot. + ## Effects on assembling the state via `snap/1` protocol: + ## + ## * A small value of this constant increases the propensity to update the + ## pivot header more often. This is so because each new peer negoiates a + ## pivot block number at least the current one. + ## + ## * A large value keeps the current pivot more stable but some experiments + ## suggest that the `snap/1` protocol is answered only for later block + ## numbers (aka pivot blocks.) So a large value tends to keep the pivot + ## farther away from the chain head. + ## + ## Note that 128 is the magic distance for snapshots used by *Geth*. + + backPivotBlockDistance* = 64 + ## When a pivot header is found, move pivot back `backPivotBlockDistance` + ## blocks so that the pivot is guaranteed to have some distance from the + ## chain head. + ## + ## Set `backPivotBlockDistance` to zero for disabling this feature. + + backPivotBlockThreshold* = backPivotBlockDistance + minPivotBlockDistance + ## Ignore `backPivotBlockDistance` unless the current block number is + ## larger than this constant (which must be at least + ## `backPivotBlockDistance`.) + + healAccountsTrigger* = 0.95 + ## Apply accounts healing if the global snap download coverage factor + ## exceeds this setting. The global coverage factor is derived by merging + ## all account ranges retrieved for all pivot state roots (see + ## `coveredAccounts` in `CtxData`.) maxTrieNodeFetch* = 1024 ## Informal maximal number of trie nodes to fetch at once. This is nor @@ -41,42 +65,22 @@ const ## Resticting the fetch list length early allows to better paralellise ## healing. - switchPivotAfterCoverage* = 1.0 # * 0.30 - ## Stop fetching from the same pivot state root with this much coverage - ## and try to find a new one. Setting this value to `1.0`, this feature - ## is disabled. Note that settting low coverage levels is primarily for - ## testing/debugging (may produce stress conditions.) - ## - ## If this setting is active, it typically supersedes the pivot update - ## mechainsm implied by the `maxPivotBlockWindow`. This for the simple - ## reason that the pivot state root is typically near the head of the - ## block chain. - ## - ## This mechanism applies to running worker buddies. When triggered, all - ## pivot handlers are reset so they will start from scratch finding a - ## better pivot. + maxHealingLeafPaths* = 1024 + ## Retrieve this many leave nodes with proper 32 bytes path when inspecting + ## for dangling nodes. This allows to run healing paralell to accounts or + ## storage download without requestinng an account/storage slot found by + ## healing again with the download. - # --- + noPivotEnvChangeIfComplete* = true + ## If set `true`, new peers will not change the pivot even if the + ## negotiated pivot would be newer. This should be the default. - snapAccountsDumpEnable* = false # or true - ## Enable data dump - - snapAccountsDumpCoverageStop* = 0.99999 - ## Stop dumping if most accounts are covered + # ------- seenBlocksMax = 500 ## Internal size of LRU cache (for debugging) type - BuddyStat* = distinct uint - - SnapBuddyErrors* = tuple - ## particular error counters so connections will not be cut immediately - ## after a particular error. - nTimeouts: uint - - # ------- - WorkerSeenBlocks = KeyedQueue[array[32,byte],BlockNumber] ## Temporary for pretty debugging, `BlockHash` keyed lru cache @@ -94,23 +98,28 @@ type SnapSlotsSet* = HashSet[SnapSlotQueueItemRef] ## Ditto but without order, to be used as veto set - SnapRepairState* = enum - Pristine ## Not initialised yet - KeepGoing ## Unfinished repair process - Done ## Stop repairing + SnapAccountRanges* = array[2,LeafRangeSet] + ## Pair of account hash range lists. The first entry must be processed + ## first. This allows to coordinate peers working on different state roots + ## to avoid ovelapping accounts as long as they fetch from the first entry. SnapPivotRef* = ref object ## Per-state root cache for particular snap data environment stateHeader*: BlockHeader ## Pivot state, containg state root - pivotAccount*: NodeTag ## Random account - availAccounts*: LeafRangeSet ## Accounts to fetch (as ranges) + + # Accounts download + fetchAccounts*: SnapAccountRanges ## Sets of accounts ranges to fetch + checkAccountNodes*: seq[Blob] ## Nodes with prob. dangling child links + missingAccountNodes*: seq[Blob] ## Dangling links to fetch and merge + accountsDone*: bool ## All accounts have been processed + + # Storage slots download + fetchStorage*: SnapSlotsQueue ## Fetch storage for these accounts + serialSync*: bool ## Done with storage, block sync next + + # Info nAccounts*: uint64 ## Number of accounts imported nStorage*: uint64 ## Number of storage spaces imported - leftOver*: SnapSlotsQueue ## Re-fetch storage for these accounts - dangling*: seq[Blob] ## Missing nodes for healing process - repairState*: SnapRepairState ## State of healing process - when switchPivotAfterCoverage < 1.0: - minCoverageReachedOk*: bool ## Stop filling this pivot SnapPivotTable* = ##\ ## LRU table, indexed by state root @@ -118,33 +127,23 @@ type BuddyData* = object ## Per-worker local descriptor data extension - errors*: SnapBuddyErrors ## For error handling - workerPivot*: RootRef ## Opaque object reference for sub-module + errors*: ComErrorStatsRef ## For error handling + pivotFinder*: RootRef ## Opaque object reference for sub-module + pivotEnv*: SnapPivotRef ## Environment containing state root vetoSlots*: SnapSlotsSet ## Do not ask for these slots, again - BuddyPoolHookFn* = proc(buddy: BuddyRef[CtxData,BuddyData]) {.gcsafe.} - ## All buddies call back (the argument type is defined below with - ## pretty name `SnapBuddyRef`.) - CtxData* = object ## Globally shared data extension seenBlock: WorkerSeenBlocks ## Temporary, debugging, pretty logs rng*: ref HmacDrbgContext ## Random generator - coveredAccounts*: LeafRangeSet ## Derived from all available accounts dbBackend*: ChainDB ## Low level DB driver access (if any) - ticker*: TickerRef ## Ticker, logger pivotTable*: SnapPivotTable ## Per state root environment - pivotCount*: uint64 ## Total of all created tab entries - pivotEnv*: SnapPivotRef ## Environment containing state root - prevEnv*: SnapPivotRef ## Previous state root environment - pivotData*: RootRef ## Opaque object reference for sub-module - accountRangeMax*: UInt256 ## Maximal length, high(u256)/#peers - accountsDb*: AccountsDbRef ## Proof processing for accounts - runPoolHook*: BuddyPoolHookFn ## Callback for `runPool()` - when snapAccountsDumpEnable: - proofDumpOk*: bool - proofDumpFile*: File - proofDumpInx*: int + pivotFinderCtx*: RootRef ## Opaque object reference for sub-module + snapDb*: SnapDbRef ## Accounts snapshot DB + coveredAccounts*: LeafRangeSet ## Derived from all available accounts + + # Info + ticker*: TickerRef ## Ticker, logger SnapBuddyRef* = BuddyRef[CtxData,BuddyData] ## Extended worker peer descriptor @@ -152,16 +151,22 @@ type SnapCtxRef* = CtxRef[CtxData] ## Extended global descriptor +static: + doAssert healAccountsTrigger < 1.0 # larger values make no sense + doAssert backPivotBlockDistance <= backPivotBlockThreshold + # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ -proc inc(stat: var BuddyStat) {.borrow.} - proc hash*(a: SnapSlotQueueItemRef): Hash = ## Table/KeyedQueue mixin cast[pointer](a).hash +proc hash*(a: Hash256): Hash = + ## Table/KeyedQueue mixin + a.data.hash + # ------------------------------------------------------------------------------ # Public functions, debugging helpers (will go away eventually) # ------------------------------------------------------------------------------ diff --git a/tests/test_sync_snap.nim b/tests/test_sync_snap.nim index fa3ec7578..4616a276e 100644 --- a/tests/test_sync_snap.nim +++ b/tests/test_sync_snap.nim @@ -25,8 +25,8 @@ import ../nimbus/p2p/chain, ../nimbus/sync/types, ../nimbus/sync/snap/range_desc, - ../nimbus/sync/snap/worker/[accounts_db, db/hexary_desc, - db/hexary_inspect, db/rocky_bulk_load], + ../nimbus/sync/snap/worker/db/[hexary_desc, hexary_inspect, + rocky_bulk_load, snap_db,], ../nimbus/utils/prettify, ./replay/[pp, undump_blocks, undump_accounts, undump_storages], ./test_sync_snap/[bulk_test_xx, snap_test_xx, test_types] @@ -117,7 +117,7 @@ proc pp(rc: Result[Hash256,HexaryDbError]): string = proc pp( rc: Result[TrieNodeStat,HexaryDbError]; - db: AccountsDbSessionRef + db: SnapDbSessionRef ): string = if rc.isErr: $rc.error else: rc.value.pp(db.getAcc) @@ -284,21 +284,21 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) = suite &"SyncSnap: {fileInfo} accounts and proofs for {info}": var - desc: AccountsDbSessionRef + desc: SnapDbSessionRef accKeys: seq[Hash256] test &"Snap-proofing {accountsList.len} items for state root ..{root.pp}": let - dbBase = if persistent: AccountsDbRef.init(db.cdb[0]) - else: AccountsDbRef.init(newMemoryDB()) - dbDesc = AccountsDbSessionRef.init(dbBase, root, peer) + dbBase = if persistent: SnapDbRef.init(db.cdb[0]) + else: SnapDbRef.init(newMemoryDB()) + dbDesc = SnapDbSessionRef.init(dbBase, root, peer) for n,w in accountsList: check dbDesc.importAccounts(w.base, w.data, persistent) == OkHexDb test &"Merging {accountsList.len} proofs for state root ..{root.pp}": - let dbBase = if persistent: AccountsDbRef.init(db.cdb[1]) - else: AccountsDbRef.init(newMemoryDB()) - desc = AccountsDbSessionRef.init(dbBase, root, peer) + let dbBase = if persistent: SnapDbRef.init(db.cdb[1]) + else: SnapDbRef.init(newMemoryDB()) + desc = SnapDbSessionRef.init(dbBase, root, peer) # Load/accumulate data from several samples (needs some particular sort) let @@ -417,19 +417,19 @@ proc storagesRunner( suite &"SyncSnap: {fileInfo} accounts storage for {info}": let - dbBase = if persistent: AccountsDbRef.init(db.cdb[0]) - else: AccountsDbRef.init(newMemoryDB()) + dbBase = if persistent: SnapDbRef.init(db.cdb[0]) + else: SnapDbRef.init(newMemoryDB()) var - desc = AccountsDbSessionRef.init(dbBase, root, peer) + desc = SnapDbSessionRef.init(dbBase, root, peer) test &"Merging {accountsList.len} accounts for state root ..{root.pp}": for w in accountsList: - let desc = AccountsDbSessionRef.init(dbBase, root, peer) + let desc = SnapDbSessionRef.init(dbBase, root, peer) check desc.importAccounts(w.base, w.data, persistent) == OkHexDb test &"Merging {storagesList.len} storages lists": let - dbDesc = AccountsDbSessionRef.init(dbBase, root, peer) + dbDesc = SnapDbSessionRef.init(dbBase, root, peer) ignore = knownFailures.toTable for n,w in storagesList: let @@ -466,18 +466,18 @@ proc inspectionRunner( suite &"SyncSnap: inspect {fileInfo} lists for {info} for healing": let - memBase = AccountsDbRef.init(newMemoryDB()) - memDesc = AccountsDbSessionRef.init(memBase, Hash256(), peer) + memBase = SnapDbRef.init(newMemoryDB()) + memDesc = SnapDbSessionRef.init(memBase, Hash256(), peer) var singleStats: seq[(int,TrieNodeStat)] accuStats: seq[(int,TrieNodeStat)] - perBase,altBase: AccountsDbRef - perDesc,altDesc: AccountsDbSessionRef + perBase,altBase: SnapDbRef + perDesc,altDesc: SnapDbSessionRef if persistent: - perBase = AccountsDbRef.init(db.cdb[0]) - perDesc = AccountsDbSessionRef.init(perBase, Hash256(), peer) - altBase = AccountsDbRef.init(db.cdb[1]) - altDesc = AccountsDbSessionRef.init(altBase, Hash256(), peer) + perBase = SnapDbRef.init(db.cdb[0]) + perDesc = SnapDbSessionRef.init(perBase, Hash256(), peer) + altBase = SnapDbRef.init(db.cdb[1]) + altDesc = SnapDbSessionRef.init(altBase, Hash256(), peer) test &"Fingerprinting {inspectList.len} single accounts lists " & "for in-memory-db": @@ -486,7 +486,7 @@ proc inspectionRunner( let root = accList[0].root rootKey = root.to(NodeKey) - desc = AccountsDbSessionRef.init(memBase, root, peer) + desc = SnapDbSessionRef.init(memBase, root, peer) for w in accList: check desc.importAccounts(w.base, w.data, persistent=false) == OkHexDb let rc = desc.inspectAccountsTrie(persistent=false) @@ -510,8 +510,8 @@ proc inspectionRunner( let root = accList[0].root rootKey = root.to(NodeKey) - dbBase = AccountsDbRef.init(db.cdb[2+n]) - desc = AccountsDbSessionRef.init(dbBase, root, peer) + dbBase = SnapDbRef.init(db.cdb[2+n]) + desc = SnapDbSessionRef.init(dbBase, root, peer) for w in accList: check desc.importAccounts(w.base, w.data, persistent) == OkHexDb let rc = desc.inspectAccountsTrie(persistent=false) @@ -572,8 +572,8 @@ proc inspectionRunner( skip() else: let - cscBase = AccountsDbRef.init(newMemoryDB()) - cscDesc = AccountsDbSessionRef.init(cscBase, Hash256(), peer) + cscBase = SnapDbRef.init(newMemoryDB()) + cscDesc = SnapDbSessionRef.init(cscBase, Hash256(), peer) var cscStep: Table[NodeKey,(int,seq[Blob])] for n,accList in inspectList: @@ -588,7 +588,7 @@ proc inspectionRunner( cscStep[rootKey][0].inc let r0 = desc.inspectAccountsTrie(persistent=false) - rc = desc.inspectAccountsTrie(cscStep[rootKey][1],false) + rc = desc.inspectAccountsTrie(cscStep[rootKey][1],persistent=false) check rc.isOk let accumulated = r0.value.dangling.toHashSet @@ -620,7 +620,7 @@ proc inspectionRunner( cscStep[rootKey][0].inc let r0 = desc.inspectAccountsTrie(persistent=true) - rc = desc.inspectAccountsTrie(cscStep[rootKey][1],true) + rc = desc.inspectAccountsTrie(cscStep[rootKey][1],persistent=true) check rc.isOk let accumulated = r0.value.dangling.toHashSet