Prep for full sync after snap (#1253)

* Split fetch accounts into sub-modules

details:
  There will be separated modules for accounts snapshot, storage snapshot,
  and healing for either.

* Allow to rebase pivot before negotiated header

why:
  Peers seem to have not too many snapshots available. By setting back the
  pivot block header slightly, the chances might be higher to find more
  peers to serve this pivot. Experiment on mainnet showed that setting back
  too much (tested with 1024), the chances to find matching snapshot peers
  seem to decrease.

* Add accounts healing

* Update variable/field naming in `worker_desc` for readability

* Handle leaf nodes in accounts healing

why:
  There is no need to fetch accounts when they had been added by the
  healing process. On the flip side, these accounts must be checked for
  storage data and the batch queue updated, accordingly.

* Reorganising accounts hash ranges batch queue

why:
  The aim is to formally cover as many accounts as possible for different
  pivot state root environments. Formerly, this was tried by starting the
  accounts batch queue at a random value for each pivot (and wrapping
  around.)

  Now, each pivot environment starts with an interval set mutually
  disjunct from any interval set retrieved with other pivot state roots.

also:
  Stop fishing for more pivots in `worker` if 100% download is reached

* Reorganise/update accounts healing

why:
  Error handling was wrong and the (math. complexity of) whole process
  could be better managed.

details:
  Much of the algorithm is now documented at the top of the file
  `heal_accounts.nim`
This commit is contained in:
Jordan Hrycaj 2022-10-08 18:20:50 +01:00 committed by GitHub
parent 5b4643b8ff
commit d53eacb854
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1588 additions and 1116 deletions

View File

@ -1,12 +1,5 @@
newBlockchainTests
===
## bcArrowGlacierToMerge
```diff
+ difficultyFormula.json OK
+ powToPosBlockRejection.json OK
+ powToPosTest.json OK
```
OK: 3/3 Fail: 0/3 Skip: 0/3
## bcBerlinToLondon
```diff
+ BerlinToLondonTransition.json OK
@ -36,7 +29,6 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
+ baseFee.json OK
+ besuBaseFeeBug.json OK
+ burnVerify.json OK
+ burnVerifyLondon.json OK
+ checkGasLimit.json OK
+ feeCap.json OK
+ gasLimit20m.json OK
@ -48,29 +40,21 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
+ lowDemand.json OK
+ medDemand.json OK
+ tips.json OK
+ tipsLondon.json OK
+ transFail.json OK
+ transType.json OK
+ valCausesOOF.json OK
```
OK: 21/21 Fail: 0/21 Skip: 0/21
OK: 19/19 Fail: 0/19 Skip: 0/19
## bcEIP158ToByzantium
```diff
+ ByzantiumTransition.json OK
```
OK: 1/1 Fail: 0/1 Skip: 0/1
## bcEIP3675
```diff
+ timestampPerBlock.json OK
+ tipInsideBlock.json OK
```
OK: 2/2 Fail: 0/2 Skip: 0/2
## bcExample
```diff
+ basefeeExample.json OK
+ mergeExample.json OK
```
OK: 2/2 Fail: 0/2 Skip: 0/2
OK: 1/1 Fail: 0/1 Skip: 0/1
## bcExploitTest
```diff
DelegateCallSpam.json Skip
@ -282,7 +266,6 @@ OK: 96/96 Fail: 0/96 Skip: 0/96
+ RefundOverflow.json OK
+ RefundOverflow2.json OK
+ SuicidesMixingCoinbase.json OK
+ SuicidesMixingCoinbase2.json OK
+ TransactionFromCoinbaseHittingBlockGasLimit1.json OK
+ TransactionFromCoinbaseNotEnoughFounds.json OK
+ TransactionNonceCheck.json OK
@ -316,7 +299,6 @@ OK: 96/96 Fail: 0/96 Skip: 0/96
+ extcodehashEmptySuicide.json OK
+ logRevert.json OK
+ multimpleBalanceInstruction.json OK
+ random.json OK
+ randomStatetest123.json OK
+ randomStatetest136.json OK
+ randomStatetest160.json OK
@ -362,7 +344,7 @@ OK: 96/96 Fail: 0/96 Skip: 0/96
+ transactionFromSelfDestructedContract.json OK
+ txCost-sec73.json OK
```
OK: 89/90 Fail: 0/90 Skip: 1/90
OK: 87/88 Fail: 0/88 Skip: 1/88
## bcTotalDifficultyTest
```diff
+ lotsOfBranchesOverrideAtTheEnd.json OK
@ -423,7 +405,6 @@ OK: 9/9 Fail: 0/9 Skip: 0/9
## bcUncleTest
```diff
+ EqualUncleInTwoDifferentBlocks.json OK
+ EqualUncleInTwoDifferentBlocks2.json OK
+ InChainUncle.json OK
+ InChainUncleFather.json OK
+ InChainUncleGrandPa.json OK
@ -446,7 +427,7 @@ OK: 9/9 Fail: 0/9 Skip: 0/9
+ uncleHeaderWithGeneration0.json OK
+ uncleWithSameBlockNumber.json OK
```
OK: 23/23 Fail: 0/23 Skip: 0/23
OK: 22/22 Fail: 0/22 Skip: 0/22
## bcValidBlockTest
```diff
+ ExtraData32.json OK
@ -749,9 +730,8 @@ OK: 5/5 Fail: 0/5 Skip: 0/5
+ callcodecallcodecallcode_111_SuicideEnd.json OK
+ callcodecallcodecallcode_111_SuicideMiddle.json OK
callcodecallcodecallcode_ABCB_RECURSIVE.json Skip
+ touchAndGo.json OK
```
OK: 73/80 Fail: 0/80 Skip: 7/80
OK: 72/79 Fail: 0/79 Skip: 7/79
## stCallCreateCallCodeTest
```diff
Call1024BalanceTooLow.json Skip
@ -1026,7 +1006,6 @@ OK: 51/52 Fail: 0/52 Skip: 1/52
+ CREATE_HighNonceMinus1.json OK
+ CREATE_empty000CreateinInitCode_Transaction.json OK
+ CodeInConstructor.json OK
+ CreateAddressWarmAfterFail.json OK
+ CreateCollisionResults.json OK
+ CreateCollisionToEmpty.json OK
+ CreateOOGFromCallRefunds.json OK
@ -1040,14 +1019,12 @@ OK: 51/52 Fail: 0/52 Skip: 1/52
+ CreateOOGafterInitCodeRevert2.json OK
+ CreateOOGafterMaxCodesize.json OK
+ CreateResults.json OK
+ CreateTransactionHighNonce.json OK
+ TransactionCollisionToEmpty.json OK
+ TransactionCollisionToEmptyButCode.json OK
+ TransactionCollisionToEmptyButNonce.json OK
+ createFailResult.json OK
+ createLargeResult.json OK
```
OK: 44/44 Fail: 0/44 Skip: 0/44
OK: 41/41 Fail: 0/41 Skip: 0/41
## stDelegatecallTestHomestead
```diff
Call1024BalanceTooLow.json Skip
@ -1151,13 +1128,12 @@ OK: 40/40 Fail: 0/40 Skip: 0/40
+ lowGasPriceOldTypes.json OK
+ outOfFunds.json OK
+ outOfFundsOldTypes.json OK
+ senderBalance.json OK
+ tipTooHigh.json OK
+ transactionIntinsicBug.json OK
+ typeTwoBerlin.json OK
+ valCausesOOF.json OK
```
OK: 13/13 Fail: 0/13 Skip: 0/13
OK: 12/12 Fail: 0/12 Skip: 0/12
## stEIP158Specific
```diff
+ CALL_OneVCallSuicide.json OK
@ -1199,12 +1175,11 @@ OK: 5/5 Fail: 0/5 Skip: 0/5
+ indexesOmitExample.json OK
+ invalidTr.json OK
+ labelsExample.json OK
+ mergeTest.json OK
+ rangesExample.json OK
+ solidityExample.json OK
+ yulExample.json OK
```
OK: 12/12 Fail: 0/12 Skip: 0/12
OK: 11/11 Fail: 0/11 Skip: 0/11
## stExtCodeHash
```diff
+ callToNonExistent.json OK
@ -1482,7 +1457,6 @@ OK: 24/24 Fail: 0/24 Skip: 0/24
## stPreCompiledContracts
```diff
+ blake2B.json OK
+ delegatecall09Undefined.json OK
+ idPrecomps.json OK
+ identity_to_bigger.json OK
+ identity_to_smaller.json OK
@ -1491,7 +1465,7 @@ OK: 24/24 Fail: 0/24 Skip: 0/24
+ precompsEIP2929.json OK
+ sec80.json OK
```
OK: 9/9 Fail: 0/9 Skip: 0/9
OK: 8/8 Fail: 0/8 Skip: 0/8
## stPreCompiledContracts2
```diff
+ CALLBlake2f.json OK
@ -1559,7 +1533,6 @@ OK: 9/9 Fail: 0/9 Skip: 0/9
+ CallEcrecoverS_prefixed0.json OK
+ CallEcrecoverUnrecoverableKey.json OK
+ CallEcrecoverV_prefixed0.json OK
+ CallEcrecover_Overflow.json OK
+ CallIdentitiy_0.json OK
+ CallIdentitiy_1.json OK
+ CallIdentity_1_nonzeroValue.json OK
@ -1589,15 +1562,13 @@ OK: 9/9 Fail: 0/9 Skip: 0/9
+ CallSha256_4.json OK
+ CallSha256_4_gas99.json OK
+ CallSha256_5.json OK
+ ecrecoverShortBuff.json OK
+ ecrecoverWeirdV.json OK
+ modexpRandomInput.json OK
+ modexp_0_0_0_20500.json OK
+ modexp_0_0_0_22000.json OK
+ modexp_0_0_0_25000.json OK
+ modexp_0_0_0_35000.json OK
```
OK: 102/102 Fail: 0/102 Skip: 0/102
OK: 99/99 Fail: 0/99 Skip: 0/99
## stQuadraticComplexityTest
```diff
Call1MB1024Calldepth.json Skip
@ -2801,7 +2772,6 @@ OK: 13/13 Fail: 0/13 Skip: 0/13
+ currentAccountBalance.json OK
+ doubleSelfdestructTest.json OK
+ doubleSelfdestructTest2.json OK
+ doubleSelfdestructTouch.json OK
+ extcodecopy.json OK
+ return0.json OK
+ return1.json OK
@ -2816,7 +2786,7 @@ OK: 13/13 Fail: 0/13 Skip: 0/13
+ suicideSendEtherToMe.json OK
+ testRandomTest.json OK
```
OK: 57/67 Fail: 0/67 Skip: 10/67
OK: 56/66 Fail: 0/66 Skip: 10/66
## stTimeConsuming
```diff
CALLBlake2f_MaxRounds.json Skip
@ -3336,4 +3306,4 @@ OK: 0/3 Fail: 0/3 Skip: 3/3
OK: 11/11 Fail: 0/11 Skip: 0/11
---TOTAL---
OK: 2881/2986 Fail: 0/2986 Skip: 105/2986
OK: 2859/2964 Fail: 0/2964 Skip: 105/2964

View File

@ -271,9 +271,8 @@ OK: 5/5 Fail: 0/5 Skip: 0/5
+ callcodecallcodecallcode_111_SuicideEnd.json OK
+ callcodecallcodecallcode_111_SuicideMiddle.json OK
callcodecallcodecallcode_ABCB_RECURSIVE.json Skip
+ touchAndGo.json OK
```
OK: 73/80 Fail: 0/80 Skip: 7/80
OK: 72/79 Fail: 0/79 Skip: 7/79
## stCallCreateCallCodeTest
```diff
Call1024BalanceTooLow.json Skip
@ -548,7 +547,6 @@ OK: 51/52 Fail: 0/52 Skip: 1/52
+ CREATE_HighNonceMinus1.json OK
+ CREATE_empty000CreateinInitCode_Transaction.json OK
+ CodeInConstructor.json OK
+ CreateAddressWarmAfterFail.json OK
+ CreateCollisionResults.json OK
+ CreateCollisionToEmpty.json OK
+ CreateOOGFromCallRefunds.json OK
@ -562,14 +560,12 @@ OK: 51/52 Fail: 0/52 Skip: 1/52
+ CreateOOGafterInitCodeRevert2.json OK
+ CreateOOGafterMaxCodesize.json OK
+ CreateResults.json OK
+ CreateTransactionHighNonce.json OK
+ TransactionCollisionToEmpty.json OK
+ TransactionCollisionToEmptyButCode.json OK
+ TransactionCollisionToEmptyButNonce.json OK
+ createFailResult.json OK
+ createLargeResult.json OK
```
OK: 44/44 Fail: 0/44 Skip: 0/44
OK: 41/41 Fail: 0/41 Skip: 0/41
## stDelegatecallTestHomestead
```diff
Call1024BalanceTooLow.json Skip
@ -673,13 +669,12 @@ OK: 40/40 Fail: 0/40 Skip: 0/40
+ lowGasPriceOldTypes.json OK
+ outOfFunds.json OK
+ outOfFundsOldTypes.json OK
+ senderBalance.json OK
+ tipTooHigh.json OK
+ transactionIntinsicBug.json OK
+ typeTwoBerlin.json OK
+ valCausesOOF.json OK
```
OK: 13/13 Fail: 0/13 Skip: 0/13
OK: 12/12 Fail: 0/12 Skip: 0/12
## stEIP158Specific
```diff
+ CALL_OneVCallSuicide.json OK
@ -721,12 +716,11 @@ OK: 5/5 Fail: 0/5 Skip: 0/5
+ indexesOmitExample.json OK
+ invalidTr.json OK
+ labelsExample.json OK
+ mergeTest.json OK
+ rangesExample.json OK
+ solidityExample.json OK
+ yulExample.json OK
```
OK: 12/12 Fail: 0/12 Skip: 0/12
OK: 11/11 Fail: 0/11 Skip: 0/11
## stExtCodeHash
```diff
+ callToNonExistent.json OK
@ -1004,7 +998,6 @@ OK: 24/24 Fail: 0/24 Skip: 0/24
## stPreCompiledContracts
```diff
+ blake2B.json OK
+ delegatecall09Undefined.json OK
+ idPrecomps.json OK
+ identity_to_bigger.json OK
+ identity_to_smaller.json OK
@ -1013,7 +1006,7 @@ OK: 24/24 Fail: 0/24 Skip: 0/24
+ precompsEIP2929.json OK
+ sec80.json OK
```
OK: 9/9 Fail: 0/9 Skip: 0/9
OK: 8/8 Fail: 0/8 Skip: 0/8
## stPreCompiledContracts2
```diff
+ CALLBlake2f.json OK
@ -1081,7 +1074,6 @@ OK: 9/9 Fail: 0/9 Skip: 0/9
+ CallEcrecoverS_prefixed0.json OK
+ CallEcrecoverUnrecoverableKey.json OK
+ CallEcrecoverV_prefixed0.json OK
+ CallEcrecover_Overflow.json OK
+ CallIdentitiy_0.json OK
+ CallIdentitiy_1.json OK
+ CallIdentity_1_nonzeroValue.json OK
@ -1111,15 +1103,13 @@ OK: 9/9 Fail: 0/9 Skip: 0/9
+ CallSha256_4.json OK
+ CallSha256_4_gas99.json OK
+ CallSha256_5.json OK
+ ecrecoverShortBuff.json OK
+ ecrecoverWeirdV.json OK
+ modexpRandomInput.json OK
+ modexp_0_0_0_20500.json OK
+ modexp_0_0_0_22000.json OK
+ modexp_0_0_0_25000.json OK
+ modexp_0_0_0_35000.json OK
```
OK: 102/102 Fail: 0/102 Skip: 0/102
OK: 99/99 Fail: 0/99 Skip: 0/99
## stQuadraticComplexityTest
```diff
Call1MB1024Calldepth.json Skip
@ -2323,7 +2313,6 @@ OK: 13/13 Fail: 0/13 Skip: 0/13
+ currentAccountBalance.json OK
+ doubleSelfdestructTest.json OK
+ doubleSelfdestructTest2.json OK
+ doubleSelfdestructTouch.json OK
+ extcodecopy.json OK
+ return0.json OK
+ return1.json OK
@ -2338,7 +2327,7 @@ OK: 13/13 Fail: 0/13 Skip: 0/13
+ suicideSendEtherToMe.json OK
+ testRandomTest.json OK
```
OK: 57/67 Fail: 0/67 Skip: 10/67
OK: 56/66 Fail: 0/66 Skip: 10/66
## stTimeConsuming
```diff
CALLBlake2f_MaxRounds.json Skip
@ -2858,4 +2847,4 @@ OK: 1/3 Fail: 0/3 Skip: 2/3
OK: 11/11 Fail: 0/11 Skip: 0/11
---TOTAL---
OK: 2506/2608 Fail: 0/2608 Skip: 102/2608
OK: 2495/2597 Fail: 0/2597 Skip: 102/2597

View File

@ -240,11 +240,11 @@ proc runSingle*(buddy: FullBuddyRef) {.async.} =
buddy.ctrl.zombie = true
# Initialise/re-initialise this worker
elif await buddy.data.pivot.bestPivotNegotiate(buddy.data.bestNumber):
elif await buddy.data.pivot.pivotNegotiate(buddy.data.bestNumber):
buddy.ctrl.multiOk = true
# Update/activate `bestNumber` for local use
buddy.data.bestNumber =
some(buddy.data.pivot.bestPivotHeader.value.blockNumber)
some(buddy.data.pivot.pivotHeader.value.blockNumber)
elif not buddy.ctrl.stopped:
await sleepAsync(2.seconds)

View File

@ -242,7 +242,7 @@ proc clear*(bp: BestPivotWorkerRef) =
# Public functions
# ------------------------------------------------------------------------------
proc bestPivotHeader*(bp: BestPivotWorkerRef): Result[BlockHeader,void] =
proc pivotHeader*(bp: BestPivotWorkerRef): Result[BlockHeader,void] =
## Returns cached block header if available and the buddy `peer` is trusted.
if bp.header.isSome and
bp.peer notin bp.global.untrusted and
@ -251,10 +251,11 @@ proc bestPivotHeader*(bp: BestPivotWorkerRef): Result[BlockHeader,void] =
return ok(bp.header.unsafeGet)
err()
proc bestPivotNegotiate*(
bp: BestPivotWorkerRef; ## Worker peer
proc pivotNegotiate*(
bp: BestPivotWorkerRef; ## Worker peer
minBlockNumber: Option[BlockNumber]; ## Minimum block number to expect
): Future[bool] {.async.} =
): Future[bool]
{.async.} =
## Negotiate best header pivot. This function must be run in *single mode* at
## the beginning of a running worker peer. If the function returns `true`,
## the current `buddy` can be used for syncing and the function

View File

@ -13,21 +13,21 @@
##
## Worker items state diagram and sketch of sync algorithm:
## ::
## set of unprocessed | peer workers | list of work items ready
## block ranges | | for persistent database
## ==================================================================
## unprocessed | | ready for | store into
## block ranges | peer workers | persistent database | database
## =======================================================================
##
## +---------------------------------------------+
## | |
## | +---------------------------------+ |
## | | | |
## V v | |
## <unprocessed> ---+-----> <worker-0> -----+---> <staged> ---> OUTPUT
## | |
## +-----> <worker-1> -----+
## | |
## +-----> <worker-2> -----+
## : :
## +------------------------------------------+
## | |
## | +----------------------------+ |
## | | | |
## V v | |
## <unprocessed> ---+---> <worker-0> ---+-----> <staged> -------> OUTPUT
## | |
## +---> <worker-1> ---+
## | |
## +---> <worker-2> ---+
## : :
##
## A work item is created from a range of block numbers extracted from the
## `<unprocessed>` set of block ranges.

View File

@ -122,16 +122,16 @@ type
## Statistics counters for events associated with this peer.
## These may be used to recognise errors and select good peers.
ok: tuple[
reorgDetected: BuddyStat,
getBlockHeaders: BuddyStat,
getNodeData: BuddyStat]
reorgDetected: uint,
getBlockHeaders: uint,
getNodeData: uint]
minor: tuple[
timeoutBlockHeaders: BuddyStat,
unexpectedBlockHash: BuddyStat]
timeoutBlockHeaders: uint,
unexpectedBlockHash: uint]
major: tuple[
networkErrors: BuddyStat,
excessBlockHeaders: BuddyStat,
wrongBlockHeader: BuddyStat]
networkErrors: uint,
excessBlockHeaders: uint,
wrongBlockHeader: uint]
SnapPivotCtxRef* = ref object of RootRef
stats*: SnapWorkerStats ## Statistics counters
@ -550,14 +550,19 @@ proc init*(
# Public functions
# ------------------------------------------------------------------------------
proc snapPivotHeader*(sp: SnapPivotWorkerRef): Result[BlockHeader,void] =
proc pivotHeader*(sp: SnapPivotWorkerRef): Result[BlockHeader,void] =
## Returns cached block header if available
if sp.header.isSome:
ok(sp.header.unsafeGet)
else:
err()
let header = sp.header.unsafeGet
if header.blockNumber != 0:
return ok(header)
err()
proc snapPivotNegotiate*(sp: SnapPivotWorkerRef) {.async.} =
proc pivotNegotiate*(
sp: SnapPivotWorkerRef;
ign: Option[BlockNumber]; ## Minimum block number to expect,ignored for now
): Future[bool]
{.async.} =
## Query a peer to update our knowledge of its canonical chain and its best
## block, which is its canonical chain head. This can be called at any time
## after a peer has negotiated the connection.
@ -588,13 +593,13 @@ proc snapPivotNegotiate*(sp: SnapPivotWorkerRef) {.async.} =
inc sp.global.stats.major.networkErrors
# Just try another peer
sp.ctrl.zombie = true
return
return false
if reply.isNone:
trace trEthRecvTimeoutWaiting & "for GetBlockHeaders reply", peer
# TODO: Should disconnect?
inc sp.global.stats.minor.timeoutBlockHeaders
return
return false
let nHeaders = reply.get.headers.len
if nHeaders == 0:
@ -611,7 +616,7 @@ proc snapPivotNegotiate*(sp: SnapPivotWorkerRef) {.async.} =
peer, got=nHeaders, requested=request.maxResults
# TODO: Should disconnect.
inc sp.global.stats.major.excessBlockHeaders
return
return false
if 0 < nHeaders:
# TODO: Check this is not copying the `headers`.
@ -620,6 +625,7 @@ proc snapPivotNegotiate*(sp: SnapPivotWorkerRef) {.async.} =
sp.peerSyncChainEmptyReply(request)
trace "Done pivotExec()", peer
return sp.header.isSome
# ------------------------------------------------------------------------------
# Debugging

View File

@ -1,5 +1,4 @@
# Nimbus - Types, data structures and shared utilities used in network sync
#
# Nimbus
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
@ -162,25 +161,46 @@ proc digestTo*(data: Blob; T: type NodeTag): T =
## Hash the `data` argument
keccakHash(data).to(T)
proc freeFactor*(lrs: LeafRangeSet): float =
## Free factor, ie. `#items-free / 2^256` to be used in statistics
proc emptyFactor*(lrs: LeafRangeSet): float =
## Relative uncovered total, i.e. `#points-not-covered / 2^256` to be used
## in statistics or triggers.
if 0 < lrs.total:
((high(NodeTag) - lrs.total).u256 + 1).to(float) / (2.0^256)
elif lrs.chunks == 0:
1.0 # `total` represents the residue class `mod 2^256` from `0`..`(2^256-1)`
else:
0.0
0.0 # number of points in `lrs` is `2^256 + 1`
proc emptyFactor*(lrs: openArray[LeafRangeSet]): float =
## Variant of `emptyFactor()` where intervals are distributed across several
## sets. This function makes sense only if the interval sets are mutually
## disjunct.
var accu: Nodetag
for ivSet in lrs:
if 0 < ivSet.total:
if high(NodeTag) - ivSet.total < accu:
return 0.0
accu = accu + ivSet.total
elif ivSet.chunks == 0:
discard
else: # number of points in `ivSet` is `2^256 + 1`
return 0.0
((high(NodeTag) - accu).u256 + 1).to(float) / (2.0^256)
proc fullFactor*(lrs: LeafRangeSet): float =
## Free factor, ie. `#items-contained / 2^256` to be used in statistics
## Relative covered total, i.e. `#points-covered / 2^256` to be used
## in statistics or triggers
if 0 < lrs.total:
lrs.total.u256.to(float) / (2.0^256)
elif lrs.chunks == 0:
0.0
0.0 # `total` represents the residue class `mod 2^256` from `0`..`(2^256-1)`
else:
1.0 # `total` represents the residue class `mod 2^256` from `0`..`(2^256-1)`
1.0 # number of points in `lrs` is `2^256 + 1`
# Printing & pretty printing
proc `$`*(nt: NodeTag): string =
if nt == high(NodeTag):
"high(NodeTag)"

View File

@ -1,5 +1,4 @@
# Nimbus
#
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
@ -17,30 +16,41 @@ import
stew/[interval_set, keyed_queue],
../../db/select_backend,
".."/[protocol, sync_desc],
./worker/[accounts_db, fetch_accounts, ticker],
./worker/[heal_accounts, store_accounts, store_storages, ticker],
./worker/com/[com_error, get_block_header],
./worker/db/snap_db,
"."/[range_desc, worker_desc]
const
usePivot2ok = false or true
when usePivot2ok:
import ../misc/best_pivot
import
../misc/best_pivot
type
PivotCtxRef = BestPivotCtxRef
PivotWorkerRef = BestPivotWorkerRef
else:
import ../misc/snap_pivot, ../../p2p/chain/chain_desc
import
../../p2p/chain/chain_desc,
../misc/snap_pivot
type
PivotCtxRef = SnapPivotCtxRef
PivotWorkerRef = SnapPivotWorkerRef
{.push raises: [Defect].}
logScope:
topics = "snap-sync"
const
extraTraceMessages = false or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc hash(h: Hash256): Hash =
## Mixin for `Table` or `keyedQueue`
h.data.hash
proc meanStdDev(sum, sqSum: float; length: int): (float,float) =
if 0 < length:
result[0] = sum / length.float
@ -57,29 +67,24 @@ template noExceptionOops(info: static[string]; code: untyped) =
raiseAssert "Ooops " & info & ": name=" & $e.name & " msg=" & e.msg
# ------------------------------------------------------------------------------
# Private helpers: integration of pivot negotiator
# Private helpers: integration of pivot finder
# ------------------------------------------------------------------------------
when usePivot2ok:
type
PivotCtxRef = BestPivotCtxRef
PivotWorkerRef = BestPivotWorkerRef
else:
type
PivotCtxRef = SnapPivotCtxRef
PivotWorkerRef = SnapPivotWorkerRef
proc pivot(ctx: SnapCtxRef): PivotCtxRef =
ctx.data.pivotData.PivotCtxRef
# Getter
ctx.data.pivotFinderCtx.PivotCtxRef
proc `pivot=`(ctx: SnapCtxRef; val: PivotCtxRef) =
ctx.data.pivotData = val
# Setter
ctx.data.pivotFinderCtx = val
proc pivot(buddy: SnapBuddyRef): PivotWorkerRef =
buddy.data.workerPivot.PivotWorkerRef
# Getter
buddy.data.pivotFinder.PivotWorkerRef
proc `pivot=`(buddy: SnapBuddyRef; val: PivotWorkerRef) =
buddy.data.workerPivot = val
# Setter
buddy.data.pivotFinder = val
# --------------------
@ -92,104 +97,139 @@ proc pivotSetup(ctx: SnapCtxRef) =
proc pivotRelease(ctx: SnapCtxRef) =
ctx.pivot = nil
proc pivotStart(buddy: SnapBuddyRef) =
buddy.pivot = PivotWorkerRef.init(buddy.ctx.pivot, buddy.ctrl, buddy.peer)
proc pivotStop(buddy: SnapBuddyRef) =
buddy.pivot.clear()
# --------------------
proc pivotHeader(buddy: SnapBuddyRef): Result[BlockHeader,void] =
when usePivot2ok:
buddy.pivot.bestPivotHeader()
else:
buddy.pivot.snapPivotHeader()
when usePivot2ok:
proc envPivotNumber(buddy: SnapBuddyRef): Option[BlockNumber] =
let env = buddy.ctx.data.pivotEnv
if env.isNil:
none(BlockNumber)
else:
some(env.stateHeader.blockNumber)
template pivotNegotiate(buddy: SnapBuddyRef; n: Option[BlockNumber]): auto =
buddy.pivot.bestPivotNegotiate(n)
else:
template pivotNegotiate(buddy: SnapBuddyRef): auto =
buddy.pivot.snapPivotNegotiate()
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc rndNodeTag(buddy: SnapBuddyRef): NodeTag =
## Create random node tag
var data: array[32,byte]
buddy.ctx.data.rng[].generate(data)
UInt256.fromBytesBE(data).NodeTag
proc init(T: type SnapAccountRanges; ctx: SnapCtxRef): T =
## Return a pair of account hash range lists with the whole range of
## smartly spread `[low(NodeTag),high(NodeTag)]` across the mutually
## disjunct interval sets.
result = [LeafRangeSet.init(),LeafRangeSet.init()]
# Initialise accounts range fetch batch, the pair of `fetchAccounts[]`
# range sets.
if ctx.data.coveredAccounts.total == 0 and
ctx.data.coveredAccounts.chunks == 1:
# 100% of accounts covered by range fetch batches for the total
# of pivot environments. Do a random split distributing the range
# `[low(NodeTag),high(NodeTag)]` across the pair of range sats.
var nodeKey: NodeKey
ctx.data.rng[].generate(nodeKey.ByteArray32)
let partition = nodeKey.to(NodeTag)
discard result[0].merge(partition, high(NodeTag))
if low(NodeTag) < partition:
discard result[1].merge(low(NodeTag), partition - 1.u256)
else:
# Not all account hashes are covered, yet. So keep the uncovered
# account hashes in the first range set, and the other account hashes
# in the second range set.
# Pre-filled with the first range set with largest possible interval
discard result[0].merge(low(NodeTag),high(NodeTag))
# Move covered account ranges (aka intervals) to the second set.
for iv in ctx.data.coveredAccounts.increasing:
discard result[0].reduce(iv)
discard result[1].merge(iv)
proc setPivotEnv(buddy: SnapBuddyRef; header: BlockHeader) =
## Activate environment for state root implied by `header` argument
proc appendPivotEnv(buddy: SnapBuddyRef; header: BlockHeader) =
## Activate environment for state root implied by `header` argument. This
## function appends a new environment unless there was any not far enough
## apart.
##
## Note that this function relies on a queue sorted by the block numbers of
## the pivot header. To maintain the sort order, the function `lruFetch()`
## must not be called and only records appended with increasing block
## numbers.
let
ctx = buddy.ctx
key = header.stateRoot
rc = ctx.data.pivotTable.lruFetch(key)
if rc.isOk:
ctx.data.pivotEnv = rc.value
return
minNumber = block:
let rc = ctx.data.pivotTable.lastValue
if rc.isOk: rc.value.stateHeader.blockNumber + minPivotBlockDistance
else: 1.toBlockNumber
let env = SnapPivotRef(
stateHeader: header,
pivotAccount: buddy.rndNodeTag,
availAccounts: LeafRangeSet.init())
# Pre-filled with the largest possible interval
discard env.availAccounts.merge(low(NodeTag),high(NodeTag))
# Check whether the new header follows minimum depth requirement. This is
# where the queue is assumed to have increasing block numbers.
if minNumber <= header.blockNumber:
# Ok, append a new environment
let env = SnapPivotRef(
stateHeader: header,
fetchAccounts: SnapAccountRanges.init(ctx))
# Statistics
ctx.data.pivotCount.inc
# Append per-state root environment to LRU queue
discard ctx.data.pivotTable.lruAppend(header.stateRoot, env, ctx.buddiesMax)
# Activate per-state root environment (and hold previous one)
ctx.data.prevEnv = ctx.data.pivotEnv
ctx.data.pivotEnv = ctx.data.pivotTable.lruAppend(key, env, ctx.buddiesMax)
# Debugging, will go away
block:
let ivSet = env.fetchAccounts[0].clone
for iv in env.fetchAccounts[1].increasing:
doAssert ivSet.merge(iv) == iv.len
doAssert ivSet.chunks == 1
doAssert ivSet.total == 0
proc updatePivotEnv(buddy: SnapBuddyRef): bool =
## Update global state root environment from local `pivotHeader`. Choose the
## latest block number. Returns `true` if the environment was changed
let rc = buddy.pivotHeader
if rc.isOk:
let
peer = buddy.peer
ctx = buddy.ctx
env = ctx.data.pivotEnv
pivotHeader = rc.value
newStateNumber = pivotHeader.blockNumber
stateNumber = if env.isNil: 0.toBlockNumber
else: env.stateHeader.blockNumber
stateWindow = stateNumber + maxPivotBlockWindow
block keepCurrent:
if env.isNil:
break keepCurrent # => new pivot
if stateNumber < newStateNumber:
when switchPivotAfterCoverage < 1.0:
if env.minCoverageReachedOk:
break keepCurrent # => new pivot
if stateWindow < newStateNumber:
break keepCurrent # => new pivot
if newStateNumber <= maxPivotBlockWindow:
break keepCurrent # => new pivot
# keep current
return false
# set new block
buddy.setPivotEnv(pivotHeader)
proc updatePivotImpl(buddy: SnapBuddyRef): Future[bool] {.async.} =
## Helper, negotiate pivot unless present
if buddy.pivot.pivotHeader.isOk:
return true
let
ctx = buddy.ctx
peer = buddy.peer
env = ctx.data.pivotTable.lastValue.get(otherwise = nil)
nMin = if env.isNil: none(BlockNumber)
else: some(env.stateHeader.blockNumber)
if await buddy.pivot.pivotNegotiate(nMin):
var header = buddy.pivot.pivotHeader.value
# Check whether there is no environment change needed
when noPivotEnvChangeIfComplete:
let rc = ctx.data.pivotTable.lastValue
if rc.isOk and rc.value.serialSync:
# No neede to change
if extraTraceMessages:
trace "No need to change snap pivot", peer,
pivot=("#" & $rc.value.stateHeader.blockNumber),
multiOk=buddy.ctrl.multiOk, runState=buddy.ctrl.state
return true
when 0 < backPivotBlockDistance:
# Backtrack, do not use the very latest pivot header
if backPivotBlockThreshold.toBlockNumber < header.blockNumber:
let
backNum = header.blockNumber - backPivotBlockDistance.toBlockNumber
rc = await buddy.getBlockHeader(backNum)
if rc.isErr:
if rc.error in {ComNoHeaderAvailable, ComTooManyHeaders}:
buddy.ctrl.zombie = true
return false
header = rc.value
buddy.appendPivotEnv(header)
trace "Snap pivot initialised", peer, pivot=("#" & $header.blockNumber),
multiOk=buddy.ctrl.multiOk, runState=buddy.ctrl.state
return true
# Syntactic sugar
when usePivot2ok:
template updateSinglePivot(buddy: SnapBuddyRef): auto =
buddy.updatePivotImpl()
else:
template updateMultiPivot(buddy: SnapBuddyRef): auto =
buddy.updatePivotImpl()
proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater =
result = proc: TickerStats =
@ -206,7 +246,7 @@ proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater =
aSqSum += aLen * aLen
# Fill utilisation mean & variance
let fill = kvp.data.availAccounts.freeFactor
let fill = kvp.data.fetchAccounts.emptyFactor
uSum += fill
uSqSum += fill * fill
@ -215,77 +255,35 @@ proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater =
sSqSum += sLen * sLen
let
tabLen = ctx.data.pivotTable.len
pivotBlock = if ctx.data.pivotEnv.isNil: none(BlockNumber)
else: some(ctx.data.pivotEnv.stateHeader.blockNumber)
env = ctx.data.pivotTable.lastValue.get(otherwise = nil)
pivotBlock = if env.isNil: none(BlockNumber)
else: some(env.stateHeader.blockNumber)
accCoverage = ctx.data.coveredAccounts.fullFactor
accFill = meanStdDev(uSum, uSqSum, count)
when snapAccountsDumpEnable:
if snapAccountsDumpCoverageStop < accCoverage:
trace " Snap proofs dump stop",
threshold=snapAccountsDumpCoverageStop, coverage=accCoverage.toPC
ctx.data.proofDumpOk = false
TickerStats(
pivotBlock: pivotBlock,
activeQueues: tabLen,
flushedQueues: ctx.data.pivotCount.int64 - tabLen,
nQueues: ctx.data.pivotTable.len,
nAccounts: meanStdDev(aSum, aSqSum, count),
nStorage: meanStdDev(sSum, sSqSum, count),
accountsFill: (accFill[0], accFill[1], accCoverage))
proc havePivot(buddy: SnapBuddyRef): bool =
## ...
let rc = buddy.pivotHeader
if rc.isOk and rc.value.blockNumber != 0:
# So there is a `ctx.data.pivotEnv`
when 1.0 <= switchPivotAfterCoverage:
return true
else:
let
ctx = buddy.ctx
env = ctx.data.pivotEnv
# Force fetching new pivot if coverage reached by returning `false`
if not env.minCoverageReachedOk:
# Not sure yet, so check whether coverage has been reached at all
let cov = env.availAccounts.freeFactor
if switchPivotAfterCoverage <= cov:
trace " Snap accounts coverage reached", peer,
threshold=switchPivotAfterCoverage, coverage=cov.toPC
# Need to reset pivot handlers
buddy.ctx.poolMode = true
buddy.ctx.data.runPoolHook = proc(b: SnapBuddyRef) =
b.ctx.data.pivotEnv.minCoverageReachedOk = true
b.pivotRestart
return true
# ------------------------------------------------------------------------------
# Public start/stop and admin functions
# ------------------------------------------------------------------------------
proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool =
## Global set up
ctx.data.accountRangeMax = high(UInt256) div ctx.buddiesMax.u256
ctx.data.coveredAccounts = LeafRangeSet.init()
ctx.data.accountsDb =
if ctx.data.dbBackend.isNil: AccountsDbRef.init(ctx.chain.getTrieDB)
else: AccountsDbRef.init(ctx.data.dbBackend)
ctx.data.snapDb =
if ctx.data.dbBackend.isNil: SnapDbRef.init(ctx.chain.getTrieDB)
else: SnapDbRef.init(ctx.data.dbBackend)
ctx.pivotSetup()
if tickerOK:
ctx.data.ticker = TickerRef.init(ctx.tickerUpdate)
else:
trace "Ticker is disabled"
result = true
# -----------------------
when snapAccountsDumpEnable:
doAssert ctx.data.proofDumpFile.open("./dump-stream.out", fmWrite)
ctx.data.proofDumpOk = true
proc release*(ctx: SnapCtxRef) =
## Global clean up
@ -303,6 +301,7 @@ proc start*(buddy: SnapBuddyRef): bool =
peer.supports(protocol.eth) and
peer.state(protocol.eth).initialized:
buddy.pivotStart()
buddy.data.errors = ComErrorStatsRef()
if not ctx.data.ticker.isNil:
ctx.data.ticker.startBuddy()
return true
@ -334,28 +333,17 @@ proc runSingle*(buddy: SnapBuddyRef) {.async.} =
## Note that this function runs in `async` mode.
##
when usePivot2ok:
#
# Run alternative pivot finder. This one harmonises difficulties of at
# least two peers. The can only be one instance active/unfinished of the
# `pivot2Exec()` functions.
#
let peer = buddy.peer
if buddy.pivotHeader.isErr:
if await buddy.pivotNegotiate(buddy.envPivotNumber):
discard buddy.updatePivotEnv()
else:
# Wait if needed, then return => repeat
if not buddy.ctrl.stopped:
await sleepAsync(2.seconds)
return
if not await buddy.updateSinglePivot():
# Wait if needed, then return => repeat
if not buddy.ctrl.stopped:
await sleepAsync(2.seconds)
return
buddy.ctrl.multiOk = true
trace "Snap pivot initialised", peer,
multiOk=buddy.ctrl.multiOk, runState=buddy.ctrl.state
else:
# Default pivot finder runs in multi mode => nothing to do here.
buddy.ctrl.multiOk = true
buddy.ctrl.multiOk = true
proc runPool*(buddy: SnapBuddyRef, last: bool) =
@ -375,11 +363,21 @@ proc runPool*(buddy: SnapBuddyRef, last: bool) =
let ctx = buddy.ctx
if ctx.poolMode:
ctx.poolMode = false
if not ctx.data.runPoolHook.isNil:
noExceptionOops("runPool"):
ctx.data.runPoolHook(buddy)
if last:
ctx.data.runPoolHook = nil
let rc = ctx.data.pivotTable.lastValue
if rc.isOk:
# Check whether accounts and storage might be complete.
let env = rc.value
if not env.serialSync:
# Check whether accounts download is complete
block checkAccountsComplete:
for ivSet in env.fetchAccounts:
if ivSet.chunks != 0:
break checkAccountsComplete
env.accountsDone = true
# Check whether storage slots are complete
if env.fetchStorage.len == 0:
env.serialSync = true
proc runMulti*(buddy: SnapBuddyRef) {.async.} =
@ -392,23 +390,44 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} =
peer = buddy.peer
when not usePivot2ok:
if not buddy.havePivot:
await buddy.pivotNegotiate()
if not buddy.updatePivotEnv():
return
discard await buddy.updateMultiPivot()
# Ignore rest if the pivot is still acceptably covered
when switchPivotAfterCoverage < 1.0:
if ctx.data.pivotEnv.minCoverageReachedOk:
await sleepAsync(50.milliseconds)
return
# Set up current state root environment for accounts snapshot
let env = block:
let rc = ctx.data.pivotTable.lastValue
if rc.isErr:
return # nothing to do
rc.value
await buddy.fetchAccounts()
buddy.data.pivotEnv = env
if ctx.data.pivotEnv.repairState == Done:
buddy.ctrl.multiOk = false
buddy.pivotStop()
buddy.pivotStart()
if env.serialSync:
trace "Snap serial sync -- not implemented yet", peer
await sleepAsync(5.seconds)
else:
# Snapshot sync processing. Note that *serialSync => accountsDone*.
await buddy.storeStorages() # always pre-clean the queue
await buddy.storeAccounts()
await buddy.storeStorages()
# If the current database is not complete yet
if 0 < env.fetchAccounts[0].chunks or
0 < env.fetchAccounts[1].chunks:
# Healing applies to the latest pivot only. The pivot might have changed
# in the background (while netwoking) due to a new peer worker that has
# negotiated another, newer pivot.
if env == ctx.data.pivotTable.lastValue.value:
await buddy.healAccountsDb()
# TODO: use/apply storage healer
# Check whether accounts might be complete.
if env.fetchStorage.len == 0:
# Possibly done but some buddies might wait for an account range to be
# received from the network. So we need to sync.
buddy.ctx.poolMode = true
# ------------------------------------------------------------------------------
# End

