Code reorg 4 snap sync suite (#1560)
* Rename `playXXX` => `passXXX` why: Better purpose match * Code massage, log message updates * Moved `ticker.nim` to `misc` folder to be used the same by full and snap sync why: Simplifies maintenance * Move `worker/pivot*` => `worker/pass/pass_snap/*` why: better for maintenance * Moved helper source file => `pass/pass_snap/helper` * Renamed ComError => GetError, `worker/com/` => `worker/get/` * Keep ticker enable flag in worker descriptor why: This allows to pass this flag with the descriptor and not an extra function argument when calling the setup function. * Extracted setup/release code from `worker.nim` => `pass/pass_init.nim`
This commit is contained in:
parent
ddbdf34c3d
commit
c5e895aaab
|
@ -69,9 +69,9 @@ template tracerFrameBuddy(f: static[string]; b: FullBuddyRef; code: untyped) =
|
|||
# Virtual methods/interface, `mixin` functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc runSetup(ctx: FullCtxRef; ticker: bool): bool =
|
||||
proc runSetup(ctx: FullCtxRef): bool =
|
||||
tracerFrameCtx("runSetup", ctx):
|
||||
result = worker.setup(ctx,ticker)
|
||||
result = worker.setup(ctx)
|
||||
|
||||
proc runRelease(ctx: FullCtxRef) =
|
||||
tracerFrameCtx("runRelease", ctx):
|
||||
|
@ -115,9 +115,9 @@ proc init*(
|
|||
exCtrlFile = none(string);
|
||||
): T =
|
||||
new result
|
||||
result.initSync(ethNode, chain, maxPeers, enableTicker, exCtrlFile)
|
||||
result.initSync(ethNode, chain, maxPeers, exCtrlFile)
|
||||
result.ctx.pool.rng = rng
|
||||
|
||||
result.ctx.pool.enableTicker = enableTicker
|
||||
|
||||
proc start*(ctx: FullSyncRef) =
|
||||
doAssert ctx.startSync()
|
||||
|
|
|
@ -1,133 +0,0 @@
|
|||
# Nimbus
|
||||
# Copyright (c) 2021 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
||||
# http://opensource.org/licenses/MIT)
|
||||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
import
|
||||
chronos,
|
||||
chronicles,
|
||||
eth/[common, p2p],
|
||||
stint,
|
||||
../../utils/prettify,
|
||||
../misc/timer_helper
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
logScope:
|
||||
topics = "full-tick"
|
||||
|
||||
type
|
||||
TickerStats* = object
|
||||
topPersistent*: BlockNumber
|
||||
nextUnprocessed*: Option[BlockNumber]
|
||||
nextStaged*: Option[BlockNumber]
|
||||
nStagedQueue*: int
|
||||
suspended*: bool
|
||||
reOrg*: bool
|
||||
|
||||
TickerStatsUpdater* =
|
||||
proc: TickerStats {.gcsafe, raises: [].}
|
||||
|
||||
TickerRef* = ref object
|
||||
nBuddies: int
|
||||
lastStats: TickerStats
|
||||
lastTick: uint64
|
||||
statsCb: TickerStatsUpdater
|
||||
logTicker: TimerCallback
|
||||
tick: uint64 # more than 5*10^11y before wrap when ticking every sec
|
||||
|
||||
const
|
||||
tickerStartDelay = 100.milliseconds
|
||||
tickerLogInterval = 1.seconds
|
||||
tickerLogSuppressMax = 100
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions: ticking log messages
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc pp(n: BlockNumber): string =
|
||||
"#" & $n
|
||||
|
||||
proc pp(n: Option[BlockNumber]): string =
|
||||
if n.isNone: "n/a" else: n.get.pp
|
||||
|
||||
proc setLogTicker(t: TickerRef; at: Moment) {.gcsafe.}
|
||||
|
||||
proc runLogTicker(t: TickerRef) {.gcsafe.} =
|
||||
let data = t.statsCb()
|
||||
|
||||
if data != t.lastStats or
|
||||
t.lastTick + tickerLogSuppressMax < t.tick:
|
||||
t.lastStats = data
|
||||
t.lastTick = t.tick
|
||||
let
|
||||
persistent = data.topPersistent.pp
|
||||
staged = data.nextStaged.pp
|
||||
unprocessed = data.nextUnprocessed.pp
|
||||
queued = data.nStagedQueue
|
||||
reOrg = if data.reOrg: "t" else: "f"
|
||||
|
||||
buddies = t.nBuddies
|
||||
tick = t.tick.toSI
|
||||
mem = getTotalMem().uint.toSI
|
||||
|
||||
if data.suspended:
|
||||
info "Sync statistics (suspended)", tick, buddies,
|
||||
persistent, unprocessed, staged, queued, reOrg, mem
|
||||
else:
|
||||
info "Sync statistics", tick, buddies,
|
||||
persistent, unprocessed, staged, queued, reOrg, mem
|
||||
|
||||
t.tick.inc
|
||||
t.setLogTicker(Moment.fromNow(tickerLogInterval))
|
||||
|
||||
|
||||
proc setLogTicker(t: TickerRef; at: Moment) =
|
||||
if not t.logTicker.isNil:
|
||||
t.logTicker = safeSetTimer(at, runLogTicker, t)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public constructor and start/stop functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc init*(T: type TickerRef; cb: TickerStatsUpdater): T =
|
||||
## Constructor
|
||||
T(statsCb: cb)
|
||||
|
||||
proc start*(t: TickerRef) =
|
||||
## Re/start ticker unconditionally
|
||||
#debug "Started ticker"
|
||||
t.logTicker = safeSetTimer(Moment.fromNow(tickerStartDelay), runLogTicker, t)
|
||||
|
||||
proc stop*(t: TickerRef) =
|
||||
## Stop ticker unconditionally
|
||||
t.logTicker = nil
|
||||
#debug "Stopped ticker"
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc startBuddy*(t: TickerRef) =
|
||||
## Increment buddies counter and start ticker unless running.
|
||||
if t.nBuddies <= 0:
|
||||
t.nBuddies = 1
|
||||
t.start()
|
||||
else:
|
||||
t.nBuddies.inc
|
||||
|
||||
proc stopBuddy*(t: TickerRef) =
|
||||
## Decrement buddies counter and stop ticker if there are no more registered
|
||||
## buddies.
|
||||
t.nBuddies.dec
|
||||
if t.nBuddies <= 0:
|
||||
t.stop()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
|
@ -15,8 +15,8 @@ import
|
|||
chronos,
|
||||
eth/p2p,
|
||||
".."/[protocol, sync_desc],
|
||||
../misc/[best_pivot, block_queue, sync_ctrl],
|
||||
"."/[ticker, worker_desc]
|
||||
../misc/[best_pivot, block_queue, sync_ctrl, ticker],
|
||||
./worker_desc
|
||||
|
||||
logScope:
|
||||
topics = "full-buddy"
|
||||
|
@ -66,15 +66,15 @@ proc topUsedNumber(
|
|||
ok(top)
|
||||
|
||||
|
||||
proc tickerUpdater(ctx: FullCtxRef): TickerStatsUpdater =
|
||||
result = proc: TickerStats =
|
||||
proc tickerUpdater(ctx: FullCtxRef): TickerFullStatsUpdater =
|
||||
result = proc: auto =
|
||||
var stats: BlockQueueStats
|
||||
ctx.pool.bCtx.blockQueueStats(stats)
|
||||
|
||||
let suspended =
|
||||
0 < ctx.pool.suspendAt and ctx.pool.suspendAt < stats.topAccepted
|
||||
|
||||
TickerStats(
|
||||
TickerFullStats(
|
||||
topPersistent: stats.topAccepted,
|
||||
nextStaged: stats.nextStaged,
|
||||
nextUnprocessed: stats.nextUnprocessed,
|
||||
|
@ -159,7 +159,7 @@ proc suspendDownload(buddy: FullBuddyRef): bool =
|
|||
# Public start/stop and admin functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc setup*(ctx: FullCtxRef; tickerOK: bool): bool =
|
||||
proc setup*(ctx: FullCtxRef): bool =
|
||||
## Global set up
|
||||
ctx.pool.pivot = BestPivotCtxRef.init(ctx.pool.rng)
|
||||
let rc = ctx.topUsedNumber(backBlocks = 0)
|
||||
|
@ -167,7 +167,7 @@ proc setup*(ctx: FullCtxRef; tickerOK: bool): bool =
|
|||
ctx.pool.bCtx = BlockQueueCtxRef.init()
|
||||
return false
|
||||
ctx.pool.bCtx = BlockQueueCtxRef.init(rc.value + 1)
|
||||
if tickerOK:
|
||||
if ctx.pool.enableTicker:
|
||||
ctx.pool.ticker = TickerRef.init(ctx.tickerUpdater)
|
||||
else:
|
||||
debug "Ticker is disabled"
|
||||
|
|
|
@ -14,8 +14,7 @@ import
|
|||
eth/p2p,
|
||||
chronos,
|
||||
../sync_desc,
|
||||
../misc/[best_pivot, block_queue],
|
||||
./ticker
|
||||
../misc/[best_pivot, block_queue, ticker]
|
||||
|
||||
type
|
||||
PivotState* = enum
|
||||
|
@ -38,6 +37,8 @@ type
|
|||
pivotStamp*: Moment ## `PivotState` driven timing control
|
||||
bCtx*: BlockQueueCtxRef ## Global block queue descriptor
|
||||
suspendAt*: BlockNumber ## Suspend if persistent head is larger
|
||||
|
||||
enableTicker*: bool ## Advisary, extra level of gossip
|
||||
ticker*: TickerRef ## Logger ticker
|
||||
|
||||
FullBuddyRef* = BuddyRef[FullCtxData,FullBuddyData]
|
||||
|
|
|
@ -39,7 +39,7 @@ type
|
|||
stoRoot: NodeKey # Storage root
|
||||
|
||||
const
|
||||
extraTraceMessages = false or true
|
||||
extraTraceMessages = false # or true
|
||||
## Enabled additional logging noise
|
||||
|
||||
estimatedNodeSize = hexaryRangeRlpNodesListSizeMax(1)
|
||||
|
|
|
@ -12,17 +12,17 @@
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options, strformat, strutils],
|
||||
std/[strformat, strutils],
|
||||
chronos,
|
||||
chronicles,
|
||||
eth/[common, p2p],
|
||||
stint,
|
||||
../../../utils/prettify,
|
||||
../../misc/timer_helper,
|
||||
../../types
|
||||
../../utils/prettify,
|
||||
../types,
|
||||
./timer_helper
|
||||
|
||||
logScope:
|
||||
topics = "snap-tick"
|
||||
topics = "tick"
|
||||
|
||||
type
|
||||
TickerSnapStatsUpdater* = proc: TickerSnapStats {.gcsafe, raises: [].}
|
||||
|
@ -236,7 +236,10 @@ proc stopImpl(t: TickerRef) =
|
|||
# Public constructor and start/stop functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc init*(T: type TickerRef; cb: TickerSnapStatsUpdater): T =
|
||||
proc init*(
|
||||
T: type TickerRef;
|
||||
cb: TickerSnapStatsUpdater|TickerFullStatsUpdater;
|
||||
): T =
|
||||
## Constructor
|
||||
new result
|
||||
result.initImpl(cb)
|
||||
|
@ -280,7 +283,7 @@ proc startBuddy*(t: TickerRef) =
|
|||
if not t.isNil:
|
||||
if t.nBuddies <= 0:
|
||||
t.nBuddies = 1
|
||||
if not t.fullMode and not t.snap.recovery:
|
||||
if t.fullMode or not t.snap.recovery:
|
||||
t.startImpl()
|
||||
when extraTraceMessages:
|
||||
debug logTxt "start buddy", fullMode=t.fullMode, nBuddies=t.nBuddies
|
|
@ -17,7 +17,7 @@ import
|
|||
../db/select_backend,
|
||||
../core/chain,
|
||||
./snap/[worker, worker_desc],
|
||||
"."/[protocol, sync_desc, sync_sched]
|
||||
"."/[protocol, sync_sched]
|
||||
|
||||
logScope:
|
||||
topics = "snap-sync"
|
||||
|
@ -70,9 +70,9 @@ template tracerFrameBuddy(f: static[string]; b: SnapBuddyRef; code: untyped) =
|
|||
# Virtual methods/interface, `mixin` functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc runSetup(ctx: SnapCtxRef; ticker: bool): bool =
|
||||
proc runSetup(ctx: SnapCtxRef): bool =
|
||||
tracerFrameCtx("runSetup", ctx):
|
||||
result = worker.setup(ctx,ticker)
|
||||
result = worker.setup(ctx)
|
||||
|
||||
proc runRelease(ctx: SnapCtxRef) =
|
||||
tracerFrameCtx("runRelease", ctx):
|
||||
|
@ -117,9 +117,10 @@ proc init*(
|
|||
exCtrlFile = none(string);
|
||||
): T =
|
||||
new result
|
||||
result.initSync(ethNode, chain, maxPeers, enableTicker, exCtrlFile)
|
||||
result.initSync(ethNode, chain, maxPeers, exCtrlFile)
|
||||
result.ctx.chain = chain # explicitely override
|
||||
result.ctx.pool.rng = rng
|
||||
result.ctx.pool.enableTicker = enableTicker
|
||||
result.ctx.pool.dbBackend = dbBackend
|
||||
# Required to have been initialised via `addEthHandlerCapability()`
|
||||
doAssert not result.ctx.ethWireCtx.isNil
|
||||
|
|
|
@ -118,7 +118,7 @@ const
|
|||
|
||||
# --------------
|
||||
|
||||
healAccountsCoverageTrigger* = 0.65 # 1.01 <--- will go away (debugging)
|
||||
healAccountsCoverageTrigger* = 1.01
|
||||
## Apply accounts healing if the global snap download coverage factor
|
||||
## exceeds this setting. The global coverage factor is derived by merging
|
||||
## all account ranges retrieved for all pivot state roots (see
|
||||
|
|
|
@ -20,6 +20,9 @@ import
|
|||
../protocol,
|
||||
../types
|
||||
|
||||
export
|
||||
types
|
||||
|
||||
type
|
||||
ByteArray32* = array[32,byte]
|
||||
## Used for 32 byte database keys
|
||||
|
|
|
@ -8,19 +8,24 @@
|
|||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
## Sync mode pass multiplexer
|
||||
## ==========================
|
||||
##
|
||||
## Pass state diagram:
|
||||
## ::
|
||||
## <init> -> <snap-sync> -> <full-sync> ---+
|
||||
## ^ |
|
||||
## | |
|
||||
## +----------+
|
||||
##
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
chronicles,
|
||||
chronos,
|
||||
eth/p2p,
|
||||
stew/[interval_set, keyed_queue],
|
||||
"../.."/[common, db/select_backend],
|
||||
../sync_desc,
|
||||
./worker/[play, ticker],
|
||||
./worker/com/com_error,
|
||||
./worker/db/snapdb_desc,
|
||||
"."/[range_desc, worker_desc]
|
||||
./range_desc,
|
||||
./worker/pass,
|
||||
./worker_desc
|
||||
|
||||
logScope:
|
||||
topics = "snap-worker"
|
||||
|
@ -35,70 +40,32 @@ template ignoreException(info: static[string]; code: untyped) =
|
|||
except CatchableError as e:
|
||||
error "Exception at " & info & ":", name=($e.name), msg=(e.msg)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc setupTicker(ctx: SnapCtxRef; tickerOK: bool) =
|
||||
let blindTicker = proc: TickerSnapStats =
|
||||
discard
|
||||
if tickerOK:
|
||||
ctx.pool.ticker = TickerRef.init(blindTicker)
|
||||
|
||||
proc releaseTicker(ctx: SnapCtxRef) =
|
||||
## Helper for `release()`
|
||||
ctx.pool.ticker.stop()
|
||||
ctx.pool.ticker = nil
|
||||
|
||||
# --------------
|
||||
|
||||
proc setupSnapDb(ctx: SnapCtxRef) =
|
||||
## Helper for `setup()`: Initialise snap sync database layer
|
||||
ctx.pool.snapDb =
|
||||
if ctx.pool.dbBackend.isNil: SnapDbRef.init(ctx.chain.db.db)
|
||||
else: SnapDbRef.init(ctx.pool.dbBackend)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public start/stop and admin functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool =
|
||||
proc setup*(ctx: SnapCtxRef): bool =
|
||||
## Global set up
|
||||
ctx.playSetup() # Set up sync sub-mode specs.
|
||||
ctx.setupSnapDb() # Set database backend, subject to change
|
||||
ctx.setupTicker(tickerOK) # Start log/status ticker (if any)
|
||||
|
||||
ctx.passInitSetup()
|
||||
ignoreException("setup"):
|
||||
ctx.playMethod.setup(ctx)
|
||||
|
||||
# Experimental, also used for debugging
|
||||
if ctx.exCtrlFile.isSome:
|
||||
warn "Snap sync accepts pivot block number or hash",
|
||||
syncCtrlFile=ctx.exCtrlFile.get
|
||||
ctx.passActor.setup(ctx)
|
||||
true
|
||||
|
||||
proc release*(ctx: SnapCtxRef) =
|
||||
## Global clean up
|
||||
ignoreException("release"):
|
||||
ctx.playMethod.release(ctx)
|
||||
|
||||
ctx.releaseTicker() # Stop log/status ticker (if any)
|
||||
ctx.playRelease() # Shut down sync methods
|
||||
|
||||
ctx.passActor.release(ctx)
|
||||
ctx.passInitRelease()
|
||||
|
||||
proc start*(buddy: SnapBuddyRef): bool =
|
||||
## Initialise worker peer
|
||||
let ctx = buddy.ctx
|
||||
ignoreException("start"):
|
||||
if ctx.playMethod.start(buddy):
|
||||
buddy.only.errors = ComErrorStatsRef()
|
||||
return true
|
||||
result = buddy.ctx.passActor.start(buddy)
|
||||
|
||||
proc stop*(buddy: SnapBuddyRef) =
|
||||
## Clean up this peer
|
||||
let ctx = buddy.ctx
|
||||
ignoreException("stop"):
|
||||
ctx.playMethod.stop(buddy)
|
||||
buddy.ctx.passActor.stop(buddy)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions, sync handler multiplexers
|
||||
|
@ -107,22 +74,22 @@ proc stop*(buddy: SnapBuddyRef) =
|
|||
proc runDaemon*(ctx: SnapCtxRef) {.async.} =
|
||||
## Sync processsing multiplexer
|
||||
ignoreException("runDaemon"):
|
||||
await ctx.playMethod.daemon(ctx)
|
||||
await ctx.passActor.daemon(ctx)
|
||||
|
||||
proc runSingle*(buddy: SnapBuddyRef) {.async.} =
|
||||
## Sync processsing multiplexer
|
||||
ignoreException("runSingle"):
|
||||
await buddy.ctx.playMethod.single(buddy)
|
||||
await buddy.ctx.passActor.single(buddy)
|
||||
|
||||
proc runPool*(buddy: SnapBuddyRef, last: bool; laps: int): bool =
|
||||
## Sync processsing multiplexer
|
||||
ignoreException("runPool"):
|
||||
result = buddy.ctx.playMethod.pool(buddy,last,laps)
|
||||
result = buddy.ctx.passActor.pool(buddy,last,laps)
|
||||
|
||||
proc runMulti*(buddy: SnapBuddyRef) {.async.} =
|
||||
## Sync processsing multiplexer
|
||||
ignoreException("runMulti"):
|
||||
await buddy.ctx.playMethod.multi(buddy)
|
||||
await buddy.ctx.passActor.multi(buddy)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
|
|
@ -154,7 +154,7 @@ type
|
|||
slot*: Option[int] ## May refer to indexed argument slots
|
||||
kind*: Option[NodeKind] ## Node type (if any)
|
||||
dangling*: seq[NodeSpecs] ## Missing inner sub-tries
|
||||
error*: HexaryError ## Error code, or `NothingSerious`
|
||||
error*: HexaryError ## Error code, or `HexaryError(0)`
|
||||
|
||||
static:
|
||||
# Not that there is no doubt about this ...
|
||||
|
|
|
@ -32,7 +32,7 @@ type
|
|||
dangling*: seq[NodeSpecs]
|
||||
|
||||
const
|
||||
extraTraceMessages = false or true
|
||||
extraTraceMessages = false # or true
|
||||
|
||||
proc getAccountFn*(ps: SnapDbAccountsRef): HexaryGetFn
|
||||
|
||||
|
@ -321,7 +321,7 @@ proc importRawAccountsNodes*(
|
|||
##
|
||||
## Additional node items might be reported if the node type is in the
|
||||
## argument set `reportNodes`. These reported items will have no error
|
||||
## code set (i.e. `NothingSerious`.)
|
||||
## code set (i.e. `HexaryError(0)`.)
|
||||
##
|
||||
let
|
||||
peer = ps.peer
|
||||
|
@ -336,7 +336,7 @@ proc importRawAccountsNodes*(
|
|||
if 0 < node.data.len: # otherwise ignore empty placeholder
|
||||
slot = some(n)
|
||||
var rep = db.hexaryImport(node)
|
||||
if rep.error != NothingSerious:
|
||||
if rep.error != HexaryError(0):
|
||||
rep.slot = slot
|
||||
result.add rep
|
||||
nErrors.inc
|
||||
|
|
|
@ -17,14 +17,14 @@ import
|
|||
../../../../db/[select_backend, storage_types],
|
||||
../../../protocol,
|
||||
../../range_desc,
|
||||
"."/[hexary_debug, hexary_desc, hexary_error, hexary_import, hexary_nearby,
|
||||
hexary_paths, rocky_bulk_load]
|
||||
"."/[hexary_desc, hexary_error, hexary_import, hexary_nearby, hexary_paths,
|
||||
rocky_bulk_load]
|
||||
|
||||
logScope:
|
||||
topics = "snap-db"
|
||||
|
||||
const
|
||||
extraTraceMessages = false or true
|
||||
extraTraceMessages = false # or true
|
||||
|
||||
RockyBulkCache* = "accounts.sst"
|
||||
## Name of temporary file to accomodate SST records for `rocksdb`
|
||||
|
@ -42,6 +42,9 @@ type
|
|||
base: SnapDbRef ## Back reference to common parameters
|
||||
root*: NodeKey ## Session DB root node key
|
||||
|
||||
when extraTraceMessages:
|
||||
import hexary_debug
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private debugging helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -221,7 +224,7 @@ proc mergeProofs*(
|
|||
|
||||
for n,rlpRec in proof:
|
||||
let report = xDb.hexaryImport(rlpRec.to(Blob), nodes, refs)
|
||||
if report.error != NothingSerious:
|
||||
if report.error != HexaryError(0):
|
||||
let error = report.error
|
||||
trace "mergeProofs()", peer, item=n, proofs=proof.len, error
|
||||
return err(error)
|
||||
|
|
|
@ -15,7 +15,6 @@ import
|
|||
chronicles,
|
||||
eth/[common, trie/db],
|
||||
../../../../db/kvstore_rocksdb,
|
||||
../../../types,
|
||||
../../range_desc,
|
||||
"."/[hexary_desc, hexary_error, rocky_bulk_load, snapdb_desc]
|
||||
|
||||
|
|
|
@ -8,14 +8,14 @@
|
|||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
eth/[common, rlp],
|
||||
stew/results,
|
||||
../../range_desc,
|
||||
"."/[hexary_error, snapdb_desc, snapdb_persistent]
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
type
|
||||
SnapDbPivotRegistry* = object
|
||||
predecessor*: NodeKey ## Predecessor key in chain, auto filled
|
||||
|
@ -28,9 +28,6 @@ type
|
|||
slotAccounts*: seq[NodeKey] ## List of accounts with missing storage slots
|
||||
ctraAccounts*: seq[NodeKey] ## List of accounts with missing contracts
|
||||
|
||||
const
|
||||
extraTraceMessages {.used.} = false or true
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
|
|
@ -30,7 +30,7 @@ type
|
|||
accKey: NodeKey ## Accounts address hash (curr.unused)
|
||||
|
||||
const
|
||||
extraTraceMessages = false or true
|
||||
extraTraceMessages = false # or true
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private helpers
|
||||
|
@ -338,7 +338,7 @@ proc importRawStorageSlotsNodes*(
|
|||
##
|
||||
## Additional node items might be reported if the node type is in the
|
||||
## argument set `reportNodes`. These reported items will have no error
|
||||
## code set (i.e. `NothingSerious`.)
|
||||
## code set (i.e. `HexaryError(0)`.)
|
||||
##
|
||||
let
|
||||
peer = ps.peer
|
||||
|
@ -353,7 +353,7 @@ proc importRawStorageSlotsNodes*(
|
|||
if 0 < node.data.len: # otherwise ignore empty placeholder
|
||||
slot = some(n)
|
||||
var rep = db.hexaryImport(node)
|
||||
if rep.error != NothingSerious:
|
||||
if rep.error != HexaryError(0):
|
||||
rep.slot = slot
|
||||
result.add rep
|
||||
nErrors.inc
|
||||
|
|
|
@ -17,14 +17,14 @@
|
|||
import
|
||||
std/sequtils,
|
||||
chronos,
|
||||
eth/[common, p2p, trie/trie_defs],
|
||||
eth/[common, p2p],
|
||||
stew/interval_set,
|
||||
"../../.."/[protocol, protocol/trace_config],
|
||||
"../.."/[constants, range_desc, worker_desc],
|
||||
./com_error
|
||||
./get_error
|
||||
|
||||
logScope:
|
||||
topics = "snap-fetch"
|
||||
topics = "snap-get"
|
||||
|
||||
type
|
||||
GetAccountRange* = object
|
||||
|
@ -32,6 +32,9 @@ type
|
|||
withStorage*: seq[AccountSlotsHeader] ## Accounts with storage root
|
||||
withContract*: seq[AccountCodeHeader] ## Accounts with contacts
|
||||
|
||||
const
|
||||
extraTraceMessages = false # or true
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -50,9 +53,9 @@ proc getAccountRangeReq(
|
|||
fetchRequestBytesLimit)
|
||||
return ok(reply)
|
||||
except CatchableError as e:
|
||||
let error {.used.} = e.msg
|
||||
trace trSnapRecvError & "waiting for GetAccountRange reply", peer, pivot,
|
||||
error
|
||||
when trSnapTracePacketsOk:
|
||||
trace trSnapRecvError & "waiting for GetAccountRange reply", peer, pivot,
|
||||
error=(e.msg)
|
||||
return err()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -64,7 +67,7 @@ proc getAccountRange*(
|
|||
stateRoot: Hash256; ## Current DB base (see `pivot` for logging)
|
||||
iv: NodeTagRange; ## Range to be fetched
|
||||
pivot: string; ## For logging, instead of `stateRoot`
|
||||
): Future[Result[GetAccountRange,ComError]] {.async.} =
|
||||
): Future[Result[GetAccountRange,GetError]] {.async.} =
|
||||
## Fetch data using the `snap#` protocol, returns the range covered.
|
||||
let
|
||||
peer {.used.} = buddy.peer
|
||||
|
@ -74,10 +77,11 @@ proc getAccountRange*(
|
|||
let snAccRange = block:
|
||||
let rc = await buddy.getAccountRangeReq(stateRoot, iv, pivot)
|
||||
if rc.isErr:
|
||||
return err(ComNetworkProblem)
|
||||
return err(GetNetworkProblem)
|
||||
if rc.value.isNone:
|
||||
trace trSnapRecvTimeoutWaiting & "for AccountRange", peer, pivot
|
||||
return err(ComResponseTimeout)
|
||||
when trSnapTracePacketsOk:
|
||||
trace trSnapRecvTimeoutWaiting & "for AccountRange", peer, pivot
|
||||
return err(GetResponseTimeout)
|
||||
rc.value.get
|
||||
|
||||
var dd = GetAccountRange(
|
||||
|
@ -119,13 +123,16 @@ proc getAccountRange*(
|
|||
# any) account after limitHash must be provided.
|
||||
if nProof == 0:
|
||||
# Maybe try another peer
|
||||
trace trSnapRecvReceived & "empty AccountRange", peer, pivot,
|
||||
nAccounts, nProof, accRange="n/a", reqRange=iv
|
||||
return err(ComNoAccountsForStateRoot)
|
||||
when trSnapTracePacketsOk:
|
||||
trace trSnapRecvReceived & "empty AccountRange", peer, pivot,
|
||||
nAccounts, nProof, accRange="n/a", reqRange=iv
|
||||
return err(GetNoAccountsForStateRoot)
|
||||
|
||||
# So there is no data and a proof.
|
||||
trace trSnapRecvReceived & "terminal AccountRange", peer, pivot, nAccounts,
|
||||
nProof, accRange=NodeTagRange.new(iv.minPt, high(NodeTag)), reqRange=iv
|
||||
when trSnapTracePacketsOk:
|
||||
trace trSnapRecvReceived & "terminal AccountRange", peer, pivot,
|
||||
nAccounts, nProof, accRange=NodeTagRange.new(iv.minPt, high(NodeTag)),
|
||||
reqRange=iv
|
||||
return ok(dd)
|
||||
|
||||
let (accMinPt, accMaxPt) = (
|
||||
|
@ -134,10 +141,11 @@ proc getAccountRange*(
|
|||
|
||||
if accMinPt < iv.minPt:
|
||||
# Not allowed
|
||||
trace trSnapRecvProtocolViolation & "min too small in AccountRange", peer,
|
||||
pivot, nAccounts, nProof, accRange=NodeTagRange.new(accMinPt, accMaxPt),
|
||||
reqRange=iv
|
||||
return err(ComAccountsMinTooSmall)
|
||||
when trSnapTracePacketsOk:
|
||||
trace trSnapRecvProtocolViolation & "min too small in AccountRange", peer,
|
||||
pivot, nAccounts, nProof, accRange=NodeTagRange.new(accMinPt, accMaxPt),
|
||||
reqRange=iv
|
||||
return err(GetAccountsMinTooSmall)
|
||||
|
||||
if iv.maxPt < accMaxPt:
|
||||
# github.com/ethereum/devp2p/blob/master/caps/snap.md#getaccountrange-0x00:
|
||||
|
@ -151,13 +159,16 @@ proc getAccountRange*(
|
|||
# limit (seen with Geth/v1.10.18-unstable-4b309c70-20220517.)
|
||||
if iv.maxPt < dd.data.accounts[^2].accKey.to(NodeTag):
|
||||
# The second largest should not excceed the top one requested.
|
||||
trace trSnapRecvProtocolViolation & "AccountRange top exceeded", peer,
|
||||
pivot, nAccounts, nProof,
|
||||
accRange=NodeTagRange.new(iv.minPt, accMaxPt), reqRange=iv
|
||||
return err(ComAccountsMaxTooLarge)
|
||||
when extraTraceMessages:
|
||||
when trSnapTracePacketsOk:
|
||||
trace trSnapRecvProtocolViolation & "AccountRange top exceeded",
|
||||
peer, pivot, nAccounts, nProof,
|
||||
accRange=NodeTagRange.new(iv.minPt, accMaxPt), reqRange=iv
|
||||
return err(GetAccountsMaxTooLarge)
|
||||
|
||||
trace trSnapRecvReceived & "AccountRange", peer, pivot, nAccounts, nProof,
|
||||
accRange=NodeTagRange.new(accMinPt, accMaxPt), reqRange=iv
|
||||
when trSnapTracePacketsOk:
|
||||
trace trSnapRecvReceived & "AccountRange", peer, pivot, nAccounts, nProof,
|
||||
accRange=NodeTagRange.new(accMinPt, accMaxPt), reqRange=iv
|
||||
|
||||
return ok(dd)
|
||||
|
|
@ -14,12 +14,12 @@ import
|
|||
chronos,
|
||||
eth/[common, p2p],
|
||||
stew/byteutils,
|
||||
"../../.."/[protocol, types],
|
||||
"../../.."/[protocol, protocol/trace_config, types],
|
||||
../../worker_desc,
|
||||
./com_error
|
||||
./get_error
|
||||
|
||||
logScope:
|
||||
topics = "snap-fetch"
|
||||
topics = "snap-get"
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
|
@ -28,7 +28,7 @@ logScope:
|
|||
proc getBlockHeader*(
|
||||
buddy: SnapBuddyRef;
|
||||
num: BlockNumber;
|
||||
): Future[Result[BlockHeader,ComError]]
|
||||
): Future[Result[BlockHeader,GetError]]
|
||||
{.async.} =
|
||||
## Get single block header
|
||||
let
|
||||
|
@ -42,38 +42,43 @@ proc getBlockHeader*(
|
|||
skip: 0,
|
||||
reverse: false)
|
||||
|
||||
trace trEthSendSendingGetBlockHeaders, peer, header=num.toStr, reqLen
|
||||
when trSnapTracePacketsOk:
|
||||
trace trEthSendSendingGetBlockHeaders, peer, header=num.toStr, reqLen
|
||||
|
||||
var hdrResp: Option[blockHeadersObj]
|
||||
try:
|
||||
hdrResp = await peer.getBlockHeaders(hdrReq)
|
||||
except CatchableError as e:
|
||||
trace trSnapRecvError & "waiting for GetByteCodes reply", peer,
|
||||
error=e.msg
|
||||
return err(ComNetworkProblem)
|
||||
when trSnapTracePacketsOk:
|
||||
trace trSnapRecvError & "waiting for GetByteCodes reply", peer,
|
||||
error=e.msg
|
||||
return err(GetNetworkProblem)
|
||||
|
||||
var hdrRespLen = 0
|
||||
if hdrResp.isSome:
|
||||
hdrRespLen = hdrResp.get.headers.len
|
||||
if hdrRespLen == 0:
|
||||
trace trEthRecvReceivedBlockHeaders, peer, reqLen, respose="n/a"
|
||||
return err(ComNoHeaderAvailable)
|
||||
when trSnapTracePacketsOk:
|
||||
trace trEthRecvReceivedBlockHeaders, peer, reqLen, respose="n/a"
|
||||
return err(GetNoHeaderAvailable)
|
||||
|
||||
if hdrRespLen == 1:
|
||||
let
|
||||
header = hdrResp.get.headers[0]
|
||||
blockNumber = header.blockNumber
|
||||
trace trEthRecvReceivedBlockHeaders, peer, hdrRespLen, blockNumber
|
||||
when trSnapTracePacketsOk:
|
||||
trace trEthRecvReceivedBlockHeaders, peer, hdrRespLen, blockNumber
|
||||
return ok(header)
|
||||
|
||||
trace trEthRecvReceivedBlockHeaders, peer, reqLen, hdrRespLen
|
||||
return err(ComTooManyHeaders)
|
||||
when trSnapTracePacketsOk:
|
||||
trace trEthRecvReceivedBlockHeaders, peer, reqLen, hdrRespLen
|
||||
return err(GetTooManyHeaders)
|
||||
|
||||
|
||||
proc getBlockHeader*(
|
||||
buddy: SnapBuddyRef;
|
||||
hash: Hash256;
|
||||
): Future[Result[BlockHeader,ComError]]
|
||||
): Future[Result[BlockHeader,GetError]]
|
||||
{.async.} =
|
||||
## Get single block header
|
||||
let
|
||||
|
@ -87,33 +92,38 @@ proc getBlockHeader*(
|
|||
skip: 0,
|
||||
reverse: false)
|
||||
|
||||
trace trEthSendSendingGetBlockHeaders, peer,
|
||||
header=hash.data.toHex, reqLen
|
||||
when trSnapTracePacketsOk:
|
||||
trace trEthSendSendingGetBlockHeaders, peer,
|
||||
header=hash.data.toHex, reqLen
|
||||
|
||||
var hdrResp: Option[blockHeadersObj]
|
||||
try:
|
||||
hdrResp = await peer.getBlockHeaders(hdrReq)
|
||||
except CatchableError as e:
|
||||
trace trSnapRecvError & "waiting for GetByteCodes reply", peer,
|
||||
error=e.msg
|
||||
return err(ComNetworkProblem)
|
||||
when trSnapTracePacketsOk:
|
||||
trace trSnapRecvError & "waiting for GetByteCodes reply", peer,
|
||||
error=e.msg
|
||||
return err(GetNetworkProblem)
|
||||
|
||||
var hdrRespLen = 0
|
||||
if hdrResp.isSome:
|
||||
hdrRespLen = hdrResp.get.headers.len
|
||||
if hdrRespLen == 0:
|
||||
trace trEthRecvReceivedBlockHeaders, peer, reqLen, respose="n/a"
|
||||
return err(ComNoHeaderAvailable)
|
||||
when trSnapTracePacketsOk:
|
||||
trace trEthRecvReceivedBlockHeaders, peer, reqLen, respose="n/a"
|
||||
return err(GetNoHeaderAvailable)
|
||||
|
||||
if hdrRespLen == 1:
|
||||
let
|
||||
header = hdrResp.get.headers[0]
|
||||
blockNumber = header.blockNumber
|
||||
trace trEthRecvReceivedBlockHeaders, peer, hdrRespLen, blockNumber
|
||||
when trSnapTracePacketsOk:
|
||||
trace trEthRecvReceivedBlockHeaders, peer, hdrRespLen, blockNumber
|
||||
return ok(header)
|
||||
|
||||
trace trEthRecvReceivedBlockHeaders, peer, reqLen, hdrRespLen
|
||||
return err(ComTooManyHeaders)
|
||||
when trSnapTracePacketsOk:
|
||||
trace trEthRecvReceivedBlockHeaders, peer, reqLen, hdrRespLen
|
||||
return err(GetTooManyHeaders)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
|
@ -8,20 +8,18 @@
|
|||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
## Note: this module is currently unused
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/[hashes, options, sequtils],
|
||||
std/[options, sequtils],
|
||||
chronos,
|
||||
eth/[common, p2p],
|
||||
"../../.."/[protocol, protocol/trace_config],
|
||||
"../.."/[constants, range_desc, worker_desc],
|
||||
./com_error
|
||||
./get_error
|
||||
|
||||
logScope:
|
||||
topics = "snap-fetch"
|
||||
topics = "snap-get"
|
||||
|
||||
type
|
||||
# SnapByteCodes* = object
|
||||
|
@ -32,9 +30,6 @@ type
|
|||
extra*: seq[(NodeKey,Blob)]
|
||||
kvPairs*: seq[(NodeKey,Blob)]
|
||||
|
||||
const
|
||||
emptyBlob = seq[byte].default
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -51,8 +46,9 @@ proc getByteCodesReq(
|
|||
return ok(reply)
|
||||
|
||||
except CatchableError as e:
|
||||
trace trSnapRecvError & "waiting for GetByteCodes reply", peer,
|
||||
error=e.msg
|
||||
when trSnapTracePacketsOk:
|
||||
trace trSnapRecvError & "waiting for GetByteCodes reply", peer,
|
||||
error=e.msg
|
||||
return err()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -62,7 +58,7 @@ proc getByteCodesReq(
|
|||
proc getByteCodes*(
|
||||
buddy: SnapBuddyRef;
|
||||
keys: seq[NodeKey],
|
||||
): Future[Result[GetByteCodes,ComError]]
|
||||
): Future[Result[GetByteCodes,GetError]]
|
||||
{.async.} =
|
||||
## Fetch data using the `snap#` protocol, returns the byte codes requested
|
||||
## (if any.)
|
||||
|
@ -71,7 +67,7 @@ proc getByteCodes*(
|
|||
nKeys = keys.len
|
||||
|
||||
if nKeys == 0:
|
||||
return err(ComEmptyRequestArguments)
|
||||
return err(GetEmptyRequestArguments)
|
||||
|
||||
if trSnapTracePacketsOk:
|
||||
trace trSnapSendSending & "GetByteCodes", peer, nkeys
|
||||
|
@ -79,14 +75,16 @@ proc getByteCodes*(
|
|||
let byteCodes = block:
|
||||
let rc = await buddy.getByteCodesReq keys.mapIt(it.to(Hash256))
|
||||
if rc.isErr:
|
||||
return err(ComNetworkProblem)
|
||||
return err(GetNetworkProblem)
|
||||
if rc.value.isNone:
|
||||
trace trSnapRecvTimeoutWaiting & "for reply to GetByteCodes", peer, nKeys
|
||||
return err(ComResponseTimeout)
|
||||
when trSnapTracePacketsOk:
|
||||
trace trSnapRecvTimeoutWaiting & "for reply to GetByteCodes", peer,
|
||||
nKeys
|
||||
return err(GetResponseTimeout)
|
||||
let blobs = rc.value.get.codes
|
||||
if nKeys < blobs.len:
|
||||
# Ooops, makes no sense
|
||||
return err(ComTooManyByteCodes)
|
||||
return err(GetTooManyByteCodes)
|
||||
blobs
|
||||
|
||||
let
|
||||
|
@ -104,8 +102,9 @@ proc getByteCodes*(
|
|||
# an empty response.
|
||||
# * If a bytecode is unavailable, the node must skip that slot and proceed
|
||||
# to the next one. The node must not return nil or other placeholders.
|
||||
trace trSnapRecvReceived & "empty ByteCodes", peer, nKeys, nCodes
|
||||
return err(ComNoByteCodesAvailable)
|
||||
when trSnapTracePacketsOk:
|
||||
trace trSnapRecvReceived & "empty ByteCodes", peer, nKeys, nCodes
|
||||
return err(GetNoByteCodesAvailable)
|
||||
|
||||
# Assemble return value
|
||||
var
|
||||
|
@ -122,8 +121,9 @@ proc getByteCodes*(
|
|||
|
||||
dd.leftOver = req.toSeq
|
||||
|
||||
trace trSnapRecvReceived & "ByteCodes", peer,
|
||||
nKeys, nCodes, nLeftOver=dd.leftOver.len, nExtra=dd.extra.len
|
||||
when trSnapTracePacketsOk:
|
||||
trace trSnapRecvReceived & "ByteCodes", peer,
|
||||
nKeys, nCodes, nLeftOver=dd.leftOver.len, nExtra=dd.extra.len
|
||||
|
||||
return ok(dd)
|
||||
|
|
@ -8,15 +8,15 @@
|
|||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
chronos,
|
||||
../../../sync_desc,
|
||||
../../constants
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
type
|
||||
ComErrorStatsRef* = ref object
|
||||
GetErrorStatsRef* = ref object
|
||||
## particular error counters so connections will not be cut immediately
|
||||
## after a particular error.
|
||||
peerDegraded*: bool
|
||||
|
@ -24,40 +24,40 @@ type
|
|||
nNoData*: uint
|
||||
nNetwork*: uint
|
||||
|
||||
ComError* = enum
|
||||
ComNothingSerious
|
||||
ComAccountsMaxTooLarge
|
||||
ComAccountsMinTooSmall
|
||||
ComEmptyAccountsArguments
|
||||
ComEmptyPartialRange
|
||||
ComEmptyRequestArguments
|
||||
ComNetworkProblem
|
||||
ComNoAccountsForStateRoot
|
||||
ComNoByteCodesAvailable
|
||||
ComNoHeaderAvailable
|
||||
ComNoStorageForAccounts
|
||||
ComNoTrieNodesAvailable
|
||||
ComResponseTimeout
|
||||
ComTooManyByteCodes
|
||||
ComTooManyHeaders
|
||||
ComTooManyStorageSlots
|
||||
ComTooManyTrieNodes
|
||||
GetError* = enum
|
||||
GetNothingSerious
|
||||
GetAccountsMaxTooLarge
|
||||
GetAccountsMinTooSmall
|
||||
GetEmptyAccountsArguments
|
||||
GetEmptyPartialRange
|
||||
GetEmptyRequestArguments
|
||||
GetNetworkProblem
|
||||
GetNoAccountsForStateRoot
|
||||
GetNoByteCodesAvailable
|
||||
GetNoHeaderAvailable
|
||||
GetNoStorageForAccounts
|
||||
GetNoTrieNodesAvailable
|
||||
GetResponseTimeout
|
||||
GetTooManyByteCodes
|
||||
GetTooManyHeaders
|
||||
GetTooManyStorageSlots
|
||||
GetTooManyTrieNodes
|
||||
|
||||
|
||||
proc resetComError*(stats: ComErrorStatsRef) =
|
||||
proc getErrorReset*(stats: GetErrorStatsRef) =
|
||||
## Reset error counts after successful network operation
|
||||
stats[].reset
|
||||
|
||||
proc stopAfterSeriousComError*(
|
||||
proc getErrorStopAfterSeriousOne*(
|
||||
ctrl: BuddyCtrlRef;
|
||||
error: ComError;
|
||||
stats: ComErrorStatsRef;
|
||||
error: GetError;
|
||||
stats: GetErrorStatsRef;
|
||||
): Future[bool]
|
||||
{.async.} =
|
||||
## Error handling after data protocol failed. Returns `true` if the current
|
||||
## worker should be terminated as *zombie*.
|
||||
case error:
|
||||
of ComResponseTimeout:
|
||||
of GetResponseTimeout:
|
||||
stats.nTimeouts.inc
|
||||
if comErrorsTimeoutMax < stats.nTimeouts:
|
||||
# Mark this peer dead, i.e. avoid fetching from this peer for a while
|
||||
|
@ -69,7 +69,7 @@ proc stopAfterSeriousComError*(
|
|||
# Otherwise try again some time later.
|
||||
await sleepAsync(comErrorsTimeoutSleepMSecs.milliseconds)
|
||||
|
||||
of ComNetworkProblem:
|
||||
of GetNetworkProblem:
|
||||
stats.nNetwork.inc
|
||||
if comErrorsNetworkMax < stats.nNetwork:
|
||||
ctrl.zombie = true
|
||||
|
@ -80,11 +80,11 @@ proc stopAfterSeriousComError*(
|
|||
# Otherwise try again some time later.
|
||||
await sleepAsync(comErrorsNetworkSleepMSecs.milliseconds)
|
||||
|
||||
of ComNoAccountsForStateRoot,
|
||||
ComNoByteCodesAvailable,
|
||||
ComNoStorageForAccounts,
|
||||
ComNoHeaderAvailable,
|
||||
ComNoTrieNodesAvailable:
|
||||
of GetNoAccountsForStateRoot,
|
||||
GetNoByteCodesAvailable,
|
||||
GetNoStorageForAccounts,
|
||||
GetNoHeaderAvailable,
|
||||
GetNoTrieNodesAvailable:
|
||||
stats.nNoData.inc
|
||||
if comErrorsNoDataMax < stats.nNoData:
|
||||
# Mark this peer dead, i.e. avoid fetching from this peer for a while
|
||||
|
@ -95,20 +95,20 @@ proc stopAfterSeriousComError*(
|
|||
# Otherwise try again some time later.
|
||||
await sleepAsync(comErrorsNoDataSleepMSecs.milliseconds)
|
||||
|
||||
of ComAccountsMinTooSmall,
|
||||
ComAccountsMaxTooLarge,
|
||||
ComTooManyByteCodes,
|
||||
ComTooManyHeaders,
|
||||
ComTooManyStorageSlots,
|
||||
ComTooManyTrieNodes:
|
||||
of GetAccountsMinTooSmall,
|
||||
GetAccountsMaxTooLarge,
|
||||
GetTooManyByteCodes,
|
||||
GetTooManyHeaders,
|
||||
GetTooManyStorageSlots,
|
||||
GetTooManyTrieNodes:
|
||||
# Mark this peer dead, i.e. avoid fetching from this peer for a while
|
||||
ctrl.zombie = true
|
||||
return true
|
||||
|
||||
of ComEmptyAccountsArguments,
|
||||
ComEmptyRequestArguments,
|
||||
ComEmptyPartialRange,
|
||||
ComNothingSerious:
|
||||
of GetEmptyAccountsArguments,
|
||||
GetEmptyRequestArguments,
|
||||
GetEmptyPartialRange,
|
||||
GetError(0):
|
||||
discard
|
||||
|
||||
# End
|
|
@ -18,10 +18,10 @@ import
|
|||
stew/interval_set,
|
||||
"../../.."/[protocol, protocol/trace_config],
|
||||
"../.."/[constants, range_desc, worker_desc],
|
||||
./com_error
|
||||
./get_error
|
||||
|
||||
logScope:
|
||||
topics = "snap-fetch"
|
||||
topics = "snap-get"
|
||||
|
||||
type
|
||||
# SnapStorage* = object
|
||||
|
@ -37,7 +37,7 @@ type
|
|||
data*: AccountStorageRange
|
||||
|
||||
const
|
||||
extraTraceMessages = false or true
|
||||
extraTraceMessages = false # or true
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions
|
||||
|
@ -71,8 +71,9 @@ proc getStorageRangesReq(
|
|||
return ok(reply)
|
||||
|
||||
except CatchableError as e:
|
||||
trace trSnapRecvError & "waiting for GetStorageRanges reply", peer, pivot,
|
||||
name=($e.name), error=(e.msg)
|
||||
when trSnapTracePacketsOk:
|
||||
trace trSnapRecvError & "waiting for GetStorageRanges reply", peer, pivot,
|
||||
name=($e.name), error=(e.msg)
|
||||
return err()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -84,7 +85,7 @@ proc getStorageRanges*(
|
|||
stateRoot: Hash256; ## Current DB base (`pivot` for logging)
|
||||
accounts: seq[AccountSlotsHeader]; ## List of per-account storage slots
|
||||
pivot: string; ## For logging, instead of `stateRoot`
|
||||
): Future[Result[GetStorageRanges,ComError]]
|
||||
): Future[Result[GetStorageRanges,GetError]]
|
||||
{.async.} =
|
||||
## Fetch data using the `snap/1` protocol, returns the range covered.
|
||||
##
|
||||
|
@ -94,7 +95,7 @@ proc getStorageRanges*(
|
|||
## are ignored for later accounts list items.)
|
||||
var nAccounts = accounts.len
|
||||
if nAccounts == 0:
|
||||
return err(ComEmptyAccountsArguments)
|
||||
return err(GetEmptyAccountsArguments)
|
||||
|
||||
let
|
||||
peer {.used.} = buddy.peer
|
||||
|
@ -112,16 +113,18 @@ proc getStorageRanges*(
|
|||
let rc = await buddy.getStorageRangesReq(stateRoot,
|
||||
accounts.mapIt(it.accKey.to(Hash256)), iv, pivot)
|
||||
if rc.isErr:
|
||||
return err(ComNetworkProblem)
|
||||
return err(GetNetworkProblem)
|
||||
if rc.value.isNone:
|
||||
trace trSnapRecvTimeoutWaiting & "for StorageRanges", peer, pivot,
|
||||
nAccounts
|
||||
return err(ComResponseTimeout)
|
||||
when trSnapTracePacketsOk:
|
||||
trace trSnapRecvTimeoutWaiting & "for StorageRanges", peer, pivot,
|
||||
nAccounts
|
||||
return err(GetResponseTimeout)
|
||||
if nAccounts < rc.value.get.slotLists.len:
|
||||
# Ooops, makes no sense
|
||||
trace trSnapRecvReceived & "too many slot lists", peer, pivot,
|
||||
nAccounts, nReceived=rc.value.get.slotLists.len
|
||||
return err(ComTooManyStorageSlots)
|
||||
when trSnapTracePacketsOk:
|
||||
trace trSnapRecvReceived & "too many slot lists", peer, pivot,
|
||||
nAccounts, nReceived=rc.value.get.slotLists.len
|
||||
return err(GetTooManyStorageSlots)
|
||||
rc.value.get
|
||||
|
||||
nSlotLists = snStoRanges.slotLists.len
|
||||
|
@ -136,9 +139,10 @@ proc getStorageRanges*(
|
|||
# for any requested account hash, it must return an empty reply. It is
|
||||
# the responsibility of the caller to query an state not older than 128
|
||||
# blocks; and the caller is expected to only ever query existing accounts.
|
||||
trace trSnapRecvReceived & "empty StorageRanges", peer, pivot,
|
||||
nAccounts, nSlotLists, nProof, firstAccount=accounts[0].accKey
|
||||
return err(ComNoStorageForAccounts)
|
||||
when trSnapTracePacketsOk:
|
||||
trace trSnapRecvReceived & "empty StorageRanges", peer, pivot,
|
||||
nAccounts, nSlotLists, nProof, firstAccount=accounts[0].accKey
|
||||
return err(GetNoStorageForAccounts)
|
||||
|
||||
# Assemble return structure for given peer response
|
||||
var dd = GetStorageRanges(
|
|
@ -16,10 +16,10 @@ import
|
|||
eth/[common, p2p],
|
||||
"../../.."/[protocol, protocol/trace_config],
|
||||
"../.."/[constants, range_desc, worker_desc],
|
||||
./com_error
|
||||
./get_error
|
||||
|
||||
logScope:
|
||||
topics = "snap-fetch"
|
||||
topics = "snap-get"
|
||||
|
||||
type
|
||||
# SnapTrieNodes = object
|
||||
|
@ -54,8 +54,9 @@ proc getTrieNodesReq(
|
|||
|
||||
except CatchableError as e:
|
||||
let error {.used.} = e.msg
|
||||
trace trSnapRecvError & "waiting for GetByteCodes reply", peer, pivot,
|
||||
error
|
||||
when trSnapTracePacketsOk:
|
||||
trace trSnapRecvError & "waiting for GetByteCodes reply", peer, pivot,
|
||||
error
|
||||
return err()
|
||||
|
||||
|
||||
|
@ -108,7 +109,7 @@ proc getTrieNodes*(
|
|||
stateRoot: Hash256; # Current DB base (see `pivot` for logging)
|
||||
paths: seq[SnapTriePaths]; # Nodes to fetch
|
||||
pivot: string; # For logging, instead of `stateRoot`
|
||||
): Future[Result[GetTrieNodes,ComError]]
|
||||
): Future[Result[GetTrieNodes,GetError]]
|
||||
{.async.} =
|
||||
## Fetch data using the `snap#` protocol, returns the trie nodes requested
|
||||
## (if any.)
|
||||
|
@ -117,7 +118,7 @@ proc getTrieNodes*(
|
|||
nGroups = paths.len
|
||||
|
||||
if nGroups == 0:
|
||||
return err(ComEmptyRequestArguments)
|
||||
return err(GetEmptyRequestArguments)
|
||||
|
||||
let nTotal = paths.mapIt(max(1,it.slotPaths.len)).foldl(a+b, 0)
|
||||
|
||||
|
@ -127,16 +128,18 @@ proc getTrieNodes*(
|
|||
let trieNodes = block:
|
||||
let rc = await buddy.getTrieNodesReq(stateRoot, paths, pivot)
|
||||
if rc.isErr:
|
||||
return err(ComNetworkProblem)
|
||||
return err(GetNetworkProblem)
|
||||
if rc.value.isNone:
|
||||
trace trSnapRecvTimeoutWaiting & "for TrieNodes", peer, pivot, nGroups
|
||||
return err(ComResponseTimeout)
|
||||
when trSnapTracePacketsOk:
|
||||
trace trSnapRecvTimeoutWaiting & "for TrieNodes", peer, pivot, nGroups
|
||||
return err(GetResponseTimeout)
|
||||
let blobs = rc.value.get.nodes
|
||||
if nTotal < blobs.len:
|
||||
# Ooops, makes no sense
|
||||
trace trSnapRecvError & "too many TrieNodes", peer, pivot,
|
||||
nGroups, nExpected=nTotal, nReceived=blobs.len
|
||||
return err(ComTooManyTrieNodes)
|
||||
when trSnapTracePacketsOk:
|
||||
trace trSnapRecvError & "too many TrieNodes", peer, pivot,
|
||||
nGroups, nExpected=nTotal, nReceived=blobs.len
|
||||
return err(GetTooManyTrieNodes)
|
||||
blobs
|
||||
|
||||
let
|
||||
|
@ -155,8 +158,9 @@ proc getTrieNodes*(
|
|||
# nodes.
|
||||
# * The responding node is allowed to return less data than requested
|
||||
# (serving QoS limits), but the node must return at least one trie node.
|
||||
trace trSnapRecvReceived & "empty TrieNodes", peer, pivot, nGroups, nNodes
|
||||
return err(ComNoByteCodesAvailable)
|
||||
when trSnapTracePacketsOk:
|
||||
trace trSnapRecvReceived & "empty TrieNodes", peer, pivot, nGroups, nNodes
|
||||
return err(GetNoByteCodesAvailable)
|
||||
|
||||
# Assemble return value
|
||||
var
|
||||
|
@ -173,8 +177,9 @@ proc getTrieNodes*(
|
|||
if trieNodes.len <= inx:
|
||||
break
|
||||
|
||||
trace trSnapRecvReceived & "TrieNodes", peer, pivot,
|
||||
nGroups, nNodes, nLeftOver=dd.leftOver.len
|
||||
when trSnapTracePacketsOk:
|
||||
trace trSnapRecvReceived & "TrieNodes", peer, pivot,
|
||||
nGroups, nNodes, nLeftOver=dd.leftOver.len
|
||||
|
||||
return ok(dd)
|
||||
|
|
@ -0,0 +1,18 @@
|
|||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
||||
# http://opensource.org/licenses/MIT)
|
||||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
./pass/[pass_desc, pass_init]
|
||||
|
||||
export
|
||||
PassActorRef,
|
||||
passActor,
|
||||
pass_init
|
||||
|
||||
# End
|
|
@ -9,54 +9,53 @@
|
|||
|
||||
import
|
||||
chronos,
|
||||
../../../sync_desc,
|
||||
../../worker_desc
|
||||
|
||||
type
|
||||
PlayVoidFutureCtxFn* = proc(
|
||||
PassVoidFutureCtxFn* = proc(
|
||||
ctx: SnapCtxRef): Future[void]
|
||||
{.gcsafe, raises: [CatchableError].}
|
||||
|
||||
PlayVoidCtxFn* = proc(
|
||||
PassVoidCtxFn* = proc(
|
||||
ctx: SnapCtxRef)
|
||||
{.gcsafe, raises: [CatchableError].}
|
||||
|
||||
|
||||
PlayVoidFutureBuddyFn* = proc(
|
||||
PassVoidFutureBuddyFn* = proc(
|
||||
buddy: SnapBuddyRef): Future[void]
|
||||
{.gcsafe, raises: [CatchableError].}
|
||||
|
||||
PlayBoolBuddyBoolIntFn* = proc(
|
||||
PassBoolBuddyBoolIntFn* = proc(
|
||||
buddy: SnapBuddyRef; last: bool; laps: int): bool
|
||||
{.gcsafe, raises: [CatchableError].}
|
||||
|
||||
PlayBoolBuddyFn* = proc(
|
||||
PassBoolBuddyFn* = proc(
|
||||
buddy: SnapBuddyRef): bool
|
||||
{.gcsafe, raises: [CatchableError].}
|
||||
|
||||
PlayVoidBuddyFn* = proc(
|
||||
PassVoidBuddyFn* = proc(
|
||||
buddy: SnapBuddyRef)
|
||||
{.gcsafe, raises: [CatchableError].}
|
||||
|
||||
|
||||
PlaySyncSpecs* = ref object of RootRef
|
||||
PassActorRef* = ref object of RootRef
|
||||
## Holds sync mode specs & methods for a particular sync state
|
||||
setup*: PlayVoidCtxFn
|
||||
release*: PlayVoidCtxFn
|
||||
start*: PlayBoolBuddyFn
|
||||
stop*: PlayVoidBuddyFn
|
||||
pool*: PlayBoolBuddyBoolIntFn
|
||||
daemon*: PlayVoidFutureCtxFn
|
||||
single*: PlayVoidFutureBuddyFn
|
||||
multi*: PlayVoidFutureBuddyFn
|
||||
setup*: PassVoidCtxFn
|
||||
release*: PassVoidCtxFn
|
||||
start*: PassBoolBuddyFn
|
||||
stop*: PassVoidBuddyFn
|
||||
pool*: PassBoolBuddyBoolIntFn
|
||||
daemon*: PassVoidFutureCtxFn
|
||||
single*: PassVoidFutureBuddyFn
|
||||
multi*: PassVoidFutureBuddyFn
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc playMethod*(ctx: SnapCtxRef): PlaySyncSpecs =
|
||||
proc passActor*(ctx: SnapCtxRef): PassActorRef =
|
||||
## Getter
|
||||
ctx.pool.syncMode.tab[ctx.pool.syncMode.active].PlaySyncSpecs
|
||||
ctx.pool.syncMode.tab[ctx.pool.syncMode.active].PassActorRef
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
|
@ -14,17 +14,24 @@ import
|
|||
chronicles,
|
||||
chronos,
|
||||
eth/p2p,
|
||||
../../../misc/[best_pivot, block_queue],
|
||||
"../../.."/[protocol, sync_desc, types],
|
||||
stew/keyed_queue,
|
||||
../../../misc/[best_pivot, block_queue, ticker],
|
||||
../../../protocol,
|
||||
"../.."/[range_desc, worker_desc],
|
||||
../db/[snapdb_desc, snapdb_persistent],
|
||||
".."/[pivot, ticker],
|
||||
play_desc
|
||||
../get/get_error,
|
||||
./pass_desc
|
||||
|
||||
const
|
||||
extraTraceMessages = false or true
|
||||
extraTraceMessages = false # or true
|
||||
## Enabled additional logging noise
|
||||
|
||||
dumpDatabaseOnRollOver = true # or false # <--- will go away (debugging only)
|
||||
## Dump database before switching to full sync (debugging, testing)
|
||||
|
||||
when dumpDatabaseOnRollOver:
|
||||
import ../../../../../tests/replay/undump_kvp
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -122,8 +129,8 @@ proc processStaged(buddy: SnapBuddyRef): bool =
|
|||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc fullSyncSetup(ctx: SnapCtxRef) =
|
||||
let blockNum = if ctx.pool.fullPivot.isNil: ctx.pool.pivotTable.topNumber
|
||||
else: ctx.pool.fullPivot.stateHeader.blockNumber
|
||||
let blockNum = if ctx.pool.fullHeader.isNone: 0.toBlockNumber
|
||||
else: ctx.pool.fullHeader.unsafeGet.blockNumber
|
||||
|
||||
ctx.pool.bCtx = BlockQueueCtxRef.init(blockNum + 1)
|
||||
ctx.pool.bPivot = BestPivotCtxRef.init(rng=ctx.pool.rng, minPeers=0)
|
||||
|
@ -148,6 +155,7 @@ proc fullSyncStart(buddy: SnapBuddyRef): bool =
|
|||
|
||||
ctx.pool.ticker.startBuddy()
|
||||
buddy.ctrl.multiOk = false # confirm default mode for soft restart
|
||||
buddy.only.errors = GetErrorStatsRef()
|
||||
return true
|
||||
|
||||
proc fullSyncStop(buddy: SnapBuddyRef) =
|
||||
|
@ -163,34 +171,39 @@ proc fullSyncDaemon(ctx: SnapCtxRef) {.async.} =
|
|||
|
||||
|
||||
proc fullSyncPool(buddy: SnapBuddyRef, last: bool; laps: int): bool =
|
||||
let
|
||||
ctx = buddy.ctx
|
||||
env = ctx.pool.fullPivot
|
||||
let ctx = buddy.ctx
|
||||
|
||||
# Take over soft restart after switch to full sync mode.
|
||||
# This process needs to be applied to all buddy peers.
|
||||
if not env.isNil:
|
||||
if ctx.pool.fullHeader.isSome:
|
||||
# Soft start all peers on the second lap.
|
||||
ignoreException("fullSyncPool"):
|
||||
if not ctx.playMethod.start(buddy):
|
||||
if not buddy.fullSyncStart():
|
||||
# Start() method failed => wait for another peer
|
||||
buddy.ctrl.stopped = true
|
||||
if last:
|
||||
let stateHeader = ctx.pool.fullHeader.unsafeGet
|
||||
trace logTxt "soft restart done", peer=buddy.peer, last, laps,
|
||||
pivot=env.stateHeader.blockNumber.toStr,
|
||||
pivot=stateHeader.blockNumber.toStr,
|
||||
mode=ctx.pool.syncMode.active, state= buddy.ctrl.state
|
||||
|
||||
# Kick off ticker (was stopped by snap `release()` method)
|
||||
ctx.pool.ticker.start()
|
||||
|
||||
# Store pivot as parent hash in database
|
||||
ctx.pool.snapDb.kvDb.persistentBlockHeaderPut env.stateHeader
|
||||
ctx.pool.snapDb.kvDb.persistentBlockHeaderPut stateHeader
|
||||
|
||||
# Instead of genesis.
|
||||
ctx.chain.com.startOfHistory = env.stateHeader.blockHash
|
||||
ctx.chain.com.startOfHistory = stateHeader.blockHash
|
||||
|
||||
when dumpDatabaseOnRollOver: # <--- will go away (debugging only)
|
||||
# Dump database ... <--- will go away (debugging only)
|
||||
let nRecords = # <--- will go away (debugging only)
|
||||
ctx.pool.snapDb.rockDb.dumpAllDb # <--- will go away (debugging only)
|
||||
trace logTxt "dumped block chain database", nRecords
|
||||
|
||||
# Reset so that this action would not be triggered, again
|
||||
ctx.pool.fullPivot = nil
|
||||
ctx.pool.fullHeader = none(BlockHeader)
|
||||
return false # do stop magically when looping over peers is exhausted
|
||||
|
||||
# Mind the gap, fill in if necessary (function is peer independent)
|
||||
|
@ -250,9 +263,9 @@ proc fullSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} =
|
|||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc playFullSyncSpecs*: PlaySyncSpecs =
|
||||
proc passFull*: auto =
|
||||
## Return full sync handler environment
|
||||
PlaySyncSpecs(
|
||||
PassActorRef(
|
||||
setup: fullSyncSetup,
|
||||
release: fullSyncRelease,
|
||||
start: fullSyncStart,
|
|
@ -0,0 +1,80 @@
|
|||
# Nimbus
|
||||
# Copyright (c) 2021 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
||||
# http://opensource.org/licenses/MIT)
|
||||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
chronicles,
|
||||
"../../../.."/[common, db/select_backend],
|
||||
../../../misc/ticker,
|
||||
../../worker_desc,
|
||||
../db/snapdb_desc,
|
||||
"."/[pass_full, pass_snap]
|
||||
|
||||
logScope:
|
||||
topics = "snap-init"
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc setupPass(ctx: SnapCtxRef) =
|
||||
## Set up sync mode specs table. This cannot be done at compile time.
|
||||
ctx.pool.syncMode.tab[SnapSyncMode] = passSnap()
|
||||
ctx.pool.syncMode.tab[FullSyncMode] = passFull()
|
||||
ctx.pool.syncMode.active = SnapSyncMode
|
||||
|
||||
proc releasePass(ctx: SnapCtxRef) =
|
||||
discard
|
||||
|
||||
# --------------
|
||||
|
||||
proc setupTicker(ctx: SnapCtxRef) =
|
||||
let blindTicker: TickerSnapStatsUpdater = proc: TickerSnapStats =
|
||||
discard
|
||||
if ctx.pool.enableTicker:
|
||||
ctx.pool.ticker = TickerRef.init(blindTicker)
|
||||
|
||||
proc releaseTicker(ctx: SnapCtxRef) =
|
||||
## Helper for `release()`
|
||||
ctx.pool.ticker.stop()
|
||||
ctx.pool.ticker = nil
|
||||
|
||||
# --------------
|
||||
|
||||
proc setupSnapDb(ctx: SnapCtxRef) =
|
||||
## Helper for `setup()`: Initialise snap sync database layer
|
||||
ctx.pool.snapDb =
|
||||
if ctx.pool.dbBackend.isNil: SnapDbRef.init(ctx.chain.db.db)
|
||||
else: SnapDbRef.init(ctx.pool.dbBackend)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public start/stop and admin functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc passInitSetup*(ctx: SnapCtxRef) =
|
||||
## Global set up
|
||||
ctx.setupPass() # Set up sync sub-mode specs.
|
||||
ctx.setupSnapDb() # Set database backend, subject to change
|
||||
ctx.setupTicker() # Start log/status ticker (if any)
|
||||
|
||||
# Experimental, also used for debugging
|
||||
if ctx.exCtrlFile.isSome:
|
||||
warn "Snap sync accepts pivot block number or hash",
|
||||
syncCtrlFile=ctx.exCtrlFile.get
|
||||
|
||||
proc passInitRelease*(ctx: SnapCtxRef) =
|
||||
## Global clean up
|
||||
ctx.releaseTicker() # Stop log/status ticker (if any)
|
||||
ctx.releasePass() # Shut down sync methods
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
|
@ -15,20 +15,29 @@ import
|
|||
chronos,
|
||||
eth/p2p,
|
||||
stew/[interval_set, keyed_queue],
|
||||
"../../.."/[handlers/eth, protocol, sync_desc, types],
|
||||
".."/[pivot, ticker],
|
||||
../pivot/storage_queue_helper,
|
||||
"../../.."/[handlers/eth, misc/ticker, protocol],
|
||||
"../.."/[range_desc, worker_desc],
|
||||
../db/[hexary_desc, snapdb_pivot],
|
||||
"../.."/[range_desc, update_beacon_header, worker_desc],
|
||||
play_desc
|
||||
../get/get_error,
|
||||
./pass_desc,
|
||||
./pass_snap/helper/[beacon_header, storage_queue],
|
||||
./pass_snap/pivot
|
||||
|
||||
logScope:
|
||||
topics = "snap-play"
|
||||
|
||||
const
|
||||
extraTraceMessages = false or true
|
||||
extraTraceMessages = false # or true
|
||||
## Enabled additional logging noise
|
||||
|
||||
extraScrutinyDoubleCheckCompleteness = 1_000_000
|
||||
## Double check database whether it is complete (debugging, testing). This
|
||||
## action is slow and intended for debugging and testing use, only. The
|
||||
## numeric value limits the action to the maximal number of account in the
|
||||
## database.
|
||||
##
|
||||
## Set to `0` to disable.
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -136,12 +145,14 @@ proc snapSyncCompleteOk(
|
|||
## debugging, only and should not be used on a large database as it uses
|
||||
## quite a bit of computation ressources.
|
||||
if env.pivotCompleteOk():
|
||||
if env.nAccounts <= 1_000_000: # Larger sizes might be infeasible
|
||||
if not await env.pivotVerifyComplete(ctx):
|
||||
error logTxt "inconsistent state, pivot incomplete",
|
||||
pivot = env.stateHeader.blockNumber.toStr
|
||||
return false
|
||||
ctx.pool.fullPivot = env
|
||||
when 0 < extraScrutinyDoubleCheckCompleteness:
|
||||
# Larger sizes might be infeasible
|
||||
if env.nAccounts <= extraScrutinyDoubleCheckCompleteness:
|
||||
if not await env.pivotVerifyComplete(ctx):
|
||||
error logTxt "inconsistent state, pivot incomplete",
|
||||
pivot=env.stateHeader.blockNumber.toStr, nAccounts=env.nAccounts
|
||||
return false
|
||||
ctx.pool.completePivot = env
|
||||
ctx.poolMode = true # Fast sync mode must be synchronized among all peers
|
||||
return true
|
||||
|
||||
|
@ -172,6 +183,7 @@ proc snapSyncStart(buddy: SnapBuddyRef): bool =
|
|||
peer.state(protocol.eth).initialized:
|
||||
ctx.pool.ticker.startBuddy()
|
||||
buddy.ctrl.multiOk = false # confirm default mode for soft restart
|
||||
buddy.only.errors = GetErrorStatsRef()
|
||||
return true
|
||||
|
||||
proc snapSyncStop(buddy: SnapBuddyRef) =
|
||||
|
@ -186,14 +198,14 @@ proc snapSyncPool(buddy: SnapBuddyRef, last: bool, laps: int): bool =
|
|||
##
|
||||
let
|
||||
ctx = buddy.ctx
|
||||
env = ctx.pool.fullPivot
|
||||
env = ctx.pool.completePivot
|
||||
|
||||
# Check whether the snapshot is complete. If so, switch to full sync mode.
|
||||
# This process needs to be applied to all buddy peers.
|
||||
if not env.isNil:
|
||||
ignoreException("snapSyncPool"):
|
||||
# Stop all peers
|
||||
ctx.playMethod.stop(buddy)
|
||||
buddy.snapSyncStop()
|
||||
# After the last buddy peer was stopped switch to full sync mode
|
||||
# and repeat that loop over buddy peers for re-starting them.
|
||||
if last:
|
||||
|
@ -201,10 +213,12 @@ proc snapSyncPool(buddy: SnapBuddyRef, last: bool, laps: int): bool =
|
|||
trace logTxt "switch to full sync", peer=buddy.peer, last, laps,
|
||||
pivot=env.stateHeader.blockNumber.toStr,
|
||||
mode=ctx.pool.syncMode.active, state= buddy.ctrl.state
|
||||
ctx.playMethod.release(ctx)
|
||||
ctx.snapSyncRelease()
|
||||
ctx.pool.syncMode.active = FullSyncMode
|
||||
ctx.playMethod.setup(ctx)
|
||||
ctx.poolMode = true # repeat looping over peers
|
||||
ctx.passActor.setup(ctx)
|
||||
ctx.poolMode = true # repeat looping over peers
|
||||
ctx.pool.fullHeader = some(env.stateHeader) # Full sync start here
|
||||
|
||||
return false # do stop magically when looping over peers is exhausted
|
||||
|
||||
# Clean up empty pivot slots (never the top one.) This needs to be run on
|
||||
|
@ -239,7 +253,7 @@ proc snapSyncSingle(buddy: SnapBuddyRef) {.async.} =
|
|||
## * `buddy.ctrl.poolMode` is `false`
|
||||
##
|
||||
# External beacon header updater
|
||||
await buddy.updateBeaconHeaderFromFile()
|
||||
await buddy.beaconHeaderUpdateFromFile()
|
||||
|
||||
# Dedicate some process cycles to the recovery process (if any)
|
||||
if not buddy.ctx.pool.recovery.isNil:
|
||||
|
@ -322,9 +336,9 @@ proc snapSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} =
|
|||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc playSnapSyncSpecs*: PlaySyncSpecs =
|
||||
proc passSnap*: auto =
|
||||
## Return snap sync handler environment
|
||||
PlaySyncSpecs(
|
||||
PassActorRef(
|
||||
setup: snapSyncSetup,
|
||||
release: snapSyncRelease,
|
||||
start: snapSyncStart,
|
|
@ -44,13 +44,13 @@ import
|
|||
chronos,
|
||||
eth/[common, p2p, trie/nibbles, trie/trie_defs, rlp],
|
||||
stew/[byteutils, interval_set, keyed_queue],
|
||||
../../../../utils/prettify,
|
||||
"../../.."/[sync_desc, protocol, types],
|
||||
"../.."/[constants, range_desc, worker_desc],
|
||||
../com/[com_error, get_trie_nodes],
|
||||
../db/[hexary_desc, hexary_envelope, hexary_error, hexary_nearby,
|
||||
hexary_paths, hexary_range, snapdb_accounts],
|
||||
"."/[find_missing_nodes, storage_queue_helper, swap_in]
|
||||
../../../../../utils/prettify,
|
||||
../../../../protocol,
|
||||
"../../.."/[constants, range_desc, worker_desc],
|
||||
../../get/[get_error, get_trie_nodes],
|
||||
../../db/[hexary_desc, hexary_envelope, hexary_error, hexary_nearby,
|
||||
hexary_paths, hexary_range, snapdb_accounts],
|
||||
./helper/[missing_nodes, storage_queue, swap_in]
|
||||
|
||||
logScope:
|
||||
topics = "snap-acc"
|
||||
|
@ -123,7 +123,7 @@ proc compileMissingNodesList(
|
|||
discard ctx.swapInAccounts(env)
|
||||
|
||||
if not fa.processed.isFull:
|
||||
let mlv = await fa.findMissingNodes(
|
||||
let mlv = await fa.missingNodesFind(
|
||||
rootKey, getFn,
|
||||
healAccountsInspectionPlanBLevel,
|
||||
healAccountsInspectionPlanBRetryMax,
|
||||
|
@ -177,7 +177,7 @@ proc getNodesFromNetwork(
|
|||
let rc = await buddy.getTrieNodes(rootHash, pathList, pivot)
|
||||
if rc.isOk:
|
||||
# Reset error counts for detecting repeated timeouts, network errors, etc.
|
||||
buddy.only.errors.resetComError()
|
||||
buddy.only.errors.getErrorReset()
|
||||
|
||||
# Forget about unfetched missing nodes, will be picked up later
|
||||
return rc.value.nodes.mapIt(NodeSpecs(
|
||||
|
@ -188,7 +188,8 @@ proc getNodesFromNetwork(
|
|||
# Process error ...
|
||||
let
|
||||
error = rc.error
|
||||
ok = await buddy.ctrl.stopAfterSeriousComError(error, buddy.only.errors)
|
||||
ok = await buddy.ctrl.getErrorStopAfterSeriousOne(
|
||||
error, buddy.only.errors)
|
||||
when extraTraceMessages:
|
||||
trace logTxt "reply error", peer, ctx=buddy.healingCtx(env),
|
||||
error, stop=ok
|
|
@ -46,13 +46,13 @@ import
|
|||
chronos,
|
||||
eth/[common, p2p, trie/nibbles],
|
||||
stew/[byteutils, interval_set, keyed_queue],
|
||||
../../../../utils/prettify,
|
||||
"../../.."/[sync_desc, protocol, types],
|
||||
"../.."/[constants, range_desc, worker_desc],
|
||||
../com/[com_error, get_trie_nodes],
|
||||
../db/[hexary_desc, hexary_envelope, hexary_error, hexary_range,
|
||||
snapdb_storage_slots],
|
||||
"."/[find_missing_nodes, storage_queue_helper]
|
||||
../../../../../utils/prettify,
|
||||
../../../../protocol,
|
||||
"../../.."/[constants, range_desc, worker_desc],
|
||||
../../get/[get_error, get_trie_nodes],
|
||||
../../db/[hexary_desc, hexary_envelope, hexary_error, hexary_range,
|
||||
snapdb_storage_slots],
|
||||
./helper/[missing_nodes, storage_queue]
|
||||
|
||||
logScope:
|
||||
topics = "snap-slot"
|
||||
|
@ -136,7 +136,7 @@ proc compileMissingNodesList(
|
|||
getFn = ctx.pool.snapDb.getStorageSlotsFn(kvp.data.accKey)
|
||||
|
||||
if not slots.processed.isFull:
|
||||
let mlv = await slots.findMissingNodes(
|
||||
let mlv = await slots.missingNodesFind(
|
||||
rootKey, getFn,
|
||||
healStorageSlotsInspectionPlanBLevel,
|
||||
healStorageSlotsInspectionPlanBRetryMax,
|
||||
|
@ -192,7 +192,7 @@ proc getNodesFromNetwork(
|
|||
let rc = await buddy.getTrieNodes(rootHash, @[req], pivot)
|
||||
if rc.isOk:
|
||||
# Reset error counts for detecting repeated timeouts, network errors, etc.
|
||||
buddy.only.errors.resetComError()
|
||||
buddy.only.errors.getErrorReset()
|
||||
|
||||
return rc.value.nodes.mapIt(NodeSpecs(
|
||||
partialPath: it.partialPath,
|
||||
|
@ -202,7 +202,8 @@ proc getNodesFromNetwork(
|
|||
# Process error ...
|
||||
let
|
||||
error = rc.error
|
||||
ok = await buddy.ctrl.stopAfterSeriousComError(error, buddy.only.errors)
|
||||
ok = await buddy.ctrl.getErrorStopAfterSeriousOne(
|
||||
error, buddy.only.errors)
|
||||
when extraTraceMessages:
|
||||
trace logTxt "reply error", peer, ctx=buddy.healingCtx(kvp,env),
|
||||
error, stop=ok
|
|
@ -14,10 +14,9 @@ import
|
|||
chronicles,
|
||||
chronos,
|
||||
eth/[common, p2p],
|
||||
../sync_desc,
|
||||
../misc/sync_ctrl,
|
||||
./worker_desc,
|
||||
./worker/com/[com_error, get_block_header]
|
||||
../../../../../misc/sync_ctrl,
|
||||
../../../../worker_desc,
|
||||
../../../get/[get_error, get_block_header]
|
||||
|
||||
logScope:
|
||||
topics = "snap-ctrl"
|
||||
|
@ -26,7 +25,7 @@ logScope:
|
|||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc updateBeaconHeaderbuBlockNumber*(
|
||||
proc beaconHeaderUpdatebuBlockNumber*(
|
||||
buddy: SnapBuddyRef; # Worker peer
|
||||
num: BlockNumber; # Block number to sync against
|
||||
) {.async.} =
|
||||
|
@ -45,7 +44,7 @@ proc updateBeaconHeaderbuBlockNumber*(
|
|||
ctx.pool.beaconHeader = rc.value
|
||||
|
||||
|
||||
proc updateBeaconHeaderFromFile*(
|
||||
proc beaconHeaderUpdateFromFile*(
|
||||
buddy: SnapBuddyRef; # Worker peer
|
||||
) {.async.} =
|
||||
## This function updates the beacon header cache by import from the file name
|
||||
|
@ -65,7 +64,7 @@ proc updateBeaconHeaderFromFile*(
|
|||
peer = buddy.peer
|
||||
|
||||
var
|
||||
rc = Result[BlockHeader,ComError].err(ComError(0))
|
||||
rc = Result[BlockHeader,GetError].err(GetError(0))
|
||||
isHash = hashOrNum.isHash # so that the value can be logged
|
||||
|
||||
# Parse value dump and fetch a header from the peer (if any)
|
|
@ -63,10 +63,9 @@ import
|
|||
chronos,
|
||||
eth/common,
|
||||
stew/interval_set,
|
||||
"../../.."/[sync_desc, types],
|
||||
"../.."/[constants, range_desc, worker_desc],
|
||||
../db/[hexary_desc, hexary_envelope, hexary_error, hexary_inspect,
|
||||
hexary_nearby]
|
||||
"../../../.."/[constants, range_desc, worker_desc],
|
||||
../../../db/[hexary_desc, hexary_envelope, hexary_error, hexary_inspect,
|
||||
hexary_nearby]
|
||||
|
||||
logScope:
|
||||
topics = "snap-find"
|
||||
|
@ -107,7 +106,7 @@ template noExceptionOops(info: static[string]; code: untyped) =
|
|||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc findMissingNodes*(
|
||||
proc missingNodesFind*(
|
||||
ranges: SnapRangeBatchRef;
|
||||
rootKey: NodeKey;
|
||||
getFn: HexaryGetFn;
|
|
@ -15,9 +15,8 @@ import
|
|||
chronicles,
|
||||
eth/[common, p2p],
|
||||
stew/[interval_set, keyed_queue],
|
||||
../../../sync_desc,
|
||||
"../.."/[constants, range_desc, worker_desc],
|
||||
../db/[hexary_inspect, snapdb_storage_slots]
|
||||
"../../../.."/[constants, range_desc, worker_desc],
|
||||
../../../db/[hexary_inspect, snapdb_storage_slots]
|
||||
|
||||
logScope:
|
||||
topics = "snap-slots"
|
|
@ -34,18 +34,17 @@
|
|||
##
|
||||
## * Rinse and repeat.
|
||||
##
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[math, sequtils],
|
||||
chronicles,
|
||||
eth/[common, p2p],
|
||||
stew/[byteutils, interval_set, keyed_queue, sorted_set],
|
||||
../../../../utils/prettify,
|
||||
../../../types,
|
||||
"../.."/[range_desc, worker_desc],
|
||||
../db/[hexary_desc, hexary_envelope, hexary_error,
|
||||
hexary_paths, snapdb_accounts]
|
||||
|
||||
{.push raises: [].}
|
||||
../../../../../../utils/prettify,
|
||||
"../../../.."/[range_desc, worker_desc],
|
||||
../../../db/[hexary_desc, hexary_envelope, hexary_error,
|
||||
hexary_paths, snapdb_accounts]
|
||||
|
||||
logScope:
|
||||
topics = "snap-swapin"
|
||||
|
@ -103,7 +102,7 @@ proc existsInTrie(
|
|||
error = ExceptionError
|
||||
|
||||
when extraTraceMessages:
|
||||
if error != NothingSerious:
|
||||
if error != HexaryError(0):
|
||||
trace logTxt "other trie check node failed", node, error
|
||||
|
||||
false
|
|
@ -16,24 +16,22 @@ import
|
|||
chronos,
|
||||
eth/p2p, # trie/trie_defs],
|
||||
stew/[interval_set, keyed_queue, sorted_set],
|
||||
"../.."/[sync_desc, types],
|
||||
".."/[constants, range_desc, worker_desc],
|
||||
./db/[hexary_error, snapdb_accounts, snapdb_contracts, snapdb_pivot],
|
||||
./pivot/[heal_accounts, heal_storage_slots, range_fetch_accounts,
|
||||
range_fetch_contracts, range_fetch_storage_slots,
|
||||
storage_queue_helper],
|
||||
./ticker
|
||||
"../../../.."/[misc/ticker, sync_desc, types],
|
||||
"../../.."/[constants, range_desc, worker_desc],
|
||||
../../db/[hexary_error, snapdb_accounts, snapdb_contracts, snapdb_pivot],
|
||||
./helper/storage_queue,
|
||||
"."/[heal_accounts, heal_storage_slots, range_fetch_accounts,
|
||||
range_fetch_contracts, range_fetch_storage_slots]
|
||||
|
||||
logScope:
|
||||
topics = "snap-pivot"
|
||||
|
||||
const
|
||||
extraTraceMessages = false or true
|
||||
extraTraceMessages = false # or true
|
||||
## Enabled additional logging noise
|
||||
|
||||
proc pivotMothball*(env: SnapPivotRef) {.gcsafe.}
|
||||
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private helpers, logging
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -101,7 +99,6 @@ proc beforeTopMostlyClean*(pivotTable: var SnapPivotTable) =
|
|||
if rc.isOk:
|
||||
rc.value.pivotMothball
|
||||
|
||||
|
||||
proc topNumber*(pivotTable: var SnapPivotTable): BlockNumber =
|
||||
## Return the block number of the top pivot entry, or zero if there is none.
|
||||
let rc = pivotTable.lastValue
|
||||
|
@ -148,7 +145,7 @@ proc tickerStats*(
|
|||
if rSq < sqSumAv:
|
||||
result[1] = sqrt(sqSum / length.float - result[0] * result[0])
|
||||
|
||||
result = proc: TickerSnapStats =
|
||||
result = proc: auto =
|
||||
var
|
||||
aSum, aSqSum, uSum, uSqSum, sSum, sSqSum, cSum, cSqSum: float
|
||||
count = 0
|
||||
|
@ -471,8 +468,8 @@ proc pivotUpdateBeaconHeaderCB*(ctx: SnapCtxRef): SyncReqNewHeadCB =
|
|||
# ------------------------------------------------------------------------------
|
||||
|
||||
import
|
||||
db/[hexary_desc, hexary_inspect, hexary_nearby, hexary_paths,
|
||||
snapdb_storage_slots]
|
||||
../../db/[hexary_desc, hexary_inspect, hexary_nearby, hexary_paths,
|
||||
snapdb_storage_slots]
|
||||
|
||||
const
|
||||
pivotVerifyExtraBlurb = false # or true
|
|
@ -48,11 +48,11 @@ import
|
|||
chronos,
|
||||
eth/[common, p2p],
|
||||
stew/[interval_set, keyed_queue],
|
||||
"../../.."/[sync_desc, types],
|
||||
"../.."/[constants, range_desc, worker_desc],
|
||||
../com/[com_error, get_account_range],
|
||||
../db/[hexary_envelope, snapdb_accounts],
|
||||
"."/[storage_queue_helper, swap_in]
|
||||
"../../../.."/[sync_desc, types],
|
||||
"../../.."/[constants, range_desc, worker_desc],
|
||||
../../get/[get_error, get_account_range],
|
||||
../../db/[hexary_envelope, snapdb_accounts],
|
||||
./helper/[storage_queue, swap_in]
|
||||
|
||||
logScope:
|
||||
topics = "snap-acc"
|
||||
|
@ -133,7 +133,8 @@ proc accountsRangefetchImpl(
|
|||
rc = await buddy.getAccountRange(stateRoot, iv, pivot)
|
||||
if rc.isErr:
|
||||
fa.unprocessed.mergeSplit iv # fail => interval back to pool
|
||||
if await buddy.ctrl.stopAfterSeriousComError(rc.error, buddy.only.errors):
|
||||
if await buddy.ctrl.getErrorStopAfterSeriousOne(
|
||||
rc.error, buddy.only.errors):
|
||||
when extraTraceMessages:
|
||||
trace logTxt "fetch error", peer, ctx=buddy.fetchCtx(env),
|
||||
reqLen=iv, error=rc.error
|
||||
|
@ -142,7 +143,7 @@ proc accountsRangefetchImpl(
|
|||
rc.value
|
||||
|
||||
# Reset error counts for detecting repeated timeouts, network errors, etc.
|
||||
buddy.only.errors.resetComError()
|
||||
buddy.only.errors.getErrorReset()
|
||||
|
||||
let
|
||||
gotAccounts = dd.data.accounts.len # comprises `gotStorage`
|
|
@ -12,8 +12,7 @@
|
|||
## ================================
|
||||
##
|
||||
## Pretty straight forward
|
||||
|
||||
|
||||
##
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
|
@ -22,10 +21,9 @@ import
|
|||
chronos,
|
||||
eth/[common, p2p],
|
||||
stew/keyed_queue,
|
||||
"../../.."/[sync_desc, types],
|
||||
"../.."/[constants, range_desc, worker_desc],
|
||||
../com/[com_error, get_byte_codes],
|
||||
../db/snapdb_contracts
|
||||
"../../.."/[constants, range_desc, worker_desc],
|
||||
../../get/[get_error, get_byte_codes],
|
||||
../../db/snapdb_contracts
|
||||
|
||||
logScope:
|
||||
topics = "snap-con"
|
||||
|
@ -34,7 +32,7 @@ type
|
|||
SnapCtraKVP = KeyedQueuePair[Hash256,NodeKey]
|
||||
|
||||
const
|
||||
extraTraceMessages = false or true
|
||||
extraTraceMessages = false # or true
|
||||
## Enabled additional logging noise
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -136,7 +134,8 @@ proc rangeFetchContractsImpl(
|
|||
if rc.isErr:
|
||||
# Restore batch queue
|
||||
env.putUnprocessed parking
|
||||
if await buddy.ctrl.stopAfterSeriousComError(rc.error, buddy.only.errors):
|
||||
if await buddy.ctrl.getErrorStopAfterSeriousOne(
|
||||
rc.error, buddy.only.errors):
|
||||
error logTxt "fetch error", peer, ctx=buddy.fetchCtx(env),
|
||||
nHashKeys=hashKeys.len, error=rc.error
|
||||
discard
|
|
@ -61,7 +61,6 @@
|
|||
## In general, if an error occurs, the entry that caused the error is moved
|
||||
## or re-stored onto the queue of partial requests `env.fetchStoragePart`.
|
||||
##
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
|
@ -70,11 +69,10 @@ import
|
|||
chronos,
|
||||
eth/p2p,
|
||||
stew/[interval_set, keyed_queue],
|
||||
"../../.."/[sync_desc, types],
|
||||
"../.."/[constants, range_desc, worker_desc],
|
||||
../com/[com_error, get_storage_ranges],
|
||||
../db/[hexary_error, snapdb_storage_slots],
|
||||
./storage_queue_helper
|
||||
"../../.."/[constants, range_desc, worker_desc],
|
||||
../../get/[get_error, get_storage_ranges],
|
||||
../../db/[hexary_error, snapdb_storage_slots],
|
||||
./helper/storage_queue
|
||||
|
||||
logScope:
|
||||
topics = "snap-slot"
|
||||
|
@ -123,14 +121,15 @@ proc fetchStorageSlotsImpl(
|
|||
var stoRange = block:
|
||||
let rc = await buddy.getStorageRanges(stateRoot, req, pivot)
|
||||
if rc.isErr:
|
||||
if await buddy.ctrl.stopAfterSeriousComError(rc.error, buddy.only.errors):
|
||||
if await buddy.ctrl.getErrorStopAfterSeriousOne(
|
||||
rc.error, buddy.only.errors):
|
||||
trace logTxt "fetch error", peer, ctx=buddy.fetchCtx(env),
|
||||
nReq=req.len, error=rc.error
|
||||
return err() # all of `req` failed
|
||||
rc.value
|
||||
|
||||
# Reset error counts for detecting repeated timeouts, network errors, etc.
|
||||
buddy.only.errors.resetComError()
|
||||
buddy.only.errors.getErrorReset()
|
||||
|
||||
var
|
||||
nSlotLists = stoRange.data.storages.len
|
|
@ -1,26 +0,0 @@
|
|||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
||||
# http://opensource.org/licenses/MIT)
|
||||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
../worker_desc,
|
||||
./play/[play_desc, play_full_sync, play_snap_sync]
|
||||
|
||||
export
|
||||
PlaySyncSpecs,
|
||||
playMethod
|
||||
|
||||
proc playSetup*(ctx: SnapCtxRef) =
|
||||
## Set up sync mode specs table. This cannot be done at compile time.
|
||||
ctx.pool.syncMode.tab[SnapSyncMode] = playSnapSyncSpecs()
|
||||
ctx.pool.syncMode.tab[FullSyncMode] = playFullSyncSpecs()
|
||||
|
||||
proc playRelease*(ctx: SnapCtxRef) =
|
||||
discard
|
||||
|
||||
# End
|
|
@ -16,13 +16,15 @@ import
|
|||
eth/[common, p2p],
|
||||
stew/[interval_set, keyed_queue, sorted_set],
|
||||
../../db/select_backend,
|
||||
../misc/[best_pivot, block_queue],
|
||||
../misc/[best_pivot, block_queue, ticker],
|
||||
../sync_desc,
|
||||
./worker/com/com_error,
|
||||
./worker/get/get_error,
|
||||
./worker/db/[snapdb_desc, snapdb_pivot],
|
||||
./worker/ticker,
|
||||
./range_desc
|
||||
|
||||
export
|
||||
sync_desc # worker desc prototype
|
||||
|
||||
type
|
||||
SnapAccountsList* = SortedSet[NodeTag,Hash256]
|
||||
## Sorted pair of `(account,state-root)` entries
|
||||
|
@ -93,24 +95,24 @@ type
|
|||
|
||||
SnapBuddyData* = object
|
||||
## Per-worker local descriptor data extension
|
||||
errors*: ComErrorStatsRef ## For error handling
|
||||
errors*: GetErrorStatsRef ## For error handling
|
||||
|
||||
# Full sync continuation parameters
|
||||
bPivot*: BestPivotWorkerRef ## Local pivot worker descriptor
|
||||
bQueue*: BlockQueueWorkerRef ## Block queue worker
|
||||
|
||||
SnapSyncModeType* = enum
|
||||
SnapSyncPassType* = enum
|
||||
## Current sync mode, after a snapshot has been downloaded, the system
|
||||
## proceeds with full sync.
|
||||
SnapSyncMode = 0 ## Start mode
|
||||
FullSyncMode
|
||||
|
||||
SnapSyncSpecs* = object
|
||||
SnapSyncPass* = object
|
||||
## Full specs for all sync modes. This table must be held in the main
|
||||
## descriptor and initialised at run time. The table values are opaque
|
||||
## and will be specified in the worker module(s).
|
||||
active*: SnapSyncModeType
|
||||
tab*: array[SnapSyncModeType,RootRef]
|
||||
active*: SnapSyncPassType
|
||||
tab*: array[SnapSyncPassType,RootRef]
|
||||
|
||||
SnapCtxData* = object
|
||||
## Globally shared data extension
|
||||
|
@ -118,21 +120,23 @@ type
|
|||
dbBackend*: ChainDB ## Low level DB driver access (if any)
|
||||
snapDb*: SnapDbRef ## Accounts snapshot DB
|
||||
|
||||
# Pivot table
|
||||
# Info
|
||||
enableTicker*: bool ## Advisary, extra level of gossip
|
||||
ticker*: TickerRef ## Ticker, logger descriptor
|
||||
|
||||
# Snap/full mode muliplexing
|
||||
syncMode*: SnapSyncPass ## Sync mode methods & data
|
||||
|
||||
# Snap sync parameters, pivot table
|
||||
pivotTable*: SnapPivotTable ## Per state root environment
|
||||
completePivot*: SnapPivotRef ## Start full sync from here
|
||||
beaconHeader*: BlockHeader ## Running on beacon chain
|
||||
coveredAccounts*: NodeTagRangeSet ## Derived from all available accounts
|
||||
covAccTimesFull*: uint ## # of 100% coverages
|
||||
recovery*: SnapRecoveryRef ## Current recovery checkpoint/context
|
||||
|
||||
# Info
|
||||
ticker*: TickerRef ## Ticker, logger
|
||||
|
||||
# Snap/full mode muliplexing
|
||||
syncMode*: SnapSyncSpecs ## Sync mode methods & data
|
||||
fullPivot*: SnapPivotRef ## Start full sync from here
|
||||
|
||||
# Full sync continuation parameters
|
||||
fullHeader*: Option[BlockHeader] ## Start full sync from here
|
||||
bPivot*: BestPivotCtxRef ## Global pivot descriptor
|
||||
bCtx*: BlockQueueCtxRef ## Global block queue descriptor
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
##
|
||||
## Virtual method/interface functions to be provided as `mixin`:
|
||||
##
|
||||
## *runSetup(ctx: CtxRef[S]; tickerOK: bool): bool*
|
||||
## *runSetup(ctx: CtxRef[S]): bool*
|
||||
## Global set up. This function will be called before any worker peer is
|
||||
## started. If that function returns `false`, no worker peers will be run.
|
||||
##
|
||||
|
@ -112,7 +112,6 @@ type
|
|||
ctx*: CtxRef[S] ## Shared data
|
||||
pool: PeerPool ## For starting the system
|
||||
buddies: ActiveBuddies[S,W] ## LRU cache with worker descriptors
|
||||
tickerOk: bool ## Ticker logger
|
||||
daemonRunning: bool ## Run global background job
|
||||
singleRunLock: bool ## Some single mode runner is activated
|
||||
monitorLock: bool ## Monitor mode is activated
|
||||
|
@ -350,7 +349,6 @@ proc initSync*[S,W](
|
|||
node: EthereumNode;
|
||||
chain: ChainRef,
|
||||
slots: int;
|
||||
noisy = false;
|
||||
exCtrlFile = none(string);
|
||||
) =
|
||||
## Constructor
|
||||
|
@ -363,14 +361,13 @@ proc initSync*[S,W](
|
|||
exCtrlFile: exCtrlFile,
|
||||
chain: chain)
|
||||
dsc.pool = node.peerPool
|
||||
dsc.tickerOk = noisy
|
||||
dsc.buddies.init(dsc.ctx.buddiesMax)
|
||||
|
||||
proc startSync*[S,W](dsc: RunnerSyncRef[S,W]): bool =
|
||||
## Set up `PeerObserver` handlers and start syncing.
|
||||
mixin runSetup
|
||||
# Initialise sub-systems
|
||||
if dsc.ctx.runSetup(dsc.tickerOk):
|
||||
if dsc.ctx.runSetup():
|
||||
var po = PeerObserver(
|
||||
onPeerConnected:
|
||||
proc(p: Peer) {.gcsafe.} =
|
||||
|
|
|
@ -22,7 +22,6 @@ import
|
|||
../../nimbus/sync/snap/worker/db/[hexary_desc, rocky_bulk_load],
|
||||
../../nimbus/utils/prettify,
|
||||
../replay/[pp, undump_blocks]
|
||||
#./test_helpers
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private helpers
|
||||
|
|
Loading…
Reference in New Issue