diff --git a/newBlockchainTests.md b/newBlockchainTests.md index 4214f5e56..da49b9e91 100644 --- a/newBlockchainTests.md +++ b/newBlockchainTests.md @@ -1,12 +1,5 @@ newBlockchainTests === -## bcArrowGlacierToMerge -```diff -+ difficultyFormula.json OK -+ powToPosBlockRejection.json OK -+ powToPosTest.json OK -``` -OK: 3/3 Fail: 0/3 Skip: 0/3 ## bcBerlinToLondon ```diff + BerlinToLondonTransition.json OK @@ -36,7 +29,6 @@ OK: 1/1 Fail: 0/1 Skip: 0/1 + baseFee.json OK + besuBaseFeeBug.json OK + burnVerify.json OK -+ burnVerifyLondon.json OK + checkGasLimit.json OK + feeCap.json OK + gasLimit20m.json OK @@ -48,29 +40,21 @@ OK: 1/1 Fail: 0/1 Skip: 0/1 + lowDemand.json OK + medDemand.json OK + tips.json OK -+ tipsLondon.json OK + transFail.json OK + transType.json OK + valCausesOOF.json OK ``` -OK: 21/21 Fail: 0/21 Skip: 0/21 +OK: 19/19 Fail: 0/19 Skip: 0/19 ## bcEIP158ToByzantium ```diff + ByzantiumTransition.json OK ``` OK: 1/1 Fail: 0/1 Skip: 0/1 -## bcEIP3675 -```diff -+ timestampPerBlock.json OK -+ tipInsideBlock.json OK -``` -OK: 2/2 Fail: 0/2 Skip: 0/2 ## bcExample ```diff + basefeeExample.json OK -+ mergeExample.json OK ``` -OK: 2/2 Fail: 0/2 Skip: 0/2 +OK: 1/1 Fail: 0/1 Skip: 0/1 ## bcExploitTest ```diff DelegateCallSpam.json Skip @@ -282,7 +266,6 @@ OK: 96/96 Fail: 0/96 Skip: 0/96 + RefundOverflow.json OK + RefundOverflow2.json OK + SuicidesMixingCoinbase.json OK -+ SuicidesMixingCoinbase2.json OK + TransactionFromCoinbaseHittingBlockGasLimit1.json OK + TransactionFromCoinbaseNotEnoughFounds.json OK + TransactionNonceCheck.json OK @@ -316,7 +299,6 @@ OK: 96/96 Fail: 0/96 Skip: 0/96 + extcodehashEmptySuicide.json OK + logRevert.json OK + multimpleBalanceInstruction.json OK -+ random.json OK + randomStatetest123.json OK + randomStatetest136.json OK + randomStatetest160.json OK @@ -362,7 +344,7 @@ OK: 96/96 Fail: 0/96 Skip: 0/96 + transactionFromSelfDestructedContract.json OK + txCost-sec73.json OK ``` -OK: 89/90 Fail: 0/90 Skip: 1/90 +OK: 87/88 Fail: 0/88 Skip: 1/88 ## bcTotalDifficultyTest ```diff + lotsOfBranchesOverrideAtTheEnd.json OK @@ -423,7 +405,6 @@ OK: 9/9 Fail: 0/9 Skip: 0/9 ## bcUncleTest ```diff + EqualUncleInTwoDifferentBlocks.json OK -+ EqualUncleInTwoDifferentBlocks2.json OK + InChainUncle.json OK + InChainUncleFather.json OK + InChainUncleGrandPa.json OK @@ -446,7 +427,7 @@ OK: 9/9 Fail: 0/9 Skip: 0/9 + uncleHeaderWithGeneration0.json OK + uncleWithSameBlockNumber.json OK ``` -OK: 23/23 Fail: 0/23 Skip: 0/23 +OK: 22/22 Fail: 0/22 Skip: 0/22 ## bcValidBlockTest ```diff + ExtraData32.json OK @@ -749,9 +730,8 @@ OK: 5/5 Fail: 0/5 Skip: 0/5 + callcodecallcodecallcode_111_SuicideEnd.json OK + callcodecallcodecallcode_111_SuicideMiddle.json OK callcodecallcodecallcode_ABCB_RECURSIVE.json Skip -+ touchAndGo.json OK ``` -OK: 73/80 Fail: 0/80 Skip: 7/80 +OK: 72/79 Fail: 0/79 Skip: 7/79 ## stCallCreateCallCodeTest ```diff Call1024BalanceTooLow.json Skip @@ -1026,7 +1006,6 @@ OK: 51/52 Fail: 0/52 Skip: 1/52 + CREATE_HighNonceMinus1.json OK + CREATE_empty000CreateinInitCode_Transaction.json OK + CodeInConstructor.json OK -+ CreateAddressWarmAfterFail.json OK + CreateCollisionResults.json OK + CreateCollisionToEmpty.json OK + CreateOOGFromCallRefunds.json OK @@ -1040,14 +1019,12 @@ OK: 51/52 Fail: 0/52 Skip: 1/52 + CreateOOGafterInitCodeRevert2.json OK + CreateOOGafterMaxCodesize.json OK + CreateResults.json OK -+ CreateTransactionHighNonce.json OK + TransactionCollisionToEmpty.json OK + TransactionCollisionToEmptyButCode.json OK + TransactionCollisionToEmptyButNonce.json OK + createFailResult.json OK -+ createLargeResult.json OK ``` -OK: 44/44 Fail: 0/44 Skip: 0/44 +OK: 41/41 Fail: 0/41 Skip: 0/41 ## stDelegatecallTestHomestead ```diff Call1024BalanceTooLow.json Skip @@ -1151,13 +1128,12 @@ OK: 40/40 Fail: 0/40 Skip: 0/40 + lowGasPriceOldTypes.json OK + outOfFunds.json OK + outOfFundsOldTypes.json OK -+ senderBalance.json OK + tipTooHigh.json OK + transactionIntinsicBug.json OK + typeTwoBerlin.json OK + valCausesOOF.json OK ``` -OK: 13/13 Fail: 0/13 Skip: 0/13 +OK: 12/12 Fail: 0/12 Skip: 0/12 ## stEIP158Specific ```diff + CALL_OneVCallSuicide.json OK @@ -1199,12 +1175,11 @@ OK: 5/5 Fail: 0/5 Skip: 0/5 + indexesOmitExample.json OK + invalidTr.json OK + labelsExample.json OK -+ mergeTest.json OK + rangesExample.json OK + solidityExample.json OK + yulExample.json OK ``` -OK: 12/12 Fail: 0/12 Skip: 0/12 +OK: 11/11 Fail: 0/11 Skip: 0/11 ## stExtCodeHash ```diff + callToNonExistent.json OK @@ -1482,7 +1457,6 @@ OK: 24/24 Fail: 0/24 Skip: 0/24 ## stPreCompiledContracts ```diff + blake2B.json OK -+ delegatecall09Undefined.json OK + idPrecomps.json OK + identity_to_bigger.json OK + identity_to_smaller.json OK @@ -1491,7 +1465,7 @@ OK: 24/24 Fail: 0/24 Skip: 0/24 + precompsEIP2929.json OK + sec80.json OK ``` -OK: 9/9 Fail: 0/9 Skip: 0/9 +OK: 8/8 Fail: 0/8 Skip: 0/8 ## stPreCompiledContracts2 ```diff + CALLBlake2f.json OK @@ -1559,7 +1533,6 @@ OK: 9/9 Fail: 0/9 Skip: 0/9 + CallEcrecoverS_prefixed0.json OK + CallEcrecoverUnrecoverableKey.json OK + CallEcrecoverV_prefixed0.json OK -+ CallEcrecover_Overflow.json OK + CallIdentitiy_0.json OK + CallIdentitiy_1.json OK + CallIdentity_1_nonzeroValue.json OK @@ -1589,15 +1562,13 @@ OK: 9/9 Fail: 0/9 Skip: 0/9 + CallSha256_4.json OK + CallSha256_4_gas99.json OK + CallSha256_5.json OK -+ ecrecoverShortBuff.json OK -+ ecrecoverWeirdV.json OK + modexpRandomInput.json OK + modexp_0_0_0_20500.json OK + modexp_0_0_0_22000.json OK + modexp_0_0_0_25000.json OK + modexp_0_0_0_35000.json OK ``` -OK: 102/102 Fail: 0/102 Skip: 0/102 +OK: 99/99 Fail: 0/99 Skip: 0/99 ## stQuadraticComplexityTest ```diff Call1MB1024Calldepth.json Skip @@ -2801,7 +2772,6 @@ OK: 13/13 Fail: 0/13 Skip: 0/13 + currentAccountBalance.json OK + doubleSelfdestructTest.json OK + doubleSelfdestructTest2.json OK -+ doubleSelfdestructTouch.json OK + extcodecopy.json OK + return0.json OK + return1.json OK @@ -2816,7 +2786,7 @@ OK: 13/13 Fail: 0/13 Skip: 0/13 + suicideSendEtherToMe.json OK + testRandomTest.json OK ``` -OK: 57/67 Fail: 0/67 Skip: 10/67 +OK: 56/66 Fail: 0/66 Skip: 10/66 ## stTimeConsuming ```diff CALLBlake2f_MaxRounds.json Skip @@ -3336,4 +3306,4 @@ OK: 0/3 Fail: 0/3 Skip: 3/3 OK: 11/11 Fail: 0/11 Skip: 0/11 ---TOTAL--- -OK: 2881/2986 Fail: 0/2986 Skip: 105/2986 +OK: 2859/2964 Fail: 0/2964 Skip: 105/2964 diff --git a/newGeneralStateTests.md b/newGeneralStateTests.md index ed4e7bec6..8be5311b7 100644 --- a/newGeneralStateTests.md +++ b/newGeneralStateTests.md @@ -271,9 +271,8 @@ OK: 5/5 Fail: 0/5 Skip: 0/5 + callcodecallcodecallcode_111_SuicideEnd.json OK + callcodecallcodecallcode_111_SuicideMiddle.json OK callcodecallcodecallcode_ABCB_RECURSIVE.json Skip -+ touchAndGo.json OK ``` -OK: 73/80 Fail: 0/80 Skip: 7/80 +OK: 72/79 Fail: 0/79 Skip: 7/79 ## stCallCreateCallCodeTest ```diff Call1024BalanceTooLow.json Skip @@ -548,7 +547,6 @@ OK: 51/52 Fail: 0/52 Skip: 1/52 + CREATE_HighNonceMinus1.json OK + CREATE_empty000CreateinInitCode_Transaction.json OK + CodeInConstructor.json OK -+ CreateAddressWarmAfterFail.json OK + CreateCollisionResults.json OK + CreateCollisionToEmpty.json OK + CreateOOGFromCallRefunds.json OK @@ -562,14 +560,12 @@ OK: 51/52 Fail: 0/52 Skip: 1/52 + CreateOOGafterInitCodeRevert2.json OK + CreateOOGafterMaxCodesize.json OK + CreateResults.json OK -+ CreateTransactionHighNonce.json OK + TransactionCollisionToEmpty.json OK + TransactionCollisionToEmptyButCode.json OK + TransactionCollisionToEmptyButNonce.json OK + createFailResult.json OK -+ createLargeResult.json OK ``` -OK: 44/44 Fail: 0/44 Skip: 0/44 +OK: 41/41 Fail: 0/41 Skip: 0/41 ## stDelegatecallTestHomestead ```diff Call1024BalanceTooLow.json Skip @@ -673,13 +669,12 @@ OK: 40/40 Fail: 0/40 Skip: 0/40 + lowGasPriceOldTypes.json OK + outOfFunds.json OK + outOfFundsOldTypes.json OK -+ senderBalance.json OK + tipTooHigh.json OK + transactionIntinsicBug.json OK + typeTwoBerlin.json OK + valCausesOOF.json OK ``` -OK: 13/13 Fail: 0/13 Skip: 0/13 +OK: 12/12 Fail: 0/12 Skip: 0/12 ## stEIP158Specific ```diff + CALL_OneVCallSuicide.json OK @@ -721,12 +716,11 @@ OK: 5/5 Fail: 0/5 Skip: 0/5 + indexesOmitExample.json OK + invalidTr.json OK + labelsExample.json OK -+ mergeTest.json OK + rangesExample.json OK + solidityExample.json OK + yulExample.json OK ``` -OK: 12/12 Fail: 0/12 Skip: 0/12 +OK: 11/11 Fail: 0/11 Skip: 0/11 ## stExtCodeHash ```diff + callToNonExistent.json OK @@ -1004,7 +998,6 @@ OK: 24/24 Fail: 0/24 Skip: 0/24 ## stPreCompiledContracts ```diff + blake2B.json OK -+ delegatecall09Undefined.json OK + idPrecomps.json OK + identity_to_bigger.json OK + identity_to_smaller.json OK @@ -1013,7 +1006,7 @@ OK: 24/24 Fail: 0/24 Skip: 0/24 + precompsEIP2929.json OK + sec80.json OK ``` -OK: 9/9 Fail: 0/9 Skip: 0/9 +OK: 8/8 Fail: 0/8 Skip: 0/8 ## stPreCompiledContracts2 ```diff + CALLBlake2f.json OK @@ -1081,7 +1074,6 @@ OK: 9/9 Fail: 0/9 Skip: 0/9 + CallEcrecoverS_prefixed0.json OK + CallEcrecoverUnrecoverableKey.json OK + CallEcrecoverV_prefixed0.json OK -+ CallEcrecover_Overflow.json OK + CallIdentitiy_0.json OK + CallIdentitiy_1.json OK + CallIdentity_1_nonzeroValue.json OK @@ -1111,15 +1103,13 @@ OK: 9/9 Fail: 0/9 Skip: 0/9 + CallSha256_4.json OK + CallSha256_4_gas99.json OK + CallSha256_5.json OK -+ ecrecoverShortBuff.json OK -+ ecrecoverWeirdV.json OK + modexpRandomInput.json OK + modexp_0_0_0_20500.json OK + modexp_0_0_0_22000.json OK + modexp_0_0_0_25000.json OK + modexp_0_0_0_35000.json OK ``` -OK: 102/102 Fail: 0/102 Skip: 0/102 +OK: 99/99 Fail: 0/99 Skip: 0/99 ## stQuadraticComplexityTest ```diff Call1MB1024Calldepth.json Skip @@ -2323,7 +2313,6 @@ OK: 13/13 Fail: 0/13 Skip: 0/13 + currentAccountBalance.json OK + doubleSelfdestructTest.json OK + doubleSelfdestructTest2.json OK -+ doubleSelfdestructTouch.json OK + extcodecopy.json OK + return0.json OK + return1.json OK @@ -2338,7 +2327,7 @@ OK: 13/13 Fail: 0/13 Skip: 0/13 + suicideSendEtherToMe.json OK + testRandomTest.json OK ``` -OK: 57/67 Fail: 0/67 Skip: 10/67 +OK: 56/66 Fail: 0/66 Skip: 10/66 ## stTimeConsuming ```diff CALLBlake2f_MaxRounds.json Skip @@ -2858,4 +2847,4 @@ OK: 1/3 Fail: 0/3 Skip: 2/3 OK: 11/11 Fail: 0/11 Skip: 0/11 ---TOTAL--- -OK: 2506/2608 Fail: 0/2608 Skip: 102/2608 +OK: 2495/2597 Fail: 0/2597 Skip: 102/2597 diff --git a/nimbus/sync/full/worker.nim b/nimbus/sync/full/worker.nim index b6b1afec9..607465e2d 100644 --- a/nimbus/sync/full/worker.nim +++ b/nimbus/sync/full/worker.nim @@ -240,11 +240,11 @@ proc runSingle*(buddy: FullBuddyRef) {.async.} = buddy.ctrl.zombie = true # Initialise/re-initialise this worker - elif await buddy.data.pivot.bestPivotNegotiate(buddy.data.bestNumber): + elif await buddy.data.pivot.pivotNegotiate(buddy.data.bestNumber): buddy.ctrl.multiOk = true # Update/activate `bestNumber` for local use buddy.data.bestNumber = - some(buddy.data.pivot.bestPivotHeader.value.blockNumber) + some(buddy.data.pivot.pivotHeader.value.blockNumber) elif not buddy.ctrl.stopped: await sleepAsync(2.seconds) diff --git a/nimbus/sync/misc/best_pivot.nim b/nimbus/sync/misc/best_pivot.nim index 622355f7f..7421a260d 100644 --- a/nimbus/sync/misc/best_pivot.nim +++ b/nimbus/sync/misc/best_pivot.nim @@ -242,7 +242,7 @@ proc clear*(bp: BestPivotWorkerRef) = # Public functions # ------------------------------------------------------------------------------ -proc bestPivotHeader*(bp: BestPivotWorkerRef): Result[BlockHeader,void] = +proc pivotHeader*(bp: BestPivotWorkerRef): Result[BlockHeader,void] = ## Returns cached block header if available and the buddy `peer` is trusted. if bp.header.isSome and bp.peer notin bp.global.untrusted and @@ -251,10 +251,11 @@ proc bestPivotHeader*(bp: BestPivotWorkerRef): Result[BlockHeader,void] = return ok(bp.header.unsafeGet) err() -proc bestPivotNegotiate*( - bp: BestPivotWorkerRef; ## Worker peer +proc pivotNegotiate*( + bp: BestPivotWorkerRef; ## Worker peer minBlockNumber: Option[BlockNumber]; ## Minimum block number to expect - ): Future[bool] {.async.} = + ): Future[bool] + {.async.} = ## Negotiate best header pivot. This function must be run in *single mode* at ## the beginning of a running worker peer. If the function returns `true`, ## the current `buddy` can be used for syncing and the function diff --git a/nimbus/sync/misc/block_queue.nim b/nimbus/sync/misc/block_queue.nim index d072dbdcf..12092faa1 100644 --- a/nimbus/sync/misc/block_queue.nim +++ b/nimbus/sync/misc/block_queue.nim @@ -13,21 +13,21 @@ ## ## Worker items state diagram and sketch of sync algorithm: ## :: -## set of unprocessed | peer workers | list of work items ready -## block ranges | | for persistent database -## ================================================================== +## unprocessed | | ready for | store into +## block ranges | peer workers | persistent database | database +## ======================================================================= ## -## +---------------------------------------------+ -## | | -## | +---------------------------------+ | -## | | | | -## V v | | -## ---+-----> -----+---> ---> OUTPUT -## | | -## +-----> -----+ -## | | -## +-----> -----+ -## : : +## +------------------------------------------+ +## | | +## | +----------------------------+ | +## | | | | +## V v | | +## ---+---> ---+-----> -------> OUTPUT +## | | +## +---> ---+ +## | | +## +---> ---+ +## : : ## ## A work item is created from a range of block numbers extracted from the ## `` set of block ranges. diff --git a/nimbus/sync/misc/snap_pivot.nim b/nimbus/sync/misc/snap_pivot.nim index 3f92dd931..8ba2b2175 100644 --- a/nimbus/sync/misc/snap_pivot.nim +++ b/nimbus/sync/misc/snap_pivot.nim @@ -122,16 +122,16 @@ type ## Statistics counters for events associated with this peer. ## These may be used to recognise errors and select good peers. ok: tuple[ - reorgDetected: BuddyStat, - getBlockHeaders: BuddyStat, - getNodeData: BuddyStat] + reorgDetected: uint, + getBlockHeaders: uint, + getNodeData: uint] minor: tuple[ - timeoutBlockHeaders: BuddyStat, - unexpectedBlockHash: BuddyStat] + timeoutBlockHeaders: uint, + unexpectedBlockHash: uint] major: tuple[ - networkErrors: BuddyStat, - excessBlockHeaders: BuddyStat, - wrongBlockHeader: BuddyStat] + networkErrors: uint, + excessBlockHeaders: uint, + wrongBlockHeader: uint] SnapPivotCtxRef* = ref object of RootRef stats*: SnapWorkerStats ## Statistics counters @@ -550,14 +550,19 @@ proc init*( # Public functions # ------------------------------------------------------------------------------ -proc snapPivotHeader*(sp: SnapPivotWorkerRef): Result[BlockHeader,void] = +proc pivotHeader*(sp: SnapPivotWorkerRef): Result[BlockHeader,void] = ## Returns cached block header if available if sp.header.isSome: - ok(sp.header.unsafeGet) - else: - err() + let header = sp.header.unsafeGet + if header.blockNumber != 0: + return ok(header) + err() -proc snapPivotNegotiate*(sp: SnapPivotWorkerRef) {.async.} = +proc pivotNegotiate*( + sp: SnapPivotWorkerRef; + ign: Option[BlockNumber]; ## Minimum block number to expect,ignored for now + ): Future[bool] + {.async.} = ## Query a peer to update our knowledge of its canonical chain and its best ## block, which is its canonical chain head. This can be called at any time ## after a peer has negotiated the connection. @@ -588,13 +593,13 @@ proc snapPivotNegotiate*(sp: SnapPivotWorkerRef) {.async.} = inc sp.global.stats.major.networkErrors # Just try another peer sp.ctrl.zombie = true - return + return false if reply.isNone: trace trEthRecvTimeoutWaiting & "for GetBlockHeaders reply", peer # TODO: Should disconnect? inc sp.global.stats.minor.timeoutBlockHeaders - return + return false let nHeaders = reply.get.headers.len if nHeaders == 0: @@ -611,7 +616,7 @@ proc snapPivotNegotiate*(sp: SnapPivotWorkerRef) {.async.} = peer, got=nHeaders, requested=request.maxResults # TODO: Should disconnect. inc sp.global.stats.major.excessBlockHeaders - return + return false if 0 < nHeaders: # TODO: Check this is not copying the `headers`. @@ -620,6 +625,7 @@ proc snapPivotNegotiate*(sp: SnapPivotWorkerRef) {.async.} = sp.peerSyncChainEmptyReply(request) trace "Done pivotExec()", peer + return sp.header.isSome # ------------------------------------------------------------------------------ # Debugging diff --git a/nimbus/sync/snap/range_desc.nim b/nimbus/sync/snap/range_desc.nim index 295ebe7e0..3df2ec432 100644 --- a/nimbus/sync/snap/range_desc.nim +++ b/nimbus/sync/snap/range_desc.nim @@ -1,5 +1,4 @@ -# Nimbus - Types, data structures and shared utilities used in network sync -# +# Nimbus # Copyright (c) 2018-2021 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or @@ -162,25 +161,46 @@ proc digestTo*(data: Blob; T: type NodeTag): T = ## Hash the `data` argument keccakHash(data).to(T) -proc freeFactor*(lrs: LeafRangeSet): float = - ## Free factor, ie. `#items-free / 2^256` to be used in statistics + +proc emptyFactor*(lrs: LeafRangeSet): float = + ## Relative uncovered total, i.e. `#points-not-covered / 2^256` to be used + ## in statistics or triggers. if 0 < lrs.total: ((high(NodeTag) - lrs.total).u256 + 1).to(float) / (2.0^256) elif lrs.chunks == 0: 1.0 # `total` represents the residue class `mod 2^256` from `0`..`(2^256-1)` else: - 0.0 + 0.0 # number of points in `lrs` is `2^256 + 1` + +proc emptyFactor*(lrs: openArray[LeafRangeSet]): float = + ## Variant of `emptyFactor()` where intervals are distributed across several + ## sets. This function makes sense only if the interval sets are mutually + ## disjunct. + var accu: Nodetag + for ivSet in lrs: + if 0 < ivSet.total: + if high(NodeTag) - ivSet.total < accu: + return 0.0 + accu = accu + ivSet.total + elif ivSet.chunks == 0: + discard + else: # number of points in `ivSet` is `2^256 + 1` + return 0.0 + ((high(NodeTag) - accu).u256 + 1).to(float) / (2.0^256) proc fullFactor*(lrs: LeafRangeSet): float = - ## Free factor, ie. `#items-contained / 2^256` to be used in statistics + ## Relative covered total, i.e. `#points-covered / 2^256` to be used + ## in statistics or triggers if 0 < lrs.total: lrs.total.u256.to(float) / (2.0^256) elif lrs.chunks == 0: - 0.0 + 0.0 # `total` represents the residue class `mod 2^256` from `0`..`(2^256-1)` else: - 1.0 # `total` represents the residue class `mod 2^256` from `0`..`(2^256-1)` + 1.0 # number of points in `lrs` is `2^256 + 1` + # Printing & pretty printing + proc `$`*(nt: NodeTag): string = if nt == high(NodeTag): "high(NodeTag)" diff --git a/nimbus/sync/snap/worker.nim b/nimbus/sync/snap/worker.nim index 66ae16ad0..d55c6eacb 100644 --- a/nimbus/sync/snap/worker.nim +++ b/nimbus/sync/snap/worker.nim @@ -1,5 +1,4 @@ # Nimbus -# # Copyright (c) 2021 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or @@ -17,30 +16,41 @@ import stew/[interval_set, keyed_queue], ../../db/select_backend, ".."/[protocol, sync_desc], - ./worker/[accounts_db, fetch_accounts, ticker], + ./worker/[heal_accounts, store_accounts, store_storages, ticker], + ./worker/com/[com_error, get_block_header], + ./worker/db/snap_db, "."/[range_desc, worker_desc] const usePivot2ok = false or true when usePivot2ok: - import ../misc/best_pivot + import + ../misc/best_pivot + type + PivotCtxRef = BestPivotCtxRef + PivotWorkerRef = BestPivotWorkerRef else: - import ../misc/snap_pivot, ../../p2p/chain/chain_desc + import + ../../p2p/chain/chain_desc, + ../misc/snap_pivot + type + PivotCtxRef = SnapPivotCtxRef + PivotWorkerRef = SnapPivotWorkerRef {.push raises: [Defect].} logScope: topics = "snap-sync" +const + extraTraceMessages = false or true + ## Enabled additional logging noise + # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ -proc hash(h: Hash256): Hash = - ## Mixin for `Table` or `keyedQueue` - h.data.hash - proc meanStdDev(sum, sqSum: float; length: int): (float,float) = if 0 < length: result[0] = sum / length.float @@ -57,29 +67,24 @@ template noExceptionOops(info: static[string]; code: untyped) = raiseAssert "Ooops " & info & ": name=" & $e.name & " msg=" & e.msg # ------------------------------------------------------------------------------ -# Private helpers: integration of pivot negotiator +# Private helpers: integration of pivot finder # ------------------------------------------------------------------------------ -when usePivot2ok: - type - PivotCtxRef = BestPivotCtxRef - PivotWorkerRef = BestPivotWorkerRef -else: - type - PivotCtxRef = SnapPivotCtxRef - PivotWorkerRef = SnapPivotWorkerRef - proc pivot(ctx: SnapCtxRef): PivotCtxRef = - ctx.data.pivotData.PivotCtxRef + # Getter + ctx.data.pivotFinderCtx.PivotCtxRef proc `pivot=`(ctx: SnapCtxRef; val: PivotCtxRef) = - ctx.data.pivotData = val + # Setter + ctx.data.pivotFinderCtx = val proc pivot(buddy: SnapBuddyRef): PivotWorkerRef = - buddy.data.workerPivot.PivotWorkerRef + # Getter + buddy.data.pivotFinder.PivotWorkerRef proc `pivot=`(buddy: SnapBuddyRef; val: PivotWorkerRef) = - buddy.data.workerPivot = val + # Setter + buddy.data.pivotFinder = val # -------------------- @@ -92,104 +97,139 @@ proc pivotSetup(ctx: SnapCtxRef) = proc pivotRelease(ctx: SnapCtxRef) = ctx.pivot = nil - proc pivotStart(buddy: SnapBuddyRef) = buddy.pivot = PivotWorkerRef.init(buddy.ctx.pivot, buddy.ctrl, buddy.peer) proc pivotStop(buddy: SnapBuddyRef) = buddy.pivot.clear() -# -------------------- - -proc pivotHeader(buddy: SnapBuddyRef): Result[BlockHeader,void] = - when usePivot2ok: - buddy.pivot.bestPivotHeader() - else: - buddy.pivot.snapPivotHeader() - -when usePivot2ok: - proc envPivotNumber(buddy: SnapBuddyRef): Option[BlockNumber] = - let env = buddy.ctx.data.pivotEnv - if env.isNil: - none(BlockNumber) - else: - some(env.stateHeader.blockNumber) - - template pivotNegotiate(buddy: SnapBuddyRef; n: Option[BlockNumber]): auto = - buddy.pivot.bestPivotNegotiate(n) -else: - template pivotNegotiate(buddy: SnapBuddyRef): auto = - buddy.pivot.snapPivotNegotiate() - # ------------------------------------------------------------------------------ # Private functions # ------------------------------------------------------------------------------ -proc rndNodeTag(buddy: SnapBuddyRef): NodeTag = - ## Create random node tag - var data: array[32,byte] - buddy.ctx.data.rng[].generate(data) - UInt256.fromBytesBE(data).NodeTag +proc init(T: type SnapAccountRanges; ctx: SnapCtxRef): T = + ## Return a pair of account hash range lists with the whole range of + ## smartly spread `[low(NodeTag),high(NodeTag)]` across the mutually + ## disjunct interval sets. + result = [LeafRangeSet.init(),LeafRangeSet.init()] + + # Initialise accounts range fetch batch, the pair of `fetchAccounts[]` + # range sets. + if ctx.data.coveredAccounts.total == 0 and + ctx.data.coveredAccounts.chunks == 1: + # 100% of accounts covered by range fetch batches for the total + # of pivot environments. Do a random split distributing the range + # `[low(NodeTag),high(NodeTag)]` across the pair of range sats. + var nodeKey: NodeKey + ctx.data.rng[].generate(nodeKey.ByteArray32) + + let partition = nodeKey.to(NodeTag) + discard result[0].merge(partition, high(NodeTag)) + if low(NodeTag) < partition: + discard result[1].merge(low(NodeTag), partition - 1.u256) + else: + # Not all account hashes are covered, yet. So keep the uncovered + # account hashes in the first range set, and the other account hashes + # in the second range set. + + # Pre-filled with the first range set with largest possible interval + discard result[0].merge(low(NodeTag),high(NodeTag)) + + # Move covered account ranges (aka intervals) to the second set. + for iv in ctx.data.coveredAccounts.increasing: + discard result[0].reduce(iv) + discard result[1].merge(iv) -proc setPivotEnv(buddy: SnapBuddyRef; header: BlockHeader) = - ## Activate environment for state root implied by `header` argument +proc appendPivotEnv(buddy: SnapBuddyRef; header: BlockHeader) = + ## Activate environment for state root implied by `header` argument. This + ## function appends a new environment unless there was any not far enough + ## apart. + ## + ## Note that this function relies on a queue sorted by the block numbers of + ## the pivot header. To maintain the sort order, the function `lruFetch()` + ## must not be called and only records appended with increasing block + ## numbers. let ctx = buddy.ctx - key = header.stateRoot - rc = ctx.data.pivotTable.lruFetch(key) - if rc.isOk: - ctx.data.pivotEnv = rc.value - return + minNumber = block: + let rc = ctx.data.pivotTable.lastValue + if rc.isOk: rc.value.stateHeader.blockNumber + minPivotBlockDistance + else: 1.toBlockNumber - let env = SnapPivotRef( - stateHeader: header, - pivotAccount: buddy.rndNodeTag, - availAccounts: LeafRangeSet.init()) - # Pre-filled with the largest possible interval - discard env.availAccounts.merge(low(NodeTag),high(NodeTag)) + # Check whether the new header follows minimum depth requirement. This is + # where the queue is assumed to have increasing block numbers. + if minNumber <= header.blockNumber: + # Ok, append a new environment + let env = SnapPivotRef( + stateHeader: header, + fetchAccounts: SnapAccountRanges.init(ctx)) - # Statistics - ctx.data.pivotCount.inc + # Append per-state root environment to LRU queue + discard ctx.data.pivotTable.lruAppend(header.stateRoot, env, ctx.buddiesMax) - # Activate per-state root environment (and hold previous one) - ctx.data.prevEnv = ctx.data.pivotEnv - ctx.data.pivotEnv = ctx.data.pivotTable.lruAppend(key, env, ctx.buddiesMax) + # Debugging, will go away + block: + let ivSet = env.fetchAccounts[0].clone + for iv in env.fetchAccounts[1].increasing: + doAssert ivSet.merge(iv) == iv.len + doAssert ivSet.chunks == 1 + doAssert ivSet.total == 0 -proc updatePivotEnv(buddy: SnapBuddyRef): bool = - ## Update global state root environment from local `pivotHeader`. Choose the - ## latest block number. Returns `true` if the environment was changed - let rc = buddy.pivotHeader - if rc.isOk: - let - peer = buddy.peer - ctx = buddy.ctx - env = ctx.data.pivotEnv - pivotHeader = rc.value - newStateNumber = pivotHeader.blockNumber - stateNumber = if env.isNil: 0.toBlockNumber - else: env.stateHeader.blockNumber - stateWindow = stateNumber + maxPivotBlockWindow - - block keepCurrent: - if env.isNil: - break keepCurrent # => new pivot - if stateNumber < newStateNumber: - when switchPivotAfterCoverage < 1.0: - if env.minCoverageReachedOk: - break keepCurrent # => new pivot - if stateWindow < newStateNumber: - break keepCurrent # => new pivot - if newStateNumber <= maxPivotBlockWindow: - break keepCurrent # => new pivot - # keep current - return false - - # set new block - buddy.setPivotEnv(pivotHeader) +proc updatePivotImpl(buddy: SnapBuddyRef): Future[bool] {.async.} = + ## Helper, negotiate pivot unless present + if buddy.pivot.pivotHeader.isOk: return true + let + ctx = buddy.ctx + peer = buddy.peer + env = ctx.data.pivotTable.lastValue.get(otherwise = nil) + nMin = if env.isNil: none(BlockNumber) + else: some(env.stateHeader.blockNumber) + + if await buddy.pivot.pivotNegotiate(nMin): + var header = buddy.pivot.pivotHeader.value + + # Check whether there is no environment change needed + when noPivotEnvChangeIfComplete: + let rc = ctx.data.pivotTable.lastValue + if rc.isOk and rc.value.serialSync: + # No neede to change + if extraTraceMessages: + trace "No need to change snap pivot", peer, + pivot=("#" & $rc.value.stateHeader.blockNumber), + multiOk=buddy.ctrl.multiOk, runState=buddy.ctrl.state + return true + + when 0 < backPivotBlockDistance: + # Backtrack, do not use the very latest pivot header + if backPivotBlockThreshold.toBlockNumber < header.blockNumber: + let + backNum = header.blockNumber - backPivotBlockDistance.toBlockNumber + rc = await buddy.getBlockHeader(backNum) + if rc.isErr: + if rc.error in {ComNoHeaderAvailable, ComTooManyHeaders}: + buddy.ctrl.zombie = true + return false + header = rc.value + + buddy.appendPivotEnv(header) + + trace "Snap pivot initialised", peer, pivot=("#" & $header.blockNumber), + multiOk=buddy.ctrl.multiOk, runState=buddy.ctrl.state + + return true + +# Syntactic sugar +when usePivot2ok: + template updateSinglePivot(buddy: SnapBuddyRef): auto = + buddy.updatePivotImpl() +else: + template updateMultiPivot(buddy: SnapBuddyRef): auto = + buddy.updatePivotImpl() + proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater = result = proc: TickerStats = @@ -206,7 +246,7 @@ proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater = aSqSum += aLen * aLen # Fill utilisation mean & variance - let fill = kvp.data.availAccounts.freeFactor + let fill = kvp.data.fetchAccounts.emptyFactor uSum += fill uSqSum += fill * fill @@ -215,77 +255,35 @@ proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater = sSqSum += sLen * sLen let - tabLen = ctx.data.pivotTable.len - pivotBlock = if ctx.data.pivotEnv.isNil: none(BlockNumber) - else: some(ctx.data.pivotEnv.stateHeader.blockNumber) + env = ctx.data.pivotTable.lastValue.get(otherwise = nil) + pivotBlock = if env.isNil: none(BlockNumber) + else: some(env.stateHeader.blockNumber) accCoverage = ctx.data.coveredAccounts.fullFactor accFill = meanStdDev(uSum, uSqSum, count) - when snapAccountsDumpEnable: - if snapAccountsDumpCoverageStop < accCoverage: - trace " Snap proofs dump stop", - threshold=snapAccountsDumpCoverageStop, coverage=accCoverage.toPC - ctx.data.proofDumpOk = false - TickerStats( pivotBlock: pivotBlock, - activeQueues: tabLen, - flushedQueues: ctx.data.pivotCount.int64 - tabLen, + nQueues: ctx.data.pivotTable.len, nAccounts: meanStdDev(aSum, aSqSum, count), nStorage: meanStdDev(sSum, sSqSum, count), accountsFill: (accFill[0], accFill[1], accCoverage)) - -proc havePivot(buddy: SnapBuddyRef): bool = - ## ... - let rc = buddy.pivotHeader - if rc.isOk and rc.value.blockNumber != 0: - - # So there is a `ctx.data.pivotEnv` - when 1.0 <= switchPivotAfterCoverage: - return true - else: - let - ctx = buddy.ctx - env = ctx.data.pivotEnv - - # Force fetching new pivot if coverage reached by returning `false` - if not env.minCoverageReachedOk: - - # Not sure yet, so check whether coverage has been reached at all - let cov = env.availAccounts.freeFactor - if switchPivotAfterCoverage <= cov: - trace " Snap accounts coverage reached", peer, - threshold=switchPivotAfterCoverage, coverage=cov.toPC - - # Need to reset pivot handlers - buddy.ctx.poolMode = true - buddy.ctx.data.runPoolHook = proc(b: SnapBuddyRef) = - b.ctx.data.pivotEnv.minCoverageReachedOk = true - b.pivotRestart - return true - # ------------------------------------------------------------------------------ # Public start/stop and admin functions # ------------------------------------------------------------------------------ proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool = ## Global set up - ctx.data.accountRangeMax = high(UInt256) div ctx.buddiesMax.u256 ctx.data.coveredAccounts = LeafRangeSet.init() - ctx.data.accountsDb = - if ctx.data.dbBackend.isNil: AccountsDbRef.init(ctx.chain.getTrieDB) - else: AccountsDbRef.init(ctx.data.dbBackend) + ctx.data.snapDb = + if ctx.data.dbBackend.isNil: SnapDbRef.init(ctx.chain.getTrieDB) + else: SnapDbRef.init(ctx.data.dbBackend) ctx.pivotSetup() if tickerOK: ctx.data.ticker = TickerRef.init(ctx.tickerUpdate) else: trace "Ticker is disabled" result = true - # ----------------------- - when snapAccountsDumpEnable: - doAssert ctx.data.proofDumpFile.open("./dump-stream.out", fmWrite) - ctx.data.proofDumpOk = true proc release*(ctx: SnapCtxRef) = ## Global clean up @@ -303,6 +301,7 @@ proc start*(buddy: SnapBuddyRef): bool = peer.supports(protocol.eth) and peer.state(protocol.eth).initialized: buddy.pivotStart() + buddy.data.errors = ComErrorStatsRef() if not ctx.data.ticker.isNil: ctx.data.ticker.startBuddy() return true @@ -334,28 +333,17 @@ proc runSingle*(buddy: SnapBuddyRef) {.async.} = ## Note that this function runs in `async` mode. ## when usePivot2ok: - # # Run alternative pivot finder. This one harmonises difficulties of at # least two peers. The can only be one instance active/unfinished of the # `pivot2Exec()` functions. - # let peer = buddy.peer - if buddy.pivotHeader.isErr: - if await buddy.pivotNegotiate(buddy.envPivotNumber): - discard buddy.updatePivotEnv() - else: - # Wait if needed, then return => repeat - if not buddy.ctrl.stopped: - await sleepAsync(2.seconds) - return + if not await buddy.updateSinglePivot(): + # Wait if needed, then return => repeat + if not buddy.ctrl.stopped: + await sleepAsync(2.seconds) + return - buddy.ctrl.multiOk = true - - trace "Snap pivot initialised", peer, - multiOk=buddy.ctrl.multiOk, runState=buddy.ctrl.state - else: - # Default pivot finder runs in multi mode => nothing to do here. - buddy.ctrl.multiOk = true + buddy.ctrl.multiOk = true proc runPool*(buddy: SnapBuddyRef, last: bool) = @@ -375,11 +363,21 @@ proc runPool*(buddy: SnapBuddyRef, last: bool) = let ctx = buddy.ctx if ctx.poolMode: ctx.poolMode = false - if not ctx.data.runPoolHook.isNil: - noExceptionOops("runPool"): - ctx.data.runPoolHook(buddy) - if last: - ctx.data.runPoolHook = nil + + let rc = ctx.data.pivotTable.lastValue + if rc.isOk: + # Check whether accounts and storage might be complete. + let env = rc.value + if not env.serialSync: + # Check whether accounts download is complete + block checkAccountsComplete: + for ivSet in env.fetchAccounts: + if ivSet.chunks != 0: + break checkAccountsComplete + env.accountsDone = true + # Check whether storage slots are complete + if env.fetchStorage.len == 0: + env.serialSync = true proc runMulti*(buddy: SnapBuddyRef) {.async.} = @@ -392,23 +390,44 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} = peer = buddy.peer when not usePivot2ok: - if not buddy.havePivot: - await buddy.pivotNegotiate() - if not buddy.updatePivotEnv(): - return + discard await buddy.updateMultiPivot() - # Ignore rest if the pivot is still acceptably covered - when switchPivotAfterCoverage < 1.0: - if ctx.data.pivotEnv.minCoverageReachedOk: - await sleepAsync(50.milliseconds) - return + # Set up current state root environment for accounts snapshot + let env = block: + let rc = ctx.data.pivotTable.lastValue + if rc.isErr: + return # nothing to do + rc.value - await buddy.fetchAccounts() + buddy.data.pivotEnv = env - if ctx.data.pivotEnv.repairState == Done: - buddy.ctrl.multiOk = false - buddy.pivotStop() - buddy.pivotStart() + if env.serialSync: + trace "Snap serial sync -- not implemented yet", peer + await sleepAsync(5.seconds) + + else: + # Snapshot sync processing. Note that *serialSync => accountsDone*. + await buddy.storeStorages() # always pre-clean the queue + await buddy.storeAccounts() + await buddy.storeStorages() + + # If the current database is not complete yet + if 0 < env.fetchAccounts[0].chunks or + 0 < env.fetchAccounts[1].chunks: + + # Healing applies to the latest pivot only. The pivot might have changed + # in the background (while netwoking) due to a new peer worker that has + # negotiated another, newer pivot. + if env == ctx.data.pivotTable.lastValue.value: + await buddy.healAccountsDb() + + # TODO: use/apply storage healer + + # Check whether accounts might be complete. + if env.fetchStorage.len == 0: + # Possibly done but some buddies might wait for an account range to be + # received from the network. So we need to sync. + buddy.ctx.poolMode = true # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/com/com_error.nim b/nimbus/sync/snap/worker/com/com_error.nim new file mode 100644 index 000000000..0165f694e --- /dev/null +++ b/nimbus/sync/snap/worker/com/com_error.nim @@ -0,0 +1,102 @@ +# Nimbus +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +import + chronos, + ../../../sync_desc + +const + comErrorsTimeoutMax* = 2 + ## Maximal number of non-resonses accepted in a row. If there are more than + ## `comErrorsTimeoutMax` consecutive errors, the worker will be degraded + ## as zombie. + +type + ComErrorStatsRef* = ref object + ## particular error counters so connections will not be cut immediately + ## after a particular error. + nTimeouts*: uint + nNetwork*: uint + + ComError* = enum + ComNothingSerious + ComAccountsMaxTooLarge + ComAccountsMinTooSmall + ComEmptyAccountsArguments + ComEmptyRequestArguments + ComMissingProof + ComNetworkProblem + ComNoAccountsForStateRoot + ComNoByteCodesAvailable + ComNoDataForProof + ComNoHeaderAvailable + ComNoStorageForAccounts + ComNoTrieNodesAvailable + ComResponseTimeout + ComTooManyByteCodes + ComTooManyHeaders + ComTooManyStorageSlots + ComTooManyTrieNodes + + # Other errors not directly related to communication + ComInspectDbFailed + ComImportAccountsFailed + + +proc stopAfterSeriousComError*( + ctrl: BuddyCtrlRef; + error: ComError; + stats: ComErrorStatsRef; + ): Future[bool] + {.async.} = + ## Error handling after data protocol failed. + case error: + of ComResponseTimeout: + stats.nTimeouts.inc + if comErrorsTimeoutMax < stats.nTimeouts: + # Mark this peer dead, i.e. avoid fetching from this peer for a while + ctrl.zombie = true + else: + # Otherwise try again some time later. Nevertheless, stop the + # current action. + await sleepAsync(5.seconds) + return true + + of ComNetworkProblem, + ComMissingProof, + ComAccountsMinTooSmall, + ComAccountsMaxTooLarge: + stats.nNetwork.inc + # Mark this peer dead, i.e. avoid fetching from this peer for a while + ctrl.zombie = true + return true + + of ComEmptyAccountsArguments, + ComEmptyRequestArguments, + ComInspectDbFailed, + ComImportAccountsFailed, + ComNoDataForProof, + ComNothingSerious: + discard + + of ComNoAccountsForStateRoot, + ComNoStorageForAccounts, + ComNoByteCodesAvailable, + ComNoHeaderAvailable, + ComNoTrieNodesAvailable, + ComTooManyByteCodes, + ComTooManyHeaders, + ComTooManyStorageSlots, + ComTooManyTrieNodes: + # Mark this peer dead, i.e. avoid fetching from this peer for a while + ctrl.zombie = true + return true + +# End diff --git a/nimbus/sync/snap/worker/com/get_account_range.nim b/nimbus/sync/snap/worker/com/get_account_range.nim index bbcdb6e10..68c8922c6 100644 --- a/nimbus/sync/snap/worker/com/get_account_range.nim +++ b/nimbus/sync/snap/worker/com/get_account_range.nim @@ -19,7 +19,7 @@ import stew/interval_set, "../../.."/[protocol, protocol/trace_config], "../.."/[range_desc, worker_desc], - ./get_error + ./com_error {.push raises: [Defect].} diff --git a/nimbus/sync/snap/worker/com/get_block_header.nim b/nimbus/sync/snap/worker/com/get_block_header.nim new file mode 100644 index 000000000..817622d1d --- /dev/null +++ b/nimbus/sync/snap/worker/com/get_block_header.nim @@ -0,0 +1,74 @@ +# Nimbus +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +import + std/options, + chronos, + eth/[common/eth_types, p2p], + ../../../protocol, + ../../worker_desc, + ./com_error + +{.push raises: [Defect].} + +logScope: + topics = "snap-fetch" + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc getBlockHeader*( + buddy: SnapBuddyRef; + num: BlockNumber; + ): Future[Result[BlockHeader,ComError]] + {.async.} = + ## Get single block header + let + peer = buddy.peer + reqLen = 1u + hdrReq = BlocksRequest( + startBlock: HashOrNum( + isHash: false, + number: num), + maxResults: reqLen, + skip: 0, + reverse: false) + + trace trEthSendSendingGetBlockHeaders, peer, startBlock=num, reqLen + + var hdrResp: Option[blockHeadersObj] + try: + hdrResp = await peer.getBlockHeaders(hdrReq) + except CatchableError as e: + trace trSnapRecvError & "waiting for GetByteCodes reply", peer, + error=e.msg + return err(ComNetworkProblem) + + var hdrRespLen = 0 + if hdrResp.isSome: + hdrRespLen = hdrResp.get.headers.len + if hdrRespLen == 0: + trace trEthRecvReceivedBlockHeaders, peer, reqLen, respose="n/a" + return err(ComNoHeaderAvailable) + + if hdrRespLen == 1: + let + header = hdrResp.get.headers[0] + blockNumber = header.blockNumber + trace trEthRecvReceivedBlockHeaders, peer, hdrRespLen, blockNumber + return ok(header) + + trace trEthRecvReceivedBlockHeaders, peer, reqLen, hdrRespLen + return err(ComTooManyHeaders) + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/com/get_byte_codes.nim b/nimbus/sync/snap/worker/com/get_byte_codes.nim index 6a272f13a..992ff0810 100644 --- a/nimbus/sync/snap/worker/com/get_byte_codes.nim +++ b/nimbus/sync/snap/worker/com/get_byte_codes.nim @@ -16,7 +16,7 @@ import eth/[common/eth_types, p2p], "../../.."/[protocol, protocol/trace_config], "../.."/[range_desc, worker_desc], - ./get_error + ./com_error {.push raises: [Defect].} diff --git a/nimbus/sync/snap/worker/com/get_error.nim b/nimbus/sync/snap/worker/com/get_error.nim deleted file mode 100644 index e649d3b9c..000000000 --- a/nimbus/sync/snap/worker/com/get_error.nim +++ /dev/null @@ -1,34 +0,0 @@ -# Nimbus -# Copyright (c) 2021 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or -# http://www.apache.org/licenses/LICENSE-2.0) -# * MIT license ([LICENSE-MIT](LICENSE-MIT) or -# http://opensource.org/licenses/MIT) -# at your option. This file may not be copied, modified, or distributed -# except according to those terms. - -type - ComError* = enum - ComNothingSerious - ComAccountsMaxTooLarge - ComAccountsMinTooSmall - ComEmptyAccountsArguments - ComEmptyRequestArguments - ComMissingProof - ComNetworkProblem - ComNoAccountsForStateRoot - ComNoByteCodesAvailable - ComNoDataForProof - ComNoStorageForAccounts - ComNoTrieNodesAvailable - ComResponseTimeout - ComTooManyByteCodes - ComTooManyStorageSlots - ComTooManyTrieNodes - - # Other errors not directly related to communication - ComInspectDbFailed - ComImportAccountsFailed - -# End diff --git a/nimbus/sync/snap/worker/com/get_storage_ranges.nim b/nimbus/sync/snap/worker/com/get_storage_ranges.nim index d0b09f922..1bc8dc318 100644 --- a/nimbus/sync/snap/worker/com/get_storage_ranges.nim +++ b/nimbus/sync/snap/worker/com/get_storage_ranges.nim @@ -16,7 +16,7 @@ import stew/interval_set, "../../.."/[protocol, protocol/trace_config], "../.."/[range_desc, worker_desc], - ./get_error + ./com_error {.push raises: [Defect].} diff --git a/nimbus/sync/snap/worker/com/get_trie_nodes.nim b/nimbus/sync/snap/worker/com/get_trie_nodes.nim index 7a2aacc48..7f52f59f6 100644 --- a/nimbus/sync/snap/worker/com/get_trie_nodes.nim +++ b/nimbus/sync/snap/worker/com/get_trie_nodes.nim @@ -1,3 +1,6 @@ +# Nimbus +# Copyright (c) 2018-2021 Status Research & Development GmbH +# Licensed and distributed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or # http://www.apache.org/licenses/LICENSE-2.0) # * MIT license ([LICENSE-MIT](LICENSE-MIT) or @@ -11,7 +14,7 @@ import eth/[common/eth_types, p2p], "../../.."/[protocol, protocol/trace_config], ../../worker_desc, - ./get_error + ./com_error {.push raises: [Defect].} diff --git a/nimbus/sync/snap/worker/db/hexary_desc.nim b/nimbus/sync/snap/worker/db/hexary_desc.nim index 8dfd8e44d..4e816f5ff 100644 --- a/nimbus/sync/snap/worker/db/hexary_desc.nim +++ b/nimbus/sync/snap/worker/db/hexary_desc.nim @@ -136,6 +136,7 @@ type TrieNodeStat* = object ## Trie inspection report dangling*: seq[Blob] ## Paths from nodes with incomplete refs + leaves*: seq[NodeKey] ## Paths to leave nodes (if any) level*: int ## Maximim nesting depth of dangling nodes stopped*: bool ## Potential loop detected if `true` @@ -281,7 +282,13 @@ proc ppDangling(a: seq[Blob]; maxItems = 30): string = proc ppBlob(w: Blob): string = w.mapIt(it.toHex(2)).join.toLowerAscii let - q = a.toSeq.mapIt(it.ppBlob)[0 ..< min(maxItems,a.len)] + q = a.mapIt(it.ppBlob)[0 ..< min(maxItems,a.len)] + andMore = if maxItems < a.len: ", ..[#" & $a.len & "].." else: "" + "{" & q.join(",") & andMore & "}" + +proc ppLeaves(a: openArray[NodeKey]; db: HexaryTreeDbRef; maxItems=30): string = + let + q = a.mapIt(it.ppImpl(db))[0 ..< min(maxItems,a.len)] andMore = if maxItems < a.len: ", ..[#" & $a.len & "].." else: "" "{" & q.join(",") & andMore & "}" @@ -328,10 +335,12 @@ proc pp*(db: HexaryTreeDbRef; indent=4): string = db.ppImpl(NodeKey.default).join(indent.toPfx) proc pp*(a: TrieNodeStat; db: HexaryTreeDbRef; maxItems = 30): string = - result = "(" & $a.level & "," + result = "(" & $a.level if a.stopped: result &= "stopped," - result &= $a.dangling.len & "," & a.dangling.ppDangling(maxItems) & ")" + result &= $a.dangling.len & "," & + a.dangling.ppDangling(maxItems) & "," & + a.leaves.ppLeaves(db, maxItems) & ")" # ------------------------------------------------------------------------------ # Public constructor (or similar) diff --git a/nimbus/sync/snap/worker/db/hexary_error.nim b/nimbus/sync/snap/worker/db/hexary_error.nim index 5418c19b2..790756c9e 100644 --- a/nimbus/sync/snap/worker/db/hexary_error.nim +++ b/nimbus/sync/snap/worker/db/hexary_error.nim @@ -16,6 +16,7 @@ type AccountSmallerThanBase AccountsNotSrictlyIncreasing AccountRangesOverlap + NodeNotFound RlpEncoding SlotsNotSrictlyIncreasing TrieLoopAlert diff --git a/nimbus/sync/snap/worker/db/hexary_inspect.nim b/nimbus/sync/snap/worker/db/hexary_inspect.nim index 7e701a64a..9ca1edd2c 100644 --- a/nimbus/sync/snap/worker/db/hexary_inspect.nim +++ b/nimbus/sync/snap/worker/db/hexary_inspect.nim @@ -24,6 +24,9 @@ logScope: const extraTraceMessages = false # or true +when extraTraceMessages: + import stew/byteutils + # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ @@ -63,7 +66,7 @@ proc doStepLink(step: XPathStep): Result[NodeKey,bool] = err(true) # fully fail -proc hexaryInspectPath( +proc hexaryInspectPathImpl( db: HexaryTreeDbRef; ## Database rootKey: RepairKey; ## State root path: NibblesSeq; ## Starting path @@ -84,8 +87,8 @@ proc hexaryInspectPath( return ok(rc.value) err() -proc hexaryInspectPath( - getFn: HexaryGetFn; ## Database retrival function +proc hexaryInspectPathImpl( + getFn: HexaryGetFn; ## Database retrieval function root: NodeKey; ## State root path: NibblesSeq; ## Starting path ): Result[NodeKey,void] @@ -170,11 +173,25 @@ proc hexaryInspectPath*( ## Returns the `NodeKey` for a given path if there is any. let (isLeaf,nibbles) = hexPrefixDecode path if not isLeaf: - let rc = db.hexaryInspectPath(root.to(RepairKey), nibbles) + let rc = db.hexaryInspectPathImpl(root.to(RepairKey), nibbles) if rc.isOk and rc.value.isNodeKey: return ok(rc.value.convertTo(NodeKey)) err() +proc hexaryInspectPath*( + getFn: HexaryGetFn; ## Database abstraction + root: NodeKey; ## State root + path: Blob; ## Partial database path + ): Result[NodeKey,void] + {.gcsafe, raises: [Defect,RlpError]} = + ## Variant of `hexaryInspectPath()` for persistent database. + let (isLeaf,nibbles) = hexPrefixDecode path + if not isLeaf: + let rc = getFn.hexaryInspectPathImpl(root, nibbles) + if rc.isOk: + return ok(rc.value) + err() + proc hexaryInspectToKeys*( db: HexaryTreeDbRef; ## Database root: NodeKey; ## State root @@ -193,25 +210,37 @@ proc hexaryInspectTrie*( db: HexaryTreeDbRef; ## Database root: NodeKey; ## State root paths: seq[Blob]; ## Starting paths for search + maxLeafPaths = 0; ## Record leaves with proper 32 bytes path stopAtLevel = 32; ## Instead of loop detector ): TrieNodeStat {.gcsafe, raises: [Defect,KeyError]} = ## Starting with the argument list `paths`, find all the non-leaf nodes in ## the hexary trie which have at least one node key reference missing in - ## the trie database. + ## the trie database. The references for these nodes are collected and + ## returned. + ## * At most `maxLeafPaths` leaf node references are collected along the way. + ## * Search list `paths` argument entries that do not refer to a hexary node + ## are ignored. + ## * For any search list `paths` argument entry, this function stops if + ## the search depth exceeds `stopAtLevel` levels of linked sub-nodes. + ## * Argument `paths` list entries that do not refer to a valid node are + ## silently ignored. + ## let rootKey = root.to(RepairKey) if not db.tab.hasKey(rootKey): return TrieNodeStat() # Initialise TODO list - var reVisit = newTable[RepairKey,NibblesSeq]() + var + leafSlots = maxLeafPaths + reVisit = newTable[RepairKey,NibblesSeq]() if paths.len == 0: reVisit[rootKey] = EmptyNibbleRange else: for w in paths: let (isLeaf,nibbles) = hexPrefixDecode w if not isLeaf: - let rc = db.hexaryInspectPath(rootKey, nibbles) + let rc = db.hexaryInspectPathImpl(rootKey, nibbles) if rc.isOk: reVisit[rc.value] = nibbles @@ -240,8 +269,13 @@ proc hexaryInspectTrie*( child = node.bLink[n] db.processLink(stats=result, inspect=again, parent, trail, child) of Leaf: - # Done with this link, forget the key - discard + if 0 < leafSlots: + let trail = parentTrail & node.lPfx + if trail.len == 64: + result.leaves.add trail.getBytes.convertTo(NodeKey) + leafSlots.dec + # Done with this link + # End `for` result.level.inc @@ -250,53 +284,70 @@ proc hexaryInspectTrie*( proc hexaryInspectTrie*( - getFn: HexaryGetFn; - root: NodeKey; ## State root + getFn: HexaryGetFn; ## Database abstraction + rootKey: NodeKey; ## State root paths: seq[Blob]; ## Starting paths for search + maxLeafPaths = 0; ## Record leaves with proper 32 bytes path stopAtLevel = 32; ## Instead of loop detector ): TrieNodeStat {.gcsafe, raises: [Defect,RlpError,KeyError]} = ## Variant of `hexaryInspectTrie()` for persistent database. - ## - if root.to(Blob).getFn().len == 0: + when extraTraceMessages: + let nPaths = paths.len + + let root = rootKey.to(Blob) + if root.getFn().len == 0: + when extraTraceMessages: + trace "Hexary inspect: missing root", nPaths, maxLeafPaths, + rootKey=root.toHex return TrieNodeStat() # Initialise TODO list - var reVisit = newTable[NodeKey,NibblesSeq]() + var + leafSlots = maxLeafPaths + reVisit = newTable[NodeKey,NibblesSeq]() if paths.len == 0: - reVisit[root] = EmptyNibbleRange + reVisit[rootKey] = EmptyNibbleRange else: for w in paths: let (isLeaf,nibbles) = hexPrefixDecode w if not isLeaf: - let rc = getFn.hexaryInspectPath(root, nibbles) + let rc = getFn.hexaryInspectPathImpl(rootKey, nibbles) if rc.isOk: reVisit[rc.value] = nibbles - when extraTraceMessages: - trace "Hexary inspect start", nPaths=paths.len, reVisit=reVisit.len - while 0 < reVisit.len: + when extraTraceMessages: + trace "Hexary inspect processing", nPaths, maxLeafPaths, + level=result.level, nReVisit=reVisit.len, nDangling=result.dangling.len + if stopAtLevel < result.level: result.stopped = true break - when extraTraceMessages: - trace "Hexary inspect processing", level=result.level, - reVisit=reVisit.len, dangling=result.dangling.len - let again = newTable[NodeKey,NibblesSeq]() for parent,parentTrail in reVisit.pairs: - let nodeRlp = rlpFromBytes parent.to(Blob).getFn() + + let parentBlob = parent.to(Blob).getFn() + if parentBlob.len == 0: + # Ooops, forget node and key + continue + + let nodeRlp = rlpFromBytes parentBlob case nodeRlp.listLen: of 2: - let (isLeaf,ePfx) = hexPrefixDecode nodeRlp.listElem(0).toBytes + let (isLeaf,xPfx) = hexPrefixDecode nodeRlp.listElem(0).toBytes if not isleaf: let - trail = parentTrail & ePfx + trail = parentTrail & xPfx child = nodeRlp.listElem(1) getFn.processLink(stats=result, inspect=again, parent, trail, child) + elif 0 < leafSlots: + let trail = parentTrail & xPfx + if trail.len == 64: + result.leaves.add trail.getBytes.convertTo(NodeKey) + leafSlots.dec of 17: for n in 0 ..< 16: let @@ -304,7 +355,7 @@ proc hexaryInspectTrie*( child = nodeRlp.listElem(n) getFn.processLink(stats=result, inspect=again, parent, trail, child) else: - # Done with this link, forget the key + # Ooops, forget node and key discard # End `for` @@ -313,8 +364,9 @@ proc hexaryInspectTrie*( # End while when extraTraceMessages: - trace "Hexary inspect finished", level=result.level, maxLevel=stopAtLevel, - reVisit=reVisit.len, dangling=result.dangling.len, stopped=result.stopped + trace "Hexary inspect finished", nPaths, maxLeafPaths, + level=result.level, nReVisit=reVisit.len, nDangling=result.dangling.len, + maxLevel=stopAtLevel, stopped=result.stopped # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/db/hexary_paths.nim b/nimbus/sync/snap/worker/db/hexary_paths.nim index 280aca059..9d5cf66d4 100644 --- a/nimbus/sync/snap/worker/db/hexary_paths.nim +++ b/nimbus/sync/snap/worker/db/hexary_paths.nim @@ -373,6 +373,18 @@ proc leafData*(path: XPath): Blob = of Extension: discard +proc leafData*(path: RPath): Blob = + ## Return the leaf data from a successful `RPath` computation (if any.) + if path.tail.len == 0 and 0 < path.path.len: + let node = path.path[^1].node + case node.kind: + of Branch: + return node.bData + of Leaf: + return node.lData + of Extension: + discard + # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/accounts_db.nim b/nimbus/sync/snap/worker/db/snap_db.nim similarity index 76% rename from nimbus/sync/snap/worker/accounts_db.nim rename to nimbus/sync/snap/worker/db/snap_db.nim index 6226bb2fe..79fc24a2f 100644 --- a/nimbus/sync/snap/worker/accounts_db.nim +++ b/nimbus/sync/snap/worker/db/snap_db.nim @@ -16,32 +16,35 @@ import stew/byteutils, stint, rocksdb, - ../../../constants, - ../../../db/[kvstore_rocksdb, select_backend], - "../.."/[protocol, types], - ../range_desc, - ./db/[bulk_storage, hexary_desc, hexary_error, hexary_import, - hexary_interpolate, hexary_inspect, hexary_paths, rocky_bulk_load] + ../../../../constants, + ../../../../db/[kvstore_rocksdb, select_backend], + "../../.."/[protocol, types], + ../../range_desc, + "."/[bulk_storage, hexary_desc, hexary_error, hexary_import, + hexary_interpolate, hexary_inspect, hexary_paths, rocky_bulk_load] {.push raises: [Defect].} logScope: - topics = "snap-proof" + topics = "snap-db" export - HexaryDbError + HexaryDbError, + TrieNodeStat const extraTraceMessages = false or true type - AccountsDbRef* = ref object + SnapDbRef* = ref object + ## Global, re-usable descriptor db: TrieDatabaseRef ## General database rocky: RocksStoreRef ## Set if rocksdb is available - AccountsDbSessionRef* = ref object + SnapDbSessionRef* = ref object + ## Database session descriptor keyMap: Table[RepairKey,uint] ## For debugging only (will go away) - base: AccountsDbRef ## Back reference to common parameters + base: SnapDbRef ## Back reference to common parameters peer: Peer ## For log messages accRoot: NodeKey ## Current accounts root node accDb: HexaryTreeDbRef ## Accounts database @@ -51,7 +54,7 @@ type # Private helpers # ------------------------------------------------------------------------------ -proc newHexaryTreeDbRef(ps: AccountsDbSessionRef): HexaryTreeDbRef = +proc newHexaryTreeDbRef(ps: SnapDbSessionRef): HexaryTreeDbRef = HexaryTreeDbRef(keyPp: ps.stoDb.keyPp) # for debugging, will go away proc to(h: Hash256; T: type NodeKey): T = @@ -94,27 +97,27 @@ template noPpError(info: static[string]; code: untyped) = except Exception as e: raiseAssert "Ooops (" & info & ") " & $e.name & ": " & e.msg -proc toKey(a: RepairKey; ps: AccountsDbSessionRef): uint = +proc toKey(a: RepairKey; ps: SnapDbSessionRef): uint = if not a.isZero: noPpError("pp(RepairKey)"): if not ps.keyMap.hasKey(a): ps.keyMap[a] = ps.keyMap.len.uint + 1 result = ps.keyMap[a] -proc toKey(a: NodeKey; ps: AccountsDbSessionRef): uint = +proc toKey(a: NodeKey; ps: SnapDbSessionRef): uint = a.to(RepairKey).toKey(ps) -proc toKey(a: NodeTag; ps: AccountsDbSessionRef): uint = +proc toKey(a: NodeTag; ps: SnapDbSessionRef): uint = a.to(NodeKey).toKey(ps) -proc pp(a: NodeKey; ps: AccountsDbSessionRef): string = +proc pp(a: NodeKey; ps: SnapDbSessionRef): string = if a.isZero: "ø" else:"$" & $a.toKey(ps) -proc pp(a: RepairKey; ps: AccountsDbSessionRef): string = +proc pp(a: RepairKey; ps: SnapDbSessionRef): string = if a.isZero: "ø" elif a.isNodeKey: "$" & $a.toKey(ps) else: "@" & $a.toKey(ps) -proc pp(a: NodeTag; ps: AccountsDbSessionRef): string = +proc pp(a: NodeTag; ps: SnapDbSessionRef): string = a.to(NodeKey).pp(ps) # ------------------------------------------------------------------------------ @@ -122,11 +125,11 @@ proc pp(a: NodeTag; ps: AccountsDbSessionRef): string = # ------------------------------------------------------------------------------ proc mergeProofs( - peer: Peer, ## For log messages - db: HexaryTreeDbRef; ## Database table - root: NodeKey; ## Root for checking nodes - proof: seq[Blob]; ## Node records - freeStandingOk = false; ## Remove freestanding nodes + peer: Peer, ## For log messages + db: HexaryTreeDbRef; ## Database table + root: NodeKey; ## Root for checking nodes + proof: seq[Blob]; ## Node records + freeStandingOk = false; ## Remove freestanding nodes ): Result[void,HexaryDbError] {.gcsafe, raises: [Defect, RlpError, KeyError].} = ## Import proof records (as received with snap message) into a hexary trie @@ -160,8 +163,8 @@ proc mergeProofs( proc persistentAccounts( - db: HexaryTreeDbRef; ## Current table - pv: AccountsDbRef; ## Persistent database + db: HexaryTreeDbRef; ## Current table + pv: SnapDbRef; ## Persistent database ): Result[void,HexaryDbError] {.gcsafe, raises: [Defect,OSError,KeyError].} = ## Store accounts trie table on databse @@ -174,8 +177,8 @@ proc persistentAccounts( ok() proc persistentStorages( - db: HexaryTreeDbRef; ## Current table - pv: AccountsDbRef; ## Persistent database + db: HexaryTreeDbRef; ## Current table + pv: SnapDbRef; ## Persistent database ): Result[void,HexaryDbError] {.gcsafe, raises: [Defect,OSError,KeyError].} = ## Store accounts trie table on databse @@ -268,9 +271,9 @@ proc collectStorageSlots( proc importStorageSlots*( - ps: AccountsDbSessionRef; ## Re-usable session descriptor - data: AccountSlots; ## account storage descriptor - proof: SnapStorageProof; ## account storage proof + ps: SnapDbSessionRef; ## Re-usable session descriptor + data: AccountSlots; ## Account storage descriptor + proof: SnapStorageProof; ## Account storage proof ): Result[void,HexaryDbError] {.gcsafe, raises: [Defect, RlpError,KeyError].} = ## Preocess storage slots for a particular storage root @@ -307,14 +310,14 @@ proc importStorageSlots*( # ------------------------------------------------------------------------------ proc init*( - T: type AccountsDbRef; + T: type SnapDbRef; db: TrieDatabaseRef ): T = ## Main object constructor T(db: db) proc init*( - T: type AccountsDbRef; + T: type SnapDbRef; db: ChainDb ): T = ## Variant of `init()` allowing bulk import on rocksdb backend @@ -323,14 +326,14 @@ proc init*( result.rocky = nil proc init*( - T: type AccountsDbSessionRef; - pv: AccountsDbRef; + T: type SnapDbSessionRef; + pv: SnapDbRef; root: Hash256; peer: Peer = nil ): T = ## Start a new session, do some actions an then discard the session ## descriptor (probably after commiting data.) - let desc = AccountsDbSessionRef( + let desc = SnapDbSessionRef( base: pv, peer: peer, accRoot: root.to(NodeKey), @@ -344,13 +347,13 @@ proc init*( return desc proc dup*( - ps: AccountsDbSessionRef; + ps: SnapDbSessionRef; root: Hash256; peer: Peer; - ): AccountsDbSessionRef = + ): SnapDbSessionRef = ## Resume a session with different `root` key and `peer`. This new session ## will access the same memory database as the `ps` argument session. - AccountsDbSessionRef( + SnapDbSessionRef( base: ps.base, peer: peer, accRoot: root.to(NodeKey), @@ -358,9 +361,9 @@ proc dup*( stoDb: ps.stoDb) proc dup*( - ps: AccountsDbSessionRef; + ps: SnapDbSessionRef; root: Hash256; - ): AccountsDbSessionRef = + ): SnapDbSessionRef = ## Variant of `dup()` without the `peer` argument. ps.dup(root, ps.peer) @@ -368,16 +371,16 @@ proc dup*( # Public functions # ------------------------------------------------------------------------------ -proc dbBackendRocksDb*(pv: AccountsDbRef): bool = +proc dbBackendRocksDb*(pv: SnapDbRef): bool = ## Returns `true` if rocksdb features are available not pv.rocky.isNil -proc dbBackendRocksDb*(ps: AccountsDbSessionRef): bool = +proc dbBackendRocksDb*(ps: SnapDbSessionRef): bool = ## Returns `true` if rocksdb features are available not ps.base.rocky.isNil proc importAccounts*( - ps: AccountsDbSessionRef; ## Re-usable session descriptor + ps: SnapDbSessionRef; ## Re-usable session descriptor base: NodeTag; ## before or at first account entry in `data` data: PackedAccountRange; ## re-packed `snap/1 ` reply data persistent = false; ## store data on disk @@ -425,20 +428,20 @@ proc importAccounts*( ok() proc importAccounts*( - pv: AccountsDbRef; ## Base descriptor on `BaseChainDB` + pv: SnapDbRef; ## Base descriptor on `BaseChainDB` peer: Peer, ## for log messages root: Hash256; ## state root base: NodeTag; ## before or at first account entry in `data` data: PackedAccountRange; ## re-packed `snap/1 ` reply data ): Result[void,HexaryDbError] = ## Variant of `importAccounts()` - AccountsDbSessionRef.init( + SnapDbSessionRef.init( pv, root, peer).importAccounts(base, data, persistent=true) proc importStorages*( - ps: AccountsDbSessionRef; ## Re-usable session descriptor + ps: SnapDbSessionRef; ## Re-usable session descriptor data: AccountStorageRange; ## Account storage reply from `snap/1` protocol persistent = false; ## store data on disk ): Result[void,seq[(int,HexaryDbError)]] = @@ -506,18 +509,18 @@ proc importStorages*( ok() proc importStorages*( - pv: AccountsDbRef; ## Base descriptor on `BaseChainDB` + pv: SnapDbRef; ## Base descriptor on `BaseChainDB` peer: Peer, ## For log messages, only data: AccountStorageRange; ## Account storage reply from `snap/1` protocol ): Result[void,seq[(int,HexaryDbError)]] = ## Variant of `importStorages()` - AccountsDbSessionRef.init( + SnapDbSessionRef.init( pv, Hash256(), peer).importStorages(data, persistent=true) proc importRawNodes*( - ps: AccountsDbSessionRef; ## Re-usable session descriptor + ps: SnapDbSessionRef; ## Re-usable session descriptor nodes: openArray[Blob]; ## Node records persistent = false; ## store data on disk ): Result[void,seq[(int,HexaryDbError)]] = @@ -559,64 +562,133 @@ proc importRawNodes*( ok() proc importRawNodes*( - pv: AccountsDbRef; ## Base descriptor on `BaseChainDB` + pv: SnapDbRef; ## Base descriptor on `BaseChainDB` peer: Peer, ## For log messages, only nodes: openArray[Blob]; ## Node records ): Result[void,seq[(int,HexaryDbError)]] = ## Variant of `importRawNodes()` for persistent storage. - AccountsDbSessionRef.init( + SnapDbSessionRef.init( pv, Hash256(), peer).importRawNodes(nodes, persistent=true) proc inspectAccountsTrie*( - ps: AccountsDbSessionRef; ## Re-usable session descriptor + ps: SnapDbSessionRef; ## Re-usable session descriptor pathList = seq[Blob].default; ## Starting nodes for search + maxLeafPaths = 0; ## Record leaves with proper 32 bytes path persistent = false; ## Read data from disk - ignoreError = false; ## Return partial results if available + ignoreError = false; ## Always return partial results if available ): Result[TrieNodeStat, HexaryDbError] = ## Starting with the argument list `pathSet`, find all the non-leaf nodes in ## the hexary trie which have at least one node key reference missing in - ## the trie database. + ## the trie database. Argument `pathSet` list entries that do not refer to a + ## valid node are silently ignored. ## let peer = ps.peer var stats: TrieNodeStat noRlpExceptionOops("inspectAccountsTrie()"): if persistent: let getFn: HexaryGetFn = proc(key: Blob): Blob = ps.base.db.get(key) - stats = getFn.hexaryInspectTrie(ps.accRoot, pathList) + stats = getFn.hexaryInspectTrie(ps.accRoot, pathList, maxLeafPaths) else: - stats = ps.accDb.hexaryInspectTrie(ps.accRoot, pathList) + stats = ps.accDb.hexaryInspectTrie(ps.accRoot, pathList, maxLeafPaths) block checkForError: let error = block: if stats.stopped: TrieLoopAlert - elif stats.level <= 1: + elif stats.level == 0: TrieIsEmpty else: break checkForError trace "Inspect account trie failed", peer, nPathList=pathList.len, - nDangling=stats.dangling.len, stoppedAt=stats.level, error + nDangling=stats.dangling.len, leaves=stats.leaves.len, + maxleaves=maxLeafPaths, stoppedAt=stats.level, error return err(error) when extraTraceMessages: trace "Inspect account trie ok", peer, nPathList=pathList.len, - nDangling=stats.dangling.len, level=stats.level + nDangling=stats.dangling.len, leaves=stats.leaves.len, + maxleaves=maxLeafPaths, level=stats.level return ok(stats) proc inspectAccountsTrie*( - pv: AccountsDbRef; ## Base descriptor on `BaseChainDB` - peer: Peer, ## For log messages, only + pv: SnapDbRef; ## Base descriptor on `BaseChainDB` + peer: Peer; ## For log messages, only root: Hash256; ## state root pathList = seq[Blob].default; ## Starting paths for search - ignoreError = false; ## Return partial results if available + maxLeafPaths = 0; ## Record leaves with proper 32 bytes path + ignoreError = false; ## Always return partial results when avail. ): Result[TrieNodeStat, HexaryDbError] = ## Variant of `inspectAccountsTrie()` for persistent storage. - AccountsDbSessionRef.init( - pv, root, peer).inspectAccountsTrie(pathList, persistent=true, ignoreError) + SnapDbSessionRef.init( + pv, root, peer).inspectAccountsTrie( + pathList, maxLeafPaths, persistent=true, ignoreError) + + +proc getAccountNodeKey*( + ps: SnapDbSessionRef; ## Re-usable session descriptor + path: Blob; ## Partial node path + persistent = false; ## Read data from disk + ): Result[NodeKey,HexaryDbError] = + ## For a partial node path argument `path`, return the raw node key. + var rc: Result[NodeKey,void] + noRlpExceptionOops("inspectAccountsPath()"): + if persistent: + let getFn: HexaryGetFn = proc(key: Blob): Blob = ps.base.db.get(key) + rc = getFn.hexaryInspectPath(ps.accRoot, path) + else: + rc = ps.accDb.hexaryInspectPath(ps.accRoot, path) + if rc.isOk: + return ok(rc.value) + err(NodeNotFound) + +proc getAccountNodeKey*( + pv: SnapDbRef; ## Base descriptor on `BaseChainDB` + peer: Peer; ## For log messages, only + root: Hash256; ## state root + path: Blob; ## Partial node path + ): Result[NodeKey,HexaryDbError] = + ## Variant of `inspectAccountsPath()` for persistent storage. + SnapDbSessionRef.init( + pv, root, peer).getAccountNodeKey(path, persistent=true) + + +proc getAccountData*( + ps: SnapDbSessionRef; ## Re-usable session descriptor + path: NodeKey; ## Account to visit + persistent = false; ## Read data from disk + ): Result[Account,HexaryDbError] = + ## Fetch account data. + ## + ## Caveat: There is no unit test yet for the non-persistent version + let peer = ps.peer + var acc: Account + + noRlpExceptionOops("getAccountData()"): + var leaf: Blob + if persistent: + let getFn: HexaryGetFn = proc(key: Blob): Blob = ps.base.db.get(key) + leaf = path.hexaryPath(ps.accRoot, getFn).leafData + else: + leaf = path.hexaryPath(ps.accRoot.to(RepairKey),ps.accDb).leafData + + if leaf.len == 0: + return err(AccountNotFound) + acc = rlp.decode(leaf,Account) + + return ok(acc) + +proc getAccountData*( + pv: SnapDbRef; ## Base descriptor on `BaseChainDB` + peer: Peer, ## For log messages, only + root: Hash256; ## state root + path: NodeKey; ## Account to visit + ): Result[Account,HexaryDbError] = + ## Variant of `getAccount()` for persistent storage. + SnapDbSessionRef.init(pv, root, peer).getAccountData(path, persistent=true) # ------------------------------------------------------------------------------ -# Debugging (and playing with the hexary database) +# Public functions: additional helpers # ------------------------------------------------------------------------------ proc sortMerge*(base: openArray[NodeTag]): NodeTag = @@ -642,22 +714,14 @@ proc sortMerge*(acc: openArray[seq[PackedAccount]]): seq[PackedAccount] = result = toSeq(accounts.keys).sorted(cmp).mapIt(accounts[it]) proc getChainDbAccount*( - ps: AccountsDbSessionRef; + ps: SnapDbSessionRef; accHash: Hash256 ): Result[Account,HexaryDbError] = ## Fetch account via `BaseChainDB` - noRlpExceptionOops("getChainDbAccount()"): - let - getFn: HexaryGetFn = proc(key: Blob): Blob = ps.base.db.get(key) - leaf = accHash.to(NodeKey).hexaryPath(ps.accRoot, getFn).leafData - if 0 < leaf.len: - let acc = rlp.decode(leaf,Account) - return ok(acc) - - err(AccountNotFound) + ps.getAccountData(accHash.to(NodeKey),persistent=true) proc nextChainDbKey*( - ps: AccountsDbSessionRef; + ps: SnapDbSessionRef; accHash: Hash256 ): Result[Hash256,HexaryDbError] = ## Fetch the account path on the `BaseChainDB`, the one next to the @@ -675,7 +739,7 @@ proc nextChainDbKey*( err(AccountNotFound) proc prevChainDbKey*( - ps: AccountsDbSessionRef; + ps: SnapDbSessionRef; accHash: Hash256 ): Result[Hash256,HexaryDbError] = ## Fetch the account path on the `BaseChainDB`, the one before to the @@ -692,7 +756,11 @@ proc prevChainDbKey*( err(AccountNotFound) -proc assignPrettyKeys*(ps: AccountsDbSessionRef) = +# ------------------------------------------------------------------------------ +# Debugging (and playing with the hexary database) +# ------------------------------------------------------------------------------ + +proc assignPrettyKeys*(ps: SnapDbSessionRef) = ## Prepare foe pretty pringing/debugging. Run early enough this function ## sets the root key to `"$"`, for instance. noPpError("validate(1)"): @@ -710,22 +778,22 @@ proc assignPrettyKeys*(ps: AccountsDbSessionRef) = of Extension: discard node.eLink.toKey(ps) of Leaf: discard -proc dumpPath*(ps: AccountsDbSessionRef; key: NodeTag): seq[string] = +proc dumpPath*(ps: SnapDbSessionRef; key: NodeTag): seq[string] = ## Pretty print helper compiling the path into the repair tree for the ## argument `key`. noPpError("dumpPath"): let rPath= key.to(NodeKey).hexaryPath(ps.accRoot.to(RepairKey), ps.accDb) result = rPath.path.mapIt(it.pp(ps.accDb)) & @["(" & rPath.tail.pp & ")"] -proc dumpAccDB*(ps: AccountsDbSessionRef; indent = 4): string = +proc dumpAccDB*(ps: SnapDbSessionRef; indent = 4): string = ## Dump the entries from the a generic accounts trie. ps.accDb.pp(ps.accRoot,indent) -proc getAcc*(ps: AccountsDbSessionRef): HexaryTreeDbRef = +proc getAcc*(ps: SnapDbSessionRef): HexaryTreeDbRef = ## Low level access to accounts DB ps.accDb -proc hexaryPpFn*(ps: AccountsDbSessionRef): HexaryPpFn = +proc hexaryPpFn*(ps: SnapDbSessionRef): HexaryPpFn = ## Key mapping function used in `HexaryTreeDB` ps.accDb.keyPp diff --git a/nimbus/sync/snap/worker/fetch_accounts.nim b/nimbus/sync/snap/worker/fetch_accounts.nim deleted file mode 100644 index e25685c53..000000000 --- a/nimbus/sync/snap/worker/fetch_accounts.nim +++ /dev/null @@ -1,566 +0,0 @@ -# Nimbus -# Copyright (c) 2021 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or -# http://www.apache.org/licenses/LICENSE-2.0) -# * MIT license ([LICENSE-MIT](LICENSE-MIT) or -# http://opensource.org/licenses/MIT) -# at your option. This file may not be copied, modified, or distributed -# except according to those terms. - -import - std/sequtils, - chronicles, - chronos, - eth/[common/eth_types, p2p], - stew/[interval_set, keyed_queue], - stint, - ../../sync_desc, - ".."/[range_desc, worker_desc], - ./com/[get_account_range, get_error, get_storage_ranges, get_trie_nodes], - ./accounts_db - -when snapAccountsDumpEnable: - import ../../../tests/replay/[undump_accounts, undump_storages] - -{.push raises: [Defect].} - -logScope: - topics = "snap-fetch" - -const - extraTraceMessages = false or true - ## Enabled additional logging noise - - maxTimeoutErrors = 2 - ## maximal number of non-resonses accepted in a row - -# ------------------------------------------------------------------------------ -# Private debugging -# ------------------------------------------------------------------------------ - -proc dumpBegin( - buddy: SnapBuddyRef; - iv: LeafRange; - dd: GetAccountRange; - error = NothingSerious) = - # For debuging, will go away - discard - when snapAccountsDumpEnable: - let ctx = buddy.ctx - if ctx.data.proofDumpOk: - let - peer = buddy.peer - env = ctx.data.pivotEnv - stateRoot = env.stateHeader.stateRoot - trace " Snap proofs dump", peer, enabled=ctx.data.proofDumpOk, iv - var - fd = ctx.data.proofDumpFile - try: - if error != NothingSerious: - fd.write " # Error: base=" & $iv.minPt & " msg=" & $error & "\n" - fd.write "# count ", $ctx.data.proofDumpInx & "\n" - fd.write stateRoot.dumpAccounts(iv.minPt, dd.data) & "\n" - except CatchableError: - discard - ctx.data.proofDumpInx.inc - -proc dumpStorage(buddy: SnapBuddyRef; data: AccountStorageRange) = - # For debuging, will go away - discard - when snapAccountsDumpEnable: - let ctx = buddy.ctx - if ctx.data.proofDumpOk: - let - peer = buddy.peer - env = ctx.data.pivotEnv - stateRoot = env.stateHeader.stateRoot - var - fd = ctx.data.proofDumpFile - try: - fd.write stateRoot.dumpStorages(data) & "\n" - except CatchableError: - discard - -proc dumpEnd(buddy: SnapBuddyRef) = - # For debuging, will go away - discard - when snapAccountsDumpEnable: - let ctx = buddy.ctx - if ctx.data.proofDumpOk: - var fd = ctx.data.proofDumpFile - fd.flushFile - -# ------------------------------------------------------------------------------ -# Private helpers -# ------------------------------------------------------------------------------ - -proc withMaxLen(buddy: SnapBuddyRef; iv: LeafRange): LeafRange = - ## Reduce accounts interval to maximal size - let maxlen = buddy.ctx.data.accountRangeMax - if 0 < iv.len and iv.len <= maxLen: - iv - else: - LeafRange.new(iv.minPt, iv.minPt + (maxLen - 1.u256)) - -# ------------------------------------------------------------------------------ -# Private functions -# ------------------------------------------------------------------------------ - -proc getUnprocessed(buddy: SnapBuddyRef): Result[LeafRange,void] = - ## Fetch an interval from the account range list. Use the `pivotAccount` - ## value as a start entry to fetch data from, wrapping around if necessary. - let - ctx = buddy.ctx - env = ctx.data.pivotEnv - pivotPt = env.pivotAccount - - block: - # Take the next interval to the right (aka ge) `pivotPt` - let rc = env.availAccounts.ge(pivotPt) - if rc.isOk: - let iv = buddy.withMaxLen(rc.value) - discard env.availAccounts.reduce(iv) - return ok(iv) - - block: - # Check whether the `pivotPt` is in the middle of an interval - let rc = env.availAccounts.envelope(pivotPt) - if rc.isOk: - let iv = buddy.withMaxLen(LeafRange.new(pivotPt, rc.value.maxPt)) - discard env.availAccounts.reduce(iv) - return ok(iv) - - block: - # Otherwise wrap around - let rc = env.availAccounts.ge() - if rc.isOk: - let iv = buddy.withMaxLen(rc.value) - discard env.availAccounts.reduce(iv) - return ok(iv) - - err() - -proc putUnprocessed(buddy: SnapBuddyRef; iv: LeafRange) = - ## Shortcut - discard buddy.ctx.data.pivotEnv.availAccounts.merge(iv) - -proc delUnprocessed(buddy: SnapBuddyRef; iv: LeafRange) = - ## Shortcut - discard buddy.ctx.data.pivotEnv.availAccounts.reduce(iv) - - -proc stopAfterError( - buddy: SnapBuddyRef; - error: ComError; - ): Future[bool] - {.async.} = - ## Error handling after data protocol failed. - case error: - of ComResponseTimeout: - if maxTimeoutErrors <= buddy.data.errors.nTimeouts: - # Mark this peer dead, i.e. avoid fetching from this peer for a while - buddy.ctrl.zombie = true - else: - # Otherwise try again some time later. Nevertheless, stop the - # current action. - buddy.data.errors.nTimeouts.inc - await sleepAsync(5.seconds) - return true - - of ComNetworkProblem, - ComMissingProof, - ComAccountsMinTooSmall, - ComAccountsMaxTooLarge: - # Mark this peer dead, i.e. avoid fetching from this peer for a while - # buddy.data.stats.major.networkErrors.inc() - buddy.ctrl.zombie = true - return true - - of ComEmptyAccountsArguments, - ComEmptyRequestArguments, - ComInspectDbFailed, - ComImportAccountsFailed, - ComNoDataForProof, - ComNothingSerious: - discard - - of ComNoAccountsForStateRoot, - ComNoStorageForAccounts, - ComNoByteCodesAvailable, - ComNoTrieNodesAvailable, - ComTooManyByteCodes, - ComTooManyStorageSlots, - ComTooManyTrieNodes: - # Mark this peer dead, i.e. avoid fetching from this peer for a while - buddy.ctrl.zombie = true - return true - -# ------------------------------------------------------------------------------ -# Private functions: accounts -# ------------------------------------------------------------------------------ - -proc processAccounts( - buddy: SnapBuddyRef; - iv: LeafRange; ## Accounts range to process - ): Future[Result[void,ComError]] - {.async.} = - ## Process accounts and storage by bulk download on the current envirinment - # Reset error counts for detecting repeated timeouts - buddy.data.errors.nTimeouts = 0 - - # Process accounts - let - ctx = buddy.ctx - peer = buddy.peer - env = ctx.data.pivotEnv - stateRoot = env.stateHeader.stateRoot - - # Fetch data for this range delegated to `fetchAccounts()` - let dd = block: - let rc = await buddy.getAccountRange(stateRoot, iv) - if rc.isErr: - buddy.putUnprocessed(iv) # fail => interval back to pool - return err(rc.error) - rc.value - - let - nAccounts = dd.data.accounts.len - nStorage = dd.withStorage.len - - block: - let rc = ctx.data.accountsDb.importAccounts( - peer, stateRoot, iv.minPt, dd.data) - if rc.isErr: - # Bad data, just try another peer - buddy.putUnprocessed(iv) - buddy.ctrl.zombie = true - trace "Import failed, restoring unprocessed accounts", peer, stateRoot, - range=dd.consumed, nAccounts, nStorage, error=rc.error - - buddy.dumpBegin(iv, dd, rc.error) # FIXME: Debugging (will go away) - buddy.dumpEnd() # FIXME: Debugging (will go away) - return err(ComImportAccountsFailed) - - buddy.dumpBegin(iv, dd) # FIXME: Debugging (will go away) - - # Statistics - env.nAccounts.inc(nAccounts) - env.nStorage.inc(nStorage) - - # Register consumed intervals on the accumulator over all state roots - discard buddy.ctx.data.coveredAccounts.merge(dd.consumed) - - # Register consumed and bulk-imported (well, not yet) accounts range - block registerConsumed: - block: - # Both intervals `min(iv)` and `min(dd.consumed)` are equal - let rc = iv - dd.consumed - if rc.isOk: - # Now, `dd.consumed` < `iv`, return some unused range - buddy.putUnprocessed(rc.value) - break registerConsumed - block: - # The processed interval might be a bit larger - let rc = dd.consumed - iv - if rc.isOk: - # Remove from unprocessed data. If it is not unprocessed, anymore - # then it was doubly processed which is ok. - buddy.delUnprocessed(rc.value) - break registerConsumed - # End registerConsumed - - # Store accounts on the storage TODO list. - discard env.leftOver.append SnapSlotQueueItemRef(q: dd.withStorage) - - return ok() - -# ------------------------------------------------------------------------------ -# Private functions: accounts storage -# ------------------------------------------------------------------------------ - -proc fetchAndImportStorageSlots( - buddy: SnapBuddyRef; - reqSpecs: seq[AccountSlotsHeader]; - ): Future[Result[seq[SnapSlotQueueItemRef],ComError]] - {.async.} = - ## Fetch storage slots data from the network, store it on disk and - ## return yet unprocessed data. - let - ctx = buddy.ctx - peer = buddy.peer - env = ctx.data.pivotEnv - stateRoot = env.stateHeader.stateRoot - - # Get storage slots - var stoRange = block: - let rc = await buddy.getStorageRanges(stateRoot, reqSpecs) - if rc.isErr: - return err(rc.error) - rc.value - - if 0 < stoRange.data.storages.len: - # ------------------------------ - buddy.dumpStorage(stoRange.data) - # ------------------------------ - - # Verify/process data and save to disk - block: - let rc = ctx.data.accountsDb.importStorages(peer, stoRange.data) - - if rc.isErr: - # Push back parts of the error item - var once = false - for w in rc.error: - if 0 <= w[0]: - # Reset any partial requests by not copying the `firstSlot` field. - # So all the storage slots are re-fetched completely for this - # account. - stoRange.addLeftOver( - @[AccountSlotsHeader( - accHash: stoRange.data.storages[w[0]].account.accHash, - storageRoot: stoRange.data.storages[w[0]].account.storageRoot)], - forceNew = not once) - once = true - # Do not ask for the same entries again on this `peer` - if once: - buddy.data.vetoSlots.incl stoRange.leftOver[^1] - - if rc.error[^1][0] < 0: - discard - # TODO: disk storage failed or something else happend, so what? - - # Return the remaining part to be processed later - return ok(stoRange.leftOver) - -proc processStorageSlots( - buddy: SnapBuddyRef; - ): Future[Result[void,ComError]] - {.async.} = - ## Fetch storage data and save it on disk. Storage requests are managed by - ## a request queue for handling partioal replies and re-fetch issues. For - ## all practical puroses, this request queue should mostly be empty. - let - ctx = buddy.ctx - peer = buddy.peer - env = ctx.data.pivotEnv - stateRoot = env.stateHeader.stateRoot - - while true: - # Pull out the next request item from the queue - var req: SnapSlotQueueItemRef - block getNextAvailableItem: - for w in env.leftOver.nextKeys: - # Make sure that this item was not fetched for, already - if w notin buddy.data.vetoSlots: - env.leftOver.del(w) - req = w - break getNextAvailableItem - return ok() - - block: - # Fetch and store account storage slots. On some sort of success, - # the `rc` return value contains a list of left-over items to be - # re-processed. - let rc = await buddy.fetchAndImportStorageSlots(req.q) - - if rc.isErr: - # Save accounts/storage list to be processed later, then stop - discard env.leftOver.append req - return err(rc.error) - - for qLo in rc.value: - # Handle queue left-overs for processing in the next cycle - if qLo.q[0].firstSlot == Hash256.default and 0 < env.leftOver.len: - # Appending to last queue item is preferred over adding new item - let item = env.leftOver.first.value - item.q = item.q & qLo.q - else: - # Put back as-is. - discard env.leftOver.append qLo - # End while - - return ok() - -# ------------------------------------------------------------------------------ -# Private functions: healing -# ------------------------------------------------------------------------------ - -proc accountsTrieHealing( - buddy: SnapBuddyRef; - env: SnapPivotRef; - envSource: string; - ): Future[Result[void,ComError]] - {.async.} = - ## ... - # Starting with a given set of potentially dangling nodes, this set is - # updated. - let - ctx = buddy.ctx - peer = buddy.peer - stateRoot = env.stateHeader.stateRoot - - while env.repairState != Done and - (env.dangling.len != 0 or env.repairState == Pristine): - - trace "Accounts healing loop", peer, repairState=env.repairState, - envSource, nDangling=env.dangling.len - - var needNodes: seq[Blob] - block getDanglingNodes: - let rc = ctx.data.accountsDb.inspectAccountsTrie( - peer, stateRoot, env.dangling) - if rc.isOk: - needNodes = rc.value.dangling - break getDanglingNodes - let error = rc.error - if error == TrieIsEmpty and env.dangling.len == 0: - when extraTraceMessages: - trace "Accounts healing on empty trie", peer, - repairState=env.repairState, envSource - return ok() - trace "Accounts healing failed", peer, repairState=env.repairState, - envSource, nDangling=env.dangling.len, error - return err(ComInspectDbFailed) - - # Update dangling nodes register so that other processes would not fetch - # the same list simultaneously. - if maxTrieNodeFetch < needNodes.len: - # No point in processing more at the same time. So leave the rest on - # the `dangling` queue. - env.dangling = needNodes[maxTrieNodeFetch ..< needNodes.len] - needNodes.setLen(maxTrieNodeFetch) - else: - env.dangling.setLen(0) - - # Noting to anymore - if needNodes.len == 0: - if env.repairState != Pristine: - env.repairState = Done - when extraTraceMessages: - trace "Done accounts healing for now", peer, - repairState=env.repairState, envSource, nDangling=env.dangling.len - return ok() - - let lastState = env.repairState - env.repairState = KeepGoing - - when extraTraceMessages: - trace "Need nodes for healing", peer, repairState=env.repairState, - envSource, nDangling=env.dangling.len, nNodes=needNodes.len - - # Fetch nodes - let dd = block: - let rc = await buddy.getTrieNodes(stateRoot, needNodes.mapIt(@[it])) - if rc.isErr: - env.dangling = needNodes - env.repairState = lastState - return err(rc.error) - rc.value - - # Store to disk and register left overs for the next pass - block: - let rc = ctx.data.accountsDb.importRawNodes(peer, dd.nodes) - if rc.isOk: - env.dangling = dd.leftOver.mapIt(it[0]) - elif 0 < rc.error.len and rc.error[^1][0] < 0: - # negative index => storage error - env.dangling = needNodes - else: - let nodeKeys = rc.error.mapIt(dd.nodes[it[0]]) - env.dangling = dd.leftOver.mapIt(it[0]) & nodeKeys - # End while - - return ok() - -# ------------------------------------------------------------------------------ -# Public functions -# ------------------------------------------------------------------------------ - -proc fetchAccounts*(buddy: SnapBuddyRef) {.async.} = - ## Fetch accounts and data and store them in the database. - ## - ## TODO: Healing for storages. Currently, healing in only run for accounts. - let - ctx = buddy.ctx - peer = buddy.peer - env = ctx.data.pivotEnv - stateRoot = env.stateHeader.stateRoot - var - # Complete the previous environment by trie database healing (if any) - healingEnvs = if not ctx.data.prevEnv.isNil: @[ctx.data.prevEnv] else: @[] - - block processAccountsFrame: - # Get a range of accounts to fetch from - let iv = block: - let rc = buddy.getUnprocessed() - if rc.isErr: - # Although there are no accounts left to process, the other peer might - # still work on some accounts. As a general rule, not all from an - # account range gets served so the remaining range will magically - # reappear on the unprocessed ranges database. - trace "No more unprocessed accounts (maybe)", peer, stateRoot - - # Complete healing for sporadic nodes missing. - healingEnvs.add env - break processAccountsFrame - rc.value - - when extraTraceMessages: - trace "Start fetching accounts", peer, stateRoot, iv, - repairState=env.repairState - - # Process received accounts and stash storage slots to fetch later - block: - let rc = await buddy.processAccounts(iv) - if rc.isErr: - let error = rc.error - if await buddy.stopAfterError(error): - buddy.dumpEnd() # FIXME: Debugging (will go away) - trace "Stop fetching cycle", peer, repairState=env.repairState, - processing="accounts", error - return - break processAccountsFrame - - # End `block processAccountsFrame` - - when extraTraceMessages: - trace "Start fetching storages", peer, nAccounts=env.leftOver.len, - nVetoSlots=buddy.data.vetoSlots.len, repairState=env.repairState - - # Process storage slots from environment batch - block: - let rc = await buddy.processStorageSlots() - if rc.isErr: - let error = rc.error - if await buddy.stopAfterError(error): - buddy.dumpEnd() # FIXME: Debugging (will go away) - trace "Stop fetching cycle", peer, repairState=env.repairState, - processing="storage", error - return - - # Check whether there is some environment that can be completed by - # Patricia Merkle Tree healing. - for w in healingEnvs: - let envSource = if env == ctx.data.pivotEnv: "pivot" else: "retro" - when extraTraceMessages: - trace "Start accounts healing", peer, repairState=env.repairState, - envSource, dangling=w.dangling.len - - let rc = await buddy.accountsTrieHealing(w, envSource) - if rc.isErr: - let error = rc.error - if await buddy.stopAfterError(error): - buddy.dumpEnd() # FIXME: Debugging (will go away) - when extraTraceMessages: - trace "Stop fetching cycle", peer, repairState=env.repairState, - processing="healing", dangling=w.dangling.len, error - return - - buddy.dumpEnd() # FIXME: Debugging (will go away) - when extraTraceMessages: - trace "Done fetching cycle", peer, repairState=env.repairState - -# ------------------------------------------------------------------------------ -# End -# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/heal_accounts.nim b/nimbus/sync/snap/worker/heal_accounts.nim new file mode 100644 index 000000000..59acd96c5 --- /dev/null +++ b/nimbus/sync/snap/worker/heal_accounts.nim @@ -0,0 +1,380 @@ +# Nimbus +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +## Heal accounts DB: +## ================= +## +## Flow chart for healing algorithm +## -------------------------------- +## :: +## START with {state-root} +## | +## | +------------------------------------------------+ +## | | | +## v v | +## --> {missing-account-nodes} | +## | | | +## v v | +## {leaf-nodes} | +## | | | +## v v | +## | +## | | | +## v v | +## {storage-roots} {check-account-nodes} ---------+ +## | +## v +## +## +## Legend: +## * `<..>` some action, process, etc. +## * `{..}` some data set, list, or queue etc. +## +## Discussion of flow chart +## ------------------------ +## * Input nodes for `` are checked for dangling child +## node links which in turn are collected as output. +## +## * Nodes of the `{missing-account-nodes}` list are fetched from the network +## and merged into the accounts trie database. Successfully processed nodes +## are collected in the `{check-account-nodes}` list which is fed back into +## the `` process. +## +## * If there is a problem with a node travelling from the source list +## `{missing-account-nodes}` towards the target list `{check-account-nodes}`, +## this problem node will simply held back in the source list. +## +## In order to avoid unnecessary stale entries, the `{missing-account-nodes}` +## list is regularly checked for whether nodes are still missing or some +## other process has done the magic work of merging some of then into the +## trie database. +## +## Competing with other trie algorithms +## ------------------------------------ +## * Healing runs (semi-)parallel to processing `GetAccountRange` network +## messages from the `snap/1` protocol. This is more network bandwidth +## efficient in comparison with the healing algorithm. Here, leaf nodes are +## transferred wholesale while with the healing algorithm, only the top node +## of a sub-trie can be transferred at once (but for multiple sub-tries). +## +## * The healing algorithm visits all nodes of a complete trie unless it is +## stopped in between. +## +## * If a trie node is missing, it can be fetched directly by the healing +## algorithm or one can wait for another process to do the job. Waiting for +## other processes to do the job also applies to problem nodes as indicated +## in the last bullet item of the previous chapter. +## +## * Network bandwidth can be saved if nodes are fetched by a more efficient +## process (if that is available.) This suggests that fetching missing trie +## nodes by the healing algorithm should kick in very late when the trie +## database is nearly complete. +## +## * Healing applies to a trie database associated with the currently latest +## *state root*, which may change occasionally. It suggests to start the +## healing algorithm very late altogether (not fetching nodes, only) because +## most trie databases will never be completed by healing. +## + +import + std/sequtils, + chronicles, + chronos, + eth/[common/eth_types, p2p, trie/trie_defs], + stew/[interval_set, keyed_queue], + ../../../utils/prettify, + ../../sync_desc, + ".."/[range_desc, worker_desc], + ./com/get_trie_nodes, + ./db/snap_db + +{.push raises: [Defect].} + +logScope: + topics = "snap-fetch" + +const + extraTraceMessages = false or true + ## Enabled additional logging noise + +# ------------------------------------------------------------------------------ +# Helpers +# ------------------------------------------------------------------------------ + +proc coverageInfo(buddy: SnapBuddyRef): string = + ## Logging helper ... + let + ctx = buddy.ctx + env = buddy.data.pivotEnv + env.fetchAccounts.emptyFactor.toPC(0) & + "/" & + ctx.data.coveredAccounts.fullFactor.toPC(0) + +proc getCoveringLeafRangeSet(buddy: SnapBuddyRef; pt: NodeTag): LeafRangeSet = + ## Helper ... + let env = buddy.data.pivotEnv + for ivSet in env.fetchAccounts: + if 0 < ivSet.covered(pt,pt): + return ivSet + +proc commitLeafAccount(buddy: SnapBuddyRef; ivSet: LeafRangeSet; pt: NodeTag) = + ## Helper ... + discard ivSet.reduce(pt,pt) + discard buddy.ctx.data.coveredAccounts.merge(pt,pt) + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +proc updateMissingNodesList(buddy: SnapBuddyRef) = + ## Check whether previously missing nodes from the `missingAccountNodes` list + ## have been magically added to the database since it was checked last time. + ## These nodes will me moved to `checkAccountNodes` for further processing. + let + ctx = buddy.ctx + peer = buddy.peer + env = buddy.data.pivotEnv + stateRoot = env.stateHeader.stateRoot + var + nodes: seq[Blob] + + for accKey in env.missingAccountNodes: + let rc = ctx.data.snapDb.getAccountNodeKey(peer, stateRoot, accKey) + if rc.isOk: + # Check nodes for dangling links + env.checkAccountNodes.add acckey + else: + # Node is still missing + nodes.add acckey + + env.missingAccountNodes = nodes + + +proc mergeIsolatedAccounts( + buddy: SnapBuddyRef; + paths: openArray[NodeKey]; + ): seq[AccountSlotsHeader] = + ## Process leaves found with nodes inspection, returns a list of + ## storage slots for these nodes. + let + ctx = buddy.ctx + peer = buddy.peer + env = buddy.data.pivotEnv + stateRoot = env.stateHeader.stateRoot + + # Remove reported leaf paths from the accounts interval + for accKey in paths: + let + pt = accKey.to(NodeTag) + ivSet = buddy.getCoveringLeafRangeSet(pt) + if not ivSet.isNil: + let + rc = ctx.data.snapDb.getAccountData(peer, stateRoot, accKey) + accountHash = Hash256(data: accKey.ByteArray32) + if rc.isOk: + let storageRoot = rc.value.storageRoot + when extraTraceMessages: + let stRootStr = if storageRoot != emptyRlpHash: $storageRoot + else: "emptyRlpHash" + trace "Registered isolated persistent account", peer, accountHash, + storageRoot=stRootStr + if storageRoot != emptyRlpHash: + result.add AccountSlotsHeader( + accHash: accountHash, + storageRoot: storageRoot) + buddy.commitLeafAccount(ivSet, pt) + env.nAccounts.inc + continue + + when extraTraceMessages: + let error = rc.error + trace "Get persistent account problem", peer, accountHash, error + + +proc fetchDanglingNodesList( + buddy: SnapBuddyRef + ): Result[TrieNodeStat,HexaryDbError] = + ## Starting with a given set of potentially dangling account nodes + ## `checkAccountNodes`, this set is filtered and processed. The outcome + ## is fed back to the vey same list `checkAccountNodes` + let + ctx = buddy.ctx + peer = buddy.peer + env = buddy.data.pivotEnv + stateRoot = env.stateHeader.stateRoot + + maxLeaves = if env.checkAccountNodes.len == 0: 0 + else: maxHealingLeafPaths + + rc = ctx.data.snapDb.inspectAccountsTrie( + peer, stateRoot, env.checkAccountNodes, maxLeaves) + + if rc.isErr: + # Attempt to switch peers, there is not much else we can do here + buddy.ctrl.zombie = true + return err(rc.error) + + # Global/env batch list to be replaced by by `rc.value.leaves` return value + env.checkAccountNodes.setLen(0) + + # Store accounts leaves on the storage batch list. + let withStorage = buddy.mergeIsolatedAccounts(rc.value.leaves) + if 0 < withStorage.len: + discard env.fetchStorage.append SnapSlotQueueItemRef(q: withStorage) + when extraTraceMessages: + trace "Accounts healing storage nodes", peer, + nAccounts=env.nAccounts, + covered=buddy.coverageInfo(), + nWithStorage=withStorage.len, + nDangling=rc.value.dangling + + return ok(rc.value) + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc healAccountsDb*(buddy: SnapBuddyRef) {.async.} = + ## Fetching and merging missing account trie database nodes. + let + ctx = buddy.ctx + peer = buddy.peer + env = buddy.data.pivotEnv + stateRoot = env.stateHeader.stateRoot + + # Only start healing if there is some completion level, already. + # + # We check against the global coverage factor, i.e. a measure for how + # much of the total of all accounts have been processed. Even if the trie + # database for the current pivot state root is sparsely filled, there + # is a good chance that it can inherit some unchanged sub-trie from an + # earlier pivor state root download. The healing process then works like + # sort of glue. + # + if env.nAccounts == 0 or + ctx.data.coveredAccounts.fullFactor < healAccountsTrigger: + when extraTraceMessages: + trace "Accounts healing postponed", peer, + nAccounts=env.nAccounts, + covered=buddy.coverageInfo(), + nCheckAccountNodes=env.checkAccountNodes.len, + nMissingAccountNodes=env.missingAccountNodes.len + return + + when extraTraceMessages: + trace "Start accounts healing", peer, + nAccounts=env.nAccounts, + covered=buddy.coverageInfo(), + nCheckAccountNodes=env.checkAccountNodes.len, + nMissingAccountNodes=env.missingAccountNodes.len + + # Update for changes since last visit + buddy.updateMissingNodesList() + + # If `checkAccountNodes` is empty, healing is at the very start or + # was postponed in which case `missingAccountNodes` is non-empty. + var + nodesMissing: seq[Blob] # Nodes to process by this instance + nLeaves = 0 # For logging + if 0 < env.checkAccountNodes.len or env.missingAccountNodes.len == 0: + let rc = buddy.fetchDanglingNodesList() + if rc.isErr: + error "Accounts healing failed => stop", peer, + nAccounts=env.nAccounts, + covered=buddy.coverageInfo(), + nCheckAccountNodes=env.checkAccountNodes.len, + nMissingAccountNodes=env.missingAccountNodes.len, + error=rc.error + return + + nodesMissing = rc.value.dangling + nLeaves = rc.value.leaves.len + + # Check whether the trie is complete. + if nodesMissing.len == 0 and env.missingAccountNodes.len == 0: + when extraTraceMessages: + trace "Accounts healing complete", peer, + nAccounts=env.nAccounts, + covered=buddy.coverageInfo(), + nCheckAccountNodes=0, + nMissingAccountNodes=0, + nNodesMissing=0, + nLeaves + return # nothing to do + + # Ok, clear global `env.missingAccountNodes` list and process `nodesMissing`. + nodesMissing = nodesMissing & env.missingAccountNodes + env.missingAccountNodes.setlen(0) + + # Fetch nodes, merge it into database and feed back results + while 0 < nodesMissing.len: + var fetchNodes: seq[Blob] + # There is no point in processing too many nodes at the same time. So + # leave the rest on the `nodesMissing` queue for a moment. + if maxTrieNodeFetch < nodesMissing.len: + let inxLeft = nodesMissing.len - maxTrieNodeFetch + fetchNodes = nodesMissing[inxLeft ..< nodesMissing.len] + nodesMissing.setLen(inxLeft) + else: + fetchNodes = nodesMissing + nodesMissing.setLen(0) + + when extraTraceMessages: + trace "Accounts healing loop", peer, + nAccounts=env.nAccounts, + covered=buddy.coverageInfo(), + nCheckAccountNodes=env.checkAccountNodes.len, + nMissingAccountNodes=env.missingAccountNodes.len, + nNodesMissing=nodesMissing.len, + nLeaves + + # Fetch nodes from the network + let dd = block: + let rc = await buddy.getTrieNodes(stateRoot, fetchNodes.mapIt(@[it])) + if rc.isErr: + env.missingAccountNodes = env.missingAccountNodes & fetchNodes + when extraTraceMessages: + trace "Error fetching account nodes for healing", peer, + nAccounts=env.nAccounts, + covered=buddy.coverageInfo(), + nCheckAccountNodes=env.checkAccountNodes.len, + nMissingAccountNodes=env.missingAccountNodes.len, + nNodesMissing=nodesMissing.len, + nLeaves, + error=rc.error + # Just run the next lap + continue + rc.value + + # Store to disk and register left overs for the next pass + block: + let rc = ctx.data.snapDb.importRawNodes(peer, dd.nodes) + if rc.isOk: + env.checkAccountNodes = env.checkAccountNodes & dd.leftOver.mapIt(it[0]) + elif 0 < rc.error.len and rc.error[^1][0] < 0: + # negative index => storage error + env.missingAccountNodes = env.missingAccountNodes & fetchNodes + else: + env.missingAccountNodes = env.missingAccountNodes & + dd.leftOver.mapIt(it[0]) & rc.error.mapIt(dd.nodes[it[0]]) + + # End while + + when extraTraceMessages: + trace "Done accounts healing", peer, + nAccounts=env.nAccounts, + covered=buddy.coverageInfo(), + nCheckAccountNodes=env.checkAccountNodes.len, + nMissingAccountNodes=env.missingAccountNodes.len, + nLeaves + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/store_accounts.nim b/nimbus/sync/snap/worker/store_accounts.nim new file mode 100644 index 000000000..64ae92202 --- /dev/null +++ b/nimbus/sync/snap/worker/store_accounts.nim @@ -0,0 +1,186 @@ + +# Nimbus +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +## Fetch accounts stapshot +## ======================= +## +## Worker items state diagram: +## :: +## unprocessed | peer workers + | +## account ranges | account database update | unprocessed storage slots +## ======================================================================== +## +## +---------------------------------------+ +## | | +## v | +## -----+------> ------+-----> OUTPUT +## | | +## +------> ------+ +## | | +## +------> ------+ +## : : +## + +import + chronicles, + chronos, + eth/[common/eth_types, p2p], + stew/[interval_set, keyed_queue], + stint, + ../../sync_desc, + ".."/[range_desc, worker_desc], + ./com/[com_error, get_account_range], + ./db/snap_db + +{.push raises: [Defect].} + +logScope: + topics = "snap-fetch" + +const + extraTraceMessages = false or true + ## Enabled additional logging noise + +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +proc withMaxLen( + buddy: SnapBuddyRef; + iv: LeafRange; + maxlen: UInt256; + ): LeafRange = + ## Reduce accounts interval to maximal size + if 0 < iv.len and iv.len <= maxLen: + iv + else: + LeafRange.new(iv.minPt, iv.minPt + (maxLen - 1.u256)) + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +proc getUnprocessed(buddy: SnapBuddyRef): Result[LeafRange,void] = + ## Fetch an interval from one of the account range lists. + let + env = buddy.data.pivotEnv + accountRangeMax = high(UInt256) div buddy.ctx.buddiesMax.u256 + + for ivSet in env.fetchAccounts: + let rc = ivSet.ge() + if rc.isOk: + let iv = buddy.withMaxLen(rc.value, accountRangeMax) + discard ivSet.reduce(iv) + return ok(iv) + + err() + +proc putUnprocessed(buddy: SnapBuddyRef; iv: LeafRange) = + ## Shortcut + discard buddy.data.pivotEnv.fetchAccounts[1].merge(iv) + +proc delUnprocessed(buddy: SnapBuddyRef; iv: LeafRange) = + ## Shortcut + discard buddy.data.pivotEnv.fetchAccounts[1].reduce(iv) + +proc markGloballyProcessed(buddy: SnapBuddyRef; iv: LeafRange) = + ## Shortcut + discard buddy.ctx.data.coveredAccounts.merge(iv) + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc storeAccounts*(buddy: SnapBuddyRef) {.async.} = + ## Fetch accounts and store them in the database. + let + ctx = buddy.ctx + peer = buddy.peer + env = buddy.data.pivotEnv + stateRoot = env.stateHeader.stateRoot + + # Get a range of accounts to fetch from + let iv = block: + let rc = buddy.getUnprocessed() + if rc.isErr: + trace "Currently no unprocessed accounts", peer, stateRoot + return + rc.value + + when extraTraceMessages: + trace "Start fetching accounts", peer, stateRoot, iv + + # Process received accounts and stash storage slots to fetch later + let dd = block: + let rc = await buddy.getAccountRange(stateRoot, iv) + if rc.isErr: + buddy.putUnprocessed(iv) # fail => interval back to pool + let error = rc.error + if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): + when extraTraceMessages: + trace "Error fetching accounts => stop", peer, error + return + # Reset error counts for detecting repeated timeouts + buddy.data.errors.nTimeouts = 0 + rc.value + + let + nAccounts = dd.data.accounts.len + nStorage = dd.withStorage.len + + block: + let rc = ctx.data.snapDb.importAccounts(peer, stateRoot, iv.minPt, dd.data) + if rc.isErr: + # Bad data, just try another peer + buddy.putUnprocessed(iv) + buddy.ctrl.zombie = true + when extraTraceMessages: + let error = ComImportAccountsFailed + trace "Accounts import failed => stop", peer, stateRoot, + range=dd.consumed, nAccounts, nStorage, error + return + + # Statistics + env.nAccounts.inc(nAccounts) + env.nStorage.inc(nStorage) + + # Register consumed intervals on the accumulator over all state roots + buddy.markGloballyProcessed(dd.consumed) + + # Register consumed and bulk-imported (well, not yet) accounts range + block registerConsumed: + block: + # Both intervals `min(iv)` and `min(dd.consumed)` are equal + let rc = iv - dd.consumed + if rc.isOk: + # Now, `dd.consumed` < `iv`, return some unused range + buddy.putUnprocessed(rc.value) + break registerConsumed + block: + # The processed interval might be a bit larger + let rc = dd.consumed - iv + if rc.isOk: + # Remove from unprocessed data. If it is not unprocessed, anymore + # then it was doubly processed which is ok. + buddy.delUnprocessed(rc.value) + break registerConsumed + # End registerConsumed + + # Store accounts on the storage TODO list. + discard env.fetchStorage.append SnapSlotQueueItemRef(q: dd.withStorage) + + when extraTraceMessages: + let withStorage = dd.withStorage.len + trace "Done fetching accounts", peer, stateRoot, nAccounts, withStorage, iv + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/store_storages.nim b/nimbus/sync/snap/worker/store_storages.nim new file mode 100644 index 000000000..0ee607168 --- /dev/null +++ b/nimbus/sync/snap/worker/store_storages.nim @@ -0,0 +1,177 @@ +# Nimbus +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +## Fetch accounts stapshot +## ======================= +## +## Worker items state diagram: +## :: +## unprocessed slot requests | peer workers + storages database update +## =================================================================== +## +## +-----------------------------------------------+ +## | | +## v | +## ------------+-------> ------+ +## | | +## +-------> ------+ +## | | +## +-------> ------+ +## : : +## + +import + chronicles, + chronos, + eth/[common/eth_types, p2p], + stew/keyed_queue, + stint, + ../../sync_desc, + ".."/[range_desc, worker_desc], + ./com/[com_error, get_storage_ranges], + ./db/snap_db + +{.push raises: [Defect].} + +logScope: + topics = "snap-fetch" + +const + extraTraceMessages = false or true + ## Enabled additional logging noise + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +proc getNextSlotItem(buddy: SnapBuddyRef): Result[SnapSlotQueueItemRef,void] = + let env = buddy.data.pivotEnv + for w in env.fetchStorage.nextKeys: + # Make sure that this item was not fetched and rejected earlier + if w notin buddy.data.vetoSlots: + env.fetchStorage.del(w) + return ok(w) + err() + +proc fetchAndImportStorageSlots( + buddy: SnapBuddyRef; + reqSpecs: seq[AccountSlotsHeader]; + ): Future[Result[seq[SnapSlotQueueItemRef],ComError]] + {.async.} = + ## Fetch storage slots data from the network, store it on disk and + ## return data to process in the next cycle. + let + ctx = buddy.ctx + peer = buddy.peer + env = buddy.data.pivotEnv + stateRoot = env.stateHeader.stateRoot + + # Get storage slots + var stoRange = block: + let rc = await buddy.getStorageRanges(stateRoot, reqSpecs) + if rc.isErr: + return err(rc.error) + rc.value + + if 0 < stoRange.data.storages.len: + # Verify/process data and save to disk + block: + let rc = ctx.data.snapDb.importStorages(peer, stoRange.data) + + if rc.isErr: + # Push back parts of the error item + var once = false + for w in rc.error: + if 0 <= w[0]: + # Reset any partial requests by not copying the `firstSlot` field. + # So all the storage slots are re-fetched completely for this + # account. + stoRange.addLeftOver( + @[AccountSlotsHeader( + accHash: stoRange.data.storages[w[0]].account.accHash, + storageRoot: stoRange.data.storages[w[0]].account.storageRoot)], + forceNew = not once) + once = true + # Do not ask for the same entries again on this `peer` + if once: + buddy.data.vetoSlots.incl stoRange.leftOver[^1] + + if rc.error[^1][0] < 0: + discard + # TODO: disk storage failed or something else happend, so what? + + # Return the remaining part to be processed later + return ok(stoRange.leftOver) + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc storeStorages*(buddy: SnapBuddyRef) {.async.} = + ## Fetch account storage slots and store them in the database. + let + ctx = buddy.ctx + peer = buddy.peer + env = buddy.data.pivotEnv + stateRoot = env.stateHeader.stateRoot + var + once = true # for logging + + # Fetch storage data and save it on disk. Storage requests are managed by + # a request queue for handling partioal replies and re-fetch issues. For + # all practical puroses, this request queue should mostly be empty. + while true: + # Pull out the next request item from the queue + let req = block: + let rc = buddy.getNextSlotItem() + if rc.isErr: + return # currently nothing to do + rc.value + + when extraTraceMessages: + if once: + once = false + let nAccounts = 1 + env.fetchStorage.len + trace "Start fetching storage slotss", peer, + nAccounts, nVetoSlots=buddy.data.vetoSlots.len + + block: + # Fetch and store account storage slots. On success, the `rc` value will + # contain a list of left-over items to be re-processed. + let rc = await buddy.fetchAndImportStorageSlots(req.q) + if rc.isErr: + # Save accounts/storage list to be processed later, then stop + discard env.fetchStorage.append req + let error = rc.error + if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): + trace "Error fetching storage slots => stop", peer, error + discard + return + + # Reset error counts for detecting repeated timeouts + buddy.data.errors.nTimeouts = 0 + + for qLo in rc.value: + # Handle queue left-overs for processing in the next cycle + if qLo.q[0].firstSlot == Hash256() and 0 < env.fetchStorage.len: + # Appending to last queue item is preferred over adding new item + let item = env.fetchStorage.first.value + item.q = item.q & qLo.q + else: + # Put back as-is. + discard env.fetchStorage.append qLo + # End while + + when extraTraceMessages: + trace "Done fetching storage slots", peer, nAccounts=env.fetchStorage.len + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/ticker.nim b/nimbus/sync/snap/worker/ticker.nim index 615d5af56..4c1f59b4c 100644 --- a/nimbus/sync/snap/worker/ticker.nim +++ b/nimbus/sync/snap/worker/ticker.nim @@ -30,8 +30,7 @@ type nStorage*: (float,float) ## mean and standard deviation accountsFill*: (float,float,float) ## mean, standard deviation, merged total accCoverage*: float ## as factor - activeQueues*: int - flushedQueues*: int64 + nQueues*: int TickerStatsUpdater* = proc: TickerStats {.gcsafe, raises: [Defect].} @@ -119,19 +118,18 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} = accCov = data.accountsFill[0].toPC(1) & "(" & data.accountsFill[1].toPC(1) & ")" & "/" & data.accountsFill[2].toPC(0) - flushed = data.flushedQueues buddies = t.nBuddies tick = t.tick.toSI mem = getTotalMem().uint.toSI noFmtError("runLogTicker"): if data.pivotBlock.isSome: - pivot = &"#{data.pivotBlock.get}/{data.activeQueues}" + pivot = &"#{data.pivotBlock.get}/{data.nQueues}" nAcc = &"{(data.nAccounts[0]+0.5).int64}({(data.nAccounts[1]+0.5).int64})" nStore = &"{(data.nStorage[0]+0.5).int64}({(data.nStorage[1]+0.5).int64})" info "Snap sync statistics", - tick, buddies, pivot, nAcc, accCov, nStore, flushed, mem + tick, buddies, pivot, nAcc, accCov, nStore, mem t.tick.inc t.setLogTicker(Moment.fromNow(tickerLogInterval)) diff --git a/nimbus/sync/snap/worker_desc.nim b/nimbus/sync/snap/worker_desc.nim index 5053f0b5f..dd702de9a 100644 --- a/nimbus/sync/snap/worker_desc.nim +++ b/nimbus/sync/snap/worker_desc.nim @@ -1,5 +1,4 @@ -# Nimbus - New sync approach - A fusion of snap, trie, beam and other methods -# +# Nimbus # Copyright (c) 2021 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or @@ -13,10 +12,9 @@ import std/[hashes, sequtils, strutils], eth/[common/eth_types, p2p], stew/[byteutils, keyed_queue], - ../../db/select_backend, - ../../constants, + "../.."/[constants, db/select_backend], ".."/[sync_desc, types], - ./worker/[accounts_db, ticker], + ./worker/[com/com_error, db/snap_db, ticker], ./range_desc {.push raises: [Defect].} @@ -25,14 +23,40 @@ const snapRequestBytesLimit* = 2 * 1024 * 1024 ## Soft bytes limit to request in `snap` protocol calls. - maxPivotBlockWindow* = 128 - ## The maximal depth of two block headers. If the pivot block header - ## (containing the current state root) is more than this many blocks - ## away from a new pivot block header candidate, then the latter one - ## replaces the current block header. + minPivotBlockDistance* = 128 + ## The minimal depth of two block headers needed to activate a new state + ## root pivot. ## - ## This mechanism applies to new worker buddies which start by finding - ## a new pivot. + ## Effects on assembling the state via `snap/1` protocol: + ## + ## * A small value of this constant increases the propensity to update the + ## pivot header more often. This is so because each new peer negoiates a + ## pivot block number at least the current one. + ## + ## * A large value keeps the current pivot more stable but some experiments + ## suggest that the `snap/1` protocol is answered only for later block + ## numbers (aka pivot blocks.) So a large value tends to keep the pivot + ## farther away from the chain head. + ## + ## Note that 128 is the magic distance for snapshots used by *Geth*. + + backPivotBlockDistance* = 64 + ## When a pivot header is found, move pivot back `backPivotBlockDistance` + ## blocks so that the pivot is guaranteed to have some distance from the + ## chain head. + ## + ## Set `backPivotBlockDistance` to zero for disabling this feature. + + backPivotBlockThreshold* = backPivotBlockDistance + minPivotBlockDistance + ## Ignore `backPivotBlockDistance` unless the current block number is + ## larger than this constant (which must be at least + ## `backPivotBlockDistance`.) + + healAccountsTrigger* = 0.95 + ## Apply accounts healing if the global snap download coverage factor + ## exceeds this setting. The global coverage factor is derived by merging + ## all account ranges retrieved for all pivot state roots (see + ## `coveredAccounts` in `CtxData`.) maxTrieNodeFetch* = 1024 ## Informal maximal number of trie nodes to fetch at once. This is nor @@ -41,42 +65,22 @@ const ## Resticting the fetch list length early allows to better paralellise ## healing. - switchPivotAfterCoverage* = 1.0 # * 0.30 - ## Stop fetching from the same pivot state root with this much coverage - ## and try to find a new one. Setting this value to `1.0`, this feature - ## is disabled. Note that settting low coverage levels is primarily for - ## testing/debugging (may produce stress conditions.) - ## - ## If this setting is active, it typically supersedes the pivot update - ## mechainsm implied by the `maxPivotBlockWindow`. This for the simple - ## reason that the pivot state root is typically near the head of the - ## block chain. - ## - ## This mechanism applies to running worker buddies. When triggered, all - ## pivot handlers are reset so they will start from scratch finding a - ## better pivot. + maxHealingLeafPaths* = 1024 + ## Retrieve this many leave nodes with proper 32 bytes path when inspecting + ## for dangling nodes. This allows to run healing paralell to accounts or + ## storage download without requestinng an account/storage slot found by + ## healing again with the download. - # --- + noPivotEnvChangeIfComplete* = true + ## If set `true`, new peers will not change the pivot even if the + ## negotiated pivot would be newer. This should be the default. - snapAccountsDumpEnable* = false # or true - ## Enable data dump - - snapAccountsDumpCoverageStop* = 0.99999 - ## Stop dumping if most accounts are covered + # ------- seenBlocksMax = 500 ## Internal size of LRU cache (for debugging) type - BuddyStat* = distinct uint - - SnapBuddyErrors* = tuple - ## particular error counters so connections will not be cut immediately - ## after a particular error. - nTimeouts: uint - - # ------- - WorkerSeenBlocks = KeyedQueue[array[32,byte],BlockNumber] ## Temporary for pretty debugging, `BlockHash` keyed lru cache @@ -94,23 +98,28 @@ type SnapSlotsSet* = HashSet[SnapSlotQueueItemRef] ## Ditto but without order, to be used as veto set - SnapRepairState* = enum - Pristine ## Not initialised yet - KeepGoing ## Unfinished repair process - Done ## Stop repairing + SnapAccountRanges* = array[2,LeafRangeSet] + ## Pair of account hash range lists. The first entry must be processed + ## first. This allows to coordinate peers working on different state roots + ## to avoid ovelapping accounts as long as they fetch from the first entry. SnapPivotRef* = ref object ## Per-state root cache for particular snap data environment stateHeader*: BlockHeader ## Pivot state, containg state root - pivotAccount*: NodeTag ## Random account - availAccounts*: LeafRangeSet ## Accounts to fetch (as ranges) + + # Accounts download + fetchAccounts*: SnapAccountRanges ## Sets of accounts ranges to fetch + checkAccountNodes*: seq[Blob] ## Nodes with prob. dangling child links + missingAccountNodes*: seq[Blob] ## Dangling links to fetch and merge + accountsDone*: bool ## All accounts have been processed + + # Storage slots download + fetchStorage*: SnapSlotsQueue ## Fetch storage for these accounts + serialSync*: bool ## Done with storage, block sync next + + # Info nAccounts*: uint64 ## Number of accounts imported nStorage*: uint64 ## Number of storage spaces imported - leftOver*: SnapSlotsQueue ## Re-fetch storage for these accounts - dangling*: seq[Blob] ## Missing nodes for healing process - repairState*: SnapRepairState ## State of healing process - when switchPivotAfterCoverage < 1.0: - minCoverageReachedOk*: bool ## Stop filling this pivot SnapPivotTable* = ##\ ## LRU table, indexed by state root @@ -118,33 +127,23 @@ type BuddyData* = object ## Per-worker local descriptor data extension - errors*: SnapBuddyErrors ## For error handling - workerPivot*: RootRef ## Opaque object reference for sub-module + errors*: ComErrorStatsRef ## For error handling + pivotFinder*: RootRef ## Opaque object reference for sub-module + pivotEnv*: SnapPivotRef ## Environment containing state root vetoSlots*: SnapSlotsSet ## Do not ask for these slots, again - BuddyPoolHookFn* = proc(buddy: BuddyRef[CtxData,BuddyData]) {.gcsafe.} - ## All buddies call back (the argument type is defined below with - ## pretty name `SnapBuddyRef`.) - CtxData* = object ## Globally shared data extension seenBlock: WorkerSeenBlocks ## Temporary, debugging, pretty logs rng*: ref HmacDrbgContext ## Random generator - coveredAccounts*: LeafRangeSet ## Derived from all available accounts dbBackend*: ChainDB ## Low level DB driver access (if any) - ticker*: TickerRef ## Ticker, logger pivotTable*: SnapPivotTable ## Per state root environment - pivotCount*: uint64 ## Total of all created tab entries - pivotEnv*: SnapPivotRef ## Environment containing state root - prevEnv*: SnapPivotRef ## Previous state root environment - pivotData*: RootRef ## Opaque object reference for sub-module - accountRangeMax*: UInt256 ## Maximal length, high(u256)/#peers - accountsDb*: AccountsDbRef ## Proof processing for accounts - runPoolHook*: BuddyPoolHookFn ## Callback for `runPool()` - when snapAccountsDumpEnable: - proofDumpOk*: bool - proofDumpFile*: File - proofDumpInx*: int + pivotFinderCtx*: RootRef ## Opaque object reference for sub-module + snapDb*: SnapDbRef ## Accounts snapshot DB + coveredAccounts*: LeafRangeSet ## Derived from all available accounts + + # Info + ticker*: TickerRef ## Ticker, logger SnapBuddyRef* = BuddyRef[CtxData,BuddyData] ## Extended worker peer descriptor @@ -152,16 +151,22 @@ type SnapCtxRef* = CtxRef[CtxData] ## Extended global descriptor +static: + doAssert healAccountsTrigger < 1.0 # larger values make no sense + doAssert backPivotBlockDistance <= backPivotBlockThreshold + # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ -proc inc(stat: var BuddyStat) {.borrow.} - proc hash*(a: SnapSlotQueueItemRef): Hash = ## Table/KeyedQueue mixin cast[pointer](a).hash +proc hash*(a: Hash256): Hash = + ## Table/KeyedQueue mixin + a.data.hash + # ------------------------------------------------------------------------------ # Public functions, debugging helpers (will go away eventually) # ------------------------------------------------------------------------------ diff --git a/tests/test_sync_snap.nim b/tests/test_sync_snap.nim index fa3ec7578..4616a276e 100644 --- a/tests/test_sync_snap.nim +++ b/tests/test_sync_snap.nim @@ -25,8 +25,8 @@ import ../nimbus/p2p/chain, ../nimbus/sync/types, ../nimbus/sync/snap/range_desc, - ../nimbus/sync/snap/worker/[accounts_db, db/hexary_desc, - db/hexary_inspect, db/rocky_bulk_load], + ../nimbus/sync/snap/worker/db/[hexary_desc, hexary_inspect, + rocky_bulk_load, snap_db,], ../nimbus/utils/prettify, ./replay/[pp, undump_blocks, undump_accounts, undump_storages], ./test_sync_snap/[bulk_test_xx, snap_test_xx, test_types] @@ -117,7 +117,7 @@ proc pp(rc: Result[Hash256,HexaryDbError]): string = proc pp( rc: Result[TrieNodeStat,HexaryDbError]; - db: AccountsDbSessionRef + db: SnapDbSessionRef ): string = if rc.isErr: $rc.error else: rc.value.pp(db.getAcc) @@ -284,21 +284,21 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) = suite &"SyncSnap: {fileInfo} accounts and proofs for {info}": var - desc: AccountsDbSessionRef + desc: SnapDbSessionRef accKeys: seq[Hash256] test &"Snap-proofing {accountsList.len} items for state root ..{root.pp}": let - dbBase = if persistent: AccountsDbRef.init(db.cdb[0]) - else: AccountsDbRef.init(newMemoryDB()) - dbDesc = AccountsDbSessionRef.init(dbBase, root, peer) + dbBase = if persistent: SnapDbRef.init(db.cdb[0]) + else: SnapDbRef.init(newMemoryDB()) + dbDesc = SnapDbSessionRef.init(dbBase, root, peer) for n,w in accountsList: check dbDesc.importAccounts(w.base, w.data, persistent) == OkHexDb test &"Merging {accountsList.len} proofs for state root ..{root.pp}": - let dbBase = if persistent: AccountsDbRef.init(db.cdb[1]) - else: AccountsDbRef.init(newMemoryDB()) - desc = AccountsDbSessionRef.init(dbBase, root, peer) + let dbBase = if persistent: SnapDbRef.init(db.cdb[1]) + else: SnapDbRef.init(newMemoryDB()) + desc = SnapDbSessionRef.init(dbBase, root, peer) # Load/accumulate data from several samples (needs some particular sort) let @@ -417,19 +417,19 @@ proc storagesRunner( suite &"SyncSnap: {fileInfo} accounts storage for {info}": let - dbBase = if persistent: AccountsDbRef.init(db.cdb[0]) - else: AccountsDbRef.init(newMemoryDB()) + dbBase = if persistent: SnapDbRef.init(db.cdb[0]) + else: SnapDbRef.init(newMemoryDB()) var - desc = AccountsDbSessionRef.init(dbBase, root, peer) + desc = SnapDbSessionRef.init(dbBase, root, peer) test &"Merging {accountsList.len} accounts for state root ..{root.pp}": for w in accountsList: - let desc = AccountsDbSessionRef.init(dbBase, root, peer) + let desc = SnapDbSessionRef.init(dbBase, root, peer) check desc.importAccounts(w.base, w.data, persistent) == OkHexDb test &"Merging {storagesList.len} storages lists": let - dbDesc = AccountsDbSessionRef.init(dbBase, root, peer) + dbDesc = SnapDbSessionRef.init(dbBase, root, peer) ignore = knownFailures.toTable for n,w in storagesList: let @@ -466,18 +466,18 @@ proc inspectionRunner( suite &"SyncSnap: inspect {fileInfo} lists for {info} for healing": let - memBase = AccountsDbRef.init(newMemoryDB()) - memDesc = AccountsDbSessionRef.init(memBase, Hash256(), peer) + memBase = SnapDbRef.init(newMemoryDB()) + memDesc = SnapDbSessionRef.init(memBase, Hash256(), peer) var singleStats: seq[(int,TrieNodeStat)] accuStats: seq[(int,TrieNodeStat)] - perBase,altBase: AccountsDbRef - perDesc,altDesc: AccountsDbSessionRef + perBase,altBase: SnapDbRef + perDesc,altDesc: SnapDbSessionRef if persistent: - perBase = AccountsDbRef.init(db.cdb[0]) - perDesc = AccountsDbSessionRef.init(perBase, Hash256(), peer) - altBase = AccountsDbRef.init(db.cdb[1]) - altDesc = AccountsDbSessionRef.init(altBase, Hash256(), peer) + perBase = SnapDbRef.init(db.cdb[0]) + perDesc = SnapDbSessionRef.init(perBase, Hash256(), peer) + altBase = SnapDbRef.init(db.cdb[1]) + altDesc = SnapDbSessionRef.init(altBase, Hash256(), peer) test &"Fingerprinting {inspectList.len} single accounts lists " & "for in-memory-db": @@ -486,7 +486,7 @@ proc inspectionRunner( let root = accList[0].root rootKey = root.to(NodeKey) - desc = AccountsDbSessionRef.init(memBase, root, peer) + desc = SnapDbSessionRef.init(memBase, root, peer) for w in accList: check desc.importAccounts(w.base, w.data, persistent=false) == OkHexDb let rc = desc.inspectAccountsTrie(persistent=false) @@ -510,8 +510,8 @@ proc inspectionRunner( let root = accList[0].root rootKey = root.to(NodeKey) - dbBase = AccountsDbRef.init(db.cdb[2+n]) - desc = AccountsDbSessionRef.init(dbBase, root, peer) + dbBase = SnapDbRef.init(db.cdb[2+n]) + desc = SnapDbSessionRef.init(dbBase, root, peer) for w in accList: check desc.importAccounts(w.base, w.data, persistent) == OkHexDb let rc = desc.inspectAccountsTrie(persistent=false) @@ -572,8 +572,8 @@ proc inspectionRunner( skip() else: let - cscBase = AccountsDbRef.init(newMemoryDB()) - cscDesc = AccountsDbSessionRef.init(cscBase, Hash256(), peer) + cscBase = SnapDbRef.init(newMemoryDB()) + cscDesc = SnapDbSessionRef.init(cscBase, Hash256(), peer) var cscStep: Table[NodeKey,(int,seq[Blob])] for n,accList in inspectList: @@ -588,7 +588,7 @@ proc inspectionRunner( cscStep[rootKey][0].inc let r0 = desc.inspectAccountsTrie(persistent=false) - rc = desc.inspectAccountsTrie(cscStep[rootKey][1],false) + rc = desc.inspectAccountsTrie(cscStep[rootKey][1],persistent=false) check rc.isOk let accumulated = r0.value.dangling.toHashSet @@ -620,7 +620,7 @@ proc inspectionRunner( cscStep[rootKey][0].inc let r0 = desc.inspectAccountsTrie(persistent=true) - rc = desc.inspectAccountsTrie(cscStep[rootKey][1],true) + rc = desc.inspectAccountsTrie(cscStep[rootKey][1],persistent=true) check rc.isOk let accumulated = r0.value.dangling.toHashSet