View File

@ -0,0 +1,102 @@
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
chronos,
../../../sync_desc
const
comErrorsTimeoutMax* = 2
## Maximal number of non-resonses accepted in a row. If there are more than
## `comErrorsTimeoutMax` consecutive errors, the worker will be degraded
## as zombie.
type
ComErrorStatsRef* = ref object
## particular error counters so connections will not be cut immediately
## after a particular error.
nTimeouts*: uint
nNetwork*: uint
ComError* = enum
ComNothingSerious
ComAccountsMaxTooLarge
ComAccountsMinTooSmall
ComEmptyAccountsArguments
ComEmptyRequestArguments
ComMissingProof
ComNetworkProblem
ComNoAccountsForStateRoot
ComNoByteCodesAvailable
ComNoDataForProof
ComNoHeaderAvailable
ComNoStorageForAccounts
ComNoTrieNodesAvailable
ComResponseTimeout
ComTooManyByteCodes
ComTooManyHeaders
ComTooManyStorageSlots
ComTooManyTrieNodes
# Other errors not directly related to communication
ComInspectDbFailed
ComImportAccountsFailed
proc stopAfterSeriousComError*(
ctrl: BuddyCtrlRef;
error: ComError;
stats: ComErrorStatsRef;
): Future[bool]
{.async.} =
## Error handling after data protocol failed.
case error:
of ComResponseTimeout:
stats.nTimeouts.inc
if comErrorsTimeoutMax < stats.nTimeouts:
# Mark this peer dead, i.e. avoid fetching from this peer for a while
ctrl.zombie = true
else:
# Otherwise try again some time later. Nevertheless, stop the
# current action.
await sleepAsync(5.seconds)
return true
of ComNetworkProblem,
ComMissingProof,
ComAccountsMinTooSmall,
ComAccountsMaxTooLarge:
stats.nNetwork.inc
# Mark this peer dead, i.e. avoid fetching from this peer for a while
ctrl.zombie = true
return true
of ComEmptyAccountsArguments,
ComEmptyRequestArguments,
ComInspectDbFailed,
ComImportAccountsFailed,
ComNoDataForProof,
ComNothingSerious:
discard
of ComNoAccountsForStateRoot,
ComNoStorageForAccounts,
ComNoByteCodesAvailable,
ComNoHeaderAvailable,
ComNoTrieNodesAvailable,
ComTooManyByteCodes,
ComTooManyHeaders,
ComTooManyStorageSlots,
ComTooManyTrieNodes:
# Mark this peer dead, i.e. avoid fetching from this peer for a while
ctrl.zombie = true
return true
# End

