Pre functional snap to full sync (#1546)

* Update sync scheduler pool mode

why:
  The pool mode allows to loop over active peers one after another. This
  is ideal for soft re-starting peers. As this is a two tier experience
  (start/stop, setup/release) the loop must be run twice. This is
  controlled by a more rigid re-definition of how to use the `poolMode`
  flag.

* Mitigate RLP serialiser deficiency

why:
  Currently, serialising the `BlockBody` in not conevrtible and need
  to be checked in the `eth` module. Currently a local fix for the
  wire protocol applies. Unit tests will stay (after this local solution
  will have been removed.)

* Code cosmetics and massage

details:
  Main part is `types.toStr()` as a unified function for logging block
  numbers.

* Allow to use a logical genesis replacement (start of history)

why:
  Snap sync will set up an arbitrary pivot at a block number different
  from zero. In fact, the higher the block number the better.

details:
  A non-genesis start of history will currently only affect the score
  values which were derived from the difficulty.

* Provide function to store the snap pivot block header in chain db

why:
  Together with the start of history facility, this allows to proceed
  with full syncing once snap has finished.

details:
  Snap db storage was switched from a sub-tables to the flat chain db.

* Provide database completeness and sanity checker

details:
  For debugging on smaller databases, only

* Implement snap -> full sync switch
This commit is contained in:
Jordan Hrycaj 2023-04-14 23:28:57 +01:00 committed by GitHub
parent 3eafb15d7c
commit 0a3bc102eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 927 additions and 405 deletions

View File

@ -72,10 +72,15 @@ type
syncReqNewHead: SyncReqNewHeadCB
pow: PowRef ##\
startOfHistory: Hash256
## This setting is needed for resuming blockwise syncying after
## installing a snapshot pivot. The default value for this field is
## `GENESIS_PARENT_HASH` to start at the very beginning.
pow: PowRef
## Wrapper around `hashimotoLight()` and lookup cache
poa: Clique ##\
poa: Clique
## For non-PoA networks this descriptor is ignored.
pos: CasperRef
@ -154,6 +159,9 @@ proc init(com : CommonRef,
com.pow = PowRef.new
com.pos = CasperRef.new
# By default, history begins at genesis.
com.startOfHistory = GENESIS_PARENT_HASH
proc getTd(com: CommonRef, blockHash: Hash256): Option[DifficultyInt] =
var td: DifficultyInt
if not com.db.getTd(blockHash, td):
@ -348,15 +356,20 @@ proc syncReqNewHead*(com: CommonRef; header: BlockHeader)
# ------------------------------------------------------------------------------
# Getters
# ------------------------------------------------------------------------------
proc poa*(com: CommonRef): Clique =
func startOfHistory*(com: CommonRef): Hash256 =
## Getter
com.startOfHistory
func poa*(com: CommonRef): Clique =
## Getter
com.poa
proc pow*(com: CommonRef): PowRef =
func pow*(com: CommonRef): PowRef =
## Getter
com.pow
proc pos*(com: CommonRef): CasperRef =
func pos*(com: CommonRef): CasperRef =
## Getter
com.pos
@ -441,6 +454,10 @@ proc `syncCurrent=`*(com: CommonRef, number: BlockNumber) =
proc `syncHighest=`*(com: CommonRef, number: BlockNumber) =
com.syncProgress.highest = number
proc `startOfHistory=`*(com: CommonRef, val: Hash256) =
## Setter
com.startOfHistory = val
proc setTTD*(com: CommonRef, ttd: Option[DifficultyInt]) =
## useful for testing
com.config.terminalTotalDifficulty = ttd

View File

@ -116,7 +116,8 @@ proc persistBlocksImpl(c: ChainRef; headers: openArray[BlockHeader];
return ValidationResult.Error
if NoPersistHeader notin flags:
discard c.db.persistHeaderToDb(header, c.com.consensus == ConsensusType.POS)
discard c.db.persistHeaderToDb(
header, c.com.consensus == ConsensusType.POS, c.com.startOfHistory)
if NoSaveTxs notin flags:
discard c.db.persistTransactions(header.blockNumber, body.transactions)
@ -141,7 +142,7 @@ proc insertBlockWithoutSetHead*(c: ChainRef, header: BlockHeader,
result = c.persistBlocksImpl(
[header], [body], {NoPersistHeader, NoSaveReceipts})
if result == ValidationResult.OK:
c.db.persistHeaderToDbWithoutSetHead(header)
c.db.persistHeaderToDbWithoutSetHead(header, c.com.startOfHistory)
proc setCanonical*(c: ChainRef, header: BlockHeader): ValidationResult
{.gcsafe, raises: [CatchableError].} =

View File

@ -407,16 +407,20 @@ proc getReceipts*(db: ChainDBRef; receiptRoot: Hash256): seq[Receipt] =
receipts.add(r)
return receipts
proc persistHeaderToDb*(db: ChainDBRef; header: BlockHeader,
forceCanonical: bool): seq[BlockHeader] =
let isGenesis = header.parentHash == GENESIS_PARENT_HASH
proc persistHeaderToDb*(
db: ChainDBRef;
header: BlockHeader;
forceCanonical: bool;
startOfHistory = GENESIS_PARENT_HASH;
): seq[BlockHeader] =
let isStartOfHistory = header.parentHash == startOfHistory
let headerHash = header.blockHash
if not isGenesis and not db.headerExists(header.parentHash):
if not isStartOfHistory and not db.headerExists(header.parentHash):
raise newException(ParentNotFound, "Cannot persist block header " &
$headerHash & " with unknown parent " & $header.parentHash)
db.db.put(genericHashKey(headerHash).toOpenArray, rlp.encode(header))
let score = if isGenesis: header.difficulty
let score = if isStartOfHistory: header.difficulty
else: db.getScore(header.parentHash) + header.difficulty
db.db.put(blockHashToScoreKey(headerHash).toOpenArray, rlp.encode(score))
@ -431,10 +435,14 @@ proc persistHeaderToDb*(db: ChainDBRef; header: BlockHeader,
if score > headScore or forceCanonical:
return db.setAsCanonicalChainHead(headerHash)
proc persistHeaderToDbWithoutSetHead*(db: ChainDBRef; header: BlockHeader) =
let isGenesis = header.parentHash == GENESIS_PARENT_HASH
proc persistHeaderToDbWithoutSetHead*(
db: ChainDBRef;
header: BlockHeader;
startOfHistory = GENESIS_PARENT_HASH;
) =
let isStartOfHistory = header.parentHash == startOfHistory
let headerHash = header.blockHash
let score = if isGenesis: header.difficulty
let score = if isStartOfHistory: header.difficulty
else: db.getScore(header.parentHash) + header.difficulty
db.db.put(blockHashToScoreKey(headerHash).toOpenArray, rlp.encode(score))

View File

@ -11,7 +11,7 @@
{.push raises: [].}
import
eth/[common, p2p],
eth/p2p,
chronicles,
chronos,
stew/[interval_set, sorted_set],
@ -89,9 +89,9 @@ proc runStop(buddy: FullBuddyRef) =
tracerFrameBuddy("runStop", buddy):
worker.stop(buddy)
proc runPool(buddy: FullBuddyRef; last: bool): bool =
proc runPool(buddy: FullBuddyRef; last: bool; laps: int): bool =
tracerFrameBuddy("runPool", buddy):
result = worker.runPool(buddy, last)
result = worker.runPool(buddy, last, laps)
proc runSingle(buddy: FullBuddyRef) {.async.} =
tracerFrameBuddy("runSingle", buddy):

View File

@ -11,10 +11,9 @@
{.push raises:[].}
import
std/[options],
chronicles,
chronos,
eth/[common, p2p],
eth/p2p,
".."/[protocol, sync_desc],
../misc/[best_pivot, block_queue, sync_ctrl],
"."/[ticker, worker_desc]
@ -367,16 +366,17 @@ proc runSingle*(buddy: FullBuddyRef) {.async.} =
await sleepAsync napping
proc runPool*(buddy: FullBuddyRef; last: bool): bool =
proc runPool*(buddy: FullBuddyRef; last: bool; laps: int): bool =
## Once started, the function `runPool()` is called for all worker peers in
## sequence as the body of an iteration as long as the function returns
## `false`. There will be no other worker peer functions activated
## simultaneously.
##
## This procedure is started if the global flag `buddy.ctx.poolMode` is set
## `true` (default is `false`.) It is the responsibility of the `runPool()`
## instance to reset the flag `buddy.ctx.poolMode`, typically at the first
## peer instance.
## `true` (default is `false`.) It will be automatically reset before the
## the loop starts. Re-setting it again results in repeating the loop. The
## argument `lap` (starting with `0`) indicated the currend lap of the
## repeated loops.
##
## The argument `last` is set `true` if the last entry is reached.
##
@ -384,10 +384,7 @@ proc runPool*(buddy: FullBuddyRef; last: bool): bool =
##
# Mind the gap, fill in if necessary (function is peer independent)
buddy.only.bQueue.blockQueueGrout()
# Stop after running once regardless of peer
buddy.ctx.poolMode = false
true
true # Stop after running once regardless of peer
proc runMulti*(buddy: FullBuddyRef) {.async.} =
## This peer worker is invoked if the `buddy.ctrl.multiOk` flag is set

View File

@ -380,9 +380,8 @@ proc pivotNegotiate*(
if rx.isOk:
bp.global.trusted.incl peer
when extraTraceMessages:
let bestHeader {.used.} =
if bp.header.isSome: "#" & $bp.header.get.blockNumber
else: "nil"
let bestHeader {.used.} = if bp.header.isNone: "n/a"
else: bp.header.unsafeGet.blockNumber.toStr
trace "Accepting peer", peer, trusted=bp.global.trusted.len,
untrusted=bp.global.untrusted.len, runState=bp.ctrl.state,
bestHeader
@ -397,9 +396,8 @@ proc pivotNegotiate*(
if bp.global.trusted.len == 0:
bp.global.trusted.incl peer
when extraTraceMessages:
let bestHeader {.used.} =
if bp.header.isSome: "#" & $bp.header.get.blockNumber
else: "nil"
let bestHeader {.used.} = if bp.header.isNone: "n/a"
else: bp.header.unsafeGet.blockNumber.toStr
trace "Assume initial trusted peer", peer,
trusted=bp.global.trusted.len, runState=bp.ctrl.state, bestHeader
return false
@ -465,9 +463,8 @@ proc pivotNegotiate*(
# Evaluate status, finally
if bp.global.minPeers <= bp.global.trusted.len:
when extraTraceMessages:
let bestHeader {.used.} =
if bp.header.isSome: "#" & $bp.header.get.blockNumber
else: "nil"
let bestHeader {.used.} = if bp.header.isNone: "n/a"
else: bp.header.unsafeGet.blockNumber.toStr
trace "Peer trusted now", peer,
trusted=bp.global.trusted.len, runState=bp.ctrl.state, bestHeader
return true

View File

@ -55,19 +55,18 @@
## would have no effect. In that case, the record with the largest block
## numbers are deleted from the `<staged>` list.
##
{.push raises:[].}
import
std/[algorithm, options, sequtils, strutils],
chronicles,
chronos,
eth/[common, p2p],
eth/p2p,
stew/[byteutils, interval_set, sorted_set],
../../db/db_chain,
../../utils/utils,
".."/[protocol, sync_desc, types]
{.push raises:[].}
logScope:
topics = "block-queue"
@ -134,7 +133,10 @@ type
nStagedQueue*: int
reOrg*: bool
let
const
extraTraceMessages = false or true
## Enabled additional logging noise
highBlockNumber = high(BlockNumber)
highBlockRange = BlockRange.new(highBlockNumber,highBlockNumber)
@ -163,23 +165,19 @@ proc reduce(ivSet: BlockRangeSetRef; wi: BlockItemRef): Uint256 =
# ---------------
proc pp(n: BlockNumber): string =
## Dedicated pretty printer (`$` is defined elsewhere using `UInt256`)
if n == highBlockNumber: "high" else:"#" & $n
proc `$`(iv: BlockRange): string =
## Needed for macro generated DSL files like `snap.nim` because the
## `distinct` flavour of `NodeTag` is discarded there.
result = "[" & iv.minPt.pp
result = "[" & iv.minPt.toStr
if iv.minPt != iv.maxPt:
result &= "," & iv.maxPt.pp
result &= "," & iv.maxPt.toStr
result &= "]"
proc `$`(n: Option[BlockRange]): string =
if n.isNone: "n/a" else: $n.get
proc `$`(n: Option[BlockNumber]): string =
if n.isNone: "n/a" else: n.get.pp
n.toStr
proc `$`(brs: BlockRangeSetRef): string =
"{" & toSeq(brs.increasing).mapIt($it).join(",") & "}"
@ -227,6 +225,8 @@ proc newWorkItem(qd: BlockQueueWorkerRef): Result[BlockItemRef,BlockQueueRC] =
# Check whether there is somthing to do at all
if qd.bestNumber.isNone or
qd.bestNumber.unsafeGet < rc.value.minPt:
when extraTraceMessages:
trace "no new work item", bestNumer=qd.bestNumber.toStr, range=rc.value
return err(NoMorePeerBlocks) # no more data for this peer
# Compute interval

View File

@ -46,6 +46,16 @@ const
maxReceiptsFetch* = 256
maxHeadersFetch* = 192
# Kludge, should be fixed in `eth/common/eth_types_rlp.append()`
proc ethAppend*(w: var RlpWriter, b: BlockBody) =
w.startList 2 + b.withdrawals.isSome.ord # <--- this line was missing
w.append(b.transactions)
w.append(b.uncles)
if b.withdrawals.isSome:
w.append(b.withdrawals.unsafeGet)
proc notImplemented(name: string) =
debug "Method not implemented", meth = name

View File

@ -76,6 +76,10 @@ const
trEthSendNewBlockHashes* =
">> " & prettyEthProtoName & " Sending NewBlockHashes"
# Kludge, should be fixed in `eth/common/eth_types_rlp`
proc append(w: var RlpWriter, b: BlockBody) =
w.ethAppend b
p2pProtocol eth66(version = ethVersion,
rlpxName = "eth",
peerState = EthPeerState,

View File

@ -76,6 +76,10 @@ const
trEthSendNewBlockHashes* =
">> " & prettyEthProtoName & " Sending NewBlockHashes"
# Kludge, should be fixed in `eth/common/eth_types_rlp`
proc append(w: var RlpWriter, b: BlockBody) =
w.ethAppend b
p2pProtocol eth67(version = ethVersion,
rlpxName = "eth",
peerState = EthPeerState,

View File

@ -11,9 +11,9 @@
{.push raises: [].}
import
eth/[common, p2p],
chronicles,
chronos,
eth/p2p,
../db/select_backend,
../core/chain,
./snap/[worker, worker_desc],
@ -90,9 +90,9 @@ proc runStop(buddy: SnapBuddyRef) =
tracerFrameBuddy("runStop", buddy):
worker.stop(buddy)
proc runPool(buddy: SnapBuddyRef; last: bool): bool =
proc runPool(buddy: SnapBuddyRef; last: bool; laps: int): bool =
tracerFrameBuddy("runPool", buddy):
result = worker.runPool(buddy, last)
result = worker.runPool(buddy, last=last, laps=laps)
proc runSingle(buddy: SnapBuddyRef) {.async.} =
tracerFrameBuddy("runSingle", buddy):

View File

@ -16,10 +16,10 @@ import
eth/p2p,
stew/[interval_set, keyed_queue],
"../.."/[common, db/select_backend],
".."/[handlers/eth, protocol, sync_desc],
./worker/[pivot, play, ticker],
../sync_desc,
./worker/[play, ticker],
./worker/com/com_error,
./worker/db/[snapdb_desc, snapdb_pivot],
./worker/db/snapdb_desc,
"."/[range_desc, worker_desc]
logScope:
@ -39,56 +39,20 @@ template ignoreException(info: static[string]; code: untyped) =
# Private functions
# ------------------------------------------------------------------------------
proc disableWireServices(ctx: SnapCtxRef) =
## Helper for `setup()`: Temporarily stop useless wire protocol services.
ctx.ethWireCtx.txPoolEnabled = false
proc enableWireServices(ctx: SnapCtxRef) =
## Helper for `release()`
ctx.ethWireCtx.txPoolEnabled = true
# --------------
proc enableTicker(ctx: SnapCtxRef; tickerOK: bool) =
## Helper for `setup()`: Log/status ticker
proc setupTicker(ctx: SnapCtxRef; tickerOK: bool) =
let blindTicker = proc: TickerSnapStats =
discard
if tickerOK:
ctx.pool.ticker = TickerRef.init(ctx.pool.pivotTable.tickerStats(ctx))
else:
trace "Ticker is disabled"
ctx.pool.ticker = TickerRef.init(blindTicker)
proc disableTicker(ctx: SnapCtxRef) =
proc releaseTicker(ctx: SnapCtxRef) =
## Helper for `release()`
if not ctx.pool.ticker.isNil:
ctx.pool.ticker.stop()
ctx.pool.ticker = nil
ctx.pool.ticker.stop()
ctx.pool.ticker = nil
# --------------
proc enableRpcMagic(ctx: SnapCtxRef) =
## Helper for `setup()`: Enable external pivot update via RPC
ctx.chain.com.syncReqNewHead = ctx.pivotUpdateBeaconHeaderCB
proc disableRpcMagic(ctx: SnapCtxRef) =
## Helper for `release()`
ctx.chain.com.syncReqNewHead = nil
# --------------
proc detectSnapSyncRecovery(ctx: SnapCtxRef) =
## Helper for `setup()`: Initiate snap sync recovery (if any)
let rc = ctx.pool.snapDb.pivotRecoverDB()
if rc.isOk:
ctx.pool.recovery = SnapRecoveryRef(state: rc.value)
ctx.daemon = true
# Set up early initial pivot
ctx.pool.pivotTable.reverseUpdate(ctx.pool.recovery.state.header, ctx)
trace "Snap sync recovery started",
checkpoint=("#" & $ctx.pool.pivotTable.topNumber() & "(0)")
if not ctx.pool.ticker.isNil:
ctx.pool.ticker.startRecovery()
proc initSnapDb(ctx: SnapCtxRef) =
proc setupSnapDb(ctx: SnapCtxRef) =
## Helper for `setup()`: Initialise snap sync database layer
ctx.pool.snapDb =
if ctx.pool.dbBackend.isNil: SnapDbRef.init(ctx.chain.db.db)
@ -100,16 +64,12 @@ proc initSnapDb(ctx: SnapCtxRef) =
proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool =
## Global set up
ctx.playSetup() # Set up sync sub-mode specs.
ctx.setupSnapDb() # Set database backend, subject to change
ctx.setupTicker(tickerOK) # Start log/status ticker (if any)
# For snap sync book keeping
ctx.pool.coveredAccounts = NodeTagRangeSet.init()
ctx.enableRpcMagic() # Allow external pivot update via RPC
ctx.disableWireServices() # Stop unwanted public services
ctx.pool.syncMode.playInit() # Set up sync sub-mode specs.
ctx.initSnapDb() # Set database backend, subject to change
ctx.detectSnapSyncRecovery() # Check for recovery mode
ctx.enableTicker(tickerOK) # Start log/status ticker (if any)
ignoreException("setup"):
ctx.playMethod.setup(ctx)
# Experimental, also used for debugging
if ctx.exCtrlFile.isSome:
@ -119,29 +79,26 @@ proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool =
proc release*(ctx: SnapCtxRef) =
## Global clean up
ctx.disableTicker() # Stop log/status ticker (if any)
ctx.enableWireServices() # re-enable public services
ctx.disableRpcMagic() # Disable external pivot update via RPC
ignoreException("release"):
ctx.playMethod.release(ctx)
ctx.releaseTicker() # Stop log/status ticker (if any)
ctx.playRelease() # Shut down sync methods
proc start*(buddy: SnapBuddyRef): bool =
## Initialise worker peer
let
ctx = buddy.ctx
peer = buddy.peer
if peer.supports(protocol.snap) and
peer.supports(protocol.eth) and
peer.state(protocol.eth).initialized:
buddy.only.errors = ComErrorStatsRef()
if not ctx.pool.ticker.isNil:
ctx.pool.ticker.startBuddy()
return true
let ctx = buddy.ctx
ignoreException("start"):
if ctx.playMethod.start(buddy):
buddy.only.errors = ComErrorStatsRef()
return true
proc stop*(buddy: SnapBuddyRef) =
## Clean up this peer
let ctx = buddy.ctx
if not ctx.pool.ticker.isNil:
ctx.pool.ticker.stopBuddy()
ignoreException("stop"):
ctx.playMethod.stop(buddy)
# ------------------------------------------------------------------------------
# Public functions, sync handler multiplexers
@ -150,22 +107,22 @@ proc stop*(buddy: SnapBuddyRef) =
proc runDaemon*(ctx: SnapCtxRef) {.async.} =
## Sync processsing multiplexer
ignoreException("runDaemon"):
await ctx.playSyncSpecs.daemon(ctx)
await ctx.playMethod.daemon(ctx)
proc runSingle*(buddy: SnapBuddyRef) {.async.} =
## Sync processsing multiplexer
ignoreException("runSingle"):
await buddy.ctx.playSyncSpecs.single(buddy)
await buddy.ctx.playMethod.single(buddy)
proc runPool*(buddy: SnapBuddyRef, last: bool): bool =
proc runPool*(buddy: SnapBuddyRef, last: bool; laps: int): bool =
## Sync processsing multiplexer
ignoreException("runPool"):
result = buddy.ctx.playSyncSpecs.pool(buddy,last)
result = buddy.ctx.playMethod.pool(buddy,last,laps)
proc runMulti*(buddy: SnapBuddyRef) {.async.} =
## Sync processsing multiplexer
ignoreException("runMulti"):
await buddy.ctx.playSyncSpecs.multi(buddy)
await buddy.ctx.playMethod.multi(buddy)
# ------------------------------------------------------------------------------
# End

View File

@ -11,7 +11,6 @@
{.push raises: [].}
import
std/options,
chronos,
eth/[common, p2p],
stew/byteutils,
@ -43,7 +42,7 @@ proc getBlockHeader*(
skip: 0,
reverse: false)
trace trEthSendSendingGetBlockHeaders, peer, header=("#" & $num), reqLen
trace trEthSendSendingGetBlockHeaders, peer, header=num.toStr, reqLen
var hdrResp: Option[blockHeadersObj]
try:

View File

@ -12,7 +12,7 @@
{.push raises: [].}
import
std/[options, sequtils],
std/sequtils,
chronos,
eth/[common, p2p],
stew/interval_set,

View File

@ -11,7 +11,7 @@
{.push raises: [].}
import
std/[options, sequtils],
std/sequtils,
chronos,
eth/[common, p2p],
"../../.."/[protocol, protocol/trace_config],

View File

@ -135,7 +135,7 @@ template collectLeafs(
# The following logic might be sub-optimal. A strict version of the
# `next()` function that stops with an error at dangling links could
# be faster if the leaf nodes are not too far apart on the hexary trie.
var
let
xPath = block:
let rx = nodeTag.hexaryPath(rootKey,db).hexaryNearbyRight(db)
if rx.isErr:

View File

@ -155,11 +155,26 @@ proc kvDb*(pv: SnapDbRef): TrieDatabaseRef =
# Public functions, select sub-tables for persistent storage
# ------------------------------------------------------------------------------
proc toAccountsKey*(a: NodeKey): ByteArray33 =
a.ByteArray32.snapSyncAccountKey.data
proc toBlockHeaderKey*(a: Hash256): ByteArray33 =
a.genericHashKey.data
proc toStorageSlotsKey*(a: NodeKey): ByteArray33 =
a.ByteArray32.snapSyncStorageSlotKey.data
proc toBlockNumberKey*(a: BlockNumber): ByteArray33 =
static:
doAssert 32 == sizeof BlockNumber # needed in `blockNumberToHashKey()`
a.blockNumberToHashKey.data
when false:
proc toAccountsKey*(a: NodeKey): ByteArray33 =
a.ByteArray32.snapSyncAccountKey.data
proc toStorageSlotsKey*(a: NodeKey): ByteArray33 =
a.ByteArray32.snapSyncStorageSlotKey.data
else:
proc toAccountsKey*(a: NodeKey): ByteArray32 =
a.ByteArray32
proc toStorageSlotsKey*(a: NodeKey): ByteArray32 =
a.ByteArray32
proc toStateRootKey*(a: NodeKey): ByteArray33 =
a.ByteArray32.snapSyncStateRootKey.data

View File

@ -13,7 +13,7 @@
import
std/[algorithm, tables],
chronicles,
eth/[common, rlp, trie/db],
eth/[common, trie/db],
../../../../db/kvstore_rocksdb,
../../range_desc,
"."/[hexary_desc, hexary_error, rocky_bulk_load, snapdb_desc]
@ -63,10 +63,10 @@ proc convertTo(key: RepairKey; T: type NodeTag): T =
## Might be lossy, check before use
UInt256.fromBytesBE(key.ByteArray33[1 .. 32]).T
proc toAccountsKey(a: RepairKey): ByteArray33 =
proc toAccountsKey(a: RepairKey): auto =
a.convertTo(NodeKey).toAccountsKey
proc toStorageSlotsKey(a: RepairKey): ByteArray33 =
proc toStorageSlotsKey(a: RepairKey): auto =
a.convertTo(NodeKey).toStorageSlotsKey
proc stateRootGet*(db: TrieDatabaseRef; nodeKey: Nodekey): Blob =
@ -107,9 +107,21 @@ proc persistentStateRootGet*(
# Public functions: store/put
# ------------------------------------------------------------------------------
proc persistentBlockHeaderPut*(
db: TrieDatabaseRef;
hdr: BlockHeader;
) =
## Store a single header. This function is intended to finalise snap sync
## with storing a universal pivot header not unlike genesis.
let hashKey = hdr.blockHash
db.TrieDatabaseRef.put( # see `nimbus/db/db_chain.db()`
hashKey.toBlockHeaderKey.toOpenArray, rlp.encode(hdr))
db.TrieDatabaseRef.put(
hdr.blockNumber.toBlockNumberKey.toOpenArray, rlp.encode(hashKey))
proc persistentAccountsPut*(
db: HexaryTreeDbRef;
base: TrieDatabaseRef
base: TrieDatabaseRef;
): Result[void,HexaryError] =
## Bulk store using transactional `put()`
let dbTx = base.beginTransaction
@ -125,7 +137,7 @@ proc persistentAccountsPut*(
proc persistentStorageSlotsPut*(
db: HexaryTreeDbRef;
base: TrieDatabaseRef
base: TrieDatabaseRef;
): Result[void,HexaryError] =
## Bulk store using transactional `put()`
let dbTx = base.beginTransaction

View File

@ -14,9 +14,9 @@ import
std/[math, sets, sequtils],
chronicles,
chronos,
eth/[common, p2p, trie/trie_defs],
eth/[p2p, trie/trie_defs],
stew/[interval_set, keyed_queue, sorted_set],
../../sync_desc,
"../.."/[sync_desc, types],
".."/[constants, range_desc, worker_desc],
./db/[hexary_error, snapdb_accounts, snapdb_pivot],
./pivot/[heal_accounts, heal_storage_slots,
@ -33,6 +33,20 @@ const
proc pivotMothball*(env: SnapPivotRef) {.gcsafe.}
# ------------------------------------------------------------------------------
# Private helpers, logging
# ------------------------------------------------------------------------------
template logTxt(info: static[string]): static[string] =
"Pivot " & info
template ignExceptionOops(info: static[string]; code: untyped) =
try:
code
except CatchableError as e:
trace logTxt "Ooops", `info`=info, name=($e.name), msg=(e.msg)
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
@ -186,6 +200,12 @@ proc tickerStats*(
# Public functions: particular pivot
# ------------------------------------------------------------------------------
proc pivotCompleteOk*(env: SnapPivotRef): bool =
## Returns `true` iff the pivot covers a complete set of accounts ans
## storage slots.
env.fetchAccounts.processed.isFull and env.storageQueueTotal() == 0
proc pivotMothball*(env: SnapPivotRef) =
## Clean up most of this argument `env` pivot record and mark it `archived`.
## Note that archived pivots will be checked for swapping in already known
@ -218,6 +238,9 @@ proc execSnapSyncAction*(
let
ctx = buddy.ctx
if env.savedFullPivotOk:
return # no need to do anything
block:
# Clean up storage slots queue first it it becomes too large
let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len
@ -274,6 +297,9 @@ proc saveCheckpoint*(
## Save current sync admin data. On success, the size of the data record
## saved is returned (e.g. for logging.)
##
if env.savedFullPivotOk:
return ok(0) # no need to do anything
let
fa = env.fetchAccounts
nStoQu = env.storageQueueTotal()
@ -287,7 +313,7 @@ proc saveCheckpoint*(
if accountsSaveStorageSlotsMax < nStoQu:
return err(TooManySlotAccounts)
ctx.pool.snapDb.pivotSaveDB SnapDbPivotRegistry(
result = ctx.pool.snapDb.pivotSaveDB SnapDbPivotRegistry(
header: env.stateHeader,
nAccounts: env.nAccounts,
nSlotLists: env.nSlotLists,
@ -297,6 +323,9 @@ proc saveCheckpoint*(
toSeq(env.fetchStoragePart.nextKeys)).mapIt(it.to(NodeKey)) &
toSeq(env.parkedStorage.items))
if result.isOk and env.pivotCompleteOk():
env.savedFullPivotOk = true
proc pivotRecoverFromCheckpoint*(
env: SnapPivotRef; # Current pivot environment
@ -339,7 +368,15 @@ proc pivotRecoverFromCheckpoint*(
env.storageQueueAppendFull(rc.value.storageRoot, w)
# Handle mothballed pivots for swapping in (see `pivotMothball()`)
if not topLevel:
if topLevel:
env.savedFullPivotOk = env.pivotCompleteOk()
when extraTraceMessages:
trace logTxt "recovered top level record",
pivot=env.stateHeader.blockNumber.toStr,
savedFullPivotOk=env.savedFullPivotOk,
processed=env.fetchAccounts.processed.fullPC3,
nStoQuTotal=env.storageQueueTotal()
else:
for kvp in env.fetchStorageFull.nextPairs:
let rc = env.storageAccounts.insert(kvp.data.accKey.to(NodeTag))
if rc.isOk:
@ -355,7 +392,6 @@ proc pivotApprovePeer*(buddy: SnapBuddyRef) {.async.} =
## it will not proceed to the next scheduler task.
let
ctx = buddy.ctx
peer {.used.} = buddy.peer
beaconHeader = ctx.pool.beaconHeader
var
pivotHeader: BlockHeader
@ -375,9 +411,9 @@ proc pivotApprovePeer*(buddy: SnapBuddyRef) {.async.} =
ctx.poolMode = true
when extraTraceMessages:
trace "New pivot from beacon chain", peer,
pivot=("#" & $pivotHeader.blockNumber),
beacon=("#" & $beaconHeader.blockNumber), poolMode=ctx.poolMode
trace logTxt "new pivot from beacon chain", peer=buddy.peer,
pivot=pivotHeader.blockNumber.toStr,
beacon=beaconHeader.blockNumber.toStr, poolMode=ctx.poolMode
discard ctx.pool.pivotTable.lruAppend(
beaconHeader.stateRoot, SnapPivotRef.init(ctx, beaconHeader),
@ -396,9 +432,151 @@ proc pivotUpdateBeaconHeaderCB*(ctx: SnapCtxRef): SyncReqNewHeadCB =
result = proc(h: BlockHeader) {.gcsafe.} =
if ctx.pool.beaconHeader.blockNumber < h.blockNumber:
# when extraTraceMessages:
# trace "External beacon info update", header=("#" & $h.blockNumber)
# trace logTxt "external beacon info update", header=h.blockNumber.toStr
ctx.pool.beaconHeader = h
# ------------------------------------------------------------------------------
# Public function, debugging
# ------------------------------------------------------------------------------
import
db/[hexary_desc, hexary_inspect, hexary_nearby, hexary_paths,
snapdb_storage_slots]
const
pivotVerifyExtraBlurb = false # or true
inspectSuspendAfter = 10_000
inspectExtraNap = 100.milliseconds
proc pivotVerifyComplete*(
env: SnapPivotRef; # Current pivot environment
ctx: SnapCtxRef; # Some global context
inspectAccountsTrie = false; # Check for dangling links
walkAccountsDB = true; # Walk accounts db
inspectSlotsTries = true; # Check dangling links (if `walkAccountsDB`)
): Future[bool]
{.async,discardable.} =
## Check the database whether the pivot is complete -- not advidsed on a
## production system as the process takes a lot of ressources.
let
rootKey = env.stateHeader.stateRoot.to(NodeKey)
accFn = ctx.pool.snapDb.getAccountFn
# Verify consistency of accounts trie database. This should not be needed
# if `walkAccountsDB` is set. In case that there is a dangling link that would
# have been detected by `hexaryInspectTrie()`, the `hexaryNearbyRight()`
# function should fail at that point as well.
if inspectAccountsTrie:
var
stats = accFn.hexaryInspectTrie(rootKey,
suspendAfter=inspectSuspendAfter,
maxDangling=1)
nVisited = stats.count
nRetryCount = 0
while stats.dangling.len == 0 and not stats.resumeCtx.isNil:
when pivotVerifyExtraBlurb:
trace logTxt "accounts db inspect ..", nVisited, nRetryCount
await sleepAsync inspectExtraNap
nRetryCount.inc
stats = accFn.hexaryInspectTrie(rootKey,
resumeCtx=stats.resumeCtx,
suspendAfter=inspectSuspendAfter,
maxDangling=1)
nVisited += stats.count
# End while
if stats.dangling.len != 0:
error logTxt "accounts trie has danglig links", nVisited, nRetryCount
return false
trace logTxt "accounts trie ok", nVisited, nRetryCount
# End `if inspectAccountsTrie`
# Visit accounts and make sense of storage slots
if walkAccountsDB:
var
nAccounts = 0
nStorages = 0
nRetryTotal = 0
nodeTag = low(NodeTag)
while true:
if (nAccounts mod inspectSuspendAfter) == 0 and 0 < nAccounts:
when pivotVerifyExtraBlurb:
trace logTxt "accounts db walk ..",
nAccounts, nStorages, nRetryTotal, inspectSlotsTries
await sleepAsync inspectExtraNap
# Find next account key => `nodeTag`
let rc = nodeTag.hexaryPath(rootKey,accFn).hexaryNearbyRight(accFn)
if rc.isErr:
if rc.error == NearbyBeyondRange:
break # No more accounts
error logTxt "accounts db problem", nodeTag,
nAccounts, nStorages, nRetryTotal, inspectSlotsTries,
error=rc.error
return false
nodeTag = rc.value.getPartialPath.convertTo(NodeKey).to(NodeTag)
nAccounts.inc
# Decode accounts data
var accData: Account
try:
accData = rc.value.leafData.decode(Account)
except RlpError as e:
error logTxt "account data problem", nodeTag,
nAccounts, nStorages, nRetryTotal, inspectSlotsTries,
name=($e.name), msg=(e.msg)
return false
# Check for storage slots for this account
if accData.storageRoot != emptyRlpHash:
nStorages.inc
if inspectSlotsTries:
let
slotFn = ctx.pool.snapDb.getStorageSlotsFn(nodeTag.to(NodeKey))
stoKey = accData.storageRoot.to(NodeKey)
var
stats = slotFn.hexaryInspectTrie(stoKey,
suspendAfter=inspectSuspendAfter,
maxDangling=1)
nVisited = stats.count
nRetryCount = 0
while stats.dangling.len == 0 and not stats.resumeCtx.isNil:
when pivotVerifyExtraBlurb:
trace logTxt "storage slots inspect ..", nodeTag,
nAccounts, nStorages, nRetryTotal, inspectSlotsTries,
nVisited, nRetryCount
await sleepAsync inspectExtraNap
nRetryCount.inc
nRetryTotal.inc
stats = accFn.hexaryInspectTrie(stoKey,
resumeCtx=stats.resumeCtx,
suspendAfter=inspectSuspendAfter,
maxDangling=1)
nVisited += stats.count
if stats.dangling.len != 0:
error logTxt "storage slots trie has dangling link", nodeTag,
nAccounts, nStorages, nRetryTotal, inspectSlotsTries,
nVisited, nRetryCount
return false
if nVisited == 0:
error logTxt "storage slots trie is empty", nodeTag,
nAccounts, nStorages, nRetryTotal, inspectSlotsTries,
nVisited, nRetryCount
return false
# Set up next node key for looping
if nodeTag == high(NodeTag):
break
nodeTag = nodeTag + 1.u256
# End while
trace logTxt "accounts db walk ok",
nAccounts, nStorages, nRetryTotal, inspectSlotsTries
# End `if walkAccountsDB`
return true
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -85,7 +85,7 @@ proc healingCtx(
): string =
let ctx = buddy.ctx
"{" &
"piv=" & "#" & $env.stateHeader.blockNumber & "," &
"piv=" & env.stateHeader.blockNumber.toStr & "," &
"ctl=" & $buddy.ctrl.state & "," &
"nAccounts=" & $env.nAccounts & "," &
("covered=" & $env.fetchAccounts.processed & "/" &
@ -156,7 +156,7 @@ proc getNodesFromNetwork(
let
peer {.used.} = buddy.peer
rootHash = env.stateHeader.stateRoot
pivot = "#" & $env.stateHeader.blockNumber # for logging
pivot = env.stateHeader.blockNumber.toStr # for logging in `getTrieNodes()`
# Initalise for fetching nodes from the network via `getTrieNodes()`
var

View File

@ -86,7 +86,7 @@ proc healingCtx(
env: SnapPivotRef;
): string {.used.} =
"{" &
"piv=" & "#" & $env.stateHeader.blockNumber & "," &
"piv=" & env.stateHeader.blockNumber.toStr & "," &
"ctl=" & $buddy.ctrl.state & "," &
"nStoQu=" & $env.storageQueueTotal() & "," &
"nQuPart=" & $env.fetchStoragePart.len & "," &
@ -99,7 +99,7 @@ proc healingCtx(
env: SnapPivotRef;
): string =
"{" &
"piv=" & "#" & $env.stateHeader.blockNumber & "," &
"piv=" & env.stateHeader.blockNumber.toStr & "," &
"ctl=" & $buddy.ctrl.state & "," &
"processed=" & $kvp.data.slots.processed & "," &
"nStoQu=" & $env.storageQueueTotal() & "," &
@ -171,7 +171,7 @@ proc getNodesFromNetwork(
peer {.used.} = buddy.peer
accPath = kvp.data.accKey.to(Blob)
rootHash = env.stateHeader.stateRoot
pivot = "#" & $env.stateHeader.blockNumber # for logging
pivot = env.stateHeader.blockNumber.toStr # for logging in `getTrieNodes()`
# Initalise for fetching nodes from the network via `getTrieNodes()`
var

View File

@ -48,9 +48,7 @@ import
chronos,
eth/[common, p2p],
stew/[interval_set, keyed_queue],
stint,
../../../../utils/prettify,
../../../sync_desc,
"../../.."/[sync_desc, types],
"../.."/[constants, range_desc, worker_desc],
../com/[com_error, get_account_range],
../db/[hexary_envelope, snapdb_accounts],
@ -81,7 +79,7 @@ proc fetchCtx(
env: SnapPivotRef;
): string {.used.} =
"{" &
"piv=" & "#" & $env.stateHeader.blockNumber & "," &
"piv=" & env.stateHeader.blockNumber.toStr & "," &
"ctl=" & $buddy.ctrl.state & "," &
"nStoQu=" & $env.storageQueueTotal() & "," &
"nSlotLists=" & $env.nSlotLists & "}"
@ -129,7 +127,7 @@ proc accountsRangefetchImpl(
# Process received accounts and stash storage slots to fetch later
let dd = block:
let
pivot = "#" & $env.stateHeader.blockNumber
pivot = env.stateHeader.blockNumber.toStr
rc = await buddy.getAccountRange(stateRoot, iv, pivot)
if rc.isErr:
fa.unprocessed.mergeSplit iv # fail => interval back to pool

View File

@ -68,10 +68,9 @@ import
std/sets,
chronicles,
chronos,
eth/[common, p2p],
eth/p2p,
stew/[interval_set, keyed_queue],
stint,
../../../sync_desc,
"../../.."/[sync_desc, types],
"../.."/[constants, range_desc, worker_desc],
../com/[com_error, get_storage_ranges],
../db/[hexary_error, snapdb_storage_slots],
@ -95,7 +94,7 @@ proc fetchCtx(
env: SnapPivotRef;
): string =
"{" &
"piv=" & "#" & $env.stateHeader.blockNumber & "," &
"piv=" & env.stateHeader.blockNumber.toStr & "," &
"ctl=" & $buddy.ctrl.state & "," &
"nQuFull=" & $env.fetchStorageFull.len & "," &
"nQuPart=" & $env.fetchStoragePart.len & "," &
@ -118,7 +117,7 @@ proc fetchStorageSlotsImpl(
ctx = buddy.ctx
peer = buddy.peer
stateRoot = env.stateHeader.stateRoot
pivot = "#" & $env.stateHeader.blockNumber # for logging
pivot = env.stateHeader.blockNumber.toStr # logging in `getStorageRanges()`
# Get storages slots data from the network
var stoRange = block:

View File

@ -257,7 +257,7 @@ proc swapInAccounts*(
return # nothing to do
let
pivot {.used.} = "#" & $env.stateHeader.blockNumber # Logging & debugging
pivot {.used.} = env.stateHeader.blockNumber.toStr # Logging & debugging
rootKey = env.stateHeader.stateRoot.to(NodeKey)
getFn = ctx.pool.snapDb.getAccountFn

View File

@ -9,17 +9,18 @@
import
../worker_desc,
./play/[play_desc, play_full_sync, play_prep_full, play_snap_sync]
./play/[play_desc, play_full_sync, play_snap_sync]
export
PlaySyncSpecs,
playSyncSpecs,
`playMode=`
playMethod
proc playInit*(desc: var SnapSyncSpecs) =
proc playSetup*(ctx: SnapCtxRef) =
## Set up sync mode specs table. This cannot be done at compile time.
desc.tab[SnapSyncMode] = playSnapSyncSpecs()
desc.tab[PreFullSyncMode] = playPrepFullSpecs()
desc.tab[FullSyncMode] = playFullSyncSpecs()
ctx.pool.syncMode.tab[SnapSyncMode] = playSnapSyncSpecs()
ctx.pool.syncMode.tab[FullSyncMode] = playFullSyncSpecs()
proc playRelease*(ctx: SnapCtxRef) =
discard
# End

View File

@ -14,17 +14,38 @@ import
type
PlayVoidFutureCtxFn* = proc(
ctx: SnapCtxRef): Future[void] {.gcsafe, raises: [CatchableError].}
ctx: SnapCtxRef): Future[void]
{.gcsafe, raises: [CatchableError].}
PlayVoidCtxFn* = proc(
ctx: SnapCtxRef)
{.gcsafe, raises: [CatchableError].}
PlayVoidFutureBuddyFn* = proc(
buddy: SnapBuddyRef): Future[void] {.gcsafe, raises: [CatchableError].}
buddy: SnapBuddyRef): Future[void]
{.gcsafe, raises: [CatchableError].}
PlayBoolBuddyBoolIntFn* = proc(
buddy: SnapBuddyRef; last: bool; laps: int): bool
{.gcsafe, raises: [CatchableError].}
PlayBoolBuddyFn* = proc(
buddy: SnapBuddyRef, last: bool): bool {.gcsafe, raises: [CatchableError].}
buddy: SnapBuddyRef): bool
{.gcsafe, raises: [CatchableError].}
PlayVoidBuddyFn* = proc(
buddy: SnapBuddyRef)
{.gcsafe, raises: [CatchableError].}
PlaySyncSpecs* = ref object of RootRef
## Holds sync mode specs & methods for a particular sync state
pool*: PlayBoolBuddyFn
setup*: PlayVoidCtxFn
release*: PlayVoidCtxFn
start*: PlayBoolBuddyFn
stop*: PlayVoidBuddyFn
pool*: PlayBoolBuddyBoolIntFn
daemon*: PlayVoidFutureCtxFn
single*: PlayVoidFutureBuddyFn
multi*: PlayVoidFutureBuddyFn
@ -33,14 +54,10 @@ type
# Public functions
# ------------------------------------------------------------------------------
proc playSyncSpecs*(ctx: SnapCtxRef): PlaySyncSpecs =
proc playMethod*(ctx: SnapCtxRef): PlaySyncSpecs =
## Getter
ctx.pool.syncMode.tab[ctx.pool.syncMode.active].PlaySyncSpecs
proc `playMode=`*(ctx: SnapCtxRef; val: SnapSyncModeType) =
## Setter
ctx.pool.syncMode.active = val
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -14,8 +14,11 @@ import
chronicles,
chronos,
eth/p2p,
../../../sync_desc,
../../worker_desc,
../../../misc/[best_pivot, block_queue],
"../../.."/[protocol, sync_desc, types],
"../.."/[range_desc, worker_desc],
../db/[snapdb_desc, snapdb_persistent],
".."/[pivot, ticker],
play_desc
const
@ -23,30 +26,217 @@ const
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Private functions, full sync handlers
# Private helpers
# ------------------------------------------------------------------------------
proc fullSyncPool(buddy: SnapBuddyRef, last: bool): bool =
buddy.ctx.poolMode = false
true
template ignoreException(info: static[string]; code: untyped) =
try:
code
except CatchableError as e:
error "Exception at " & info & ":", name=($e.name), msg=(e.msg)
proc tickerUpdater(ctx: SnapCtxRef): TickerFullStatsUpdater =
result = proc: TickerFullStats =
var stats: BlockQueueStats
ctx.pool.bCtx.blockQueueStats(stats)
TickerFullStats(
topPersistent: stats.topAccepted,
nextStaged: stats.nextStaged,
nextUnprocessed: stats.nextUnprocessed,
nStagedQueue: stats.nStagedQueue,
reOrg: stats.reOrg)
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc processStaged(buddy: SnapBuddyRef): bool =
## Fetch a work item from the `staged` queue an process it to be
## stored on the persistent block chain.
let
ctx {.used.} = buddy.ctx
peer = buddy.peer
chainDb = buddy.ctx.chain.db
chain = buddy.ctx.chain
bq = buddy.only.bQueue
# Get a work item, a list of headers + bodies
wi = block:
let rc = bq.blockQueueFetchStaged()
if rc.isErr:
return false
rc.value
#startNumber = wi.headers[0].blockNumber -- unused
# Store in persistent database
try:
if chain.persistBlocks(wi.headers, wi.bodies) == ValidationResult.OK:
bq.blockQueueAccept(wi)
return true
except CatchableError as e:
error "Storing persistent blocks failed", peer, range=($wi.blocks),
name=($e.name), msg=(e.msg)
# Something went wrong. Recycle work item (needs to be re-fetched, anyway)
let
parentHash = wi.headers[0].parentHash
try:
# Check whether hash of the first block is consistent
var parent: BlockHeader
if chainDb.getBlockHeader(parentHash, parent):
# First block parent is ok, so there might be other problems. Re-fetch
# the blocks from another peer.
trace "Storing persistent blocks failed", peer, range=($wi.blocks)
bq.blockQueueRecycle(wi)
buddy.ctrl.zombie = true
return false
except CatchableError as e:
error "Failed to access parent blocks", peer,
blockNumber=wi.headers[0].blockNumber.toStr, name=($e.name), msg=e.msg
# Parent block header problem, so we might be in the middle of a re-org.
# Set single mode backtrack following the offending parent hash.
bq.blockQueueBacktrackFrom(wi)
buddy.ctrl.multiOk = false
if wi.topHash.isNone:
# Assuming that currently staged entries are on the wrong branch
bq.blockQueueRecycleStaged()
notice "Starting chain re-org backtrack work item", peer, range=($wi.blocks)
else:
# Leave that block range in the staged list
trace "Resuming chain re-org backtrack work item", peer, range=($wi.blocks)
discard
return false
# ------------------------------------------------------------------------------
# Private functions, full sync admin handlers
# ------------------------------------------------------------------------------
proc fullSyncSetup(ctx: SnapCtxRef) =
let blockNum = if ctx.pool.fullPivot.isNil: ctx.pool.pivotTable.topNumber
else: ctx.pool.fullPivot.stateHeader.blockNumber
ctx.pool.bCtx = BlockQueueCtxRef.init(blockNum + 1)
ctx.pool.bPivot = BestPivotCtxRef.init(rng=ctx.pool.rng, minPeers=0)
ctx.pool.ticker.init(cb = ctx.tickerUpdater())
proc fullSyncRelease(ctx: SnapCtxRef) =
discard
proc fullSyncStart(buddy: SnapBuddyRef): bool =
let
ctx = buddy.ctx
peer = buddy.peer
if peer.supports(protocol.eth) and
peer.state(protocol.eth).initialized:
buddy.only.bQueue = BlockQueueWorkerRef.init(
ctx.pool.bCtx, buddy.ctrl, peer)
buddy.only.bPivot = BestPivotWorkerRef.init(
ctx.pool.bPivot, buddy.ctrl, buddy.peer)
ctx.pool.ticker.startBuddy()
buddy.ctrl.multiOk = false # confirm default mode for soft restart
return true
proc fullSyncStop(buddy: SnapBuddyRef) =
buddy.only.bPivot.clear()
buddy.ctx.pool.ticker.stopBuddy()
# ------------------------------------------------------------------------------
# Private functions, full sync action handlers
# ------------------------------------------------------------------------------
proc fullSyncDaemon(ctx: SnapCtxRef) {.async.} =
ctx.daemon = false
proc fullSyncPool(buddy: SnapBuddyRef, last: bool; laps: int): bool =
let
ctx = buddy.ctx
env = ctx.pool.fullPivot
# Take over soft restart after switch to full sync mode.
# This process needs to be applied to all buddy peers.
if not env.isNil:
# Soft start all peers on the second lap.
ignoreException("fullSyncPool"):
if not ctx.playMethod.start(buddy):
# Start() method failed => wait for another peer
buddy.ctrl.stopped = true
if last:
trace "Soft full sync restart done", peer=buddy.peer, last, laps,
pivot=env.stateHeader.blockNumber.toStr,
mode=ctx.pool.syncMode.active, state= buddy.ctrl.state
# Store pivot as parent hash in database
ctx.pool.snapDb.kvDb.persistentBlockHeaderPut env.stateHeader
# Instead of genesis.
ctx.chain.com.startOfHistory = env.stateHeader.blockHash
# Reset so that this actuin would not be triggered, again
ctx.pool.fullPivot = nil
return false # do stop magically when looping over peers is exhausted
# Mind the gap, fill in if necessary (function is peer independent)
buddy.only.bQueue.blockQueueGrout()
true # Stop after running once regardless of peer
proc fullSyncSingle(buddy: SnapBuddyRef) {.async.} =
buddy.ctrl.multiOk = true
let
pv = buddy.only.bPivot
bq = buddy.only.bQueue
bNum = bq.bestNumber.get(otherwise = bq.topAccepted + 1)
# Negotiate in order to derive the pivot header from this `peer`.
if await pv.pivotNegotiate(some(bNum)):
# Update/activate `bestNumber` from the pivot header
bq.bestNumber = some(pv.pivotHeader.value.blockNumber)
buddy.ctrl.multiOk = true
when extraTraceMessages:
trace "Full sync pivot accepted", peer=buddy.peer,
minNumber=bNum.toStr, bestNumber=bq.bestNumber.unsafeGet.toStr
return
if buddy.ctrl.stopped:
when extraTraceMessages:
trace "Full sync single mode stopped", peer=buddy.peer
return # done with this buddy
# Without waiting, this function repeats every 50ms (as set with the constant
# `sync_sched.execLoopTimeElapsedMin`.)
await sleepAsync 300.milliseconds
proc fullSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} =
## Full sync processing
let
ctx = buddy.ctx
peer = buddy.peer
bq = buddy.only.bQueue
trace "Snap full sync -- not implemented yet", peer
await sleepAsync(5.seconds)
# Fetch work item
let rc = await bq.blockQueueWorker()
if rc.isErr:
if rc.error == StagedQueueOverflow:
# Mind the gap: Turn on pool mode if there are too may staged items.
ctx.poolMode = true
else:
trace "Full sync error", peer=buddy.peer, error=rc.error
return
# flip over to single mode for getting new instructins
buddy.ctrl.multiOk = false
# Update persistent database
while buddy.processStaged() and not buddy.ctrl.stopped:
trace "Full sync multi processed", peer=buddy.peer
# Allow thread switch as `persistBlocks()` might be slow
await sleepAsync(10.milliseconds)
# ------------------------------------------------------------------------------
# Public functions
@ -55,10 +245,14 @@ proc fullSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} =
proc playFullSyncSpecs*: PlaySyncSpecs =
## Return full sync handler environment
PlaySyncSpecs(
pool: fullSyncPool,
daemon: fullSyncDaemon,
single: fullSyncSingle,
multi: fullSyncMulti)
setup: fullSyncSetup,
release: fullSyncRelease,
start: fullSyncStart,
stop: fullSyncStop,
pool: fullSyncPool,
daemon: fullSyncDaemon,
single: fullSyncSingle,
multi: fullSyncMulti)
# ------------------------------------------------------------------------------
# End

View File

@ -1,92 +0,0 @@
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Transitional handlers preparing for full sync
{.push raises: [].}
import
chronicles,
chronos,
eth/p2p,
stew/keyed_queue,
../../../sync_desc,
../../worker_desc,
../ticker,
play_desc
const
extraTraceMessages = false or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc blindTicker(ctx: SnapCtxRef): TickerFullStatsUpdater =
result = proc: TickerFullStats =
discard
# ------------------------------------------------------------------------------
# Private functions, transitional handlers preparing for full sync
# ------------------------------------------------------------------------------
proc prepFullSyncPool(buddy: SnapBuddyRef, last: bool): bool =
buddy.ctx.poolMode = false
true
proc prepFullSyncDaemon(ctx: SnapCtxRef) {.async.} =
ctx.daemon = false
proc prepFullSyncSingle(buddy: SnapBuddyRef) {.async.} =
## One off, setting up full sync processing in single mode
let
ctx = buddy.ctx
# Fetch latest state root environment
env = block:
let rc = ctx.pool.pivotTable.lastValue
if rc.isErr:
buddy.ctrl.multiOk = false
return
rc.value
peer = buddy.peer
pivot = "#" & $env.stateHeader.blockNumber # for logging
when extraTraceMessages:
trace "Full sync prepare in single mode", peer, pivot
# update ticker (currently blind)
ctx.pool.ticker.init(cb = ctx.blindTicker())
# Cosmetics: allow other processes (e.g. ticker) to log the current
# state. There is no other intended purpose of this wait state.
await sleepAsync 1100.milliseconds
ctx.playMode = FullSyncMode
buddy.ctrl.multiOk = true
proc prepFullSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} =
## One off, setting up full sync processing in single mode
buddy.ctrl.multiOk = false
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc playPrepFullSpecs*: PlaySyncSpecs =
## Return full sync preparation handler environment
PlaySyncSpecs(
pool: prepFullSyncPool,
daemon: prepFullSyncDaemon,
single: prepFullSyncSingle,
multi: prepFullSyncMulti)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -15,7 +15,7 @@ import
chronos,
eth/p2p,
stew/[interval_set, keyed_queue],
../../../sync_desc,
"../../.."/[handlers/eth, protocol, sync_desc, types],
".."/[pivot, ticker],
../pivot/storage_queue_helper,
../db/[hexary_desc, snapdb_pivot],
@ -33,19 +33,63 @@ const
# Private helpers
# ------------------------------------------------------------------------------
template logTxt(info: static[string]): static[string] =
"Snap worker " & info
template ignoreException(info: static[string]; code: untyped) =
try:
code
except CatchableError as e:
error "Exception at " & info & ":", name=($e.name), msg=(e.msg)
# --------------
proc disableWireServices(ctx: SnapCtxRef) =
## Helper for `setup()`: Temporarily stop useless wire protocol services.
ctx.ethWireCtx.txPoolEnabled = false
proc enableWireServices(ctx: SnapCtxRef) =
## Helper for `release()`
ctx.ethWireCtx.txPoolEnabled = true
# --------------
proc enableRpcMagic(ctx: SnapCtxRef) =
## Helper for `setup()`: Enable external pivot update via RPC
ctx.chain.com.syncReqNewHead = ctx.pivotUpdateBeaconHeaderCB
proc disableRpcMagic(ctx: SnapCtxRef) =
## Helper for `release()`
ctx.chain.com.syncReqNewHead = nil
# --------------
proc detectSnapSyncRecovery(ctx: SnapCtxRef) =
## Helper for `setup()`: Initiate snap sync recovery (if any)
let rc = ctx.pool.snapDb.pivotRecoverDB()
if rc.isOk:
ctx.pool.recovery = SnapRecoveryRef(state: rc.value)
ctx.daemon = true
# Set up early initial pivot
ctx.pool.pivotTable.reverseUpdate(ctx.pool.recovery.state.header, ctx)
trace logTxt "recovery started",
checkpoint=(ctx.pool.pivotTable.topNumber.toStr & "(0)")
if not ctx.pool.ticker.isNil:
ctx.pool.ticker.startRecovery()
proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} =
let recov = ctx.pool.recovery
if recov.isNil:
return false
let
checkpoint =
"#" & $recov.state.header.blockNumber & "(" & $recov.level & ")"
checkpoint = recov.state.header.blockNumber.toStr & "(" & $recov.level & ")"
topLevel = recov.level == 0
env = block:
let rc = ctx.pool.pivotTable.eq recov.state.header.stateRoot
if rc.isErr:
error "Recovery pivot context gone", checkpoint, topLevel
error logTxt "recovery pivot context gone", checkpoint, topLevel
return false
rc.value
@ -68,7 +112,7 @@ proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} =
let rc = ctx.pool.snapDb.pivotRecoverDB(recov.state.predecessor)
if rc.isErr:
when extraTraceMessages:
trace "Recovery stopped at pivot stale checkpoint", checkpoint, topLevel
trace logTxt "stale pivot, recovery stopped", checkpoint, topLevel
return false
# Set up next level pivot checkpoint
@ -81,24 +125,98 @@ proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} =
return true # continue recovery
# --------------
proc snapSyncCompleteOk(
env: SnapPivotRef; # Current pivot environment
ctx: SnapCtxRef; # Some global context
): Future[bool]
{.async.} =
## Check whether this pivot is fully downloaded. The `async` part is for
## debugging, only and should not be used on a large database as it uses
## quite a bit of computation ressources.
if env.pivotCompleteOk():
if env.nAccounts <= 1_000_000: # Larger sizes might be infeasible
if not await env.pivotVerifyComplete(ctx):
error logTxt "inconsistent state, pivot incomplete",
pivot = env.stateHeader.blockNumber.toStr
return false
ctx.pool.fullPivot = env
ctx.poolMode = true # Fast sync mode must be synchronized among all peers
return true
# ------------------------------------------------------------------------------
# Private functions, snap sync handlers
# Private functions, snap sync admin handlers
# ------------------------------------------------------------------------------
proc snapSyncPool(buddy: SnapBuddyRef, last: bool): bool =
proc snapSyncSetup(ctx: SnapCtxRef) =
# For snap sync book keeping
ctx.pool.coveredAccounts = NodeTagRangeSet.init()
ctx.pool.ticker.init(cb = ctx.pool.pivotTable.tickerStats(ctx))
ctx.enableRpcMagic() # Allow external pivot update via RPC
ctx.disableWireServices() # Stop unwanted public services
ctx.detectSnapSyncRecovery() # Check for recovery mode
proc snapSyncRelease(ctx: SnapCtxRef) =
ctx.disableRpcMagic() # Disable external pivot update via RPC
ctx.enableWireServices() # re-enable public services
ctx.pool.ticker.stop()
proc snapSyncStart(buddy: SnapBuddyRef): bool =
let
ctx = buddy.ctx
peer = buddy.peer
if peer.supports(protocol.snap) and
peer.supports(protocol.eth) and
peer.state(protocol.eth).initialized:
ctx.pool.ticker.startBuddy()
buddy.ctrl.multiOk = false # confirm default mode for soft restart
return true
proc snapSyncStop(buddy: SnapBuddyRef) =
buddy.ctx.pool.ticker.stopBuddy()
# ------------------------------------------------------------------------------
# Private functions, snap sync action handlers
# ------------------------------------------------------------------------------
proc snapSyncPool(buddy: SnapBuddyRef, last: bool, laps: int): bool =
## Enabled when `buddy.ctrl.poolMode` is `true`
##
let ctx = buddy.ctx
ctx.poolMode = false
result = true
let
ctx = buddy.ctx
env = ctx.pool.fullPivot
# Clean up empty pivot slots (never the top one)
# Check whether the snapshot is complete. If so, switch to full sync mode.
# This process needs to be applied to all buddy peers.
if not env.isNil:
ignoreException("snapSyncPool"):
# Stop all peers
ctx.playMethod.stop(buddy)
# After the last buddy peer was stopped switch to full sync mode
# and repeat that loop over buddy peers for re-starting them.
if last:
when extraTraceMessages:
trace logTxt "switch to full sync", peer=buddy.peer, last, laps,
pivot=env.stateHeader.blockNumber.toStr,
mode=ctx.pool.syncMode.active, state= buddy.ctrl.state
ctx.playMethod.release(ctx)
ctx.pool.syncMode.active = FullSyncMode
ctx.playMethod.setup(ctx)
ctx.poolMode = true # repeat looping over peers
return false # do stop magically when looping over peers is exhausted
# Clean up empty pivot slots (never the top one.) This needs to be run on
# a single peer only. So the loop can stop immediately (returning `true`)
# after this job is done.
var rc = ctx.pool.pivotTable.beforeLast
while rc.isOK:
let (key, env) = (rc.value.key, rc.value.data)
if env.fetchAccounts.processed.isEmpty:
ctx.pool.pivotTable.del key
rc = ctx.pool.pivotTable.prev(key)
true # Stop ok
proc snapSyncDaemon(ctx: SnapCtxRef) {.async.} =
@ -120,11 +238,15 @@ proc snapSyncSingle(buddy: SnapBuddyRef) {.async.} =
## * `buddy.ctrl.multiOk` is `false`
## * `buddy.ctrl.poolMode` is `false`
##
let ctx = buddy.ctx
# External beacon header updater
await buddy.updateBeaconHeaderFromFile()
# Dedicate some process cycles to the recovery process (if any)
if not buddy.ctx.pool.recovery.isNil:
when extraTraceMessages:
trace "Throttling single mode in favour of recovery", peer=buddy.peer
await sleepAsync 900.milliseconds
await buddy.pivotApprovePeer()
buddy.ctrl.multiOk = true
@ -145,17 +267,8 @@ proc snapSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} =
return # nothing to do
rc.value
peer = buddy.peer
pivot = "#" & $env.stateHeader.blockNumber # for logging
fa = env.fetchAccounts
# Check whether this pivot is fully downloaded
if env.fetchAccounts.processed.isFull and env.storageQueueTotal() == 0:
# Switch to full sync => final state
ctx.playMode = PreFullSyncMode
trace "Switch to full sync", peer, pivot, nAccounts=env.nAccounts,
processed=fa.processed.fullPC3, nStoQu=env.storageQueueTotal(),
nSlotLists=env.nSlotLists
if await env.snapSyncCompleteOk(ctx):
return
# If this is a new snap sync pivot, the previous one can be cleaned up and
@ -163,6 +276,11 @@ proc snapSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} =
# data any longer.
ctx.pool.pivotTable.beforeTopMostlyClean()
let
peer = buddy.peer
pivot = env.stateHeader.blockNumber.toStr # for logging
fa = env.fetchAccounts
when extraTraceMessages:
trace "Multi sync runner", peer, pivot, nAccounts=env.nAccounts,
processed=fa.processed.fullPC3, nStoQu=env.storageQueueTotal(),
@ -180,7 +298,7 @@ proc snapSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} =
# Archive this pivot eveironment if it has become stale
if env.archived:
when extraTraceMessages:
trace "Mothballing", peer, pivot, nAccounts, nSlotLists
trace logTxt "mothballing", peer, pivot, nAccounts, nSlotLists
env.pivotMothball()
return
@ -188,12 +306,17 @@ proc snapSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} =
let rc = env.saveCheckpoint(ctx)
if rc.isOk:
when extraTraceMessages:
trace "Saved recovery checkpoint", peer, pivot, nAccounts, processed,
nStoQu=env.storageQueueTotal(), nSlotLists, blobSize=rc.value
trace logTxt "saved checkpoint", peer, pivot, nAccounts,
processed, nStoQu=env.storageQueueTotal(), nSlotLists,
blobSize=rc.value
return
error "Failed to save recovery checkpoint", peer, pivot, nAccounts,
processed, nStoQu=env.storageQueueTotal(), nSlotLists, error=rc.error
error logTxt "failed to save checkpoint", peer, pivot, nAccounts,
processed, nStoQu=env.storageQueueTotal(), nSlotLists,
error=rc.error
# Check whether this pivot is fully downloaded
discard await env.snapSyncCompleteOk(ctx)
# ------------------------------------------------------------------------------
# Public functions
@ -202,10 +325,14 @@ proc snapSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} =
proc playSnapSyncSpecs*: PlaySyncSpecs =
## Return snap sync handler environment
PlaySyncSpecs(
pool: snapSyncPool,
daemon: snapSyncDaemon,
single: snapSyncSingle,
multi: snapSyncMulti)
setup: snapSyncSetup,
release: snapSyncRelease,
start: snapSyncStart,
stop: snapSyncStop,
pool: snapSyncPool,
daemon: snapSyncDaemon,
single: snapSyncSingle,
multi: snapSyncMulti)
# ------------------------------------------------------------------------------
# End

View File

@ -9,6 +9,8 @@
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises: [].}
import
std/[options, strformat, strutils],
chronos,
@ -16,9 +18,8 @@ import
eth/[common, p2p],
stint,
../../../utils/prettify,
../../misc/timer_helper
{.push raises: [].}
../../misc/timer_helper,
../../types
logScope:
topics = "snap-tick"
@ -55,6 +56,7 @@ type
TickerFullStats* = object
## Full sync state (see `TickerFullStatsUpdater`)
pivotBlock*: Option[BlockNumber]
topPersistent*: BlockNumber
nextUnprocessed*: Option[BlockNumber]
nextStaged*: Option[BlockNumber]
@ -128,12 +130,6 @@ proc pc99(val: float): string =
elif 0.0 < val and val <= 0.01: "1%"
else: val.toPC(0)
proc pp(n: BlockNumber): string =
"#" & $n
proc pp(n: Option[BlockNumber]): string =
if n.isNone: "n/a" else: n.get.pp
# ------------------------------------------------------------------------------
# Private functions: printing ticker messages
# ------------------------------------------------------------------------------
@ -155,9 +151,9 @@ proc snapTicker(t: TickerRef) {.gcsafe.} =
var
nAcc, nSto: string
pv = "n/a"
bc = "n/a"
nStoQue = "n/a"
let
bc = data.beaconBlock.toStr
recoveryDone = t.snap.lastRecov
accCov = data.accountsFill[0].pc99 &
"(" & data.accountsFill[1].pc99 & ")" &
@ -173,11 +169,10 @@ proc snapTicker(t: TickerRef) {.gcsafe.} =
t.visited = now
t.snap.lastRecov = t.snap.recovery
if data.pivotBlock.isSome:
pv = data.pivotBlock.toStr & "/" & $data.nQueues
noFmtError("runLogTicker"):
if data.pivotBlock.isSome:
pv = &"#{data.pivotBlock.get}/{data.nQueues}"
if data.beaconBlock.isSome:
bc = &"#{data.beaconBlock.get}"
nAcc = (&"{(data.nAccounts[0]+0.5).int64}" &
&"({(data.nAccounts[1]+0.5).int64})")
nSto = (&"{(data.nSlotLists[0]+0.5).int64}" &
@ -205,13 +200,14 @@ proc fullTicker(t: TickerRef) {.gcsafe.} =
if data != t.full.lastStats or
tickerLogSuppressMax < (now - t.visited):
let
persistent = data.topPersistent.pp
staged = data.nextStaged.pp
unprocessed = data.nextUnprocessed.pp
persistent = data.topPersistent.toStr
staged = data.nextStaged.toStr
unprocessed = data.nextUnprocessed.toStr
queued = data.nStagedQueue
reOrg = if data.reOrg: "t" else: "f"
pv = data.pivotBlock.toStr
buddies = t.nBuddies
nInst = t.nBuddies
# With `int64`, there are more than 29*10^10 years range for seconds
up = (now - t.started).seconds.uint64.toSI
@ -221,10 +217,10 @@ proc fullTicker(t: TickerRef) {.gcsafe.} =
t.visited = now
if data.suspended:
info "Sync statistics (suspended)", up, buddies,
info "Full sync statistics (suspended)", up, nInst, pv,
persistent, unprocessed, staged, queued, reOrg, mem
else:
info "Sync statistics", up, buddies,
info "Full sync statistics", up, nInst, pv,
persistent, unprocessed, staged, queued, reOrg, mem
# ------------------------------------------------------------------------------
@ -255,36 +251,40 @@ proc initImpl(t: TickerRef; cb: TickerFullStatsUpdater) =
# Public constructor and start/stop functions
# ------------------------------------------------------------------------------
proc init*(t: TickerRef; cb: TickerSnapStatsUpdater) =
## Re-initialise ticket
t.visited.reset
if t.fullMode:
t.prettyPrint(t) # print final state for full sync
t.initImpl(cb)
proc init*(t: TickerRef; cb: TickerFullStatsUpdater) =
## Re-initialise ticket
t.visited.reset
if not t.fullMode:
t.prettyPrint(t) # print final state for snap sync
t.initImpl(cb)
proc init*(T: type TickerRef; cb: TickerSnapStatsUpdater): T =
## Constructor
new result
result.initImpl(cb)
proc init*(t: TickerRef; cb: TickerSnapStatsUpdater) =
## Re-initialise ticket
if not t.isNil:
t.visited.reset
if t.fullMode:
t.prettyPrint(t) # print final state for full sync
t.initImpl(cb)
proc init*(t: TickerRef; cb: TickerFullStatsUpdater) =
## Re-initialise ticket
if not t.isNil:
t.visited.reset
if not t.fullMode:
t.prettyPrint(t) # print final state for snap sync
t.initImpl(cb)
proc start*(t: TickerRef) =
## Re/start ticker unconditionally
#debug "Started ticker"
t.logTicker = safeSetTimer(Moment.fromNow(tickerStartDelay), runLogTicker, t)
if t.started == Moment.default:
t.started = Moment.now()
if not t.isNil:
#debug "Started ticker"
t.logTicker = safeSetTimer(Moment.fromNow(tickerStartDelay),runLogTicker,t)
if t.started == Moment.default:
t.started = Moment.now()
proc stop*(t: TickerRef) =
## Stop ticker unconditionally
t.logTicker = nil
#debug "Stopped ticker"
if not t.isNil:
t.logTicker = nil
#debug "Stopped ticker"
# ------------------------------------------------------------------------------
# Public functions
@ -292,37 +292,41 @@ proc stop*(t: TickerRef) =
proc startBuddy*(t: TickerRef) =
## Increment buddies counter and start ticker unless running.
if t.nBuddies <= 0:
t.nBuddies = 1
if not t.fullMode:
if not t.snap.recovery:
t.start()
else:
t.nBuddies.inc
if not t.isNil:
if t.nBuddies <= 0:
t.nBuddies = 1
if not t.fullMode:
if not t.snap.recovery:
t.start()
else:
t.nBuddies.inc
proc startRecovery*(t: TickerRef) =
## Ditto for recovery mode
if not t.fullMode:
if not t.snap.recovery:
t.snap.recovery = true
if t.nBuddies <= 0:
t.start()
if not t.isNil:
if not t.fullMode:
if not t.snap.recovery:
t.snap.recovery = true
if t.nBuddies <= 0:
t.start()
proc stopBuddy*(t: TickerRef) =
## Decrement buddies counter and stop ticker if there are no more registered
## buddies.
t.nBuddies.dec
if t.nBuddies <= 0:
if not t.fullMode:
if not t.snap.recovery:
t.stop()
if not t.isNil:
t.nBuddies.dec
if t.nBuddies <= 0:
if not t.fullMode:
if not t.snap.recovery:
t.stop()
proc stopRecovery*(t: TickerRef) =
## Ditto for recovery mode
if not t.fullMode:
if t.snap.recovery:
t.snap.recovery = false
t.stop()
if not t.isNil:
if not t.fullMode:
if t.snap.recovery:
t.snap.recovery = false
t.stop()
# ------------------------------------------------------------------------------
# End

View File

@ -16,6 +16,7 @@ import
eth/[common, p2p],
stew/[interval_set, keyed_queue, sorted_set],
../../db/select_backend,
../misc/[best_pivot, block_queue],
../sync_desc,
./worker/com/com_error,
./worker/db/[snapdb_desc, snapdb_pivot],
@ -68,6 +69,9 @@ type
nAccounts*: uint64 ## Imported # of accounts
nSlotLists*: uint64 ## Imported # of account storage tries
# Checkponting
savedFullPivotOk*: bool ## This fully completed pivot was saved
# Mothballing, ready to be swapped into newer pivot record
storageAccounts*: SnapAccountsList ## Accounts with missing storage slots
archived*: bool ## Not latest pivot, anymore
@ -84,11 +88,14 @@ type
## Per-worker local descriptor data extension
errors*: ComErrorStatsRef ## For error handling
# Full sync continuation parameters
bPivot*: BestPivotWorkerRef ## Local pivot worker descriptor
bQueue*: BlockQueueWorkerRef ## Block queue worker
SnapSyncModeType* = enum
## Current sync mode, after a snapshot has been downloaded, the system
## proceeds with full sync.
SnapSyncMode = 0 ## Start mode
PreFullSyncMode
FullSyncMode
SnapSyncSpecs* = object
@ -111,12 +118,17 @@ type
covAccTimesFull*: uint ## # of 100% coverages
recovery*: SnapRecoveryRef ## Current recovery checkpoint/context
# Snap/full mode muliplexing
syncMode*: SnapSyncSpecs ## Sync mode data contaier
# Info
ticker*: TickerRef ## Ticker, logger
# Snap/full mode muliplexing
syncMode*: SnapSyncSpecs ## Sync mode methods & data
fullPivot*: SnapPivotRef ## Start full sync from here
# Full sync continuation parameters
bPivot*: BestPivotCtxRef ## Global pivot descriptor
bCtx*: BlockQueueCtxRef ## Global block queue descriptor
SnapBuddyRef* = BuddyRef[SnapCtxData,SnapBuddyData]
## Extended worker peer descriptor

View File

@ -16,8 +16,7 @@
{.push raises: [].}
import
#std/options,
eth/[common, p2p],
eth/p2p,
../core/chain,
../db/db_chain,
./handlers/eth

View File

@ -38,18 +38,21 @@
## Clean up this worker peer.
##
##
## *runPool(buddy: BuddyRef[S,W], last: bool): bool*
## *runPool(buddy: BuddyRef[S,W], last: bool; laps: int): bool*
## Once started, the function `runPool()` is called for all worker peers in
## sequence as the body of an iteration as long as the function returns
## `false`. There will be no other worker peer functions activated
## simultaneously.
##
## This procedure is started if the global flag `buddy.ctx.poolMode` is set
## `true` (default is `false`.) It is the responsibility of the `runPool()`
## instance to reset the flag `buddy.ctx.poolMode`, typically at the first
## peer instance.
## `true` (default is `false`.) It will be automatically reset before the
## the loop starts. Re-setting it again results in repeating the loop. The
## argument `laps` (starting with `0`) indicated the currend lap of the
## repeated loops. To avoid continous looping, the number of `laps` is
## limited (see `exexPoolModeMax`, below.)
##
## The argument `last` is set `true` if the last entry is reached.
## The argument `last` is set `true` if the last entry of the current loop
## has been reached.
##
## Note:
## + This function does *not* runs in `async` mode.
@ -91,7 +94,7 @@
import
std/hashes,
chronos,
eth/[common, p2p, p2p/peer_pool, p2p/private/p2p_types],
eth/[p2p, p2p/peer_pool, p2p/private/p2p_types],
stew/keyed_queue,
"."/[handlers, sync_desc]
@ -134,6 +137,9 @@ const
execLoopPollingTime = 50.milliseconds
## Single asynchroneous time interval wait state for event polling
execPoolModeLoopMax = 100
## Avoids continuous looping
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
@ -201,11 +207,25 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} =
if worker.ctrl.stopped:
dsc.monitorLock = false
break taskExecLoop
var count = dsc.buddies.len
for w in dsc.buddies.nextValues:
count.dec
if worker.runPool(count == 0):
break # `true` => stop
var count = 0
while count < execPoolModeLoopMax:
ctx.poolMode = false
# Pool mode: stop this round if returned `true`,
# last invocation this round with `true` argument
var delayed = BuddyRef[S,W](nil)
for w in dsc.buddies.nextValues:
# Execute previous (aka delayed) item (unless first)
if delayed.isNil or not delayed.runPool(last=false, laps=count):
delayed = w.worker
else:
delayed = nil # not executing any final item
break # `true` => stop
if not delayed.isNil:
discard delayed.runPool(last=true, laps=count) # final item
if not ctx.poolMode:
break
count.inc
dsc.monitorLock = false
else:

View File

@ -8,13 +8,13 @@
# at your option. This file may not be copied, modified, or
# distributed except according to those terms.
{.push raises: [].}
import
std/[math, hashes],
eth/common/eth_types_rlp,
stew/byteutils
{.push raises: [].}
type
BlockHash* = distinct Hash256
## Hash of a block, goes with `BlockNumber`.
@ -106,6 +106,13 @@ func `$`*(hashOrNum: HashOrNum): string =
if hashOrNum.isHash: $hashOrNum.hash
else: $hashOrNum.number
func toStr*(n: BlockNumber): string =
## Pretty print block number, explicitely format with a leading hash `#`
if n == high(BlockNumber): "high" else:"#" & $n
func toStr*(n: Option[BlockNumber]): string =
if n.isNone: "n/a" else: n.get.toStr
# ------------------------------------------------------------------------------
# Public debug printing helpers
# ------------------------------------------------------------------------------

View File

@ -202,9 +202,12 @@ proc miscRunner(noisy = true) =
test "RLP proofs list sizes":
test_calcProofsListSizes()
test "RLP en/decode GetTrieNodes argument list":
test "RLP en/decode GetTrieNodes arguments list":
test_calcTrieNodeTranscode()
test "RLP en/decode BockBody arguments list":
test_calcBlockBodyTranscode()
proc accountsRunner(noisy = true; persistent = true; sample = accSample) =
let

View File

@ -33,6 +33,16 @@ var
# Private helpers for `test_calcAccountsListSizes()`
# ------------------------------------------------------------------------------
# Kludge, should be fixed in `eth/common/eth_types_rlp`
proc append(w: var RlpWriter, b: BlockBody) =
## helper for ` test_calcBlockBodyTranscode()`
w.ethAppend b
proc `==`(a,b: ChainId): bool {.borrow.}
## helper for ` test_calcBlockBodyTranscode()`
# ------------------
proc randAccSize(r: var Rand): int =
## Print random account size
accObjRlpMin + r.rand(accBlobs.len - 1)
@ -123,8 +133,8 @@ proc test_calcProofsListSizes*() =
check nodeBlobsHex == brNodesHex
proc test_calcTrieNodeTranscode*() =
## RLP encode/decode of `SnapTriePaths` objects
proc test_calcTrieNodeTranscode*() =
## RLP encode/decode a list of `SnapTriePaths` objects
let
raw = @[
# Accounts
@ -174,6 +184,30 @@ proc test_calcTrieNodeTranscode*() =
check raw == rlp.decode(cooked, seq[SnapTriePaths])
check cured == rlp.decode(cooked, seq[seq[Blob]])
proc test_calcBlockBodyTranscode*() =
## RLP encode/decode a list of `BlockBody` objects. Note that tere is/was a
## problem in `eth/common/eth_types_rlp.append()` for `BlockBody` encoding.
## This has been fixed temporarily via `protocol/eth/eth_types.ethAppend()`.
let blkSeq = @[
BlockBody(
transactions: @[
Transaction(nonce: 1)]),
BlockBody(
uncles: @[
BlockHeader(nonce: [0x20u8,0,0,0,0,0,0,0])]),
BlockBody(),
BlockBody(
transactions: @[
Transaction(nonce: 3),
Transaction(nonce: 4)])]
let trBlkSeq = blkSeq.encode.decode(typeof blkSeq)
check trBlkSeq.len == blkSeq.len
for n in 0 ..< min(trBlkSeq.len, trBlkSeq.len):
check (n, trBlkSeq[n]) == (n, blkSeq[n])
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------