View File

@ -19,7 +19,7 @@ import
stew/interval_set,
"../../.."/[protocol, protocol/trace_config],
"../.."/[range_desc, worker_desc],
./get_error
./com_error
{.push raises: [Defect].}

View File

@ -0,0 +1,74 @@
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
std/options,
chronos,
eth/[common/eth_types, p2p],
../../../protocol,
../../worker_desc,
./com_error
{.push raises: [Defect].}
logScope:
topics = "snap-fetch"
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc getBlockHeader*(
buddy: SnapBuddyRef;
num: BlockNumber;
): Future[Result[BlockHeader,ComError]]
{.async.} =
## Get single block header
let
peer = buddy.peer
reqLen = 1u
hdrReq = BlocksRequest(
startBlock: HashOrNum(
isHash: false,
number: num),
maxResults: reqLen,
skip: 0,
reverse: false)
trace trEthSendSendingGetBlockHeaders, peer, startBlock=num, reqLen
var hdrResp: Option[blockHeadersObj]
try:
hdrResp = await peer.getBlockHeaders(hdrReq)
except CatchableError as e:
trace trSnapRecvError & "waiting for GetByteCodes reply", peer,
error=e.msg
return err(ComNetworkProblem)
var hdrRespLen = 0
if hdrResp.isSome:
hdrRespLen = hdrResp.get.headers.len
if hdrRespLen == 0:
trace trEthRecvReceivedBlockHeaders, peer, reqLen, respose="n/a"
return err(ComNoHeaderAvailable)
if hdrRespLen == 1:
let
header = hdrResp.get.headers[0]
blockNumber = header.blockNumber
trace trEthRecvReceivedBlockHeaders, peer, hdrRespLen, blockNumber
return ok(header)
trace trEthRecvReceivedBlockHeaders, peer, reqLen, hdrRespLen
return err(ComTooManyHeaders)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -16,7 +16,7 @@ import
eth/[common/eth_types, p2p],
"../../.."/[protocol, protocol/trace_config],
"../.."/[range_desc, worker_desc],
./get_error
./com_error
{.push raises: [Defect].}

View File

@ -1,34 +0,0 @@
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
type
ComError* = enum
ComNothingSerious
ComAccountsMaxTooLarge
ComAccountsMinTooSmall
ComEmptyAccountsArguments
ComEmptyRequestArguments
ComMissingProof
ComNetworkProblem
ComNoAccountsForStateRoot
ComNoByteCodesAvailable
ComNoDataForProof
ComNoStorageForAccounts
ComNoTrieNodesAvailable
ComResponseTimeout
ComTooManyByteCodes
ComTooManyStorageSlots
ComTooManyTrieNodes
# Other errors not directly related to communication
ComInspectDbFailed
ComImportAccountsFailed
# End

View File

@ -16,7 +16,7 @@ import
stew/interval_set,
"../../.."/[protocol, protocol/trace_config],
"../.."/[range_desc, worker_desc],
./get_error
./com_error
{.push raises: [Defect].}

View File

@ -1,3 +1,6 @@
# Nimbus
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
@ -11,7 +14,7 @@ import
eth/[common/eth_types, p2p],
"../../.."/[protocol, protocol/trace_config],
../../worker_desc,
./get_error
./com_error
{.push raises: [Defect].}

View File

@ -136,6 +136,7 @@ type
TrieNodeStat* = object
## Trie inspection report
dangling*: seq[Blob] ## Paths from nodes with incomplete refs
leaves*: seq[NodeKey] ## Paths to leave nodes (if any)
level*: int ## Maximim nesting depth of dangling nodes
stopped*: bool ## Potential loop detected if `true`
@ -281,7 +282,13 @@ proc ppDangling(a: seq[Blob]; maxItems = 30): string =
proc ppBlob(w: Blob): string =
w.mapIt(it.toHex(2)).join.toLowerAscii
let
q = a.toSeq.mapIt(it.ppBlob)[0 ..< min(maxItems,a.len)]
q = a.mapIt(it.ppBlob)[0 ..< min(maxItems,a.len)]
andMore = if maxItems < a.len: ", ..[#" & $a.len & "].." else: ""
"{" & q.join(",") & andMore & "}"
proc ppLeaves(a: openArray[NodeKey]; db: HexaryTreeDbRef; maxItems=30): string =
let
q = a.mapIt(it.ppImpl(db))[0 ..< min(maxItems,a.len)]
andMore = if maxItems < a.len: ", ..[#" & $a.len & "].." else: ""
"{" & q.join(",") & andMore & "}"
@ -328,10 +335,12 @@ proc pp*(db: HexaryTreeDbRef; indent=4): string =
db.ppImpl(NodeKey.default).join(indent.toPfx)
proc pp*(a: TrieNodeStat; db: HexaryTreeDbRef; maxItems = 30): string =
result = "(" & $a.level & ","
result = "(" & $a.level
if a.stopped:
result &= "stopped,"
result &= $a.dangling.len & "," & a.dangling.ppDangling(maxItems) & ")"
result &= $a.dangling.len & "," &
a.dangling.ppDangling(maxItems) & "," &
a.leaves.ppLeaves(db, maxItems) & ")"
# ------------------------------------------------------------------------------
# Public constructor (or similar)

View File

@ -16,6 +16,7 @@ type
AccountSmallerThanBase
AccountsNotSrictlyIncreasing
AccountRangesOverlap
NodeNotFound
RlpEncoding
SlotsNotSrictlyIncreasing
TrieLoopAlert

View File

@ -24,6 +24,9 @@ logScope:
const
extraTraceMessages = false # or true
when extraTraceMessages:
import stew/byteutils
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
@ -63,7 +66,7 @@ proc doStepLink(step: XPathStep): Result[NodeKey,bool] =
err(true) # fully fail
proc hexaryInspectPath(
proc hexaryInspectPathImpl(
db: HexaryTreeDbRef; ## Database
rootKey: RepairKey; ## State root
path: NibblesSeq; ## Starting path
@ -84,8 +87,8 @@ proc hexaryInspectPath(
return ok(rc.value)
err()
proc hexaryInspectPath(
getFn: HexaryGetFn; ## Database retrival function
proc hexaryInspectPathImpl(
getFn: HexaryGetFn; ## Database retrieval function
root: NodeKey; ## State root
path: NibblesSeq; ## Starting path
): Result[NodeKey,void]
@ -170,11 +173,25 @@ proc hexaryInspectPath*(
## Returns the `NodeKey` for a given path if there is any.
let (isLeaf,nibbles) = hexPrefixDecode path
if not isLeaf:
let rc = db.hexaryInspectPath(root.to(RepairKey), nibbles)
let rc = db.hexaryInspectPathImpl(root.to(RepairKey), nibbles)
if rc.isOk and rc.value.isNodeKey:
return ok(rc.value.convertTo(NodeKey))
err()
proc hexaryInspectPath*(
getFn: HexaryGetFn; ## Database abstraction
root: NodeKey; ## State root
path: Blob; ## Partial database path
): Result[NodeKey,void]
{.gcsafe, raises: [Defect,RlpError]} =
## Variant of `hexaryInspectPath()` for persistent database.
let (isLeaf,nibbles) = hexPrefixDecode path
if not isLeaf:
let rc = getFn.hexaryInspectPathImpl(root, nibbles)
if rc.isOk:
return ok(rc.value)
err()
proc hexaryInspectToKeys*(
db: HexaryTreeDbRef; ## Database
root: NodeKey; ## State root
@ -193,25 +210,37 @@ proc hexaryInspectTrie*(
db: HexaryTreeDbRef; ## Database
root: NodeKey; ## State root
paths: seq[Blob]; ## Starting paths for search
maxLeafPaths = 0; ## Record leaves with proper 32 bytes path
stopAtLevel = 32; ## Instead of loop detector
): TrieNodeStat
{.gcsafe, raises: [Defect,KeyError]} =
## Starting with the argument list `paths`, find all the non-leaf nodes in
## the hexary trie which have at least one node key reference missing in
## the trie database.
## the trie database. The references for these nodes are collected and
## returned.
## * At most `maxLeafPaths` leaf node references are collected along the way.
## * Search list `paths` argument entries that do not refer to a hexary node
## are ignored.
## * For any search list `paths` argument entry, this function stops if
## the search depth exceeds `stopAtLevel` levels of linked sub-nodes.
## * Argument `paths` list entries that do not refer to a valid node are
## silently ignored.
##
let rootKey = root.to(RepairKey)
if not db.tab.hasKey(rootKey):
return TrieNodeStat()
# Initialise TODO list
var reVisit = newTable[RepairKey,NibblesSeq]()
var
leafSlots = maxLeafPaths
reVisit = newTable[RepairKey,NibblesSeq]()
if paths.len == 0:
reVisit[rootKey] = EmptyNibbleRange
else:
for w in paths:
let (isLeaf,nibbles) = hexPrefixDecode w
if not isLeaf:
let rc = db.hexaryInspectPath(rootKey, nibbles)
let rc = db.hexaryInspectPathImpl(rootKey, nibbles)
if rc.isOk:
reVisit[rc.value] = nibbles
@ -240,8 +269,13 @@ proc hexaryInspectTrie*(
child = node.bLink[n]
db.processLink(stats=result, inspect=again, parent, trail, child)
of Leaf:
# Done with this link, forget the key
discard
if 0 < leafSlots:
let trail = parentTrail & node.lPfx
if trail.len == 64:
result.leaves.add trail.getBytes.convertTo(NodeKey)
leafSlots.dec
# Done with this link
# End `for`
result.level.inc
@ -250,53 +284,70 @@ proc hexaryInspectTrie*(
proc hexaryInspectTrie*(
getFn: HexaryGetFn;
root: NodeKey; ## State root
getFn: HexaryGetFn; ## Database abstraction
rootKey: NodeKey; ## State root
paths: seq[Blob]; ## Starting paths for search
maxLeafPaths = 0; ## Record leaves with proper 32 bytes path
stopAtLevel = 32; ## Instead of loop detector
): TrieNodeStat
{.gcsafe, raises: [Defect,RlpError,KeyError]} =
## Variant of `hexaryInspectTrie()` for persistent database.
##
if root.to(Blob).getFn().len == 0:
when extraTraceMessages:
let nPaths = paths.len
let root = rootKey.to(Blob)
if root.getFn().len == 0:
when extraTraceMessages:
trace "Hexary inspect: missing root", nPaths, maxLeafPaths,
rootKey=root.toHex
return TrieNodeStat()
# Initialise TODO list
var reVisit = newTable[NodeKey,NibblesSeq]()
var
leafSlots = maxLeafPaths
reVisit = newTable[NodeKey,NibblesSeq]()
if paths.len == 0:
reVisit[root] = EmptyNibbleRange
reVisit[rootKey] = EmptyNibbleRange
else:
for w in paths:
let (isLeaf,nibbles) = hexPrefixDecode w
if not isLeaf:
let rc = getFn.hexaryInspectPath(root, nibbles)
let rc = getFn.hexaryInspectPathImpl(rootKey, nibbles)
if rc.isOk:
reVisit[rc.value] = nibbles
when extraTraceMessages:
trace "Hexary inspect start", nPaths=paths.len, reVisit=reVisit.len
while 0 < reVisit.len:
when extraTraceMessages:
trace "Hexary inspect processing", nPaths, maxLeafPaths,
level=result.level, nReVisit=reVisit.len, nDangling=result.dangling.len
if stopAtLevel < result.level:
result.stopped = true
break
when extraTraceMessages:
trace "Hexary inspect processing", level=result.level,
reVisit=reVisit.len, dangling=result.dangling.len
let again = newTable[NodeKey,NibblesSeq]()
for parent,parentTrail in reVisit.pairs:
let nodeRlp = rlpFromBytes parent.to(Blob).getFn()
let parentBlob = parent.to(Blob).getFn()
if parentBlob.len == 0:
# Ooops, forget node and key
continue
let nodeRlp = rlpFromBytes parentBlob
case nodeRlp.listLen:
of 2:
let (isLeaf,ePfx) = hexPrefixDecode nodeRlp.listElem(0).toBytes
let (isLeaf,xPfx) = hexPrefixDecode nodeRlp.listElem(0).toBytes
if not isleaf:
let
trail = parentTrail & ePfx
trail = parentTrail & xPfx
child = nodeRlp.listElem(1)
getFn.processLink(stats=result, inspect=again, parent, trail, child)
elif 0 < leafSlots:
let trail = parentTrail & xPfx
if trail.len == 64:
result.leaves.add trail.getBytes.convertTo(NodeKey)
leafSlots.dec
of 17:
for n in 0 ..< 16:
let
@ -304,7 +355,7 @@ proc hexaryInspectTrie*(
child = nodeRlp.listElem(n)
getFn.processLink(stats=result, inspect=again, parent, trail, child)
else:
# Done with this link, forget the key
# Ooops, forget node and key
discard
# End `for`
@ -313,8 +364,9 @@ proc hexaryInspectTrie*(
# End while
when extraTraceMessages:
trace "Hexary inspect finished", level=result.level, maxLevel=stopAtLevel,
reVisit=reVisit.len, dangling=result.dangling.len, stopped=result.stopped
trace "Hexary inspect finished", nPaths, maxLeafPaths,
level=result.level, nReVisit=reVisit.len, nDangling=result.dangling.len,
maxLevel=stopAtLevel, stopped=result.stopped
# ------------------------------------------------------------------------------
# End

View File

@ -373,6 +373,18 @@ proc leafData*(path: XPath): Blob =
of Extension:
discard
proc leafData*(path: RPath): Blob =
## Return the leaf data from a successful `RPath` computation (if any.)
if path.tail.len == 0 and 0 < path.path.len:
let node = path.path[^1].node
case node.kind:
of Branch:
return node.bData
of Leaf:
return node.lData
of Extension:
discard
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------

View File

@ -16,32 +16,35 @@ import
stew/byteutils,
stint,
rocksdb,
../../../constants,
../../../db/[kvstore_rocksdb, select_backend],
"../.."/[protocol, types],
../range_desc,
./db/[bulk_storage, hexary_desc, hexary_error, hexary_import,
hexary_interpolate, hexary_inspect, hexary_paths, rocky_bulk_load]
../../../../constants,
../../../../db/[kvstore_rocksdb, select_backend],
"../../.."/[protocol, types],
../../range_desc,
"."/[bulk_storage, hexary_desc, hexary_error, hexary_import,
hexary_interpolate, hexary_inspect, hexary_paths, rocky_bulk_load]
{.push raises: [Defect].}
logScope:
topics = "snap-proof"
topics = "snap-db"
export
HexaryDbError
HexaryDbError,
TrieNodeStat
const
extraTraceMessages = false or true
type
AccountsDbRef* = ref object
SnapDbRef* = ref object
## Global, re-usable descriptor
db: TrieDatabaseRef ## General database
rocky: RocksStoreRef ## Set if rocksdb is available
AccountsDbSessionRef* = ref object
SnapDbSessionRef* = ref object
## Database session descriptor
keyMap: Table[RepairKey,uint] ## For debugging only (will go away)
base: AccountsDbRef ## Back reference to common parameters
base: SnapDbRef ## Back reference to common parameters
peer: Peer ## For log messages
accRoot: NodeKey ## Current accounts root node
accDb: HexaryTreeDbRef ## Accounts database
@ -51,7 +54,7 @@ type
# Private helpers
# ------------------------------------------------------------------------------
proc newHexaryTreeDbRef(ps: AccountsDbSessionRef): HexaryTreeDbRef =
proc newHexaryTreeDbRef(ps: SnapDbSessionRef): HexaryTreeDbRef =
HexaryTreeDbRef(keyPp: ps.stoDb.keyPp) # for debugging, will go away
proc to(h: Hash256; T: type NodeKey): T =
@ -94,27 +97,27 @@ template noPpError(info: static[string]; code: untyped) =
except Exception as e:
raiseAssert "Ooops (" & info & ") " & $e.name & ": " & e.msg
proc toKey(a: RepairKey; ps: AccountsDbSessionRef): uint =
proc toKey(a: RepairKey; ps: SnapDbSessionRef): uint =
if not a.isZero:
noPpError("pp(RepairKey)"):
if not ps.keyMap.hasKey(a):
ps.keyMap[a] = ps.keyMap.len.uint + 1
result = ps.keyMap[a]
proc toKey(a: NodeKey; ps: AccountsDbSessionRef): uint =
proc toKey(a: NodeKey; ps: SnapDbSessionRef): uint =
a.to(RepairKey).toKey(ps)
proc toKey(a: NodeTag; ps: AccountsDbSessionRef): uint =
proc toKey(a: NodeTag; ps: SnapDbSessionRef): uint =
a.to(NodeKey).toKey(ps)
proc pp(a: NodeKey; ps: AccountsDbSessionRef): string =
proc pp(a: NodeKey; ps: SnapDbSessionRef): string =
if a.isZero: "ø" else:"$" & $a.toKey(ps)
proc pp(a: RepairKey; ps: AccountsDbSessionRef): string =
proc pp(a: RepairKey; ps: SnapDbSessionRef): string =
if a.isZero: "ø" elif a.isNodeKey: "$" & $a.toKey(ps) else: "@" & $a.toKey(ps)
proc pp(a: NodeTag; ps: AccountsDbSessionRef): string =
proc pp(a: NodeTag; ps: SnapDbSessionRef): string =
a.to(NodeKey).pp(ps)
# ------------------------------------------------------------------------------
@ -122,11 +125,11 @@ proc pp(a: NodeTag; ps: AccountsDbSessionRef): string =
# ------------------------------------------------------------------------------
proc mergeProofs(
peer: Peer, ## For log messages
db: HexaryTreeDbRef; ## Database table
root: NodeKey; ## Root for checking nodes
proof: seq[Blob]; ## Node records
freeStandingOk = false; ## Remove freestanding nodes
peer: Peer, ## For log messages
db: HexaryTreeDbRef; ## Database table
root: NodeKey; ## Root for checking nodes
proof: seq[Blob]; ## Node records
freeStandingOk = false; ## Remove freestanding nodes
): Result[void,HexaryDbError]
{.gcsafe, raises: [Defect, RlpError, KeyError].} =
## Import proof records (as received with snap message) into a hexary trie
@ -160,8 +163,8 @@ proc mergeProofs(
proc persistentAccounts(
db: HexaryTreeDbRef; ## Current table
pv: AccountsDbRef; ## Persistent database
db: HexaryTreeDbRef; ## Current table
pv: SnapDbRef; ## Persistent database
): Result[void,HexaryDbError]
{.gcsafe, raises: [Defect,OSError,KeyError].} =
## Store accounts trie table on databse
@ -174,8 +177,8 @@ proc persistentAccounts(
ok()
proc persistentStorages(
db: HexaryTreeDbRef; ## Current table
pv: AccountsDbRef; ## Persistent database
db: HexaryTreeDbRef; ## Current table
pv: SnapDbRef; ## Persistent database
): Result[void,HexaryDbError]
{.gcsafe, raises: [Defect,OSError,KeyError].} =
## Store accounts trie table on databse
@ -268,9 +271,9 @@ proc collectStorageSlots(
proc importStorageSlots*(
ps: AccountsDbSessionRef; ## Re-usable session descriptor
data: AccountSlots; ## account storage descriptor
proof: SnapStorageProof; ## account storage proof
ps: SnapDbSessionRef; ## Re-usable session descriptor
data: AccountSlots; ## Account storage descriptor
proof: SnapStorageProof; ## Account storage proof
): Result[void,HexaryDbError]
{.gcsafe, raises: [Defect, RlpError,KeyError].} =
## Preocess storage slots for a particular storage root
@ -307,14 +310,14 @@ proc importStorageSlots*(
# ------------------------------------------------------------------------------
proc init*(
T: type AccountsDbRef;
T: type SnapDbRef;
db: TrieDatabaseRef
): T =
## Main object constructor
T(db: db)
proc init*(
T: type AccountsDbRef;
T: type SnapDbRef;
db: ChainDb
): T =
## Variant of `init()` allowing bulk import on rocksdb backend
@ -323,14 +326,14 @@ proc init*(
result.rocky = nil
proc init*(
T: type AccountsDbSessionRef;
pv: AccountsDbRef;
T: type SnapDbSessionRef;
pv: SnapDbRef;
root: Hash256;
peer: Peer = nil
): T =
## Start a new session, do some actions an then discard the session
## descriptor (probably after commiting data.)
let desc = AccountsDbSessionRef(
let desc = SnapDbSessionRef(
base: pv,
peer: peer,
accRoot: root.to(NodeKey),
@ -344,13 +347,13 @@ proc init*(
return desc
proc dup*(
ps: AccountsDbSessionRef;
ps: SnapDbSessionRef;
root: Hash256;
peer: Peer;
): AccountsDbSessionRef =
): SnapDbSessionRef =
## Resume a session with different `root` key and `peer`. This new session
## will access the same memory database as the `ps` argument session.
AccountsDbSessionRef(
SnapDbSessionRef(
base: ps.base,
peer: peer,
accRoot: root.to(NodeKey),
@ -358,9 +361,9 @@ proc dup*(
stoDb: ps.stoDb)
proc dup*(
ps: AccountsDbSessionRef;
ps: SnapDbSessionRef;
root: Hash256;
): AccountsDbSessionRef =
): SnapDbSessionRef =
## Variant of `dup()` without the `peer` argument.
ps.dup(root, ps.peer)
@ -368,16 +371,16 @@ proc dup*(
# Public functions
# ------------------------------------------------------------------------------
proc dbBackendRocksDb*(pv: AccountsDbRef): bool =
proc dbBackendRocksDb*(pv: SnapDbRef): bool =
## Returns `true` if rocksdb features are available
not pv.rocky.isNil
proc dbBackendRocksDb*(ps: AccountsDbSessionRef): bool =
proc dbBackendRocksDb*(ps: SnapDbSessionRef): bool =
## Returns `true` if rocksdb features are available
not ps.base.rocky.isNil
proc importAccounts*(
ps: AccountsDbSessionRef; ## Re-usable session descriptor
ps: SnapDbSessionRef; ## Re-usable session descriptor
base: NodeTag; ## before or at first account entry in `data`
data: PackedAccountRange; ## re-packed `snap/1 ` reply data
persistent = false; ## store data on disk
@ -425,20 +428,20 @@ proc importAccounts*(
ok()
proc importAccounts*(
pv: AccountsDbRef; ## Base descriptor on `BaseChainDB`
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
peer: Peer, ## for log messages
root: Hash256; ## state root
base: NodeTag; ## before or at first account entry in `data`
data: PackedAccountRange; ## re-packed `snap/1 ` reply data
): Result[void,HexaryDbError] =
## Variant of `importAccounts()`
AccountsDbSessionRef.init(
SnapDbSessionRef.init(
pv, root, peer).importAccounts(base, data, persistent=true)
proc importStorages*(
ps: AccountsDbSessionRef; ## Re-usable session descriptor
ps: SnapDbSessionRef; ## Re-usable session descriptor
data: AccountStorageRange; ## Account storage reply from `snap/1` protocol
persistent = false; ## store data on disk
): Result[void,seq[(int,HexaryDbError)]] =
@ -506,18 +509,18 @@ proc importStorages*(
ok()
proc importStorages*(
pv: AccountsDbRef; ## Base descriptor on `BaseChainDB`
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
peer: Peer, ## For log messages, only
data: AccountStorageRange; ## Account storage reply from `snap/1` protocol
): Result[void,seq[(int,HexaryDbError)]] =
## Variant of `importStorages()`
AccountsDbSessionRef.init(
SnapDbSessionRef.init(
pv, Hash256(), peer).importStorages(data, persistent=true)
proc importRawNodes*(
ps: AccountsDbSessionRef; ## Re-usable session descriptor
ps: SnapDbSessionRef; ## Re-usable session descriptor
nodes: openArray[Blob]; ## Node records
persistent = false; ## store data on disk
): Result[void,seq[(int,HexaryDbError)]] =
@ -559,64 +562,133 @@ proc importRawNodes*(
ok()
proc importRawNodes*(
pv: AccountsDbRef; ## Base descriptor on `BaseChainDB`
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
peer: Peer, ## For log messages, only
nodes: openArray[Blob]; ## Node records
): Result[void,seq[(int,HexaryDbError)]] =
## Variant of `importRawNodes()` for persistent storage.
AccountsDbSessionRef.init(
SnapDbSessionRef.init(
pv, Hash256(), peer).importRawNodes(nodes, persistent=true)
proc inspectAccountsTrie*(
ps: AccountsDbSessionRef; ## Re-usable session descriptor
ps: SnapDbSessionRef; ## Re-usable session descriptor
pathList = seq[Blob].default; ## Starting nodes for search
maxLeafPaths = 0; ## Record leaves with proper 32 bytes path
persistent = false; ## Read data from disk
ignoreError = false; ## Return partial results if available
ignoreError = false; ## Always return partial results if available
): Result[TrieNodeStat, HexaryDbError] =
## Starting with the argument list `pathSet`, find all the non-leaf nodes in
## the hexary trie which have at least one node key reference missing in
## the trie database.
## the trie database. Argument `pathSet` list entries that do not refer to a
## valid node are silently ignored.
##
let peer = ps.peer
var stats: TrieNodeStat
noRlpExceptionOops("inspectAccountsTrie()"):
if persistent:
let getFn: HexaryGetFn = proc(key: Blob): Blob = ps.base.db.get(key)
stats = getFn.hexaryInspectTrie(ps.accRoot, pathList)
stats = getFn.hexaryInspectTrie(ps.accRoot, pathList, maxLeafPaths)
else:
stats = ps.accDb.hexaryInspectTrie(ps.accRoot, pathList)
stats = ps.accDb.hexaryInspectTrie(ps.accRoot, pathList, maxLeafPaths)
block checkForError:
let error = block:
if stats.stopped:
TrieLoopAlert
elif stats.level <= 1:
elif stats.level == 0:
TrieIsEmpty
else:
break checkForError
trace "Inspect account trie failed", peer, nPathList=pathList.len,
nDangling=stats.dangling.len, stoppedAt=stats.level, error
nDangling=stats.dangling.len, leaves=stats.leaves.len,
maxleaves=maxLeafPaths, stoppedAt=stats.level, error
return err(error)
when extraTraceMessages:
trace "Inspect account trie ok", peer, nPathList=pathList.len,
nDangling=stats.dangling.len, level=stats.level
nDangling=stats.dangling.len, leaves=stats.leaves.len,
maxleaves=maxLeafPaths, level=stats.level
return ok(stats)
proc inspectAccountsTrie*(
pv: AccountsDbRef; ## Base descriptor on `BaseChainDB`
peer: Peer, ## For log messages, only
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
peer: Peer; ## For log messages, only
root: Hash256; ## state root
pathList = seq[Blob].default; ## Starting paths for search
ignoreError = false; ## Return partial results if available
maxLeafPaths = 0; ## Record leaves with proper 32 bytes path
ignoreError = false; ## Always return partial results when avail.
): Result[TrieNodeStat, HexaryDbError] =
## Variant of `inspectAccountsTrie()` for persistent storage.
AccountsDbSessionRef.init(
pv, root, peer).inspectAccountsTrie(pathList, persistent=true, ignoreError)
SnapDbSessionRef.init(
pv, root, peer).inspectAccountsTrie(
pathList, maxLeafPaths, persistent=true, ignoreError)
proc getAccountNodeKey*(
ps: SnapDbSessionRef; ## Re-usable session descriptor
path: Blob; ## Partial node path
persistent = false; ## Read data from disk
): Result[NodeKey,HexaryDbError] =
## For a partial node path argument `path`, return the raw node key.
var rc: Result[NodeKey,void]
noRlpExceptionOops("inspectAccountsPath()"):
if persistent:
let getFn: HexaryGetFn = proc(key: Blob): Blob = ps.base.db.get(key)
rc = getFn.hexaryInspectPath(ps.accRoot, path)
else:
rc = ps.accDb.hexaryInspectPath(ps.accRoot, path)
if rc.isOk:
return ok(rc.value)
err(NodeNotFound)
proc getAccountNodeKey*(
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
peer: Peer; ## For log messages, only
root: Hash256; ## state root
path: Blob; ## Partial node path
): Result[NodeKey,HexaryDbError] =
## Variant of `inspectAccountsPath()` for persistent storage.
SnapDbSessionRef.init(
pv, root, peer).getAccountNodeKey(path, persistent=true)
proc getAccountData*(
ps: SnapDbSessionRef; ## Re-usable session descriptor
path: NodeKey; ## Account to visit
persistent = false; ## Read data from disk
): Result[Account,HexaryDbError] =
## Fetch account data.
##
## Caveat: There is no unit test yet for the non-persistent version
let peer = ps.peer
var acc: Account
noRlpExceptionOops("getAccountData()"):
var leaf: Blob
if persistent:
let getFn: HexaryGetFn = proc(key: Blob): Blob = ps.base.db.get(key)
leaf = path.hexaryPath(ps.accRoot, getFn).leafData
else:
leaf = path.hexaryPath(ps.accRoot.to(RepairKey),ps.accDb).leafData
if leaf.len == 0:
return err(AccountNotFound)
acc = rlp.decode(leaf,Account)
return ok(acc)
proc getAccountData*(
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
peer: Peer, ## For log messages, only
root: Hash256; ## state root
path: NodeKey; ## Account to visit
): Result[Account,HexaryDbError] =
## Variant of `getAccount()` for persistent storage.
SnapDbSessionRef.init(pv, root, peer).getAccountData(path, persistent=true)
# ------------------------------------------------------------------------------
# Debugging (and playing with the hexary database)
# Public functions: additional helpers
# ------------------------------------------------------------------------------
proc sortMerge*(base: openArray[NodeTag]): NodeTag =
@ -642,22 +714,14 @@ proc sortMerge*(acc: openArray[seq[PackedAccount]]): seq[PackedAccount] =
result = toSeq(accounts.keys).sorted(cmp).mapIt(accounts[it])
proc getChainDbAccount*(
ps: AccountsDbSessionRef;
ps: SnapDbSessionRef;
accHash: Hash256
): Result[Account,HexaryDbError] =
## Fetch account via `BaseChainDB`
noRlpExceptionOops("getChainDbAccount()"):
let
getFn: HexaryGetFn = proc(key: Blob): Blob = ps.base.db.get(key)
leaf = accHash.to(NodeKey).hexaryPath(ps.accRoot, getFn).leafData
if 0 < leaf.len:
let acc = rlp.decode(leaf,Account)
return ok(acc)
err(AccountNotFound)
ps.getAccountData(accHash.to(NodeKey),persistent=true)
proc nextChainDbKey*(
ps: AccountsDbSessionRef;
ps: SnapDbSessionRef;
accHash: Hash256
): Result[Hash256,HexaryDbError] =
## Fetch the account path on the `BaseChainDB`, the one next to the
@ -675,7 +739,7 @@ proc nextChainDbKey*(
err(AccountNotFound)
proc prevChainDbKey*(
ps: AccountsDbSessionRef;
ps: SnapDbSessionRef;
accHash: Hash256
): Result[Hash256,HexaryDbError] =
## Fetch the account path on the `BaseChainDB`, the one before to the
@ -692,7 +756,11 @@ proc prevChainDbKey*(
err(AccountNotFound)
proc assignPrettyKeys*(ps: AccountsDbSessionRef) =
# ------------------------------------------------------------------------------
# Debugging (and playing with the hexary database)
# ------------------------------------------------------------------------------
proc assignPrettyKeys*(ps: SnapDbSessionRef) =
## Prepare foe pretty pringing/debugging. Run early enough this function
## sets the root key to `"$"`, for instance.
noPpError("validate(1)"):
@ -710,22 +778,22 @@ proc assignPrettyKeys*(ps: AccountsDbSessionRef) =
of Extension: discard node.eLink.toKey(ps)
of Leaf: discard
proc dumpPath*(ps: AccountsDbSessionRef; key: NodeTag): seq[string] =
proc dumpPath*(ps: SnapDbSessionRef; key: NodeTag): seq[string] =
## Pretty print helper compiling the path into the repair tree for the
## argument `key`.
noPpError("dumpPath"):
let rPath= key.to(NodeKey).hexaryPath(ps.accRoot.to(RepairKey), ps.accDb)
result = rPath.path.mapIt(it.pp(ps.accDb)) & @["(" & rPath.tail.pp & ")"]
proc dumpAccDB*(ps: AccountsDbSessionRef; indent = 4): string =
proc dumpAccDB*(ps: SnapDbSessionRef; indent = 4): string =
## Dump the entries from the a generic accounts trie.
ps.accDb.pp(ps.accRoot,indent)
proc getAcc*(ps: AccountsDbSessionRef): HexaryTreeDbRef =
proc getAcc*(ps: SnapDbSessionRef): HexaryTreeDbRef =
## Low level access to accounts DB
ps.accDb
proc hexaryPpFn*(ps: AccountsDbSessionRef): HexaryPpFn =
proc hexaryPpFn*(ps: SnapDbSessionRef): HexaryPpFn =
## Key mapping function used in `HexaryTreeDB`
ps.accDb.keyPp

View File

@ -1,566 +0,0 @@
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
std/sequtils,
chronicles,
chronos,
eth/[common/eth_types, p2p],
stew/[interval_set, keyed_queue],
stint,
../../sync_desc,
".."/[range_desc, worker_desc],
./com/[get_account_range, get_error, get_storage_ranges, get_trie_nodes],
./accounts_db
when snapAccountsDumpEnable:
import ../../../tests/replay/[undump_accounts, undump_storages]
{.push raises: [Defect].}
logScope:
topics = "snap-fetch"
const
extraTraceMessages = false or true
## Enabled additional logging noise
maxTimeoutErrors = 2
## maximal number of non-resonses accepted in a row
# ------------------------------------------------------------------------------
# Private debugging
# ------------------------------------------------------------------------------
proc dumpBegin(
buddy: SnapBuddyRef;
iv: LeafRange;
dd: GetAccountRange;
error = NothingSerious) =
# For debuging, will go away
discard
when snapAccountsDumpEnable:
let ctx = buddy.ctx
if ctx.data.proofDumpOk:
let
peer = buddy.peer
env = ctx.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
trace " Snap proofs dump", peer, enabled=ctx.data.proofDumpOk, iv
var
fd = ctx.data.proofDumpFile
try:
if error != NothingSerious:
fd.write " # Error: base=" & $iv.minPt & " msg=" & $error & "\n"
fd.write "# count ", $ctx.data.proofDumpInx & "\n"
fd.write stateRoot.dumpAccounts(iv.minPt, dd.data) & "\n"
except CatchableError:
discard
ctx.data.proofDumpInx.inc
proc dumpStorage(buddy: SnapBuddyRef; data: AccountStorageRange) =
# For debuging, will go away
discard
when snapAccountsDumpEnable:
let ctx = buddy.ctx
if ctx.data.proofDumpOk:
let
peer = buddy.peer
env = ctx.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
var
fd = ctx.data.proofDumpFile
try:
fd.write stateRoot.dumpStorages(data) & "\n"
except CatchableError:
discard
proc dumpEnd(buddy: SnapBuddyRef) =
# For debuging, will go away
discard
when snapAccountsDumpEnable:
let ctx = buddy.ctx
if ctx.data.proofDumpOk:
var fd = ctx.data.proofDumpFile
fd.flushFile
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc withMaxLen(buddy: SnapBuddyRef; iv: LeafRange): LeafRange =
## Reduce accounts interval to maximal size
let maxlen = buddy.ctx.data.accountRangeMax
if 0 < iv.len and iv.len <= maxLen:
iv
else:
LeafRange.new(iv.minPt, iv.minPt + (maxLen - 1.u256))
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc getUnprocessed(buddy: SnapBuddyRef): Result[LeafRange,void] =
## Fetch an interval from the account range list. Use the `pivotAccount`
## value as a start entry to fetch data from, wrapping around if necessary.
let
ctx = buddy.ctx
env = ctx.data.pivotEnv
pivotPt = env.pivotAccount
block:
# Take the next interval to the right (aka ge) `pivotPt`
let rc = env.availAccounts.ge(pivotPt)
if rc.isOk:
let iv = buddy.withMaxLen(rc.value)
discard env.availAccounts.reduce(iv)
return ok(iv)
block:
# Check whether the `pivotPt` is in the middle of an interval
let rc = env.availAccounts.envelope(pivotPt)
if rc.isOk:
let iv = buddy.withMaxLen(LeafRange.new(pivotPt, rc.value.maxPt))
discard env.availAccounts.reduce(iv)
return ok(iv)
block:
# Otherwise wrap around
let rc = env.availAccounts.ge()
if rc.isOk:
let iv = buddy.withMaxLen(rc.value)
discard env.availAccounts.reduce(iv)
return ok(iv)
err()
proc putUnprocessed(buddy: SnapBuddyRef; iv: LeafRange) =
## Shortcut
discard buddy.ctx.data.pivotEnv.availAccounts.merge(iv)
proc delUnprocessed(buddy: SnapBuddyRef; iv: LeafRange) =
## Shortcut
discard buddy.ctx.data.pivotEnv.availAccounts.reduce(iv)
proc stopAfterError(
buddy: SnapBuddyRef;
error: ComError;
): Future[bool]
{.async.} =
## Error handling after data protocol failed.
case error:
of ComResponseTimeout:
if maxTimeoutErrors <= buddy.data.errors.nTimeouts:
# Mark this peer dead, i.e. avoid fetching from this peer for a while
buddy.ctrl.zombie = true
else:
# Otherwise try again some time later. Nevertheless, stop the
# current action.
buddy.data.errors.nTimeouts.inc
await sleepAsync(5.seconds)
return true
of ComNetworkProblem,
ComMissingProof,
ComAccountsMinTooSmall,
ComAccountsMaxTooLarge:
# Mark this peer dead, i.e. avoid fetching from this peer for a while
# buddy.data.stats.major.networkErrors.inc()
buddy.ctrl.zombie = true
return true
of ComEmptyAccountsArguments,
ComEmptyRequestArguments,
ComInspectDbFailed,
ComImportAccountsFailed,
ComNoDataForProof,
ComNothingSerious:
discard
of ComNoAccountsForStateRoot,
ComNoStorageForAccounts,
ComNoByteCodesAvailable,
ComNoTrieNodesAvailable,
ComTooManyByteCodes,
ComTooManyStorageSlots,
ComTooManyTrieNodes:
# Mark this peer dead, i.e. avoid fetching from this peer for a while
buddy.ctrl.zombie = true
return true
# ------------------------------------------------------------------------------
# Private functions: accounts
# ------------------------------------------------------------------------------
proc processAccounts(
buddy: SnapBuddyRef;
iv: LeafRange; ## Accounts range to process
): Future[Result[void,ComError]]
{.async.} =
## Process accounts and storage by bulk download on the current envirinment
# Reset error counts for detecting repeated timeouts
buddy.data.errors.nTimeouts = 0
# Process accounts
let
ctx = buddy.ctx
peer = buddy.peer
env = ctx.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
# Fetch data for this range delegated to `fetchAccounts()`
let dd = block:
let rc = await buddy.getAccountRange(stateRoot, iv)
if rc.isErr:
buddy.putUnprocessed(iv) # fail => interval back to pool
return err(rc.error)
rc.value
let
nAccounts = dd.data.accounts.len
nStorage = dd.withStorage.len
block:
let rc = ctx.data.accountsDb.importAccounts(
peer, stateRoot, iv.minPt, dd.data)
if rc.isErr:
# Bad data, just try another peer
buddy.putUnprocessed(iv)
buddy.ctrl.zombie = true
trace "Import failed, restoring unprocessed accounts", peer, stateRoot,
range=dd.consumed, nAccounts, nStorage, error=rc.error
buddy.dumpBegin(iv, dd, rc.error) # FIXME: Debugging (will go away)
buddy.dumpEnd() # FIXME: Debugging (will go away)
return err(ComImportAccountsFailed)
buddy.dumpBegin(iv, dd) # FIXME: Debugging (will go away)
# Statistics
env.nAccounts.inc(nAccounts)
env.nStorage.inc(nStorage)
# Register consumed intervals on the accumulator over all state roots
discard buddy.ctx.data.coveredAccounts.merge(dd.consumed)
# Register consumed and bulk-imported (well, not yet) accounts range
block registerConsumed:
block:
# Both intervals `min(iv)` and `min(dd.consumed)` are equal
let rc = iv - dd.consumed
if rc.isOk:
# Now, `dd.consumed` < `iv`, return some unused range
buddy.putUnprocessed(rc.value)
break registerConsumed
block:
# The processed interval might be a bit larger
let rc = dd.consumed - iv
if rc.isOk:
# Remove from unprocessed data. If it is not unprocessed, anymore
# then it was doubly processed which is ok.
buddy.delUnprocessed(rc.value)
break registerConsumed
# End registerConsumed
# Store accounts on the storage TODO list.
discard env.leftOver.append SnapSlotQueueItemRef(q: dd.withStorage)
return ok()
# ------------------------------------------------------------------------------
# Private functions: accounts storage
# ------------------------------------------------------------------------------
proc fetchAndImportStorageSlots(
buddy: SnapBuddyRef;
reqSpecs: seq[AccountSlotsHeader];
): Future[Result[seq[SnapSlotQueueItemRef],ComError]]
{.async.} =
## Fetch storage slots data from the network, store it on disk and
## return yet unprocessed data.
let
ctx = buddy.ctx
peer = buddy.peer
env = ctx.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
# Get storage slots
var stoRange = block:
let rc = await buddy.getStorageRanges(stateRoot, reqSpecs)
if rc.isErr:
return err(rc.error)
rc.value
if 0 < stoRange.data.storages.len:
# ------------------------------
buddy.dumpStorage(stoRange.data)
# ------------------------------
# Verify/process data and save to disk
block:
let rc = ctx.data.accountsDb.importStorages(peer, stoRange.data)
if rc.isErr:
# Push back parts of the error item
var once = false
for w in rc.error:
if 0 <= w[0]:
# Reset any partial requests by not copying the `firstSlot` field.
# So all the storage slots are re-fetched completely for this
# account.
stoRange.addLeftOver(
@[AccountSlotsHeader(
accHash: stoRange.data.storages[w[0]].account.accHash,
storageRoot: stoRange.data.storages[w[0]].account.storageRoot)],
forceNew = not once)
once = true
# Do not ask for the same entries again on this `peer`
if once:
buddy.data.vetoSlots.incl stoRange.leftOver[^1]
if rc.error[^1][0] < 0:
discard
# TODO: disk storage failed or something else happend, so what?
# Return the remaining part to be processed later
return ok(stoRange.leftOver)
proc processStorageSlots(
buddy: SnapBuddyRef;
): Future[Result[void,ComError]]
{.async.} =
## Fetch storage data and save it on disk. Storage requests are managed by
## a request queue for handling partioal replies and re-fetch issues. For
## all practical puroses, this request queue should mostly be empty.
let
ctx = buddy.ctx
peer = buddy.peer
env = ctx.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
while true:
# Pull out the next request item from the queue
var req: SnapSlotQueueItemRef
block getNextAvailableItem:
for w in env.leftOver.nextKeys:
# Make sure that this item was not fetched for, already
if w notin buddy.data.vetoSlots:
env.leftOver.del(w)
req = w
break getNextAvailableItem
return ok()
block:
# Fetch and store account storage slots. On some sort of success,
# the `rc` return value contains a list of left-over items to be
# re-processed.
let rc = await buddy.fetchAndImportStorageSlots(req.q)
if rc.isErr:
# Save accounts/storage list to be processed later, then stop
discard env.leftOver.append req
return err(rc.error)
for qLo in rc.value:
# Handle queue left-overs for processing in the next cycle
if qLo.q[0].firstSlot == Hash256.default and 0 < env.leftOver.len:
# Appending to last queue item is preferred over adding new item
let item = env.leftOver.first.value
item.q = item.q & qLo.q
else:
# Put back as-is.
discard env.leftOver.append qLo
# End while
return ok()
# ------------------------------------------------------------------------------
# Private functions: healing
# ------------------------------------------------------------------------------
proc accountsTrieHealing(
buddy: SnapBuddyRef;
env: SnapPivotRef;
envSource: string;
): Future[Result[void,ComError]]
{.async.} =
## ...
# Starting with a given set of potentially dangling nodes, this set is
# updated.
let
ctx = buddy.ctx
peer = buddy.peer
stateRoot = env.stateHeader.stateRoot
while env.repairState != Done and
(env.dangling.len != 0 or env.repairState == Pristine):
trace "Accounts healing loop", peer, repairState=env.repairState,
envSource, nDangling=env.dangling.len
var needNodes: seq[Blob]
block getDanglingNodes:
let rc = ctx.data.accountsDb.inspectAccountsTrie(
peer, stateRoot, env.dangling)
if rc.isOk:
needNodes = rc.value.dangling
break getDanglingNodes
let error = rc.error
if error == TrieIsEmpty and env.dangling.len == 0:
when extraTraceMessages:
trace "Accounts healing on empty trie", peer,
repairState=env.repairState, envSource
return ok()
trace "Accounts healing failed", peer, repairState=env.repairState,
envSource, nDangling=env.dangling.len, error
return err(ComInspectDbFailed)
# Update dangling nodes register so that other processes would not fetch
# the same list simultaneously.
if maxTrieNodeFetch < needNodes.len:
# No point in processing more at the same time. So leave the rest on
# the `dangling` queue.
env.dangling = needNodes[maxTrieNodeFetch ..< needNodes.len]
needNodes.setLen(maxTrieNodeFetch)
else:
env.dangling.setLen(0)
# Noting to anymore
if needNodes.len == 0:
if env.repairState != Pristine:
env.repairState = Done
when extraTraceMessages:
trace "Done accounts healing for now", peer,
repairState=env.repairState, envSource, nDangling=env.dangling.len
return ok()
let lastState = env.repairState
env.repairState = KeepGoing
when extraTraceMessages:
trace "Need nodes for healing", peer, repairState=env.repairState,
envSource, nDangling=env.dangling.len, nNodes=needNodes.len
# Fetch nodes
let dd = block:
let rc = await buddy.getTrieNodes(stateRoot, needNodes.mapIt(@[it]))
if rc.isErr:
env.dangling = needNodes
env.repairState = lastState
return err(rc.error)
rc.value
# Store to disk and register left overs for the next pass
block:
let rc = ctx.data.accountsDb.importRawNodes(peer, dd.nodes)
if rc.isOk:
env.dangling = dd.leftOver.mapIt(it[0])
elif 0 < rc.error.len and rc.error[^1][0] < 0:
# negative index => storage error
env.dangling = needNodes
else:
let nodeKeys = rc.error.mapIt(dd.nodes[it[0]])
env.dangling = dd.leftOver.mapIt(it[0]) & nodeKeys
# End while
return ok()
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc fetchAccounts*(buddy: SnapBuddyRef) {.async.} =
## Fetch accounts and data and store them in the database.
##
## TODO: Healing for storages. Currently, healing in only run for accounts.
let
ctx = buddy.ctx
peer = buddy.peer
env = ctx.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
var
# Complete the previous environment by trie database healing (if any)
healingEnvs = if not ctx.data.prevEnv.isNil: @[ctx.data.prevEnv] else: @[]
block processAccountsFrame:
# Get a range of accounts to fetch from
let iv = block:
let rc = buddy.getUnprocessed()
if rc.isErr:
# Although there are no accounts left to process, the other peer might
# still work on some accounts. As a general rule, not all from an
# account range gets served so the remaining range will magically
# reappear on the unprocessed ranges database.
trace "No more unprocessed accounts (maybe)", peer, stateRoot
# Complete healing for sporadic nodes missing.
healingEnvs.add env
break processAccountsFrame
rc.value
when extraTraceMessages:
trace "Start fetching accounts", peer, stateRoot, iv,
repairState=env.repairState
# Process received accounts and stash storage slots to fetch later
block:
let rc = await buddy.processAccounts(iv)
if rc.isErr:
let error = rc.error
if await buddy.stopAfterError(error):
buddy.dumpEnd() # FIXME: Debugging (will go away)
trace "Stop fetching cycle", peer, repairState=env.repairState,
processing="accounts", error
return
break processAccountsFrame
# End `block processAccountsFrame`
when extraTraceMessages:
trace "Start fetching storages", peer, nAccounts=env.leftOver.len,
nVetoSlots=buddy.data.vetoSlots.len, repairState=env.repairState
# Process storage slots from environment batch
block:
let rc = await buddy.processStorageSlots()
if rc.isErr:
let error = rc.error
if await buddy.stopAfterError(error):
buddy.dumpEnd() # FIXME: Debugging (will go away)
trace "Stop fetching cycle", peer, repairState=env.repairState,
processing="storage", error
return
# Check whether there is some environment that can be completed by
# Patricia Merkle Tree healing.
for w in healingEnvs:
let envSource = if env == ctx.data.pivotEnv: "pivot" else: "retro"
when extraTraceMessages:
trace "Start accounts healing", peer, repairState=env.repairState,
envSource, dangling=w.dangling.len
let rc = await buddy.accountsTrieHealing(w, envSource)
if rc.isErr:
let error = rc.error
if await buddy.stopAfterError(error):
buddy.dumpEnd() # FIXME: Debugging (will go away)
when extraTraceMessages:
trace "Stop fetching cycle", peer, repairState=env.repairState,
processing="healing", dangling=w.dangling.len, error
return
buddy.dumpEnd() # FIXME: Debugging (will go away)
when extraTraceMessages:
trace "Done fetching cycle", peer, repairState=env.repairState
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,380 @@
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Heal accounts DB:
## =================
##
## Flow chart for healing algorithm
## --------------------------------
## ::
## START with {state-root}
## |
## | +------------------------------------------------+
## | | |
## v v |
## <inspect-accounts-trie> --> {missing-account-nodes} |
## | | |
## v v |
## {leaf-nodes} <get-trie-nodes-via-snap/1> |
## | | |
## v v |
## <update-accounts-batch> <merge-nodes-into-database> |
## | | |
## v v |
## {storage-roots} {check-account-nodes} ---------+
## |
## v
## <update-storage-processor-batch>
##
## Legend:
## * `<..>` some action, process, etc.
## * `{..}` some data set, list, or queue etc.
##
## Discussion of flow chart
## ------------------------
## * Input nodes for `<inspect-accounts-trie>` are checked for dangling child
## node links which in turn are collected as output.
##
## * Nodes of the `{missing-account-nodes}` list are fetched from the network
## and merged into the accounts trie database. Successfully processed nodes
## are collected in the `{check-account-nodes}` list which is fed back into
## the `<inspect-accounts-trie>` process.
##
## * If there is a problem with a node travelling from the source list
## `{missing-account-nodes}` towards the target list `{check-account-nodes}`,
## this problem node will simply held back in the source list.
##
## In order to avoid unnecessary stale entries, the `{missing-account-nodes}`
## list is regularly checked for whether nodes are still missing or some
## other process has done the magic work of merging some of then into the
## trie database.
##
## Competing with other trie algorithms
## ------------------------------------
## * Healing runs (semi-)parallel to processing `GetAccountRange` network
## messages from the `snap/1` protocol. This is more network bandwidth
## efficient in comparison with the healing algorithm. Here, leaf nodes are
## transferred wholesale while with the healing algorithm, only the top node
## of a sub-trie can be transferred at once (but for multiple sub-tries).
##
## * The healing algorithm visits all nodes of a complete trie unless it is
## stopped in between.
##
## * If a trie node is missing, it can be fetched directly by the healing
## algorithm or one can wait for another process to do the job. Waiting for
## other processes to do the job also applies to problem nodes as indicated
## in the last bullet item of the previous chapter.
##
## * Network bandwidth can be saved if nodes are fetched by a more efficient
## process (if that is available.) This suggests that fetching missing trie
## nodes by the healing algorithm should kick in very late when the trie
## database is nearly complete.
##
## * Healing applies to a trie database associated with the currently latest
## *state root*, which may change occasionally. It suggests to start the
## healing algorithm very late altogether (not fetching nodes, only) because
## most trie databases will never be completed by healing.
##
import
std/sequtils,
chronicles,
chronos,
eth/[common/eth_types, p2p, trie/trie_defs],
stew/[interval_set, keyed_queue],
../../../utils/prettify,
../../sync_desc,
".."/[range_desc, worker_desc],
./com/get_trie_nodes,
./db/snap_db
{.push raises: [Defect].}
logScope:
topics = "snap-fetch"
const
extraTraceMessages = false or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------------------
proc coverageInfo(buddy: SnapBuddyRef): string =
## Logging helper ...
let
ctx = buddy.ctx
env = buddy.data.pivotEnv
env.fetchAccounts.emptyFactor.toPC(0) &
"/" &
ctx.data.coveredAccounts.fullFactor.toPC(0)
proc getCoveringLeafRangeSet(buddy: SnapBuddyRef; pt: NodeTag): LeafRangeSet =
## Helper ...
let env = buddy.data.pivotEnv
for ivSet in env.fetchAccounts:
if 0 < ivSet.covered(pt,pt):
return ivSet
proc commitLeafAccount(buddy: SnapBuddyRef; ivSet: LeafRangeSet; pt: NodeTag) =
## Helper ...
discard ivSet.reduce(pt,pt)
discard buddy.ctx.data.coveredAccounts.merge(pt,pt)
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc updateMissingNodesList(buddy: SnapBuddyRef) =
## Check whether previously missing nodes from the `missingAccountNodes` list
## have been magically added to the database since it was checked last time.
## These nodes will me moved to `checkAccountNodes` for further processing.
let
ctx = buddy.ctx
peer = buddy.peer
env = buddy.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
var
nodes: seq[Blob]
for accKey in env.missingAccountNodes:
let rc = ctx.data.snapDb.getAccountNodeKey(peer, stateRoot, accKey)
if rc.isOk:
# Check nodes for dangling links
env.checkAccountNodes.add acckey
else:
# Node is still missing
nodes.add acckey
env.missingAccountNodes = nodes
proc mergeIsolatedAccounts(
buddy: SnapBuddyRef;
paths: openArray[NodeKey];
): seq[AccountSlotsHeader] =
## Process leaves found with nodes inspection, returns a list of
## storage slots for these nodes.
let
ctx = buddy.ctx
peer = buddy.peer
env = buddy.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
# Remove reported leaf paths from the accounts interval
for accKey in paths:
let
pt = accKey.to(NodeTag)
ivSet = buddy.getCoveringLeafRangeSet(pt)
if not ivSet.isNil:
let
rc = ctx.data.snapDb.getAccountData(peer, stateRoot, accKey)
accountHash = Hash256(data: accKey.ByteArray32)
if rc.isOk:
let storageRoot = rc.value.storageRoot
when extraTraceMessages:
let stRootStr = if storageRoot != emptyRlpHash: $storageRoot
else: "emptyRlpHash"
trace "Registered isolated persistent account", peer, accountHash,
storageRoot=stRootStr
if storageRoot != emptyRlpHash:
result.add AccountSlotsHeader(
accHash: accountHash,
storageRoot: storageRoot)
buddy.commitLeafAccount(ivSet, pt)
env.nAccounts.inc
continue
when extraTraceMessages:
let error = rc.error
trace "Get persistent account problem", peer, accountHash, error
proc fetchDanglingNodesList(
buddy: SnapBuddyRef
): Result[TrieNodeStat,HexaryDbError] =
## Starting with a given set of potentially dangling account nodes
## `checkAccountNodes`, this set is filtered and processed. The outcome
## is fed back to the vey same list `checkAccountNodes`
let
ctx = buddy.ctx
peer = buddy.peer
env = buddy.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
maxLeaves = if env.checkAccountNodes.len == 0: 0
else: maxHealingLeafPaths
rc = ctx.data.snapDb.inspectAccountsTrie(
peer, stateRoot, env.checkAccountNodes, maxLeaves)
if rc.isErr:
# Attempt to switch peers, there is not much else we can do here
buddy.ctrl.zombie = true
return err(rc.error)
# Global/env batch list to be replaced by by `rc.value.leaves` return value
env.checkAccountNodes.setLen(0)
# Store accounts leaves on the storage batch list.
let withStorage = buddy.mergeIsolatedAccounts(rc.value.leaves)
if 0 < withStorage.len:
discard env.fetchStorage.append SnapSlotQueueItemRef(q: withStorage)
when extraTraceMessages:
trace "Accounts healing storage nodes", peer,
nAccounts=env.nAccounts,
covered=buddy.coverageInfo(),
nWithStorage=withStorage.len,
nDangling=rc.value.dangling
return ok(rc.value)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc healAccountsDb*(buddy: SnapBuddyRef) {.async.} =
## Fetching and merging missing account trie database nodes.
let
ctx = buddy.ctx
peer = buddy.peer
env = buddy.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
# Only start healing if there is some completion level, already.
#
# We check against the global coverage factor, i.e. a measure for how
# much of the total of all accounts have been processed. Even if the trie
# database for the current pivot state root is sparsely filled, there
# is a good chance that it can inherit some unchanged sub-trie from an
# earlier pivor state root download. The healing process then works like
# sort of glue.
#
if env.nAccounts == 0 or
ctx.data.coveredAccounts.fullFactor < healAccountsTrigger:
when extraTraceMessages:
trace "Accounts healing postponed", peer,
nAccounts=env.nAccounts,
covered=buddy.coverageInfo(),
nCheckAccountNodes=env.checkAccountNodes.len,
nMissingAccountNodes=env.missingAccountNodes.len
return
when extraTraceMessages:
trace "Start accounts healing", peer,
nAccounts=env.nAccounts,
covered=buddy.coverageInfo(),
nCheckAccountNodes=env.checkAccountNodes.len,
nMissingAccountNodes=env.missingAccountNodes.len
# Update for changes since last visit
buddy.updateMissingNodesList()
# If `checkAccountNodes` is empty, healing is at the very start or
# was postponed in which case `missingAccountNodes` is non-empty.
var
nodesMissing: seq[Blob] # Nodes to process by this instance
nLeaves = 0 # For logging
if 0 < env.checkAccountNodes.len or env.missingAccountNodes.len == 0:
let rc = buddy.fetchDanglingNodesList()
if rc.isErr:
error "Accounts healing failed => stop", peer,
nAccounts=env.nAccounts,
covered=buddy.coverageInfo(),
nCheckAccountNodes=env.checkAccountNodes.len,
nMissingAccountNodes=env.missingAccountNodes.len,
error=rc.error
return
nodesMissing = rc.value.dangling
nLeaves = rc.value.leaves.len
# Check whether the trie is complete.
if nodesMissing.len == 0 and env.missingAccountNodes.len == 0:
when extraTraceMessages:
trace "Accounts healing complete", peer,
nAccounts=env.nAccounts,
covered=buddy.coverageInfo(),
nCheckAccountNodes=0,
nMissingAccountNodes=0,
nNodesMissing=0,
nLeaves
return # nothing to do
# Ok, clear global `env.missingAccountNodes` list and process `nodesMissing`.
nodesMissing = nodesMissing & env.missingAccountNodes
env.missingAccountNodes.setlen(0)
# Fetch nodes, merge it into database and feed back results
while 0 < nodesMissing.len:
var fetchNodes: seq[Blob]
# There is no point in processing too many nodes at the same time. So
# leave the rest on the `nodesMissing` queue for a moment.
if maxTrieNodeFetch < nodesMissing.len:
let inxLeft = nodesMissing.len - maxTrieNodeFetch
fetchNodes = nodesMissing[inxLeft ..< nodesMissing.len]
nodesMissing.setLen(inxLeft)
else:
fetchNodes = nodesMissing
nodesMissing.setLen(0)
when extraTraceMessages:
trace "Accounts healing loop", peer,
nAccounts=env.nAccounts,
covered=buddy.coverageInfo(),
nCheckAccountNodes=env.checkAccountNodes.len,
nMissingAccountNodes=env.missingAccountNodes.len,
nNodesMissing=nodesMissing.len,
nLeaves
# Fetch nodes from the network
let dd = block:
let rc = await buddy.getTrieNodes(stateRoot, fetchNodes.mapIt(@[it]))
if rc.isErr:
env.missingAccountNodes = env.missingAccountNodes & fetchNodes
when extraTraceMessages:
trace "Error fetching account nodes for healing", peer,
nAccounts=env.nAccounts,
covered=buddy.coverageInfo(),
nCheckAccountNodes=env.checkAccountNodes.len,
nMissingAccountNodes=env.missingAccountNodes.len,
nNodesMissing=nodesMissing.len,
nLeaves,
error=rc.error
# Just run the next lap
continue
rc.value
# Store to disk and register left overs for the next pass
block:
let rc = ctx.data.snapDb.importRawNodes(peer, dd.nodes)
if rc.isOk:
env.checkAccountNodes = env.checkAccountNodes & dd.leftOver.mapIt(it[0])
elif 0 < rc.error.len and rc.error[^1][0] < 0:
# negative index => storage error
env.missingAccountNodes = env.missingAccountNodes & fetchNodes
else:
env.missingAccountNodes = env.missingAccountNodes &
dd.leftOver.mapIt(it[0]) & rc.error.mapIt(dd.nodes[it[0]])
# End while
when extraTraceMessages:
trace "Done accounts healing", peer,
nAccounts=env.nAccounts,
covered=buddy.coverageInfo(),
nCheckAccountNodes=env.checkAccountNodes.len,
nMissingAccountNodes=env.missingAccountNodes.len,
nLeaves
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,186 @@
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Fetch accounts stapshot
## =======================
##
## Worker items state diagram:
## ::
## unprocessed | peer workers + |
## account ranges | account database update | unprocessed storage slots
## ========================================================================
##
## +---------------------------------------+
## | |
## v |
## <unprocessed> -----+------> <worker-0> ------+-----> OUTPUT
## | |
## +------> <worker-1> ------+
## | |
## +------> <worker-2> ------+
## : :
##
import
chronicles,
chronos,
eth/[common/eth_types, p2p],
stew/[interval_set, keyed_queue],
stint,
../../sync_desc,
".."/[range_desc, worker_desc],
./com/[com_error, get_account_range],
./db/snap_db
{.push raises: [Defect].}
logScope:
topics = "snap-fetch"
const
extraTraceMessages = false or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc withMaxLen(
buddy: SnapBuddyRef;
iv: LeafRange;
maxlen: UInt256;
): LeafRange =
## Reduce accounts interval to maximal size
if 0 < iv.len and iv.len <= maxLen:
iv
else:
LeafRange.new(iv.minPt, iv.minPt + (maxLen - 1.u256))
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc getUnprocessed(buddy: SnapBuddyRef): Result[LeafRange,void] =
## Fetch an interval from one of the account range lists.
let
env = buddy.data.pivotEnv
accountRangeMax = high(UInt256) div buddy.ctx.buddiesMax.u256
for ivSet in env.fetchAccounts:
let rc = ivSet.ge()
if rc.isOk:
let iv = buddy.withMaxLen(rc.value, accountRangeMax)
discard ivSet.reduce(iv)
return ok(iv)
err()
proc putUnprocessed(buddy: SnapBuddyRef; iv: LeafRange) =
## Shortcut
discard buddy.data.pivotEnv.fetchAccounts[1].merge(iv)
proc delUnprocessed(buddy: SnapBuddyRef; iv: LeafRange) =
## Shortcut
discard buddy.data.pivotEnv.fetchAccounts[1].reduce(iv)
proc markGloballyProcessed(buddy: SnapBuddyRef; iv: LeafRange) =
## Shortcut
discard buddy.ctx.data.coveredAccounts.merge(iv)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc storeAccounts*(buddy: SnapBuddyRef) {.async.} =
## Fetch accounts and store them in the database.
let
ctx = buddy.ctx
peer = buddy.peer
env = buddy.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
# Get a range of accounts to fetch from
let iv = block:
let rc = buddy.getUnprocessed()
if rc.isErr:
trace "Currently no unprocessed accounts", peer, stateRoot
return
rc.value
when extraTraceMessages:
trace "Start fetching accounts", peer, stateRoot, iv
# Process received accounts and stash storage slots to fetch later
let dd = block:
let rc = await buddy.getAccountRange(stateRoot, iv)
if rc.isErr:
buddy.putUnprocessed(iv) # fail => interval back to pool
let error = rc.error
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors):
when extraTraceMessages:
trace "Error fetching accounts => stop", peer, error
return
# Reset error counts for detecting repeated timeouts
buddy.data.errors.nTimeouts = 0
rc.value
let
nAccounts = dd.data.accounts.len
nStorage = dd.withStorage.len
block:
let rc = ctx.data.snapDb.importAccounts(peer, stateRoot, iv.minPt, dd.data)
if rc.isErr:
# Bad data, just try another peer
buddy.putUnprocessed(iv)
buddy.ctrl.zombie = true
when extraTraceMessages:
let error = ComImportAccountsFailed
trace "Accounts import failed => stop", peer, stateRoot,
range=dd.consumed, nAccounts, nStorage, error
return
# Statistics
env.nAccounts.inc(nAccounts)
env.nStorage.inc(nStorage)
# Register consumed intervals on the accumulator over all state roots
buddy.markGloballyProcessed(dd.consumed)
# Register consumed and bulk-imported (well, not yet) accounts range
block registerConsumed:
block:
# Both intervals `min(iv)` and `min(dd.consumed)` are equal
let rc = iv - dd.consumed
if rc.isOk:
# Now, `dd.consumed` < `iv`, return some unused range
buddy.putUnprocessed(rc.value)
break registerConsumed
block:
# The processed interval might be a bit larger
let rc = dd.consumed - iv
if rc.isOk:
# Remove from unprocessed data. If it is not unprocessed, anymore
# then it was doubly processed which is ok.
buddy.delUnprocessed(rc.value)
break registerConsumed
# End registerConsumed
# Store accounts on the storage TODO list.
discard env.fetchStorage.append SnapSlotQueueItemRef(q: dd.withStorage)
when extraTraceMessages:
let withStorage = dd.withStorage.len
trace "Done fetching accounts", peer, stateRoot, nAccounts, withStorage, iv
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,177 @@
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Fetch accounts stapshot
## =======================
##
## Worker items state diagram:
## ::
## unprocessed slot requests | peer workers + storages database update
## ===================================================================
##
## +-----------------------------------------------+
## | |
## v |
## <unprocessed> ------------+-------> <worker-0> ------+
## | |
## +-------> <worker-1> ------+
## | |
## +-------> <worker-2> ------+
## : :
##
import
chronicles,
chronos,
eth/[common/eth_types, p2p],
stew/keyed_queue,
stint,
../../sync_desc,
".."/[range_desc, worker_desc],
./com/[com_error, get_storage_ranges],
./db/snap_db
{.push raises: [Defect].}
logScope:
topics = "snap-fetch"
const
extraTraceMessages = false or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc getNextSlotItem(buddy: SnapBuddyRef): Result[SnapSlotQueueItemRef,void] =
let env = buddy.data.pivotEnv
for w in env.fetchStorage.nextKeys:
# Make sure that this item was not fetched and rejected earlier
if w notin buddy.data.vetoSlots:
env.fetchStorage.del(w)
return ok(w)
err()
proc fetchAndImportStorageSlots(
buddy: SnapBuddyRef;
reqSpecs: seq[AccountSlotsHeader];
): Future[Result[seq[SnapSlotQueueItemRef],ComError]]
{.async.} =
## Fetch storage slots data from the network, store it on disk and
## return data to process in the next cycle.
let
ctx = buddy.ctx
peer = buddy.peer
env = buddy.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
# Get storage slots
var stoRange = block:
let rc = await buddy.getStorageRanges(stateRoot, reqSpecs)
if rc.isErr:
return err(rc.error)
rc.value
if 0 < stoRange.data.storages.len:
# Verify/process data and save to disk
block:
let rc = ctx.data.snapDb.importStorages(peer, stoRange.data)
if rc.isErr:
# Push back parts of the error item
var once = false
for w in rc.error:
if 0 <= w[0]:
# Reset any partial requests by not copying the `firstSlot` field.
# So all the storage slots are re-fetched completely for this
# account.
stoRange.addLeftOver(
@[AccountSlotsHeader(
accHash: stoRange.data.storages[w[0]].account.accHash,
storageRoot: stoRange.data.storages[w[0]].account.storageRoot)],
forceNew = not once)
once = true
# Do not ask for the same entries again on this `peer`
if once:
buddy.data.vetoSlots.incl stoRange.leftOver[^1]
if rc.error[^1][0] < 0:
discard
# TODO: disk storage failed or something else happend, so what?
# Return the remaining part to be processed later
return ok(stoRange.leftOver)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc storeStorages*(buddy: SnapBuddyRef) {.async.} =
## Fetch account storage slots and store them in the database.
let
ctx = buddy.ctx
peer = buddy.peer
env = buddy.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
var
once = true # for logging
# Fetch storage data and save it on disk. Storage requests are managed by
# a request queue for handling partioal replies and re-fetch issues. For
# all practical puroses, this request queue should mostly be empty.
while true:
# Pull out the next request item from the queue
let req = block:
let rc = buddy.getNextSlotItem()
if rc.isErr:
return # currently nothing to do
rc.value
when extraTraceMessages:
if once:
once = false
let nAccounts = 1 + env.fetchStorage.len
trace "Start fetching storage slotss", peer,
nAccounts, nVetoSlots=buddy.data.vetoSlots.len
block:
# Fetch and store account storage slots. On success, the `rc` value will
# contain a list of left-over items to be re-processed.
let rc = await buddy.fetchAndImportStorageSlots(req.q)
if rc.isErr:
# Save accounts/storage list to be processed later, then stop
discard env.fetchStorage.append req
let error = rc.error
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors):
trace "Error fetching storage slots => stop", peer, error
discard
return
# Reset error counts for detecting repeated timeouts
buddy.data.errors.nTimeouts = 0
for qLo in rc.value:
# Handle queue left-overs for processing in the next cycle
if qLo.q[0].firstSlot == Hash256() and 0 < env.fetchStorage.len:
# Appending to last queue item is preferred over adding new item
let item = env.fetchStorage.first.value
item.q = item.q & qLo.q
else:
# Put back as-is.
discard env.fetchStorage.append qLo
# End while
when extraTraceMessages:
trace "Done fetching storage slots", peer, nAccounts=env.fetchStorage.len
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -30,8 +30,7 @@ type
nStorage*: (float,float) ## mean and standard deviation
accountsFill*: (float,float,float) ## mean, standard deviation, merged total
accCoverage*: float ## as factor
activeQueues*: int
flushedQueues*: int64
nQueues*: int
TickerStatsUpdater* =
proc: TickerStats {.gcsafe, raises: [Defect].}
@ -119,19 +118,18 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} =
accCov = data.accountsFill[0].toPC(1) &
"(" & data.accountsFill[1].toPC(1) & ")" &
"/" & data.accountsFill[2].toPC(0)
flushed = data.flushedQueues
buddies = t.nBuddies
tick = t.tick.toSI
mem = getTotalMem().uint.toSI
noFmtError("runLogTicker"):
if data.pivotBlock.isSome:
pivot = &"#{data.pivotBlock.get}/{data.activeQueues}"
pivot = &"#{data.pivotBlock.get}/{data.nQueues}"
nAcc = &"{(data.nAccounts[0]+0.5).int64}({(data.nAccounts[1]+0.5).int64})"
nStore = &"{(data.nStorage[0]+0.5).int64}({(data.nStorage[1]+0.5).int64})"
info "Snap sync statistics",
tick, buddies, pivot, nAcc, accCov, nStore, flushed, mem
tick, buddies, pivot, nAcc, accCov, nStore, mem
t.tick.inc
t.setLogTicker(Moment.fromNow(tickerLogInterval))

View File

@ -1,5 +1,4 @@
# Nimbus - New sync approach - A fusion of snap, trie, beam and other methods
#
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
@ -13,10 +12,9 @@ import
std/[hashes, sequtils, strutils],
eth/[common/eth_types, p2p],
stew/[byteutils, keyed_queue],
../../db/select_backend,
../../constants,
"../.."/[constants, db/select_backend],
".."/[sync_desc, types],
./worker/[accounts_db, ticker],
./worker/[com/com_error, db/snap_db, ticker],
./range_desc
{.push raises: [Defect].}
@ -25,14 +23,40 @@ const
snapRequestBytesLimit* = 2 * 1024 * 1024
## Soft bytes limit to request in `snap` protocol calls.
maxPivotBlockWindow* = 128
## The maximal depth of two block headers. If the pivot block header
## (containing the current state root) is more than this many blocks
## away from a new pivot block header candidate, then the latter one
## replaces the current block header.
minPivotBlockDistance* = 128
## The minimal depth of two block headers needed to activate a new state
## root pivot.
##
## This mechanism applies to new worker buddies which start by finding
## a new pivot.
## Effects on assembling the state via `snap/1` protocol:
##
## * A small value of this constant increases the propensity to update the
## pivot header more often. This is so because each new peer negoiates a
## pivot block number at least the current one.
##
## * A large value keeps the current pivot more stable but some experiments
## suggest that the `snap/1` protocol is answered only for later block
## numbers (aka pivot blocks.) So a large value tends to keep the pivot
## farther away from the chain head.
##
## Note that 128 is the magic distance for snapshots used by *Geth*.
backPivotBlockDistance* = 64
## When a pivot header is found, move pivot back `backPivotBlockDistance`
## blocks so that the pivot is guaranteed to have some distance from the
## chain head.
##
## Set `backPivotBlockDistance` to zero for disabling this feature.
backPivotBlockThreshold* = backPivotBlockDistance + minPivotBlockDistance
## Ignore `backPivotBlockDistance` unless the current block number is
## larger than this constant (which must be at least
## `backPivotBlockDistance`.)
healAccountsTrigger* = 0.95
## Apply accounts healing if the global snap download coverage factor
## exceeds this setting. The global coverage factor is derived by merging
## all account ranges retrieved for all pivot state roots (see
## `coveredAccounts` in `CtxData`.)
maxTrieNodeFetch* = 1024
## Informal maximal number of trie nodes to fetch at once. This is nor
@ -41,42 +65,22 @@ const
## Resticting the fetch list length early allows to better paralellise
## healing.
switchPivotAfterCoverage* = 1.0 # * 0.30
## Stop fetching from the same pivot state root with this much coverage
## and try to find a new one. Setting this value to `1.0`, this feature
## is disabled. Note that settting low coverage levels is primarily for
## testing/debugging (may produce stress conditions.)
##
## If this setting is active, it typically supersedes the pivot update
## mechainsm implied by the `maxPivotBlockWindow`. This for the simple
## reason that the pivot state root is typically near the head of the
## block chain.
##
## This mechanism applies to running worker buddies. When triggered, all
## pivot handlers are reset so they will start from scratch finding a
## better pivot.
maxHealingLeafPaths* = 1024
## Retrieve this many leave nodes with proper 32 bytes path when inspecting
## for dangling nodes. This allows to run healing paralell to accounts or
## storage download without requestinng an account/storage slot found by
## healing again with the download.
# ---
noPivotEnvChangeIfComplete* = true
## If set `true`, new peers will not change the pivot even if the
## negotiated pivot would be newer. This should be the default.
snapAccountsDumpEnable* = false # or true
## Enable data dump
snapAccountsDumpCoverageStop* = 0.99999
## Stop dumping if most accounts are covered
# -------
seenBlocksMax = 500
## Internal size of LRU cache (for debugging)
type
BuddyStat* = distinct uint
SnapBuddyErrors* = tuple
## particular error counters so connections will not be cut immediately
## after a particular error.
nTimeouts: uint
# -------
WorkerSeenBlocks = KeyedQueue[array[32,byte],BlockNumber]
## Temporary for pretty debugging, `BlockHash` keyed lru cache
@ -94,23 +98,28 @@ type
SnapSlotsSet* = HashSet[SnapSlotQueueItemRef]
## Ditto but without order, to be used as veto set
SnapRepairState* = enum
Pristine ## Not initialised yet
KeepGoing ## Unfinished repair process
Done ## Stop repairing
SnapAccountRanges* = array[2,LeafRangeSet]
## Pair of account hash range lists. The first entry must be processed
## first. This allows to coordinate peers working on different state roots
## to avoid ovelapping accounts as long as they fetch from the first entry.
SnapPivotRef* = ref object
## Per-state root cache for particular snap data environment
stateHeader*: BlockHeader ## Pivot state, containg state root
pivotAccount*: NodeTag ## Random account
availAccounts*: LeafRangeSet ## Accounts to fetch (as ranges)
# Accounts download
fetchAccounts*: SnapAccountRanges ## Sets of accounts ranges to fetch
checkAccountNodes*: seq[Blob] ## Nodes with prob. dangling child links
missingAccountNodes*: seq[Blob] ## Dangling links to fetch and merge
accountsDone*: bool ## All accounts have been processed
# Storage slots download
fetchStorage*: SnapSlotsQueue ## Fetch storage for these accounts
serialSync*: bool ## Done with storage, block sync next
# Info
nAccounts*: uint64 ## Number of accounts imported
nStorage*: uint64 ## Number of storage spaces imported
leftOver*: SnapSlotsQueue ## Re-fetch storage for these accounts
dangling*: seq[Blob] ## Missing nodes for healing process
repairState*: SnapRepairState ## State of healing process
when switchPivotAfterCoverage < 1.0:
minCoverageReachedOk*: bool ## Stop filling this pivot
SnapPivotTable* = ##\
## LRU table, indexed by state root
@ -118,33 +127,23 @@ type
BuddyData* = object
## Per-worker local descriptor data extension
errors*: SnapBuddyErrors ## For error handling
workerPivot*: RootRef ## Opaque object reference for sub-module
errors*: ComErrorStatsRef ## For error handling
pivotFinder*: RootRef ## Opaque object reference for sub-module
pivotEnv*: SnapPivotRef ## Environment containing state root
vetoSlots*: SnapSlotsSet ## Do not ask for these slots, again
BuddyPoolHookFn* = proc(buddy: BuddyRef[CtxData,BuddyData]) {.gcsafe.}
## All buddies call back (the argument type is defined below with
## pretty name `SnapBuddyRef`.)
CtxData* = object
## Globally shared data extension
seenBlock: WorkerSeenBlocks ## Temporary, debugging, pretty logs
rng*: ref HmacDrbgContext ## Random generator
coveredAccounts*: LeafRangeSet ## Derived from all available accounts
dbBackend*: ChainDB ## Low level DB driver access (if any)
ticker*: TickerRef ## Ticker, logger
pivotTable*: SnapPivotTable ## Per state root environment
pivotCount*: uint64 ## Total of all created tab entries
pivotEnv*: SnapPivotRef ## Environment containing state root
prevEnv*: SnapPivotRef ## Previous state root environment
pivotData*: RootRef ## Opaque object reference for sub-module
accountRangeMax*: UInt256 ## Maximal length, high(u256)/#peers
accountsDb*: AccountsDbRef ## Proof processing for accounts
runPoolHook*: BuddyPoolHookFn ## Callback for `runPool()`
when snapAccountsDumpEnable:
proofDumpOk*: bool
proofDumpFile*: File
proofDumpInx*: int
pivotFinderCtx*: RootRef ## Opaque object reference for sub-module
snapDb*: SnapDbRef ## Accounts snapshot DB
coveredAccounts*: LeafRangeSet ## Derived from all available accounts
# Info
ticker*: TickerRef ## Ticker, logger
SnapBuddyRef* = BuddyRef[CtxData,BuddyData]
## Extended worker peer descriptor
@ -152,16 +151,22 @@ type
SnapCtxRef* = CtxRef[CtxData]
## Extended global descriptor
static:
doAssert healAccountsTrigger < 1.0 # larger values make no sense
doAssert backPivotBlockDistance <= backPivotBlockThreshold
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc inc(stat: var BuddyStat) {.borrow.}
proc hash*(a: SnapSlotQueueItemRef): Hash =
## Table/KeyedQueue mixin
cast[pointer](a).hash
proc hash*(a: Hash256): Hash =
## Table/KeyedQueue mixin
a.data.hash
# ------------------------------------------------------------------------------
# Public functions, debugging helpers (will go away eventually)
# ------------------------------------------------------------------------------

View File

@ -25,8 +25,8 @@ import
../nimbus/p2p/chain,
../nimbus/sync/types,
../nimbus/sync/snap/range_desc,
../nimbus/sync/snap/worker/[accounts_db, db/hexary_desc,
db/hexary_inspect, db/rocky_bulk_load],
../nimbus/sync/snap/worker/db/[hexary_desc, hexary_inspect,
rocky_bulk_load, snap_db,],
../nimbus/utils/prettify,
./replay/[pp, undump_blocks, undump_accounts, undump_storages],
./test_sync_snap/[bulk_test_xx, snap_test_xx, test_types]
@ -117,7 +117,7 @@ proc pp(rc: Result[Hash256,HexaryDbError]): string =
proc pp(
rc: Result[TrieNodeStat,HexaryDbError];
db: AccountsDbSessionRef
db: SnapDbSessionRef
): string =
if rc.isErr: $rc.error else: rc.value.pp(db.getAcc)
@ -284,21 +284,21 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) =
suite &"SyncSnap: {fileInfo} accounts and proofs for {info}":
var
desc: AccountsDbSessionRef
desc: SnapDbSessionRef
accKeys: seq[Hash256]
test &"Snap-proofing {accountsList.len} items for state root ..{root.pp}":
let
dbBase = if persistent: AccountsDbRef.init(db.cdb[0])
else: AccountsDbRef.init(newMemoryDB())
dbDesc = AccountsDbSessionRef.init(dbBase, root, peer)
dbBase = if persistent: SnapDbRef.init(db.cdb[0])
else: SnapDbRef.init(newMemoryDB())
dbDesc = SnapDbSessionRef.init(dbBase, root, peer)
for n,w in accountsList:
check dbDesc.importAccounts(w.base, w.data, persistent) == OkHexDb
test &"Merging {accountsList.len} proofs for state root ..{root.pp}":
let dbBase = if persistent: AccountsDbRef.init(db.cdb[1])
else: AccountsDbRef.init(newMemoryDB())
desc = AccountsDbSessionRef.init(dbBase, root, peer)
let dbBase = if persistent: SnapDbRef.init(db.cdb[1])
else: SnapDbRef.init(newMemoryDB())
desc = SnapDbSessionRef.init(dbBase, root, peer)
# Load/accumulate data from several samples (needs some particular sort)
let
@ -417,19 +417,19 @@ proc storagesRunner(
suite &"SyncSnap: {fileInfo} accounts storage for {info}":
let
dbBase = if persistent: AccountsDbRef.init(db.cdb[0])
else: AccountsDbRef.init(newMemoryDB())
dbBase = if persistent: SnapDbRef.init(db.cdb[0])
else: SnapDbRef.init(newMemoryDB())
var
desc = AccountsDbSessionRef.init(dbBase, root, peer)
desc = SnapDbSessionRef.init(dbBase, root, peer)
test &"Merging {accountsList.len} accounts for state root ..{root.pp}":
for w in accountsList:
let desc = AccountsDbSessionRef.init(dbBase, root, peer)
let desc = SnapDbSessionRef.init(dbBase, root, peer)
check desc.importAccounts(w.base, w.data, persistent) == OkHexDb
test &"Merging {storagesList.len} storages lists":
let
dbDesc = AccountsDbSessionRef.init(dbBase, root, peer)
dbDesc = SnapDbSessionRef.init(dbBase, root, peer)
ignore = knownFailures.toTable
for n,w in storagesList:
let
@ -466,18 +466,18 @@ proc inspectionRunner(
suite &"SyncSnap: inspect {fileInfo} lists for {info} for healing":
let
memBase = AccountsDbRef.init(newMemoryDB())
memDesc = AccountsDbSessionRef.init(memBase, Hash256(), peer)
memBase = SnapDbRef.init(newMemoryDB())
memDesc = SnapDbSessionRef.init(memBase, Hash256(), peer)
var
singleStats: seq[(int,TrieNodeStat)]
accuStats: seq[(int,TrieNodeStat)]
perBase,altBase: AccountsDbRef
perDesc,altDesc: AccountsDbSessionRef
perBase,altBase: SnapDbRef
perDesc,altDesc: SnapDbSessionRef
if persistent:
perBase = AccountsDbRef.init(db.cdb[0])
perDesc = AccountsDbSessionRef.init(perBase, Hash256(), peer)
altBase = AccountsDbRef.init(db.cdb[1])
altDesc = AccountsDbSessionRef.init(altBase, Hash256(), peer)
perBase = SnapDbRef.init(db.cdb[0])
perDesc = SnapDbSessionRef.init(perBase, Hash256(), peer)
altBase = SnapDbRef.init(db.cdb[1])
altDesc = SnapDbSessionRef.init(altBase, Hash256(), peer)
test &"Fingerprinting {inspectList.len} single accounts lists " &
"for in-memory-db":
@ -486,7 +486,7 @@ proc inspectionRunner(
let
root = accList[0].root
rootKey = root.to(NodeKey)
desc = AccountsDbSessionRef.init(memBase, root, peer)
desc = SnapDbSessionRef.init(memBase, root, peer)
for w in accList:
check desc.importAccounts(w.base, w.data, persistent=false) == OkHexDb
let rc = desc.inspectAccountsTrie(persistent=false)
@ -510,8 +510,8 @@ proc inspectionRunner(
let
root = accList[0].root
rootKey = root.to(NodeKey)
dbBase = AccountsDbRef.init(db.cdb[2+n])
desc = AccountsDbSessionRef.init(dbBase, root, peer)
dbBase = SnapDbRef.init(db.cdb[2+n])
desc = SnapDbSessionRef.init(dbBase, root, peer)
for w in accList:
check desc.importAccounts(w.base, w.data, persistent) == OkHexDb
let rc = desc.inspectAccountsTrie(persistent=false)
@ -572,8 +572,8 @@ proc inspectionRunner(
skip()
else:
let
cscBase = AccountsDbRef.init(newMemoryDB())
cscDesc = AccountsDbSessionRef.init(cscBase, Hash256(), peer)
cscBase = SnapDbRef.init(newMemoryDB())
cscDesc = SnapDbSessionRef.init(cscBase, Hash256(), peer)
var
cscStep: Table[NodeKey,(int,seq[Blob])]
for n,accList in inspectList:
@ -588,7 +588,7 @@ proc inspectionRunner(
cscStep[rootKey][0].inc
let
r0 = desc.inspectAccountsTrie(persistent=false)
rc = desc.inspectAccountsTrie(cscStep[rootKey][1],false)
rc = desc.inspectAccountsTrie(cscStep[rootKey][1],persistent=false)
check rc.isOk
let
accumulated = r0.value.dangling.toHashSet
@ -620,7 +620,7 @@ proc inspectionRunner(
cscStep[rootKey][0].inc
let
r0 = desc.inspectAccountsTrie(persistent=true)
rc = desc.inspectAccountsTrie(cscStep[rootKey][1],true)
rc = desc.inspectAccountsTrie(cscStep[rootKey][1],persistent=true)
check rc.isOk
let
accumulated = r0.value.dangling.toHashSet