mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-01-12 21:34:33 +00:00
rm full sync (#2324)
* rm full sync Syncing forwards no longer suported * one more
This commit is contained in:
parent
5008b89185
commit
0524fe8fd1
@ -131,7 +131,6 @@ type
|
|||||||
|
|
||||||
SyncMode* {.pure.} = enum
|
SyncMode* {.pure.} = enum
|
||||||
Default
|
Default
|
||||||
Full ## Beware, experimental
|
|
||||||
#Snap ## Beware, experimental
|
#Snap ## Beware, experimental
|
||||||
|
|
||||||
NimbusConf* = object of RootObj
|
NimbusConf* = object of RootObj
|
||||||
@ -172,7 +171,6 @@ type
|
|||||||
desc: "Specify particular blockchain sync mode."
|
desc: "Specify particular blockchain sync mode."
|
||||||
longDesc:
|
longDesc:
|
||||||
"- default -- beacon sync mode\n" &
|
"- default -- beacon sync mode\n" &
|
||||||
"- full -- full blockchain archive\n" &
|
|
||||||
# "- snap -- experimental snap mode (development only)\n" &
|
# "- snap -- experimental snap mode (development only)\n" &
|
||||||
""
|
""
|
||||||
defaultValue: SyncMode.Default
|
defaultValue: SyncMode.Default
|
||||||
|
@ -138,10 +138,6 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
|
|||||||
tickerOK = conf.logLevel in {
|
tickerOK = conf.logLevel in {
|
||||||
LogLevel.INFO, LogLevel.DEBUG, LogLevel.TRACE}
|
LogLevel.INFO, LogLevel.DEBUG, LogLevel.TRACE}
|
||||||
case conf.syncMode:
|
case conf.syncMode:
|
||||||
of SyncMode.Full:
|
|
||||||
nimbus.fullSyncRef = FullSyncRef.init(
|
|
||||||
nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers,
|
|
||||||
tickerOK, exCtrlFile)
|
|
||||||
#of SyncMode.Snap:
|
#of SyncMode.Snap:
|
||||||
# # Minimal capability needed for sync only
|
# # Minimal capability needed for sync only
|
||||||
# if ProtocolFlag.Snap notin protocols:
|
# if ProtocolFlag.Snap notin protocols:
|
||||||
@ -151,14 +147,9 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
|
|||||||
# nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers,
|
# nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers,
|
||||||
# tickerOK, exCtrlFile)
|
# tickerOK, exCtrlFile)
|
||||||
of SyncMode.Default:
|
of SyncMode.Default:
|
||||||
if com.forkGTE(MergeFork):
|
|
||||||
nimbus.beaconSyncRef = BeaconSyncRef.init(
|
nimbus.beaconSyncRef = BeaconSyncRef.init(
|
||||||
nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers,
|
nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers,
|
||||||
)
|
)
|
||||||
else:
|
|
||||||
nimbus.fullSyncRef = FullSyncRef.init(
|
|
||||||
nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers,
|
|
||||||
tickerOK, exCtrlFile)
|
|
||||||
|
|
||||||
# Connect directly to the static nodes
|
# Connect directly to the static nodes
|
||||||
let staticPeers = conf.getStaticPeers()
|
let staticPeers = conf.getStaticPeers()
|
||||||
@ -177,7 +168,7 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
|
|||||||
case conf.syncMode:
|
case conf.syncMode:
|
||||||
#of SyncMode.Snap:
|
#of SyncMode.Snap:
|
||||||
# waitForPeers = false
|
# waitForPeers = false
|
||||||
of SyncMode.Full, SyncMode.Default:
|
of SyncMode.Default:
|
||||||
discard
|
discard
|
||||||
nimbus.networkLoop = nimbus.ethNode.connectToNetwork(
|
nimbus.networkLoop = nimbus.ethNode.connectToNetwork(
|
||||||
enableDiscovery = conf.discovery != DiscoveryType.None,
|
enableDiscovery = conf.discovery != DiscoveryType.None,
|
||||||
@ -266,12 +257,7 @@ proc run(nimbus: NimbusNode, conf: NimbusConf) =
|
|||||||
if conf.maxPeers > 0:
|
if conf.maxPeers > 0:
|
||||||
case conf.syncMode:
|
case conf.syncMode:
|
||||||
of SyncMode.Default:
|
of SyncMode.Default:
|
||||||
if com.forkGTE(MergeFork):
|
|
||||||
nimbus.beaconSyncRef.start
|
nimbus.beaconSyncRef.start
|
||||||
else:
|
|
||||||
nimbus.fullSyncRef.start
|
|
||||||
of SyncMode.Full:
|
|
||||||
nimbus.fullSyncRef.start
|
|
||||||
#of SyncMode.Snap:
|
#of SyncMode.Snap:
|
||||||
# nimbus.snapSyncRef.start
|
# nimbus.snapSyncRef.start
|
||||||
|
|
||||||
|
@ -17,7 +17,6 @@ import
|
|||||||
./sync/peers,
|
./sync/peers,
|
||||||
./sync/beacon,
|
./sync/beacon,
|
||||||
# ./sync/snap, # -- todo
|
# ./sync/snap, # -- todo
|
||||||
./sync/full,
|
|
||||||
./beacon/beacon_engine,
|
./beacon/beacon_engine,
|
||||||
./common,
|
./common,
|
||||||
./config
|
./config
|
||||||
@ -52,7 +51,6 @@ type
|
|||||||
networkLoop*: Future[void]
|
networkLoop*: Future[void]
|
||||||
peerManager*: PeerManagerRef
|
peerManager*: PeerManagerRef
|
||||||
# snapSyncRef*: SnapSyncRef # -- todo
|
# snapSyncRef*: SnapSyncRef # -- todo
|
||||||
fullSyncRef*: FullSyncRef
|
|
||||||
beaconSyncRef*: BeaconSyncRef
|
beaconSyncRef*: BeaconSyncRef
|
||||||
beaconEngine*: BeaconEngineRef
|
beaconEngine*: BeaconEngineRef
|
||||||
metricsServer*: MetricsHttpServerRef
|
metricsServer*: MetricsHttpServerRef
|
||||||
@ -71,8 +69,6 @@ proc stop*(nimbus: NimbusNode, conf: NimbusConf) {.async, gcsafe.} =
|
|||||||
await nimbus.peerManager.stop()
|
await nimbus.peerManager.stop()
|
||||||
#if nimbus.snapSyncRef.isNil.not:
|
#if nimbus.snapSyncRef.isNil.not:
|
||||||
# nimbus.snapSyncRef.stop()
|
# nimbus.snapSyncRef.stop()
|
||||||
if nimbus.fullSyncRef.isNil.not:
|
|
||||||
nimbus.fullSyncRef.stop()
|
|
||||||
if nimbus.beaconSyncRef.isNil.not:
|
if nimbus.beaconSyncRef.isNil.not:
|
||||||
nimbus.beaconSyncRef.stop()
|
nimbus.beaconSyncRef.stop()
|
||||||
if nimbus.metricsServer.isNil.not:
|
if nimbus.metricsServer.isNil.not:
|
||||||
|
@ -1,130 +0,0 @@
|
|||||||
# Nimbus
|
|
||||||
# Copyright (c) 2021-2024 Status Research & Development GmbH
|
|
||||||
# Licensed under either of
|
|
||||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
|
||||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
|
||||||
# http://opensource.org/licenses/MIT)
|
|
||||||
# at your option. This file may not be copied, modified, or distributed
|
|
||||||
# except according to those terms.
|
|
||||||
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
|
||||||
eth/p2p,
|
|
||||||
chronicles,
|
|
||||||
chronos,
|
|
||||||
stew/[interval_set, sorted_set],
|
|
||||||
./full/[worker, worker_desc],
|
|
||||||
"."/[sync_desc, sync_sched, protocol]
|
|
||||||
|
|
||||||
logScope:
|
|
||||||
topics = "full-sync"
|
|
||||||
|
|
||||||
type
|
|
||||||
FullSyncRef* = RunnerSyncRef[FullCtxData,FullBuddyData]
|
|
||||||
|
|
||||||
const
|
|
||||||
extraTraceMessages = false # or true
|
|
||||||
## Enable additional logging noise
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Private logging helpers
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
template traceMsg(f, info: static[string]; args: varargs[untyped]) =
|
|
||||||
trace "Snap scheduler " & f & "() " & info, args
|
|
||||||
|
|
||||||
template traceMsgCtx(f, info: static[string]; c: FullCtxRef) =
|
|
||||||
when extraTraceMessages:
|
|
||||||
block:
|
|
||||||
let
|
|
||||||
poolMode {.inject.} = c.poolMode
|
|
||||||
daemon {.inject.} = c.daemon
|
|
||||||
f.traceMsg info, poolMode, daemon
|
|
||||||
|
|
||||||
template traceMsgBuddy(f, info: static[string]; b: FullBuddyRef) =
|
|
||||||
when extraTraceMessages:
|
|
||||||
block:
|
|
||||||
let
|
|
||||||
peer {.inject.} = b.peer
|
|
||||||
runState {.inject.} = b.ctrl.state
|
|
||||||
multiOk {.inject.} = b.ctrl.multiOk
|
|
||||||
poolMode {.inject.} = b.ctx.poolMode
|
|
||||||
daemon {.inject.} = b.ctx.daemon
|
|
||||||
f.traceMsg info, peer, runState, multiOk, poolMode, daemon
|
|
||||||
|
|
||||||
|
|
||||||
template tracerFrameCtx(f: static[string]; c: FullCtxRef; code: untyped) =
|
|
||||||
f.traceMsgCtx "begin", c
|
|
||||||
code
|
|
||||||
f.traceMsgCtx "end", c
|
|
||||||
|
|
||||||
template tracerFrameBuddy(f: static[string]; b: FullBuddyRef; code: untyped) =
|
|
||||||
f.traceMsgBuddy "begin", b
|
|
||||||
code
|
|
||||||
f.traceMsgBuddy "end", b
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Virtual methods/interface, `mixin` functions
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
proc runSetup(ctx: FullCtxRef): bool =
|
|
||||||
tracerFrameCtx("runSetup", ctx):
|
|
||||||
result = worker.setup(ctx)
|
|
||||||
|
|
||||||
proc runRelease(ctx: FullCtxRef) =
|
|
||||||
tracerFrameCtx("runRelease", ctx):
|
|
||||||
worker.release(ctx)
|
|
||||||
|
|
||||||
proc runDaemon(ctx: FullCtxRef) {.async.} =
|
|
||||||
tracerFrameCtx("runDaemon", ctx):
|
|
||||||
await worker.runDaemon(ctx)
|
|
||||||
|
|
||||||
proc runStart(buddy: FullBuddyRef): bool =
|
|
||||||
tracerFrameBuddy("runStart", buddy):
|
|
||||||
result = worker.start(buddy)
|
|
||||||
|
|
||||||
proc runStop(buddy: FullBuddyRef) =
|
|
||||||
tracerFrameBuddy("runStop", buddy):
|
|
||||||
worker.stop(buddy)
|
|
||||||
|
|
||||||
proc runPool(buddy: FullBuddyRef; last: bool; laps: int): bool =
|
|
||||||
tracerFrameBuddy("runPool", buddy):
|
|
||||||
result = worker.runPool(buddy, last, laps)
|
|
||||||
|
|
||||||
proc runSingle(buddy: FullBuddyRef) {.async.} =
|
|
||||||
tracerFrameBuddy("runSingle", buddy):
|
|
||||||
await worker.runSingle(buddy)
|
|
||||||
|
|
||||||
proc runMulti(buddy: FullBuddyRef) {.async.} =
|
|
||||||
tracerFrameBuddy("runMulti", buddy):
|
|
||||||
await worker.runMulti(buddy)
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Public functions
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
proc init*(
|
|
||||||
T: type FullSyncRef;
|
|
||||||
ethNode: EthereumNode;
|
|
||||||
chain: ChainRef;
|
|
||||||
rng: ref HmacDrbgContext;
|
|
||||||
maxPeers: int;
|
|
||||||
enableTicker = false;
|
|
||||||
exCtrlFile = Opt.none(string);
|
|
||||||
): T =
|
|
||||||
new result
|
|
||||||
result.initSync(ethNode, chain, maxPeers, exCtrlFile)
|
|
||||||
result.ctx.pool.rng = rng
|
|
||||||
result.ctx.pool.enableTicker = enableTicker
|
|
||||||
|
|
||||||
proc start*(ctx: FullSyncRef) =
|
|
||||||
doAssert ctx.startSync()
|
|
||||||
|
|
||||||
proc stop*(ctx: FullSyncRef) =
|
|
||||||
ctx.stopSync()
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# End
|
|
||||||
# ------------------------------------------------------------------------------
|
|
@ -1,434 +0,0 @@
|
|||||||
# Nimbus
|
|
||||||
# Copyright (c) 2021-2024 Status Research & Development GmbH
|
|
||||||
# Licensed and distributed under either of
|
|
||||||
# * MIT license (license terms in the root directory or at
|
|
||||||
# https://opensource.org/licenses/MIT).
|
|
||||||
# * Apache v2 license (license terms in the root directory or at
|
|
||||||
# https://www.apache.org/licenses/LICENSE-2.0).
|
|
||||||
# at your option. This file may not be copied, modified, or distributed
|
|
||||||
# except according to those terms.
|
|
||||||
|
|
||||||
{.push raises:[].}
|
|
||||||
|
|
||||||
import
|
|
||||||
chronicles,
|
|
||||||
chronos,
|
|
||||||
eth/p2p,
|
|
||||||
".."/[protocol, sync_desc],
|
|
||||||
../handlers/eth,
|
|
||||||
../misc/[best_pivot, block_queue, sync_ctrl, ticker],
|
|
||||||
./worker_desc
|
|
||||||
|
|
||||||
logScope:
|
|
||||||
topics = "full-buddy"
|
|
||||||
|
|
||||||
const
|
|
||||||
extraTraceMessages = false # or true
|
|
||||||
## Enabled additional logging noise
|
|
||||||
|
|
||||||
FirstPivotSeenTimeout = 3.minutes
|
|
||||||
## Turn on relaxed pivot negotiation after some waiting time when there
|
|
||||||
## was a `peer` seen but was rejected. This covers a rare event. Typically
|
|
||||||
## useless peers do not appear ready for negotiation.
|
|
||||||
|
|
||||||
FirstPivotAcceptedTimeout = 50.seconds
|
|
||||||
## Turn on relaxed pivot negotiation after some waiting time when there
|
|
||||||
## was a `peer` accepted but no second one yet.
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Private helpers
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
proc pp(n: BlockNumber): string =
|
|
||||||
## Dedicated pretty printer (`$` is defined elsewhere using `UInt256`)
|
|
||||||
if n == high(BlockNumber): "high" else:"#" & $n
|
|
||||||
|
|
||||||
# --------------
|
|
||||||
|
|
||||||
proc disableWireServices(ctx: FullCtxRef) =
|
|
||||||
## Helper for `setup()`: Temporarily stop useless wire protocol services.
|
|
||||||
ctx.ethWireCtx.txPoolEnabled = false
|
|
||||||
|
|
||||||
proc enableWireServices(ctx: FullCtxRef) =
|
|
||||||
## Enable services again
|
|
||||||
ctx.ethWireCtx.txPoolEnabled = true
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Private functions
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
proc topUsedNumber(
|
|
||||||
ctx: FullCtxRef;
|
|
||||||
backBlocks = maxHeadersFetch;
|
|
||||||
): Result[BlockNumber,void] =
|
|
||||||
var
|
|
||||||
top = 0.toBlockNumber
|
|
||||||
try:
|
|
||||||
let
|
|
||||||
bestNumber = ctx.chain.db.getCanonicalHead().blockNumber
|
|
||||||
nBackBlocks = backBlocks.toBlockNumber
|
|
||||||
# Initialise before best block number
|
|
||||||
if nBackBlocks < bestNumber:
|
|
||||||
top = bestNumber - nBackBlocks
|
|
||||||
except CatchableError as e:
|
|
||||||
error "Best block header problem", backBlocks, error=($e.name), msg=e.msg
|
|
||||||
return err()
|
|
||||||
|
|
||||||
ok(top)
|
|
||||||
|
|
||||||
|
|
||||||
proc tickerUpdater(ctx: FullCtxRef): TickerFullStatsUpdater =
|
|
||||||
result = proc: auto =
|
|
||||||
var stats: BlockQueueStats
|
|
||||||
ctx.pool.bCtx.blockQueueStats(stats)
|
|
||||||
|
|
||||||
let suspended =
|
|
||||||
0 < ctx.pool.suspendAt and ctx.pool.suspendAt < stats.topAccepted
|
|
||||||
|
|
||||||
TickerFullStats(
|
|
||||||
topPersistent: stats.topAccepted,
|
|
||||||
nextStaged: stats.nextStaged,
|
|
||||||
nextUnprocessed: stats.nextUnprocessed,
|
|
||||||
nStagedQueue: stats.nStagedQueue,
|
|
||||||
suspended: suspended,
|
|
||||||
reOrg: stats.reOrg)
|
|
||||||
|
|
||||||
|
|
||||||
proc processStaged(buddy: FullBuddyRef): bool =
|
|
||||||
## Fetch a work item from the `staged` queue an process it to be
|
|
||||||
## stored on the persistent block chain.
|
|
||||||
let
|
|
||||||
ctx {.used.} = buddy.ctx
|
|
||||||
peer = buddy.peer
|
|
||||||
chainDb = buddy.ctx.chain.db
|
|
||||||
chain = buddy.ctx.chain
|
|
||||||
bq = buddy.only.bQueue
|
|
||||||
|
|
||||||
# Get a work item, a list of headers + bodies
|
|
||||||
wi = block:
|
|
||||||
let rc = bq.blockQueueFetchStaged()
|
|
||||||
if rc.isErr:
|
|
||||||
return false
|
|
||||||
rc.value
|
|
||||||
|
|
||||||
#startNumber = wi.headers[0].blockNumber -- unused
|
|
||||||
|
|
||||||
# Store in persistent database
|
|
||||||
try:
|
|
||||||
if chain.persistBlocks(wi.headers, wi.bodies).isOk():
|
|
||||||
bq.blockQueueAccept(wi)
|
|
||||||
return true
|
|
||||||
except CatchableError as e:
|
|
||||||
error "Storing persistent blocks failed", peer, range=($wi.blocks),
|
|
||||||
error = $e.name, msg = e.msg
|
|
||||||
|
|
||||||
# Something went wrong. Recycle work item (needs to be re-fetched, anyway)
|
|
||||||
let
|
|
||||||
parentHash = wi.headers[0].parentHash
|
|
||||||
try:
|
|
||||||
# Check whether hash of the first block is consistent
|
|
||||||
var parent: BlockHeader
|
|
||||||
if chainDb.getBlockHeader(parentHash, parent):
|
|
||||||
# First block parent is ok, so there might be other problems. Re-fetch
|
|
||||||
# the blocks from another peer.
|
|
||||||
trace "Storing persistent blocks failed", peer, range=($wi.blocks)
|
|
||||||
bq.blockQueueRecycle(wi)
|
|
||||||
buddy.ctrl.zombie = true
|
|
||||||
return false
|
|
||||||
except CatchableError as e:
|
|
||||||
error "Failed to access parent blocks", peer,
|
|
||||||
blockNumber=wi.headers[0].blockNumber.pp, error=($e.name), msg=e.msg
|
|
||||||
|
|
||||||
# Parent block header problem, so we might be in the middle of a re-org.
|
|
||||||
# Set single mode backtrack following the offending parent hash.
|
|
||||||
bq.blockQueueBacktrackFrom(wi)
|
|
||||||
buddy.ctrl.multiOk = false
|
|
||||||
|
|
||||||
if wi.topHash.isNone:
|
|
||||||
# Assuming that currently staged entries are on the wrong branch
|
|
||||||
bq.blockQueueRecycleStaged()
|
|
||||||
notice "Starting chain re-org backtrack work item", peer, range=($wi.blocks)
|
|
||||||
else:
|
|
||||||
# Leave that block range in the staged list
|
|
||||||
trace "Resuming chain re-org backtrack work item", peer, range=($wi.blocks)
|
|
||||||
discard
|
|
||||||
|
|
||||||
return false
|
|
||||||
|
|
||||||
|
|
||||||
proc suspendDownload(buddy: FullBuddyRef): bool =
|
|
||||||
## Check whether downloading should be suspended
|
|
||||||
let ctx = buddy.ctx
|
|
||||||
if ctx.exCtrlFile.isSome:
|
|
||||||
let rc = ctx.exCtrlFile.syncCtrlBlockNumberFromFile
|
|
||||||
if rc.isOk:
|
|
||||||
ctx.pool.suspendAt = rc.value
|
|
||||||
if 0 < ctx.pool.suspendAt:
|
|
||||||
return ctx.pool.suspendAt < buddy.only.bQueue.topAccepted
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Public start/stop and admin functions
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
proc setup*(ctx: FullCtxRef): bool =
|
|
||||||
## Global set up
|
|
||||||
ctx.pool.pivot = BestPivotCtxRef.init(ctx.pool.rng)
|
|
||||||
let rc = ctx.topUsedNumber(backBlocks = 0)
|
|
||||||
if rc.isErr:
|
|
||||||
ctx.pool.bCtx = BlockQueueCtxRef.init()
|
|
||||||
return false
|
|
||||||
ctx.pool.bCtx = BlockQueueCtxRef.init(rc.value + 1)
|
|
||||||
if ctx.pool.enableTicker:
|
|
||||||
ctx.pool.ticker = TickerRef.init(ctx.tickerUpdater)
|
|
||||||
else:
|
|
||||||
debug "Ticker is disabled"
|
|
||||||
|
|
||||||
if ctx.exCtrlFile.isSome:
|
|
||||||
warn "Full sync accepts suspension request block number",
|
|
||||||
syncCtrlFile=ctx.exCtrlFile.get
|
|
||||||
|
|
||||||
ctx.disableWireServices()
|
|
||||||
true
|
|
||||||
|
|
||||||
proc release*(ctx: FullCtxRef) =
|
|
||||||
## Global clean up
|
|
||||||
ctx.pool.pivot = nil
|
|
||||||
if not ctx.pool.ticker.isNil:
|
|
||||||
ctx.pool.ticker.stop()
|
|
||||||
ctx.enableWireServices() # restore to default
|
|
||||||
|
|
||||||
proc start*(buddy: FullBuddyRef): bool =
|
|
||||||
## Initialise worker peer
|
|
||||||
let
|
|
||||||
ctx = buddy.ctx
|
|
||||||
peer = buddy.peer
|
|
||||||
if peer.supports(protocol.eth) and
|
|
||||||
peer.state(protocol.eth).initialized:
|
|
||||||
if not ctx.pool.ticker.isNil:
|
|
||||||
ctx.pool.ticker.startBuddy()
|
|
||||||
buddy.only.pivot =
|
|
||||||
BestPivotWorkerRef.init(ctx.pool.pivot, buddy.ctrl, buddy.peer)
|
|
||||||
buddy.only.bQueue = BlockQueueWorkerRef.init(
|
|
||||||
ctx.pool.bCtx, buddy.ctrl, peer)
|
|
||||||
return true
|
|
||||||
|
|
||||||
proc stop*(buddy: FullBuddyRef) =
|
|
||||||
## Clean up this peer
|
|
||||||
let ctx = buddy.ctx
|
|
||||||
buddy.ctrl.stopped = true
|
|
||||||
buddy.only.pivot.clear()
|
|
||||||
if not ctx.pool.ticker.isNil:
|
|
||||||
ctx.pool.ticker.stopBuddy()
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Public functions
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
proc runDaemon*(ctx: FullCtxRef) {.async.} =
|
|
||||||
## Global background job that will be re-started as long as the variable
|
|
||||||
## `ctx.daemon` is set `true`. If that job was stopped due to re-setting
|
|
||||||
## `ctx.daemon` to `false`, it will be restarted next after it was reset
|
|
||||||
## as `true` not before there is some activity on the `runPool()`,
|
|
||||||
## `runSingle()`, or `runMulti()` functions.
|
|
||||||
##
|
|
||||||
case ctx.pool.pivotState:
|
|
||||||
of FirstPivotSeen:
|
|
||||||
let elapsed = Moment.now() - ctx.pool.pivotStamp
|
|
||||||
if FirstPivotSeenTimeout < elapsed:
|
|
||||||
# Switch to single peer pivot negotiation
|
|
||||||
ctx.pool.pivot.pivotRelaxedMode(enable = true)
|
|
||||||
|
|
||||||
# Currently no need for other monitor tasks
|
|
||||||
ctx.daemon = false
|
|
||||||
|
|
||||||
when extraTraceMessages:
|
|
||||||
trace "First seen pivot timeout", elapsed,
|
|
||||||
pivotState=ctx.pool.pivotState
|
|
||||||
return
|
|
||||||
# Otherwise delay for some time
|
|
||||||
|
|
||||||
of FirstPivotAccepted:
|
|
||||||
let elapsed = Moment.now() - ctx.pool.pivotStamp
|
|
||||||
if FirstPivotAcceptedTimeout < elapsed:
|
|
||||||
# Switch to single peer pivot negotiation
|
|
||||||
ctx.pool.pivot.pivotRelaxedMode(enable = true)
|
|
||||||
|
|
||||||
# Use currents pivot next time `runSingle()` is visited. This bent is
|
|
||||||
# necessary as there must be a peer initialising and syncing blocks. But
|
|
||||||
# this daemon has no peer assigned.
|
|
||||||
ctx.pool.pivotState = FirstPivotUseRegardless
|
|
||||||
|
|
||||||
# Currently no need for other monitor tasks
|
|
||||||
ctx.daemon = false
|
|
||||||
|
|
||||||
when extraTraceMessages:
|
|
||||||
trace "First accepted pivot timeout", elapsed,
|
|
||||||
pivotState=ctx.pool.pivotState
|
|
||||||
return
|
|
||||||
# Otherwise delay for some time
|
|
||||||
|
|
||||||
else:
|
|
||||||
# Currently no need for other monitior tasks
|
|
||||||
ctx.daemon = false
|
|
||||||
return
|
|
||||||
|
|
||||||
# Without waiting, this function repeats every 50ms (as set with the constant
|
|
||||||
# `sync_sched.execLoopTimeElapsedMin`.) Larger waiting time cleans up logging.
|
|
||||||
await sleepAsync 300.milliseconds
|
|
||||||
|
|
||||||
|
|
||||||
proc runSingle*(buddy: FullBuddyRef) {.async.} =
|
|
||||||
## This peer worker is invoked if the peer-local flag `buddy.ctrl.multiOk`
|
|
||||||
## is set `false` which is the default mode. This flag is updated by the
|
|
||||||
## worker when deemed appropriate.
|
|
||||||
## * For all workers, there can be only one `runSingle()` function active
|
|
||||||
## simultaneously for all worker peers.
|
|
||||||
## * There will be no `runMulti()` function active for the same worker peer
|
|
||||||
## simultaneously
|
|
||||||
## * There will be no `runPool()` iterator active simultaneously.
|
|
||||||
##
|
|
||||||
## Note that this function runs in `async` mode.
|
|
||||||
##
|
|
||||||
let
|
|
||||||
ctx = buddy.ctx
|
|
||||||
peer {.used.} = buddy.peer
|
|
||||||
bq = buddy.only.bQueue
|
|
||||||
pv = buddy.only.pivot
|
|
||||||
|
|
||||||
when extraTraceMessages:
|
|
||||||
trace "Single mode begin", peer, pivotState=ctx.pool.pivotState
|
|
||||||
|
|
||||||
case ctx.pool.pivotState:
|
|
||||||
of PivotStateInitial:
|
|
||||||
# Set initial state on first encounter
|
|
||||||
ctx.pool.pivotState = FirstPivotSeen
|
|
||||||
ctx.pool.pivotStamp = Moment.now()
|
|
||||||
ctx.daemon = true # Start monitor
|
|
||||||
|
|
||||||
of FirstPivotSeen, FirstPivotAccepted:
|
|
||||||
discard
|
|
||||||
|
|
||||||
of FirstPivotUseRegardless:
|
|
||||||
# Magic case when we accept anything under the sun
|
|
||||||
let rc = pv.pivotHeader(relaxedMode=true)
|
|
||||||
if rc.isOK:
|
|
||||||
# Update/activate `bestNumber` from the pivot header
|
|
||||||
bq.bestNumber = Opt.some(rc.value.blockNumber)
|
|
||||||
ctx.pool.pivotState = PivotRunMode
|
|
||||||
buddy.ctrl.multiOk = true
|
|
||||||
trace "Single pivot accepted", peer, pivot=('#' & $bq.bestNumber.get)
|
|
||||||
return # stop logging, otherwise unconditional return for this case
|
|
||||||
|
|
||||||
when extraTraceMessages:
|
|
||||||
trace "Single mode stopped", peer, pivotState=ctx.pool.pivotState
|
|
||||||
return # unconditional return for this case
|
|
||||||
|
|
||||||
of PivotRunMode:
|
|
||||||
# Sync backtrack runs in single mode
|
|
||||||
if bq.blockQueueBacktrackOk:
|
|
||||||
let rc = await bq.blockQueueBacktrackWorker()
|
|
||||||
if rc.isOk:
|
|
||||||
# Update persistent database (may reset `multiOk`)
|
|
||||||
buddy.ctrl.multiOk = true
|
|
||||||
while buddy.processStaged() and not buddy.ctrl.stopped:
|
|
||||||
# Allow thread switch as `persistBlocks()` might be slow
|
|
||||||
await sleepAsync(10.milliseconds)
|
|
||||||
when extraTraceMessages:
|
|
||||||
trace "Single backtrack mode done", peer
|
|
||||||
return
|
|
||||||
|
|
||||||
buddy.ctrl.zombie = true
|
|
||||||
|
|
||||||
when extraTraceMessages:
|
|
||||||
trace "Single backtrack mode stopped", peer
|
|
||||||
return
|
|
||||||
# End case()
|
|
||||||
|
|
||||||
# Negotiate in order to derive the pivot header from this `peer`. This code
|
|
||||||
# location here is reached when there was no compelling reason for the
|
|
||||||
# `case()` handler to process and `return`.
|
|
||||||
if await pv.pivotNegotiate(buddy.only.bQueue.bestNumber):
|
|
||||||
# Update/activate `bestNumber` from the pivot header
|
|
||||||
bq.bestNumber = Opt.some(pv.pivotHeader.value.blockNumber)
|
|
||||||
ctx.pool.pivotState = PivotRunMode
|
|
||||||
buddy.ctrl.multiOk = true
|
|
||||||
trace "Pivot accepted", peer, pivot=('#' & $bq.bestNumber.get)
|
|
||||||
return
|
|
||||||
|
|
||||||
if buddy.ctrl.stopped:
|
|
||||||
when extraTraceMessages:
|
|
||||||
trace "Single mode stopped", peer, pivotState=ctx.pool.pivotState
|
|
||||||
return # done with this buddy
|
|
||||||
|
|
||||||
var napping = 2.seconds
|
|
||||||
case ctx.pool.pivotState:
|
|
||||||
of FirstPivotSeen:
|
|
||||||
# Possible state transition
|
|
||||||
if pv.pivotHeader(relaxedMode=true).isOk:
|
|
||||||
ctx.pool.pivotState = FirstPivotAccepted
|
|
||||||
ctx.pool.pivotStamp = Moment.now()
|
|
||||||
napping = 300.milliseconds
|
|
||||||
of FirstPivotAccepted:
|
|
||||||
napping = 300.milliseconds
|
|
||||||
else:
|
|
||||||
discard
|
|
||||||
|
|
||||||
when extraTraceMessages:
|
|
||||||
trace "Single mode end", peer, pivotState=ctx.pool.pivotState, napping
|
|
||||||
|
|
||||||
# Without waiting, this function repeats every 50ms (as set with the constant
|
|
||||||
# `sync_sched.execLoopTimeElapsedMin`.)
|
|
||||||
await sleepAsync napping
|
|
||||||
|
|
||||||
|
|
||||||
proc runPool*(buddy: FullBuddyRef; last: bool; laps: int): bool =
|
|
||||||
## Once started, the function `runPool()` is called for all worker peers in
|
|
||||||
## sequence as the body of an iteration as long as the function returns
|
|
||||||
## `false`. There will be no other worker peer functions activated
|
|
||||||
## simultaneously.
|
|
||||||
##
|
|
||||||
## This procedure is started if the global flag `buddy.ctx.poolMode` is set
|
|
||||||
## `true` (default is `false`.) It will be automatically reset before the
|
|
||||||
## the loop starts. Re-setting it again results in repeating the loop. The
|
|
||||||
## argument `lap` (starting with `0`) indicated the currend lap of the
|
|
||||||
## repeated loops.
|
|
||||||
##
|
|
||||||
## The argument `last` is set `true` if the last entry is reached.
|
|
||||||
##
|
|
||||||
## Note that this function does not run in `async` mode.
|
|
||||||
##
|
|
||||||
# Mind the gap, fill in if necessary (function is peer independent)
|
|
||||||
buddy.only.bQueue.blockQueueGrout()
|
|
||||||
true # Stop after running once regardless of peer
|
|
||||||
|
|
||||||
proc runMulti*(buddy: FullBuddyRef) {.async.} =
|
|
||||||
## This peer worker is invoked if the `buddy.ctrl.multiOk` flag is set
|
|
||||||
## `true` which is typically done after finishing `runSingle()`. This
|
|
||||||
## instance can be simultaneously active for all peer workers.
|
|
||||||
##
|
|
||||||
let
|
|
||||||
ctx = buddy.ctx
|
|
||||||
bq = buddy.only.bQueue
|
|
||||||
|
|
||||||
if buddy.suspendDownload:
|
|
||||||
# Sleep for a while, then leave
|
|
||||||
await sleepAsync(10.seconds)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Fetch work item
|
|
||||||
let rc = await bq.blockQueueWorker()
|
|
||||||
if rc.isErr:
|
|
||||||
if rc.error == StagedQueueOverflow:
|
|
||||||
# Mind the gap: Turn on pool mode if there are too may staged items.
|
|
||||||
ctx.poolMode = true
|
|
||||||
else:
|
|
||||||
return
|
|
||||||
|
|
||||||
# Update persistent database
|
|
||||||
while buddy.processStaged() and not buddy.ctrl.stopped:
|
|
||||||
# Allow thread switch as `persistBlocks()` might be slow
|
|
||||||
await sleepAsync(10.milliseconds)
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# End
|
|
||||||
# ------------------------------------------------------------------------------
|
|
@ -1,50 +0,0 @@
|
|||||||
# Nimbus
|
|
||||||
# Copyright (c) 2021-2024 Status Research & Development GmbH
|
|
||||||
# Licensed and distributed under either of
|
|
||||||
# * MIT license (license terms in the root directory or at
|
|
||||||
# https://opensource.org/licenses/MIT).
|
|
||||||
# * Apache v2 license (license terms in the root directory or at
|
|
||||||
# https://www.apache.org/licenses/LICENSE-2.0).
|
|
||||||
# at your option. This file may not be copied, modified, or distributed
|
|
||||||
# except according to those terms.
|
|
||||||
|
|
||||||
{.push raises:[].}
|
|
||||||
|
|
||||||
import
|
|
||||||
eth/p2p,
|
|
||||||
chronos,
|
|
||||||
../sync_desc,
|
|
||||||
../misc/[best_pivot, block_queue, ticker]
|
|
||||||
|
|
||||||
type
|
|
||||||
PivotState* = enum
|
|
||||||
PivotStateInitial, ## Initial state
|
|
||||||
FirstPivotSeen, ## Starting, first pivot seen
|
|
||||||
FirstPivotAccepted, ## Accepted, waiting for second
|
|
||||||
FirstPivotUseRegardless ## Force pivot if available
|
|
||||||
PivotRunMode ## SNAFU after some magic
|
|
||||||
|
|
||||||
FullBuddyData* = object
|
|
||||||
## Local descriptor data extension
|
|
||||||
pivot*: BestPivotWorkerRef ## Local pivot worker descriptor
|
|
||||||
bQueue*: BlockQueueWorkerRef ## Block queue worker
|
|
||||||
|
|
||||||
FullCtxData* = object
|
|
||||||
## Globally shared data extension
|
|
||||||
rng*: ref HmacDrbgContext ## Random generator, pre-initialised
|
|
||||||
pivot*: BestPivotCtxRef ## Global pivot descriptor
|
|
||||||
pivotState*: PivotState ## For initial pivot control
|
|
||||||
pivotStamp*: Moment ## `PivotState` driven timing control
|
|
||||||
bCtx*: BlockQueueCtxRef ## Global block queue descriptor
|
|
||||||
suspendAt*: BlockNumber ## Suspend if persistent head is larger
|
|
||||||
|
|
||||||
enableTicker*: bool ## Advisary, extra level of gossip
|
|
||||||
ticker*: TickerRef ## Logger ticker
|
|
||||||
|
|
||||||
FullBuddyRef* = BuddyRef[FullCtxData,FullBuddyData]
|
|
||||||
## Extended worker peer descriptor
|
|
||||||
|
|
||||||
FullCtxRef* = CtxRef[FullCtxData]
|
|
||||||
## Extended global descriptor
|
|
||||||
|
|
||||||
# End
|
|
@ -1,474 +0,0 @@
|
|||||||
# Nimbus
|
|
||||||
# Copyright (c) 2018-2024 Status Research & Development GmbH
|
|
||||||
# Licensed and distributed under either of
|
|
||||||
# * MIT license (license terms in the root directory or at
|
|
||||||
# https://opensource.org/licenses/MIT).
|
|
||||||
# * Apache v2 license (license terms in the root directory or at
|
|
||||||
# https://www.apache.org/licenses/LICENSE-2.0).
|
|
||||||
# at your option. This file may not be copied, modified, or distributed
|
|
||||||
# except according to those terms.
|
|
||||||
|
|
||||||
## Negotiate a pivot header base on the *best header* values. Buddies that
|
|
||||||
## cannot provide a minimal block number will be disconnected.
|
|
||||||
##
|
|
||||||
## Ackn: nim-eth/eth/p2p/blockchain_sync.nim: `startSyncWithPeer()`
|
|
||||||
|
|
||||||
import
|
|
||||||
std/[hashes, options, sets],
|
|
||||||
chronicles,
|
|
||||||
chronos,
|
|
||||||
eth/[common, p2p],
|
|
||||||
stew/byteutils,
|
|
||||||
".."/[protocol, sync_desc, types]
|
|
||||||
|
|
||||||
{.push raises:[].}
|
|
||||||
|
|
||||||
logScope:
|
|
||||||
topics = "best-pivot"
|
|
||||||
|
|
||||||
const
|
|
||||||
extraTraceMessages = false or true
|
|
||||||
## Additional trace commands
|
|
||||||
|
|
||||||
minPeersToStartSync = 2
|
|
||||||
## Wait for consensus of at least this number of peers before syncing.
|
|
||||||
|
|
||||||
failCountMax = 3
|
|
||||||
## Stop after a peer fails too often while negotiating. This happens if
|
|
||||||
## a peer responses repeatedly with useless data.
|
|
||||||
|
|
||||||
type
|
|
||||||
BestPivotCtxRef* = ref object of RootRef
|
|
||||||
## Data shared by all peers.
|
|
||||||
rng: ref HmacDrbgContext ## Random generator
|
|
||||||
untrusted: HashSet[Peer] ## Clean up list
|
|
||||||
trusted: HashSet[Peer] ## Peers ready for delivery
|
|
||||||
relaxed: HashSet[Peer] ## Peers accepted in relaxed mode
|
|
||||||
relaxedMode: bool ## Not using strictly `trusted` set
|
|
||||||
minPeers: int ## Minimum peers needed in non-relaxed mode
|
|
||||||
comFailMax: int ## Stop peer after too many communication errors
|
|
||||||
|
|
||||||
BestPivotWorkerRef* = ref object of RootRef
|
|
||||||
## Data for this peer only
|
|
||||||
global: BestPivotCtxRef ## Common data
|
|
||||||
header: Option[BlockHeader] ## Pivot header (if any)
|
|
||||||
ctrl: BuddyCtrlRef ## Control and state settings
|
|
||||||
peer: Peer ## network peer
|
|
||||||
comFailCount: int ## Beware of repeated network errors
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Private helpers
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
#proc hash(peer: Peer): Hash =
|
|
||||||
# ## Mixin `HashSet[Peer]` handler
|
|
||||||
# hash(cast[pointer](peer))
|
|
||||||
|
|
||||||
template safeTransport(
|
|
||||||
bp: BestPivotWorkerRef;
|
|
||||||
info: static[string];
|
|
||||||
code: untyped) =
|
|
||||||
try:
|
|
||||||
code
|
|
||||||
except TransportError as e:
|
|
||||||
error info & ", stop", peer=bp.peer, error=($e.name), msg=e.msg
|
|
||||||
bp.ctrl.stopped = true
|
|
||||||
|
|
||||||
|
|
||||||
proc rand(r: ref HmacDrbgContext; maxVal: uint64): uint64 =
|
|
||||||
# github.com/nim-lang/Nim/tree/version-1-6/lib/pure/random.nim#L216
|
|
||||||
const
|
|
||||||
randMax = high(uint64)
|
|
||||||
if 0 < maxVal:
|
|
||||||
if maxVal == randMax:
|
|
||||||
var x: uint64
|
|
||||||
r[].generate(x)
|
|
||||||
return x
|
|
||||||
while true:
|
|
||||||
var x: uint64
|
|
||||||
r[].generate(x)
|
|
||||||
# avoid `mod` bias, so `x <= n*maxVal <= randMax` for some integer `n`
|
|
||||||
if x <= randMax - (randMax mod maxVal):
|
|
||||||
# uint -> int
|
|
||||||
return x mod (maxVal + 1)
|
|
||||||
|
|
||||||
proc rand(r: ref HmacDrbgContext; maxVal: int): int =
|
|
||||||
if 0 < maxVal: # somehow making sense of `maxVal = -1`
|
|
||||||
return cast[int](r.rand(maxVal.uint64))
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Private functions
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
proc getRandomTrustedPeer(bp: BestPivotWorkerRef): Result[Peer,void] =
|
|
||||||
## Return random entry from `trusted` peer different from this peer set if
|
|
||||||
## there are enough
|
|
||||||
##
|
|
||||||
## Ackn: nim-eth/eth/p2p/blockchain_sync.nim: `randomTrustedPeer()`
|
|
||||||
let
|
|
||||||
nPeers = bp.global.trusted.len
|
|
||||||
offInx = if bp.peer in bp.global.trusted: 2 else: 1
|
|
||||||
if 0 < nPeers:
|
|
||||||
var (walkInx, stopInx) = (0, bp.global.rng.rand(nPeers - offInx))
|
|
||||||
for p in bp.global.trusted:
|
|
||||||
if p == bp.peer:
|
|
||||||
continue
|
|
||||||
if walkInx == stopInx:
|
|
||||||
return ok(p)
|
|
||||||
walkInx.inc
|
|
||||||
err()
|
|
||||||
|
|
||||||
proc getBestHeader(
|
|
||||||
bp: BestPivotWorkerRef;
|
|
||||||
): Future[Result[BlockHeader,void]]
|
|
||||||
{.async.} =
|
|
||||||
## Get best block number from best block hash.
|
|
||||||
##
|
|
||||||
## Ackn: nim-eth/eth/p2p/blockchain_sync.nim: `getBestBlockNumber()`
|
|
||||||
let
|
|
||||||
peer = bp.peer
|
|
||||||
startHash = peer.state(eth).bestBlockHash
|
|
||||||
reqLen = 1u
|
|
||||||
hdrReq = BlocksRequest(
|
|
||||||
startBlock: HashOrNum(
|
|
||||||
isHash: true,
|
|
||||||
hash: startHash),
|
|
||||||
maxResults: reqLen,
|
|
||||||
skip: 0,
|
|
||||||
reverse: true)
|
|
||||||
|
|
||||||
trace trEthSendSendingGetBlockHeaders, peer,
|
|
||||||
startBlock=startHash.data.toHex, reqLen
|
|
||||||
|
|
||||||
var hdrResp: Option[blockHeadersObj]
|
|
||||||
bp.safeTransport("Error fetching block header"):
|
|
||||||
hdrResp = await peer.getBlockHeaders(hdrReq)
|
|
||||||
if bp.ctrl.stopped:
|
|
||||||
return err()
|
|
||||||
|
|
||||||
if hdrResp.isNone:
|
|
||||||
bp.comFailCount.inc
|
|
||||||
trace trEthRecvReceivedBlockHeaders, peer, reqLen,
|
|
||||||
hdrRespLen="n/a", comFailCount=bp.comFailCount
|
|
||||||
if bp.global.comFailMax < bp.comFailCount:
|
|
||||||
bp.ctrl.zombie = true
|
|
||||||
return err()
|
|
||||||
|
|
||||||
let hdrRespLen = hdrResp.get.headers.len
|
|
||||||
if hdrRespLen == 1:
|
|
||||||
let
|
|
||||||
header = hdrResp.get.headers[0]
|
|
||||||
blockNumber {.used.} = header.blockNumber
|
|
||||||
trace trEthRecvReceivedBlockHeaders, peer, hdrRespLen, blockNumber
|
|
||||||
bp.comFailCount = 0 # reset fail count
|
|
||||||
return ok(header)
|
|
||||||
|
|
||||||
bp.comFailCount.inc
|
|
||||||
trace trEthRecvReceivedBlockHeaders, peer, reqLen,
|
|
||||||
hdrRespLen, comFailCount=bp.comFailCount
|
|
||||||
if bp.global.comFailMax < bp.comFailCount:
|
|
||||||
bp.ctrl.zombie = true
|
|
||||||
return err()
|
|
||||||
|
|
||||||
proc agreesOnChain(
|
|
||||||
bp: BestPivotWorkerRef;
|
|
||||||
other: Peer;
|
|
||||||
): Future[Result[void,bool]]
|
|
||||||
{.async.} =
|
|
||||||
## Returns `true` if one of the peers `bp.peer` or `other` acknowledges
|
|
||||||
## existence of the best block of the other peer. The values returned mean
|
|
||||||
## * ok() -- `peer` is trusted
|
|
||||||
## * err(true) -- `peer` is untrusted
|
|
||||||
## * err(false) -- `other` is dead
|
|
||||||
##
|
|
||||||
## Ackn: nim-eth/eth/p2p/blockchain_sync.nim: `peersAgreeOnChain()`
|
|
||||||
let
|
|
||||||
peer = bp.peer
|
|
||||||
var
|
|
||||||
start = peer
|
|
||||||
fetch = other
|
|
||||||
swapped = false
|
|
||||||
# Make sure that `fetch` has not the smaller difficulty.
|
|
||||||
if fetch.state(eth).bestDifficulty < start.state(eth).bestDifficulty:
|
|
||||||
swap(fetch, start)
|
|
||||||
swapped = true
|
|
||||||
|
|
||||||
let
|
|
||||||
startHash = start.state(eth).bestBlockHash
|
|
||||||
hdrReq = BlocksRequest(
|
|
||||||
startBlock: HashOrNum(
|
|
||||||
isHash: true,
|
|
||||||
hash: startHash),
|
|
||||||
maxResults: 1,
|
|
||||||
skip: 0,
|
|
||||||
reverse: true)
|
|
||||||
|
|
||||||
trace trEthSendSendingGetBlockHeaders, peer, start, fetch,
|
|
||||||
startBlock=startHash.data.toHex, hdrReqLen=1, swapped
|
|
||||||
|
|
||||||
var hdrResp: Option[blockHeadersObj]
|
|
||||||
bp.safeTransport("Error fetching block header"):
|
|
||||||
hdrResp = await fetch.getBlockHeaders(hdrReq)
|
|
||||||
if bp.ctrl.stopped:
|
|
||||||
if swapped:
|
|
||||||
return err(true)
|
|
||||||
# No need to terminate `peer` if it was the `other`, failing nevertheless
|
|
||||||
bp.ctrl.stopped = false
|
|
||||||
return err(false)
|
|
||||||
|
|
||||||
if hdrResp.isSome:
|
|
||||||
let hdrRespLen = hdrResp.get.headers.len
|
|
||||||
if 0 < hdrRespLen:
|
|
||||||
let blockNumber {.used.} = hdrResp.get.headers[0].blockNumber
|
|
||||||
trace trEthRecvReceivedBlockHeaders, peer, start, fetch,
|
|
||||||
hdrRespLen, blockNumber
|
|
||||||
return ok()
|
|
||||||
|
|
||||||
trace trEthRecvReceivedBlockHeaders, peer, start, fetch,
|
|
||||||
blockNumber="n/a", swapped
|
|
||||||
return err(true)
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Public functions, constructor
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
proc init*(
|
|
||||||
T: type BestPivotCtxRef; ## Global data descriptor type
|
|
||||||
rng: ref HmacDrbgContext; ## Random generator
|
|
||||||
minPeers = minPeersToStartSync; ## Consensus of at least this #of peers
|
|
||||||
failMax = failCountMax; ## Stop peer after too many com. errors
|
|
||||||
): T =
|
|
||||||
## Global constructor, shared data. If `minPeers` is smaller that `2`,
|
|
||||||
## relaxed mode will be enabled (see also `pivotRelaxedMode()`.)
|
|
||||||
result = T(rng: rng,
|
|
||||||
minPeers: minPeers,
|
|
||||||
comFailMax: failCountMax)
|
|
||||||
if minPeers < 2:
|
|
||||||
result.minPeers = minPeersToStartSync
|
|
||||||
result.relaxedMode = true
|
|
||||||
|
|
||||||
|
|
||||||
proc init*(
|
|
||||||
T: type BestPivotWorkerRef; ## Global data descriptor type
|
|
||||||
ctx: BestPivotCtxRef; ## Global data descriptor
|
|
||||||
ctrl: BuddyCtrlRef; ## Control and state settings
|
|
||||||
peer: Peer; ## For fetching data from network
|
|
||||||
): T =
|
|
||||||
## Buddy/local constructor
|
|
||||||
T(global: ctx,
|
|
||||||
header: none(BlockHeader),
|
|
||||||
ctrl: ctrl,
|
|
||||||
peer: peer)
|
|
||||||
|
|
||||||
proc clear*(bp: BestPivotWorkerRef) =
|
|
||||||
## Reset descriptor
|
|
||||||
bp.global.untrusted.incl bp.peer
|
|
||||||
bp.header = none(BlockHeader)
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Public functions
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
proc nPivotApproved*(ctx: BestPivotCtxRef): int =
|
|
||||||
## Number of trusted or relax mode approved pivots
|
|
||||||
(ctx.trusted + ctx.relaxed - ctx.untrusted).len
|
|
||||||
|
|
||||||
proc pivotRelaxedMode*(ctx: BestPivotCtxRef; enable = false) =
|
|
||||||
## Controls relaxed mode. In relaxed mode, the *best header* is fetched
|
|
||||||
## from the network and used as pivot if its block number is large enough.
|
|
||||||
## Otherwise, the default is to find at least `pivotMinPeersToStartSync`
|
|
||||||
## peers (this one included) that agree on a minimum pivot.
|
|
||||||
ctx.relaxedMode = enable
|
|
||||||
|
|
||||||
proc pivotHeader*(bp: BestPivotWorkerRef): Result[BlockHeader,void] =
|
|
||||||
## Returns cached block header if available and the buddy `peer` is trusted.
|
|
||||||
## In relaxed mode (see `pivotRelaxedMode()`), also lesser trusted pivots
|
|
||||||
## are returned.
|
|
||||||
if bp.header.isSome and
|
|
||||||
bp.peer notin bp.global.untrusted:
|
|
||||||
|
|
||||||
if bp.global.minPeers <= bp.global.trusted.len and
|
|
||||||
bp.peer in bp.global.trusted:
|
|
||||||
return ok(bp.header.unsafeGet)
|
|
||||||
|
|
||||||
if bp.global.relaxedMode:
|
|
||||||
when extraTraceMessages:
|
|
||||||
trace "Returning not fully trusted pivot", peer=bp.peer,
|
|
||||||
trusted=bp.global.trusted.len, untrusted=bp.global.untrusted.len
|
|
||||||
return ok(bp.header.unsafeGet)
|
|
||||||
|
|
||||||
err()
|
|
||||||
|
|
||||||
proc pivotHeader*(
|
|
||||||
bp: BestPivotWorkerRef; ## Worker peer
|
|
||||||
relaxedMode: bool; ## One time relaxed mode flag
|
|
||||||
): Result[BlockHeader,void] =
|
|
||||||
## Variant of `pivotHeader()` with `relaxedMode` flag as function argument.
|
|
||||||
if bp.header.isSome and
|
|
||||||
bp.peer notin bp.global.untrusted:
|
|
||||||
|
|
||||||
if bp.global.minPeers <= bp.global.trusted.len and
|
|
||||||
bp.peer in bp.global.trusted:
|
|
||||||
return ok(bp.header.unsafeGet)
|
|
||||||
|
|
||||||
if relaxedMode:
|
|
||||||
when extraTraceMessages:
|
|
||||||
trace "Returning not fully trusted pivot", peer=bp.peer,
|
|
||||||
trusted=bp.global.trusted.len, untrusted=bp.global.untrusted.len
|
|
||||||
return ok(bp.header.unsafeGet)
|
|
||||||
|
|
||||||
err()
|
|
||||||
|
|
||||||
proc pivotNegotiate*(
|
|
||||||
bp: BestPivotWorkerRef; ## Worker peer
|
|
||||||
minBlockNumber: Opt[BlockNumber]; ## Minimum block number to expect
|
|
||||||
): Future[bool]
|
|
||||||
{.async.} =
|
|
||||||
## Negotiate best header pivot. This function must be run in *single mode* at
|
|
||||||
## the beginning of a running worker peer. If the function returns `true`,
|
|
||||||
## the current `buddy` can be used for syncing and the function
|
|
||||||
## `bestPivotHeader()` will succeed returning a `BlockHeader`.
|
|
||||||
##
|
|
||||||
## In relaxed mode (see `pivotRelaxedMode()`), negotiation stopps when there
|
|
||||||
## is a *best header*. It caches the best header and returns `true` it the
|
|
||||||
## block number is large enough.
|
|
||||||
##
|
|
||||||
## Ackn: nim-eth/eth/p2p/blockchain_sync.nim: `startSyncWithPeer()`
|
|
||||||
##
|
|
||||||
let peer = bp.peer
|
|
||||||
|
|
||||||
# Delayed clean up batch list
|
|
||||||
if 0 < bp.global.untrusted.len:
|
|
||||||
when extraTraceMessages:
|
|
||||||
trace "Removing untrusted peers", peer, trusted=bp.global.trusted.len,
|
|
||||||
untrusted=bp.global.untrusted.len, runState=bp.ctrl.state
|
|
||||||
bp.global.trusted = bp.global.trusted - bp.global.untrusted
|
|
||||||
bp.global.relaxed = bp.global.relaxed - bp.global.untrusted
|
|
||||||
bp.global.untrusted.clear()
|
|
||||||
|
|
||||||
if bp.header.isNone:
|
|
||||||
when extraTraceMessages:
|
|
||||||
# Only log for the first time (if any)
|
|
||||||
trace "Pivot initialisation", peer,
|
|
||||||
trusted=bp.global.trusted.len, runState=bp.ctrl.state
|
|
||||||
|
|
||||||
let rc = await bp.getBestHeader()
|
|
||||||
# Beware of peer terminating the session right after communicating
|
|
||||||
if rc.isErr or bp.ctrl.stopped:
|
|
||||||
return false
|
|
||||||
let
|
|
||||||
bestNumber = rc.value.blockNumber
|
|
||||||
minNumber = minBlockNumber.get(otherwise = 0.toBlockNumber)
|
|
||||||
if bestNumber < minNumber:
|
|
||||||
bp.ctrl.zombie = true
|
|
||||||
trace "Useless peer, best number too low", peer,
|
|
||||||
trusted=bp.global.trusted.len, runState=bp.ctrl.state,
|
|
||||||
minNumber, bestNumber
|
|
||||||
return false
|
|
||||||
bp.header = some(rc.value)
|
|
||||||
|
|
||||||
# No further negotiation if in relaxed mode
|
|
||||||
if bp.global.relaxedMode:
|
|
||||||
bp.global.relaxed.incl bp.peer
|
|
||||||
return true
|
|
||||||
|
|
||||||
if bp.global.minPeers <= bp.global.trusted.len:
|
|
||||||
# We have enough trusted peers. Validate new peer against trusted
|
|
||||||
let rc = bp.getRandomTrustedPeer()
|
|
||||||
if rc.isOK:
|
|
||||||
let rx = await bp.agreesOnChain(rc.value)
|
|
||||||
if rx.isOk:
|
|
||||||
bp.global.trusted.incl peer
|
|
||||||
when extraTraceMessages:
|
|
||||||
let bestHeader {.used.} = if bp.header.isNone: "n/a"
|
|
||||||
else: bp.header.unsafeGet.blockNumber.toStr
|
|
||||||
trace "Accepting peer", peer, trusted=bp.global.trusted.len,
|
|
||||||
untrusted=bp.global.untrusted.len, runState=bp.ctrl.state,
|
|
||||||
bestHeader
|
|
||||||
return true
|
|
||||||
if not rx.error:
|
|
||||||
# Other peer is dead
|
|
||||||
bp.global.trusted.excl rc.value
|
|
||||||
return false
|
|
||||||
|
|
||||||
# If there are no trusted peers yet, assume this very peer is trusted,
|
|
||||||
# but do not finish initialisation until there are more peers.
|
|
||||||
if bp.global.trusted.len == 0:
|
|
||||||
bp.global.trusted.incl peer
|
|
||||||
when extraTraceMessages:
|
|
||||||
let bestHeader {.used.} = if bp.header.isNone: "n/a"
|
|
||||||
else: bp.header.unsafeGet.blockNumber.toStr
|
|
||||||
trace "Assume initial trusted peer", peer,
|
|
||||||
trusted=bp.global.trusted.len, runState=bp.ctrl.state, bestHeader
|
|
||||||
return false
|
|
||||||
|
|
||||||
if bp.global.trusted.len == 1 and bp.peer in bp.global.trusted:
|
|
||||||
# Ignore degenerate case, note that `trusted.len < minPeersToStartSync`
|
|
||||||
return false
|
|
||||||
|
|
||||||
# At this point we have some "trusted" candidates, but they are not
|
|
||||||
# "trusted" enough. We evaluate `peer` against all other candidates. If
|
|
||||||
# one of the candidates disagrees, we swap it for `peer`. If all candidates
|
|
||||||
# agree, we add `peer` to trusted set. The peers in the set will become
|
|
||||||
# "fully trusted" (and sync will start) when the set is big enough
|
|
||||||
var
|
|
||||||
agreeScore = 0
|
|
||||||
otherPeer: Peer
|
|
||||||
deadPeers: HashSet[Peer]
|
|
||||||
when extraTraceMessages:
|
|
||||||
trace "Trust scoring peer", peer,
|
|
||||||
trusted=bp.global.trusted.len, runState=bp.ctrl.state
|
|
||||||
for p in bp.global.trusted:
|
|
||||||
if peer == p:
|
|
||||||
inc agreeScore
|
|
||||||
else:
|
|
||||||
let rc = await bp.agreesOnChain(p)
|
|
||||||
if rc.isOk:
|
|
||||||
inc agreeScore
|
|
||||||
elif bp.ctrl.stopped:
|
|
||||||
# Beware of terminated session
|
|
||||||
return false
|
|
||||||
elif rc.error:
|
|
||||||
otherPeer = p
|
|
||||||
else:
|
|
||||||
# `Other` peer is dead
|
|
||||||
deadPeers.incl p
|
|
||||||
|
|
||||||
# Normalise
|
|
||||||
if 0 < deadPeers.len:
|
|
||||||
bp.global.trusted = bp.global.trusted - deadPeers
|
|
||||||
if bp.global.trusted.len == 0 or
|
|
||||||
bp.global.trusted.len == 1 and bp.peer in bp.global.trusted:
|
|
||||||
return false
|
|
||||||
|
|
||||||
# Check for the number of peers that disagree
|
|
||||||
case bp.global.trusted.len - agreeScore:
|
|
||||||
of 0:
|
|
||||||
bp.global.trusted.incl peer # best possible outcome
|
|
||||||
when extraTraceMessages:
|
|
||||||
trace "Agreeable trust score for peer", peer,
|
|
||||||
trusted=bp.global.trusted.len, runState=bp.ctrl.state
|
|
||||||
of 1:
|
|
||||||
bp.global.trusted.excl otherPeer
|
|
||||||
bp.global.trusted.incl peer
|
|
||||||
when extraTraceMessages:
|
|
||||||
trace "Other peer no longer trusted", peer,
|
|
||||||
otherPeer, trusted=bp.global.trusted.len, runState=bp.ctrl.state
|
|
||||||
else:
|
|
||||||
when extraTraceMessages:
|
|
||||||
trace "Peer not trusted", peer,
|
|
||||||
trusted=bp.global.trusted.len, runState=bp.ctrl.state
|
|
||||||
discard
|
|
||||||
|
|
||||||
# Evaluate status, finally
|
|
||||||
if bp.global.minPeers <= bp.global.trusted.len:
|
|
||||||
when extraTraceMessages:
|
|
||||||
let bestHeader {.used.} = if bp.header.isNone: "n/a"
|
|
||||||
else: bp.header.unsafeGet.blockNumber.toStr
|
|
||||||
trace "Peer trusted now", peer,
|
|
||||||
trusted=bp.global.trusted.len, runState=bp.ctrl.state, bestHeader
|
|
||||||
return true
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# End
|
|
||||||
# ------------------------------------------------------------------------------
|
|
@ -1,688 +0,0 @@
|
|||||||
# Nimbus
|
|
||||||
# Copyright (c) 2018-2024 Status Research & Development GmbH
|
|
||||||
# Licensed and distributed under either of
|
|
||||||
# * MIT license (license terms in the root directory or at
|
|
||||||
# https://opensource.org/licenses/MIT).
|
|
||||||
# * Apache v2 license (license terms in the root directory or at
|
|
||||||
# https://www.apache.org/licenses/LICENSE-2.0).
|
|
||||||
# at your option. This file may not be copied, modified, or distributed
|
|
||||||
# except according to those terms.
|
|
||||||
|
|
||||||
## Fetch and queue blocks
|
|
||||||
## ======================
|
|
||||||
##
|
|
||||||
## Worker items state diagram and sketch of sync algorithm:
|
|
||||||
## ::
|
|
||||||
## unprocessed | | ready for | store into
|
|
||||||
## block ranges | peer workers | persistent database | database
|
|
||||||
## =======================================================================
|
|
||||||
##
|
|
||||||
## +------------------------------------------+
|
|
||||||
## | |
|
|
||||||
## | +----------------------------+ |
|
|
||||||
## | | | |
|
|
||||||
## V v | |
|
|
||||||
## <unprocessed> ---+---> <worker-0> ---+-----> <staged> -------> OUTPUT
|
|
||||||
## | |
|
|
||||||
## +---> <worker-1> ---+
|
|
||||||
## | |
|
|
||||||
## +---> <worker-2> ---+
|
|
||||||
## : :
|
|
||||||
##
|
|
||||||
## A work item is created from a range of block numbers extracted from the
|
|
||||||
## `<unprocessed>` set of block ranges.
|
|
||||||
##
|
|
||||||
## A work item consists of a
|
|
||||||
## * current state `<worker-#>` or `<staged>`
|
|
||||||
## * given range of consecutive block numbers `[from..to]`
|
|
||||||
## * sequence of block headers relating to `[from..to]` (to be completed)
|
|
||||||
## * sequence of block buddies relating to `[from..to]` (to be completed)
|
|
||||||
##
|
|
||||||
## Block ranges *may* be recycled back into the `<unprocessed>` set when a
|
|
||||||
## work item is destroyed. This is supposed to be an exceptional case.
|
|
||||||
## Typically, a `<staged>` work item is added to the persistent block chain
|
|
||||||
## database and destroyed without block range recycling.
|
|
||||||
##
|
|
||||||
## Beware of `<staged>` overflow
|
|
||||||
## -----------------------------
|
|
||||||
## When the `<staged>` queue gets too long in non-backtrack/re-org mode, this
|
|
||||||
## may be caused by a gap between the least `<unprocessed>` block number and
|
|
||||||
## the least `<staged>` block number. Then a mechanism is invoked where
|
|
||||||
## `<unprocessed>` block range is updated.
|
|
||||||
##
|
|
||||||
## For backtrack/re-org the system runs in single instance mode tracing
|
|
||||||
## backvards parent hash references. So updating `<unprocessed>` block numbers
|
|
||||||
## would have no effect. In that case, the record with the largest block
|
|
||||||
## numbers are deleted from the `<staged>` list.
|
|
||||||
##
|
|
||||||
{.push raises:[].}
|
|
||||||
|
|
||||||
import
|
|
||||||
std/[algorithm, options, sequtils, strutils],
|
|
||||||
chronicles,
|
|
||||||
chronos,
|
|
||||||
eth/p2p,
|
|
||||||
stew/[byteutils, interval_set, sorted_set],
|
|
||||||
../../utils/utils,
|
|
||||||
".."/[protocol, sync_desc, types]
|
|
||||||
|
|
||||||
logScope:
|
|
||||||
topics = "block-queue"
|
|
||||||
|
|
||||||
const
|
|
||||||
maxStagedWorkItems = 70
|
|
||||||
## Maximal items in the `staged` list.
|
|
||||||
|
|
||||||
stagedWorkItemsTrigger = 50
|
|
||||||
## Turn on the global `poolMode` if there are more than this many items
|
|
||||||
## staged.
|
|
||||||
|
|
||||||
type
|
|
||||||
BlockQueueRC* = enum
|
|
||||||
## Return & error codes
|
|
||||||
AllSmileOk
|
|
||||||
EmptyQueue
|
|
||||||
StagedQueueOverflow
|
|
||||||
BlockNumberGap
|
|
||||||
BacktrackDisabled
|
|
||||||
FetchHeadersError
|
|
||||||
FetchBodiesError
|
|
||||||
NoMoreUnprocessed
|
|
||||||
NoMorePeerBlocks
|
|
||||||
|
|
||||||
BlockRangeSetRef = IntervalSetRef[BlockNumber,UInt256]
|
|
||||||
## Disjunct sets of block number intervals
|
|
||||||
|
|
||||||
BlockRange = Interval[BlockNumber,UInt256]
|
|
||||||
## Block number interval
|
|
||||||
|
|
||||||
BlockItemQueue = SortedSet[BlockNumber,BlockItemRef]
|
|
||||||
## Block intervals sorted by least block number
|
|
||||||
|
|
||||||
BlockItemWalkRef = SortedSetWalkRef[BlockNumber,BlockItemRef]
|
|
||||||
## Fast traversal descriptor for `BlockItemQueue`
|
|
||||||
|
|
||||||
BlockItemRef* = ref object
|
|
||||||
## Public block items, OUTPUT
|
|
||||||
blocks*: BlockRange ## Block numbers ranvge covered
|
|
||||||
topHash*: Option[Hash256] ## Fetched by top hash rather than block
|
|
||||||
headers*: seq[BlockHeader] ## Block headers received
|
|
||||||
hashes*: seq[Hash256] ## Hashed from `headers[]` for convenience
|
|
||||||
bodies*: seq[BlockBody] ## Block bodies received
|
|
||||||
|
|
||||||
BlockQueueCtxRef* = ref object
|
|
||||||
## Globally shared data among `block` instances
|
|
||||||
backtrack: Option[Hash256] ## Find reverse block after re-org
|
|
||||||
unprocessed: BlockRangeSetRef ## Block ranges to fetch
|
|
||||||
staged: BlockItemQueue ## Blocks fetched but not stored yet
|
|
||||||
topAccepted: BlockNumber ## Up to this block number processed OK
|
|
||||||
|
|
||||||
BlockQueueWorkerRef* = ref object
|
|
||||||
## Local descriptor data extension
|
|
||||||
global: BlockQueueCtxRef ## Common data
|
|
||||||
bestNumber: Opt[BlockNumber] ## Largest block number reported
|
|
||||||
ctrl: BuddyCtrlRef ## Control and state settings
|
|
||||||
peer: Peer ## network peer
|
|
||||||
|
|
||||||
BlockQueueStats* = object
|
|
||||||
## Statistics
|
|
||||||
topAccepted*: BlockNumber
|
|
||||||
nextUnprocessed*: Opt[BlockNumber]
|
|
||||||
nextStaged*: Opt[BlockNumber]
|
|
||||||
nStagedQueue*: int
|
|
||||||
reOrg*: bool
|
|
||||||
|
|
||||||
const
|
|
||||||
extraTraceMessages = false or true
|
|
||||||
## Enabled additional logging noise
|
|
||||||
|
|
||||||
highBlockNumber = high(BlockNumber)
|
|
||||||
highBlockRange = BlockRange.new(highBlockNumber,highBlockNumber)
|
|
||||||
|
|
||||||
static:
|
|
||||||
doAssert stagedWorkItemsTrigger < maxStagedWorkItems
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Private helpers
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
proc `+`(n: BlockNumber; delta: static[int]): BlockNumber =
|
|
||||||
## Syntactic sugar for expressions like `xxx.toBlockNumber + 1`
|
|
||||||
n + delta.toBlockNumber
|
|
||||||
|
|
||||||
proc `-`(n: BlockNumber; delta: static[int]): BlockNumber =
|
|
||||||
## Syntactic sugar for expressions like `xxx.toBlockNumber - 1`
|
|
||||||
n - delta.toBlockNumber
|
|
||||||
|
|
||||||
proc merge(ivSet: BlockRangeSetRef; wi: BlockItemRef): Uint256 =
|
|
||||||
## Syntactic sugar
|
|
||||||
ivSet.merge(wi.blocks)
|
|
||||||
|
|
||||||
proc reduce(ivSet: BlockRangeSetRef; wi: BlockItemRef): Uint256 =
|
|
||||||
## Syntactic sugar
|
|
||||||
ivSet.reduce(wi.blocks)
|
|
||||||
|
|
||||||
# ---------------
|
|
||||||
|
|
||||||
proc `$`(iv: BlockRange): string =
|
|
||||||
## Needed for macro generated DSL files like `snap.nim` because the
|
|
||||||
## `distinct` flavour of `NodeTag` is discarded there.
|
|
||||||
result = "[" & iv.minPt.toStr
|
|
||||||
if iv.minPt != iv.maxPt:
|
|
||||||
result &= "," & iv.maxPt.toStr
|
|
||||||
result &= "]"
|
|
||||||
|
|
||||||
proc `$`(n: Option[BlockRange]): string =
|
|
||||||
if n.isNone: "n/a" else: $n.get
|
|
||||||
|
|
||||||
proc `$`(n: Opt[BlockNumber]): string =
|
|
||||||
n.toStr
|
|
||||||
|
|
||||||
proc `$`(brs: BlockRangeSetRef): string =
|
|
||||||
"{" & toSeq(brs.increasing).mapIt($it).join(",") & "}"
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Private helpers
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
proc nextUnprocessed(ctx: BlockQueueCtxRef): Opt[BlockNumber] =
|
|
||||||
## Pseudo getter
|
|
||||||
let rc = ctx.unprocessed.ge()
|
|
||||||
if rc.isOK:
|
|
||||||
result = Opt.some(rc.value.minPt)
|
|
||||||
|
|
||||||
proc nextStaged(ctx: BlockQueueCtxRef): Opt[BlockRange] =
|
|
||||||
## Pseudo getter
|
|
||||||
let rc = ctx.staged.ge(low(BlockNumber))
|
|
||||||
if rc.isOK:
|
|
||||||
result = Opt.some(rc.value.data.blocks)
|
|
||||||
|
|
||||||
template safeTransport(
|
|
||||||
qd: BlockQueueWorkerRef;
|
|
||||||
info: static[string];
|
|
||||||
code: untyped) =
|
|
||||||
try:
|
|
||||||
code
|
|
||||||
except TransportError as e:
|
|
||||||
error info & ", stop", error=($e.name), msg=e.msg
|
|
||||||
qd.ctrl.stopped = true
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Private functions
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
proc newWorkItem(qd: BlockQueueWorkerRef): Result[BlockItemRef,BlockQueueRC] =
|
|
||||||
## Fetch the next unprocessed block range and register it as work item.
|
|
||||||
##
|
|
||||||
## This function will grab a block range from the `unprocessed` range set,
|
|
||||||
## ove it and return it as a `BlockItemRef`. The returned range is registered
|
|
||||||
## in the `pending` list.
|
|
||||||
let rc = qd.global.unprocessed.ge()
|
|
||||||
if rc.isErr:
|
|
||||||
return err(NoMoreUnprocessed) # no more data for this peer
|
|
||||||
|
|
||||||
# Check whether there is somthing to do at all
|
|
||||||
if qd.bestNumber.isNone or
|
|
||||||
qd.bestNumber.unsafeGet < rc.value.minPt:
|
|
||||||
when extraTraceMessages:
|
|
||||||
trace "no new work item", bestNumer=qd.bestNumber.toStr, range=rc.value
|
|
||||||
return err(NoMorePeerBlocks) # no more data for this peer
|
|
||||||
|
|
||||||
# Compute interval
|
|
||||||
let iv = BlockRange.new(
|
|
||||||
rc.value.minPt,
|
|
||||||
min(rc.value.maxPt,
|
|
||||||
min(rc.value.minPt + maxHeadersFetch - 1, qd.bestNumber.unsafeGet)))
|
|
||||||
|
|
||||||
discard qd.global.unprocessed.reduce(iv)
|
|
||||||
ok(BlockItemRef(blocks: iv))
|
|
||||||
|
|
||||||
|
|
||||||
proc stageItem(
|
|
||||||
qd: BlockQueueWorkerRef;
|
|
||||||
wi: BlockItemRef;
|
|
||||||
): Result[void,BlockQueueRC] =
|
|
||||||
## Add work item to the list of staged items
|
|
||||||
##
|
|
||||||
## Typically, the function returns `AllSmileOk` unless there is a queue
|
|
||||||
## oberflow (with return code`StagedQueueOverflow`) which needs to be handled
|
|
||||||
## in *pool mode* by running `blockQueueGrout()`.
|
|
||||||
var
|
|
||||||
error = AllSmileOk
|
|
||||||
let
|
|
||||||
peer = qd.peer
|
|
||||||
rc = qd.global.staged.insert(wi.blocks.minPt)
|
|
||||||
if rc.isOk:
|
|
||||||
rc.value.data = wi
|
|
||||||
|
|
||||||
# Return `true` if staged queue oberflows (unless backtracking.)
|
|
||||||
if stagedWorkItemsTrigger < qd.global.staged.len and
|
|
||||||
qd.global.backtrack.isNone and
|
|
||||||
wi.topHash.isNone:
|
|
||||||
debug "Staged queue too long", peer,
|
|
||||||
staged=qd.global.staged.len, max=stagedWorkItemsTrigger
|
|
||||||
error = StagedQueueOverflow
|
|
||||||
|
|
||||||
# The list size is limited. So cut if necessary and recycle back the block
|
|
||||||
# range of the discarded item (tough luck if the current work item is the
|
|
||||||
# one removed from top.)
|
|
||||||
while maxStagedWorkItems < qd.global.staged.len:
|
|
||||||
let topValue = qd.global.staged.le(highBlockNumber).value
|
|
||||||
discard qd.global.unprocessed.merge(topValue.data)
|
|
||||||
discard qd.global.staged.delete(topValue.key)
|
|
||||||
else:
|
|
||||||
# Ooops, duplicates should not exist (but anyway ...)
|
|
||||||
let wj = block:
|
|
||||||
let rc = qd.global.staged.eq(wi.blocks.minPt)
|
|
||||||
doAssert rc.isOk
|
|
||||||
# Store `wi` and return offending entry
|
|
||||||
let rcData = rc.value.data
|
|
||||||
rc.value.data = wi
|
|
||||||
rcData
|
|
||||||
|
|
||||||
# Update `staged` list and `unprocessed` ranges
|
|
||||||
block:
|
|
||||||
debug "Replacing dup item in staged list", peer,
|
|
||||||
range=($wi.blocks), discarded=($wj.blocks)
|
|
||||||
let rc = wi.blocks - wj.blocks
|
|
||||||
if rc.isOk:
|
|
||||||
discard qd.global.unprocessed.merge(rc.value)
|
|
||||||
|
|
||||||
if error != AllSmileOk:
|
|
||||||
return err(error)
|
|
||||||
ok()
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Private functions, asynchroneous data network activity
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
proc fetchHeaders(
|
|
||||||
qd: BlockQueueWorkerRef;
|
|
||||||
wi: BlockItemRef;
|
|
||||||
): Future[bool]
|
|
||||||
{.async.} =
|
|
||||||
## Get the work item with the least interval and complete it. The function
|
|
||||||
## returns `true` if bodies were fetched and there were no inconsistencies.
|
|
||||||
if 0 < wi.hashes.len:
|
|
||||||
return true
|
|
||||||
|
|
||||||
let peer = qd.peer
|
|
||||||
var hdrReq: BlocksRequest
|
|
||||||
if wi.topHash.isNone:
|
|
||||||
hdrReq = BlocksRequest(
|
|
||||||
startBlock: HashOrNum(
|
|
||||||
isHash: false,
|
|
||||||
number: wi.blocks.minPt),
|
|
||||||
maxResults: wi.blocks.len.truncate(uint),
|
|
||||||
skip: 0,
|
|
||||||
reverse: false)
|
|
||||||
trace trEthSendSendingGetBlockHeaders, peer,
|
|
||||||
blocks=($wi.blocks)
|
|
||||||
|
|
||||||
else:
|
|
||||||
hdrReq = BlocksRequest(
|
|
||||||
startBlock: HashOrNum(
|
|
||||||
isHash: true,
|
|
||||||
hash: wi.topHash.get),
|
|
||||||
maxResults: maxHeadersFetch,
|
|
||||||
skip: 0,
|
|
||||||
reverse: true)
|
|
||||||
trace trEthSendSendingGetBlockHeaders & " reverse", peer,
|
|
||||||
topHash=hdrReq.startBlock.hash, reqLen=hdrReq.maxResults
|
|
||||||
|
|
||||||
# Fetch headers from peer
|
|
||||||
var hdrResp: Option[blockHeadersObj]
|
|
||||||
block:
|
|
||||||
let reqLen {.used.} = hdrReq.maxResults
|
|
||||||
qd.safeTransport("Error fetching block headers"):
|
|
||||||
hdrResp = await peer.getBlockHeaders(hdrReq)
|
|
||||||
# Beware of peer terminating the session
|
|
||||||
if qd.ctrl.stopped:
|
|
||||||
return false
|
|
||||||
|
|
||||||
if hdrResp.isNone:
|
|
||||||
trace trEthRecvReceivedBlockHeaders, peer, reqLen, respose="n/a"
|
|
||||||
return false
|
|
||||||
|
|
||||||
let hdrRespLen = hdrResp.get.headers.len
|
|
||||||
trace trEthRecvReceivedBlockHeaders, peer, reqLen, hdrRespLen
|
|
||||||
|
|
||||||
if hdrRespLen == 0:
|
|
||||||
qd.ctrl.stopped = true
|
|
||||||
return false
|
|
||||||
|
|
||||||
# Update block range for reverse search
|
|
||||||
if wi.topHash.isSome:
|
|
||||||
# Headers are in reversed order
|
|
||||||
wi.headers = hdrResp.get.headers.reversed
|
|
||||||
wi.blocks = BlockRange.new(
|
|
||||||
wi.headers[0].blockNumber, wi.headers[^1].blockNumber)
|
|
||||||
discard qd.global.unprocessed.reduce(wi)
|
|
||||||
trace "Updated reverse header range", peer, range=($wi.blocks)
|
|
||||||
|
|
||||||
# Verify start block number
|
|
||||||
elif hdrResp.get.headers[0].blockNumber != wi.blocks.minPt:
|
|
||||||
trace "Header range starts with wrong block number", peer,
|
|
||||||
startBlock=hdrResp.get.headers[0].blockNumber,
|
|
||||||
requestedBlock=wi.blocks.minPt
|
|
||||||
qd.ctrl.zombie = true
|
|
||||||
return false
|
|
||||||
|
|
||||||
# Import into `wi.headers`
|
|
||||||
else:
|
|
||||||
wi.headers = system.move(hdrResp.get.headers)
|
|
||||||
|
|
||||||
# Calculate block header hashes and verify it against parent links. If
|
|
||||||
# necessary, cut off some offending block headers tail.
|
|
||||||
wi.hashes.setLen(wi.headers.len)
|
|
||||||
wi.hashes[0] = wi.headers[0].hash
|
|
||||||
for n in 1 ..< wi.headers.len:
|
|
||||||
if wi.headers[n-1].blockNumber + 1 != wi.headers[n].blockNumber:
|
|
||||||
trace "Non-consecutive block numbers in header list response", peer
|
|
||||||
qd.ctrl.zombie = true
|
|
||||||
return false
|
|
||||||
if wi.hashes[n-1] != wi.headers[n].parentHash:
|
|
||||||
# Oops, cul-de-sac after block chain re-org?
|
|
||||||
trace "Dangling parent link in header list response. Re-org?", peer
|
|
||||||
wi.headers.setLen(n)
|
|
||||||
wi.hashes.setLen(n)
|
|
||||||
break
|
|
||||||
wi.hashes[n] = wi.headers[n].hash
|
|
||||||
|
|
||||||
# Adjust range length if necessary
|
|
||||||
if wi.headers[^1].blockNumber < wi.blocks.maxPt:
|
|
||||||
let redRng = BlockRange.new(
|
|
||||||
wi.headers[0].blockNumber, wi.headers[^1].blockNumber)
|
|
||||||
trace "Adjusting block range", peer, range=($wi.blocks), reduced=($redRng)
|
|
||||||
discard qd.global.unprocessed.merge(redRng.maxPt + 1, wi.blocks.maxPt)
|
|
||||||
wi.blocks = redRng
|
|
||||||
|
|
||||||
return true
|
|
||||||
|
|
||||||
|
|
||||||
proc fetchBodies(
|
|
||||||
qd: BlockQueueWorkerRef;
|
|
||||||
wi: BlockItemRef
|
|
||||||
): Future[bool]
|
|
||||||
{.async.} =
|
|
||||||
## Get the work item with the least interval and complete it. The function
|
|
||||||
## returns `true` if bodies were fetched and there were no inconsistencies.
|
|
||||||
let peer = qd.peer
|
|
||||||
|
|
||||||
# Complete group of bodies
|
|
||||||
qd.safeTransport("Error fetching block bodies"):
|
|
||||||
while wi.bodies.len < wi.hashes.len:
|
|
||||||
let
|
|
||||||
start = wi.bodies.len
|
|
||||||
reqLen = min(wi.hashes.len - wi.bodies.len, maxBodiesFetch)
|
|
||||||
top = start + reqLen
|
|
||||||
hashes = wi.hashes[start ..< top]
|
|
||||||
|
|
||||||
trace trEthSendSendingGetBlockBodies, peer, reqLen
|
|
||||||
|
|
||||||
# Append bodies from peer to `wi.bodies`
|
|
||||||
block:
|
|
||||||
let bdyResp = await peer.getBlockBodies(hashes)
|
|
||||||
# Beware of peer terminating the session
|
|
||||||
if qd.ctrl.stopped:
|
|
||||||
return false
|
|
||||||
|
|
||||||
if bdyResp.isNone:
|
|
||||||
trace trEthRecvReceivedBlockBodies, peer, reqLen, respose="n/a"
|
|
||||||
qd.ctrl.zombie = true
|
|
||||||
return false
|
|
||||||
|
|
||||||
let bdyRespLen = bdyResp.get.blocks.len
|
|
||||||
trace trEthRecvReceivedBlockBodies, peer, reqLen, bdyRespLen
|
|
||||||
|
|
||||||
if bdyRespLen == 0 or reqLen < bdyRespLen:
|
|
||||||
qd.ctrl.zombie = true
|
|
||||||
return false
|
|
||||||
|
|
||||||
wi.bodies.add bdyResp.get.blocks
|
|
||||||
|
|
||||||
return true
|
|
||||||
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Public functions, constructor
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
proc init*(
|
|
||||||
T: type BlockQueueCtxRef; ## Global data descriptor type
|
|
||||||
firstBlockNumber = 0.toBlockNumber; ## Of first block to fetch from network
|
|
||||||
): T =
|
|
||||||
## Global constructor, shared data
|
|
||||||
result = T(
|
|
||||||
unprocessed: BlockRangeSetRef.init())
|
|
||||||
result.staged.init()
|
|
||||||
result.topAccepted = max(firstBlockNumber,1.toBlockNumber) - 1
|
|
||||||
discard result.unprocessed.merge(result.topAccepted + 1, highBlockNumber)
|
|
||||||
|
|
||||||
proc init*(
|
|
||||||
T: type BlockQueueWorkerRef; ## Global data descriptor type
|
|
||||||
ctx: BlockQueueCtxRef; ## Global data descriptor
|
|
||||||
ctrl: BuddyCtrlRef; ## Control and state settings
|
|
||||||
peer: Peer; ## For fetching data from network
|
|
||||||
): T =
|
|
||||||
## Buddy/local constructor
|
|
||||||
T(global: ctx,
|
|
||||||
peer: peer,
|
|
||||||
ctrl: ctrl)
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Public functions -- getter/setter
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
proc bestNumber*(qd: BlockQueueWorkerRef): Opt[BlockNumber] =
|
|
||||||
## Getter
|
|
||||||
qd.bestNumber
|
|
||||||
|
|
||||||
proc `bestNumber=`*(qd: BlockQueueWorkerRef; val: Opt[BlockNumber]) =
|
|
||||||
## Setter, needs to be set to something valid so that `blockQueueWorker()`
|
|
||||||
## does something useful.
|
|
||||||
qd.bestNumber = val
|
|
||||||
|
|
||||||
proc topAccepted*(qd: BlockQueueWorkerRef): BlockNumber =
|
|
||||||
## Getter
|
|
||||||
qd.global.topAccepted
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Public functions -- synchronous
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
proc blockQueueFetchStaged*(
|
|
||||||
qd: BlockQueueWorkerRef;
|
|
||||||
): Result[BlockItemRef,BlockQueueRC]=
|
|
||||||
## Fetch the next item from the staged block queue. This item will be removed
|
|
||||||
## from the staged queue and must be recycled if it cannot be processed.
|
|
||||||
##
|
|
||||||
## On error, the function returns `EmptyQueue` if the queue was empty and
|
|
||||||
## `BlockNumberGap` if processing this item would result in a gap between the
|
|
||||||
## last accepted block number and the fitsr block number of the next queue
|
|
||||||
## item.
|
|
||||||
##
|
|
||||||
## This gap might appear if another function processes the in-beween block
|
|
||||||
## in paralell or if something went wrong, see `blockQueueGrout()`, below.
|
|
||||||
let rc = qd.global.staged.ge(low(BlockNumber))
|
|
||||||
if rc.isErr:
|
|
||||||
# No more items in the database
|
|
||||||
return err(EmptyQueue)
|
|
||||||
|
|
||||||
let
|
|
||||||
peer {.used.} = qd.peer
|
|
||||||
wi = rc.value.data
|
|
||||||
topAccepted = qd.global.topAccepted
|
|
||||||
startNumber = wi.headers[0].blockNumber
|
|
||||||
|
|
||||||
# Check whether this record of blocks can be stored, at all
|
|
||||||
if topAccepted + 1 < startNumber:
|
|
||||||
trace "Staged work item postponed", peer, topAccepted,
|
|
||||||
range=($wi.blocks), staged=qd.global.staged.len
|
|
||||||
return err(BlockNumberGap)
|
|
||||||
|
|
||||||
# Ok, store into the block chain database
|
|
||||||
trace "Staged work item", peer,
|
|
||||||
topAccepted, range=($wi.blocks)
|
|
||||||
|
|
||||||
# Remove from staged DB
|
|
||||||
discard qd.global.staged.delete(wi.blocks.minPt)
|
|
||||||
|
|
||||||
ok(wi)
|
|
||||||
|
|
||||||
|
|
||||||
proc blockQueueAccept*(qd: BlockQueueWorkerRef; wi: BlockItemRef) =
|
|
||||||
## Mark this argument item `wi` to be the item with the topmost block number
|
|
||||||
## accepted. This statement comes tyipcally after the successful processing
|
|
||||||
## and storage of the work item fetched by `blockQueueFetchStaged()`.
|
|
||||||
qd.global.topAccepted = wi.blocks.maxPt
|
|
||||||
|
|
||||||
|
|
||||||
proc blockQueueGrout*(qd: BlockQueueWorkerRef) =
|
|
||||||
## Fill the gap unprocessed and staged block numbers. If there is such a gap
|
|
||||||
## (which should not at all), the `blockQueueFetchStaged()` will always fail
|
|
||||||
## with a `true` error code because there is no next work item.
|
|
||||||
##
|
|
||||||
## To close the gap and avoid double processing, all other workers should
|
|
||||||
## have finished their tasks while this function is run. A way to achive that
|
|
||||||
## is to run this function in *pool mode* once.
|
|
||||||
# Mind the gap, fill in if necessary
|
|
||||||
let covered = min(
|
|
||||||
qd.global.nextUnprocessed.get(otherwise = highBlockNumber),
|
|
||||||
qd.global.nextStaged.get(otherwise = highBlockRange).minPt)
|
|
||||||
if qd.global.topAccepted + 1 < covered:
|
|
||||||
discard qd.global.unprocessed.merge(qd.global.topAccepted + 1, covered - 1)
|
|
||||||
|
|
||||||
|
|
||||||
proc blockQueueRecycle*(qd: BlockQueueWorkerRef; wi: BlockItemRef) =
|
|
||||||
## Put back and destroy the `wi` argument item. The covered block range needs
|
|
||||||
## to be re-fetched from the network. This statement is typically used instead
|
|
||||||
## of `blockQueueAccept()` after a failure tpo process and store the work item
|
|
||||||
## fetched by `blockQueueFetchStaged()`.
|
|
||||||
discard qd.global.unprocessed.merge(wi.blocks)
|
|
||||||
|
|
||||||
|
|
||||||
proc blockQueueRecycleStaged*(qd: BlockQueueWorkerRef) =
|
|
||||||
## Similar to `blockQueueRecycle()`, recycle all items from the staged queue.
|
|
||||||
# using fast traversal
|
|
||||||
let
|
|
||||||
walk = BlockItemWalkRef.init(qd.global.staged)
|
|
||||||
var
|
|
||||||
rc = walk.first()
|
|
||||||
while rc.isOk:
|
|
||||||
# Store back into `unprocessed` ranges set
|
|
||||||
discard qd.global.unprocessed.merge(rc.value.data)
|
|
||||||
rc = walk.next()
|
|
||||||
# optional clean up, see comments on the destroy() directive
|
|
||||||
walk.destroy()
|
|
||||||
qd.global.staged.clear()
|
|
||||||
|
|
||||||
|
|
||||||
proc blockQueueBacktrackFrom*(qd: BlockQueueWorkerRef; wi: BlockItemRef) =
|
|
||||||
## Set backtrack mode starting with the blocks before the argument work
|
|
||||||
## item `wi`.
|
|
||||||
qd.global.backtrack = some(wi.headers[0].parentHash)
|
|
||||||
|
|
||||||
|
|
||||||
proc blockQueueBacktrackOk*(qd: BlockQueueWorkerRef): bool =
|
|
||||||
## Returns `true` if the queue is in backtrack mode.
|
|
||||||
qd.global.backtrack.isSome
|
|
||||||
|
|
||||||
|
|
||||||
proc blockQueueStats*(ctx: BlockQueueCtxRef; stats: var BlockQueueStats) =
|
|
||||||
## Collect global statistics
|
|
||||||
stats.topAccepted = ctx.topAccepted
|
|
||||||
stats.nextUnprocessed = ctx.nextUnprocessed
|
|
||||||
stats.nStagedQueue = ctx.staged.len
|
|
||||||
stats.reOrg = ctx.backtrack.isSome
|
|
||||||
stats.nextStaged =
|
|
||||||
if ctx.nextStaged.isSome: Opt.some(ctx.nextStaged.unsafeGet.minPt)
|
|
||||||
else: Opt.none(BlockNumber)
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Public functions -- asynchronous
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
proc blockQueueBacktrackWorker*(
|
|
||||||
qd: BlockQueueWorkerRef;
|
|
||||||
): Future[Result[void,BlockQueueRC]]
|
|
||||||
{.async.} =
|
|
||||||
## This function does some backtrack processing on the queue. Backtracking
|
|
||||||
## is single threaded due to the fact that the next block is identified by
|
|
||||||
## the hash of the parent header. So this function needs to run in *single
|
|
||||||
## mode*.
|
|
||||||
##
|
|
||||||
## If backtracking is enabled, this function fetches the next parent work
|
|
||||||
## item from the network and makes it available on the staged queue to be
|
|
||||||
## retrieved with `blockQueueFetchStaged()`. In that case, the function
|
|
||||||
## succeeds and `blockQueueBacktrackOk()` will return `false`.
|
|
||||||
##
|
|
||||||
## In all other cases, the function returns an error code.
|
|
||||||
var error = BacktrackDisabled
|
|
||||||
if qd.global.backtrack.isSome:
|
|
||||||
let
|
|
||||||
peer {.used.} = qd.peer
|
|
||||||
wi = BlockItemRef(
|
|
||||||
# This dummy interval can savely merged back without any effect
|
|
||||||
blocks: highBlockRange,
|
|
||||||
# Enable backtrack
|
|
||||||
topHash: some(qd.global.backtrack.unsafeGet))
|
|
||||||
|
|
||||||
# Fetch headers and bodies for the current work item
|
|
||||||
trace "Single mode worker, re-org backtracking", peer
|
|
||||||
if not await qd.fetchHeaders(wi):
|
|
||||||
error = FetchHeadersError
|
|
||||||
elif not await qd.fetchBodies(wi):
|
|
||||||
error = FetchBodiesError
|
|
||||||
else:
|
|
||||||
qd.global.backtrack = none(Hash256)
|
|
||||||
discard qd.stageItem(wi)
|
|
||||||
return ok()
|
|
||||||
|
|
||||||
# This work item failed, nothing to do anymore.
|
|
||||||
discard qd.global.unprocessed.merge(wi)
|
|
||||||
|
|
||||||
return err(error)
|
|
||||||
|
|
||||||
|
|
||||||
proc blockQueueWorker*(
|
|
||||||
qd: BlockQueueWorkerRef;
|
|
||||||
): Future[Result[void,BlockQueueRC]]
|
|
||||||
{.async.} =
|
|
||||||
## Normal worker function used to stage another work item be retrieved by
|
|
||||||
## `blockQueueFetchStaged()`. This function may run in *multi mode*. Not
|
|
||||||
## until retrieving work items the queue will be synchronised in a way that
|
|
||||||
## after the next item can be retrieved the queue will be blocked by a
|
|
||||||
## *gap* until the item is commited by `blockQueueAccept()`.
|
|
||||||
##
|
|
||||||
## On error, with most error codes there is not much that can be done. The
|
|
||||||
## one remarcable error code is `StagedQueueOverflow` which pops up if there
|
|
||||||
## is a gap between unprocessed and staged block numbers. One of the actions
|
|
||||||
## to be considered here is to run `blockQueueGrout()` in *pool mode*.
|
|
||||||
## Otherwise, the `StagedQueueOverflow` can be treated as a success would be.
|
|
||||||
##
|
|
||||||
# Fetch work item
|
|
||||||
let wi = block:
|
|
||||||
let rc = qd.newWorkItem()
|
|
||||||
if rc.isErr:
|
|
||||||
# No way, end of capacity for this peer => re-calibrate
|
|
||||||
qd.bestNumber = Opt.none(BlockNumber)
|
|
||||||
return err(rc.error)
|
|
||||||
rc.value
|
|
||||||
|
|
||||||
# Fetch headers and bodies for the current work item
|
|
||||||
var error = AllSmileOk
|
|
||||||
if not await qd.fetchHeaders(wi):
|
|
||||||
error = FetchHeadersError
|
|
||||||
elif not await qd.fetchBodies(wi):
|
|
||||||
error = FetchBodiesError
|
|
||||||
else:
|
|
||||||
return qd.stageItem(wi)
|
|
||||||
|
|
||||||
# This work item failed
|
|
||||||
discard qd.global.unprocessed.merge(wi)
|
|
||||||
return err(error)
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# End
|
|
||||||
# ------------------------------------------------------------------------------
|
|
@ -1,62 +0,0 @@
|
|||||||
# Nimbus
|
|
||||||
# Copyright (c) 2021-2024 Status Research & Development GmbH
|
|
||||||
# Licensed under either of
|
|
||||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
|
||||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
|
||||||
# http://opensource.org/licenses/MIT)
|
|
||||||
# at your option. This file may not be copied, modified, or distributed
|
|
||||||
# except according to those terms.
|
|
||||||
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
|
||||||
std/[os, strutils],
|
|
||||||
chronicles,
|
|
||||||
eth/[common, p2p]
|
|
||||||
|
|
||||||
logScope:
|
|
||||||
topics = "sync-ctrl"
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Private functions
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
proc getDataLine(
|
|
||||||
name: string;
|
|
||||||
lineNum: int;
|
|
||||||
): string {.gcsafe, raises: [IOError].} =
|
|
||||||
if name.fileExists:
|
|
||||||
let file = name.open
|
|
||||||
defer: file.close
|
|
||||||
let linesRead = file.readAll.splitLines
|
|
||||||
if lineNum < linesRead.len:
|
|
||||||
return linesRead[lineNum].strip
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Public functions
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
proc syncCtrlBlockNumberFromFile*(
|
|
||||||
fileName: Opt[string]; # Optional file name
|
|
||||||
lineNum = 0; # Read line from file
|
|
||||||
): Result[BlockNumber,void] =
|
|
||||||
## Returns a block number from the file name argument `fileName`. The first
|
|
||||||
## line of the file is parsed as a decimal encoded block number.
|
|
||||||
if fileName.isSome:
|
|
||||||
let file = fileName.get
|
|
||||||
try:
|
|
||||||
let data = file.getDataLine(lineNum)
|
|
||||||
if 0 < data.len:
|
|
||||||
let num = parse(data,UInt256)
|
|
||||||
return ok(num.toBlockNumber)
|
|
||||||
except CatchableError as e:
|
|
||||||
let
|
|
||||||
name {.used.} = $e.name
|
|
||||||
msg {.used.} = e.msg
|
|
||||||
debug "Exception while parsing block number", file, name, msg
|
|
||||||
err()
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# End
|
|
||||||
# ------------------------------------------------------------------------------
|
|
@ -1,324 +0,0 @@
|
|||||||
# Nimbus - Fetch account and storage states from peers efficiently
|
|
||||||
#
|
|
||||||
# Copyright (c) 2021-2024 Status Research & Development GmbH
|
|
||||||
# Licensed under either of
|
|
||||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
|
||||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
|
||||||
# http://opensource.org/licenses/MIT)
|
|
||||||
# at your option. This file may not be copied, modified, or distributed
|
|
||||||
# except according to those terms.
|
|
||||||
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
|
||||||
std/[strformat, strutils],
|
|
||||||
chronos,
|
|
||||||
chronicles,
|
|
||||||
eth/[common, p2p],
|
|
||||||
stint,
|
|
||||||
../../utils/prettify,
|
|
||||||
../types,
|
|
||||||
./timer_helper
|
|
||||||
|
|
||||||
logScope:
|
|
||||||
topics = "tick"
|
|
||||||
|
|
||||||
type
|
|
||||||
TickerSnapStatsUpdater* = proc: TickerSnapStats {.gcsafe, raises: [].}
|
|
||||||
## Snap sync state update function
|
|
||||||
|
|
||||||
TickerFullStatsUpdater* = proc: TickerFullStats {.gcsafe, raises: [].}
|
|
||||||
## Full sync state update function
|
|
||||||
|
|
||||||
SnapDescDetails = object
|
|
||||||
## Private state descriptor
|
|
||||||
snapCb: TickerSnapStatsUpdater
|
|
||||||
recovery: bool
|
|
||||||
lastRecov: bool
|
|
||||||
lastStats: TickerSnapStats
|
|
||||||
|
|
||||||
FullDescDetails = object
|
|
||||||
## Private state descriptor
|
|
||||||
fullCb: TickerFullStatsUpdater
|
|
||||||
lastStats: TickerFullStats
|
|
||||||
|
|
||||||
TickerSnapStats* = object
|
|
||||||
## Snap sync state (see `TickerSnapStatsUpdater`)
|
|
||||||
beaconBlock*: Opt[BlockNumber]
|
|
||||||
pivotBlock*: Opt[BlockNumber]
|
|
||||||
nAccounts*: (float,float) ## Mean and standard deviation
|
|
||||||
accountsFill*: (float,float,float) ## Mean, standard deviation, merged total
|
|
||||||
nAccountStats*: int ## #chunks
|
|
||||||
nSlotLists*: (float,float) ## Mean and standard deviation
|
|
||||||
nContracts*: (float,float) ## Mean and standard deviation
|
|
||||||
nStorageQueue*: Opt[int]
|
|
||||||
nContractQueue*: Opt[int]
|
|
||||||
nQueues*: int
|
|
||||||
|
|
||||||
TickerFullStats* = object
|
|
||||||
## Full sync state (see `TickerFullStatsUpdater`)
|
|
||||||
pivotBlock*: Opt[BlockNumber]
|
|
||||||
topPersistent*: BlockNumber
|
|
||||||
nextUnprocessed*: Opt[BlockNumber]
|
|
||||||
nextStaged*: Opt[BlockNumber]
|
|
||||||
nStagedQueue*: int
|
|
||||||
suspended*: bool
|
|
||||||
reOrg*: bool
|
|
||||||
|
|
||||||
TickerRef* = ref object
|
|
||||||
## Ticker descriptor object
|
|
||||||
nBuddies: int
|
|
||||||
logTicker: TimerCallback
|
|
||||||
started: Moment
|
|
||||||
visited: Moment
|
|
||||||
prettyPrint: proc(t: TickerRef) {.gcsafe, raises: [].}
|
|
||||||
case fullMode: bool
|
|
||||||
of false:
|
|
||||||
snap: SnapDescDetails
|
|
||||||
of true:
|
|
||||||
full: FullDescDetails
|
|
||||||
|
|
||||||
const
|
|
||||||
extraTraceMessages = false # or true
|
|
||||||
## Enabled additional logging noise
|
|
||||||
|
|
||||||
tickerStartDelay = chronos.milliseconds(100)
|
|
||||||
tickerLogInterval = chronos.seconds(1)
|
|
||||||
tickerLogSuppressMax = chronos.seconds(100)
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Private functions: pretty printing
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
proc pc99(val: float): string =
|
|
||||||
if 0.99 <= val and val < 1.0: "99%"
|
|
||||||
elif 0.0 < val and val <= 0.01: "1%"
|
|
||||||
else: val.toPC(0)
|
|
||||||
|
|
||||||
proc toStr(a: Opt[int]): string =
|
|
||||||
if a.isNone: "n/a"
|
|
||||||
else: $a.unsafeGet
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Private functions: printing ticker messages
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
when false:
|
|
||||||
template logTxt(info: static[string]): static[string] =
|
|
||||||
"Ticker " & info
|
|
||||||
|
|
||||||
template noFmtError(info: static[string]; code: untyped) =
|
|
||||||
try:
|
|
||||||
code
|
|
||||||
except ValueError as e:
|
|
||||||
raiseAssert "Inconveivable (" & info & "): " & e.msg
|
|
||||||
|
|
||||||
proc snapTicker(t: TickerRef) {.gcsafe.} =
|
|
||||||
let
|
|
||||||
data = t.snap.snapCb()
|
|
||||||
now = Moment.now()
|
|
||||||
|
|
||||||
if data != t.snap.lastStats or
|
|
||||||
t.snap.recovery != t.snap.lastRecov or
|
|
||||||
tickerLogSuppressMax < (now - t.visited):
|
|
||||||
var
|
|
||||||
nAcc, nSto, nCon: string
|
|
||||||
pv = "n/a"
|
|
||||||
let
|
|
||||||
nStoQ = data.nStorageQueue.toStr
|
|
||||||
nConQ = data.nContractQueue.toStr
|
|
||||||
bc = data.beaconBlock.toStr
|
|
||||||
recoveryDone = t.snap.lastRecov
|
|
||||||
accCov = data.accountsFill[0].pc99 &
|
|
||||||
"(" & data.accountsFill[1].pc99 & ")" &
|
|
||||||
"/" & data.accountsFill[2].pc99 &
|
|
||||||
"~" & data.nAccountStats.uint.toSI
|
|
||||||
nInst = t.nBuddies
|
|
||||||
|
|
||||||
# With `int64`, there are more than 29*10^10 years range for seconds
|
|
||||||
up = (now - t.started).seconds.uint64.toSI
|
|
||||||
mem = getTotalMem().uint.toSI
|
|
||||||
|
|
||||||
t.snap.lastStats = data
|
|
||||||
t.visited = now
|
|
||||||
t.snap.lastRecov = t.snap.recovery
|
|
||||||
|
|
||||||
if data.pivotBlock.isSome:
|
|
||||||
pv = data.pivotBlock.toStr & "/" & $data.nQueues
|
|
||||||
|
|
||||||
noFmtError("runLogTicker"):
|
|
||||||
nAcc = (&"{(data.nAccounts[0]+0.5).int64}" &
|
|
||||||
&"({(data.nAccounts[1]+0.5).int64})")
|
|
||||||
nSto = (&"{(data.nSlotLists[0]+0.5).int64}" &
|
|
||||||
&"({(data.nSlotLists[1]+0.5).int64})")
|
|
||||||
nCon = (&"{(data.nContracts[0]+0.5).int64}" &
|
|
||||||
&"({(data.nContracts[1]+0.5).int64})")
|
|
||||||
|
|
||||||
if t.snap.recovery:
|
|
||||||
info "Snap sync ticker (recovery)",
|
|
||||||
up, nInst, bc, pv, nAcc, accCov, nSto, nStoQ, nCon, nConQ, mem
|
|
||||||
elif recoveryDone:
|
|
||||||
info "Snap sync ticker (recovery done)",
|
|
||||||
up, nInst, bc, pv, nAcc, accCov, nSto, nStoQ, nCon, nConQ, mem
|
|
||||||
else:
|
|
||||||
info "Snap sync ticker",
|
|
||||||
up, nInst, bc, pv, nAcc, accCov, nSto, nStoQ, nCon, nConQ, mem
|
|
||||||
|
|
||||||
|
|
||||||
proc fullTicker(t: TickerRef) {.gcsafe.} =
|
|
||||||
let
|
|
||||||
data = t.full.fullCb()
|
|
||||||
now = Moment.now()
|
|
||||||
|
|
||||||
if data != t.full.lastStats or
|
|
||||||
tickerLogSuppressMax < (now - t.visited):
|
|
||||||
let
|
|
||||||
persistent = data.topPersistent.toStr
|
|
||||||
staged = data.nextStaged.toStr
|
|
||||||
unprocessed = data.nextUnprocessed.toStr
|
|
||||||
queued = data.nStagedQueue
|
|
||||||
reOrg = if data.reOrg: "t" else: "f"
|
|
||||||
pv = data.pivotBlock.toStr
|
|
||||||
|
|
||||||
nInst = t.nBuddies
|
|
||||||
|
|
||||||
# With `int64`, there are more than 29*10^10 years range for seconds
|
|
||||||
up = (now - t.started).seconds.uint64.toSI
|
|
||||||
mem = getTotalMem().uint.toSI
|
|
||||||
|
|
||||||
t.full.lastStats = data
|
|
||||||
t.visited = now
|
|
||||||
|
|
||||||
if data.suspended:
|
|
||||||
info "Full sync ticker (suspended)", up, nInst, pv,
|
|
||||||
persistent, staged, unprocessed, queued, reOrg, mem
|
|
||||||
else:
|
|
||||||
info "Full sync ticker", up, nInst, pv,
|
|
||||||
persistent, staged, unprocessed, queued, reOrg, mem
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Private functions: ticking log messages
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
proc setLogTicker(t: TickerRef; at: Moment) {.gcsafe.}
|
|
||||||
|
|
||||||
proc runLogTicker(t: TickerRef) {.gcsafe.} =
|
|
||||||
t.prettyPrint(t)
|
|
||||||
t.setLogTicker(Moment.fromNow(tickerLogInterval))
|
|
||||||
|
|
||||||
proc setLogTicker(t: TickerRef; at: Moment) =
|
|
||||||
if t.logTicker.isNil:
|
|
||||||
when extraTraceMessages:
|
|
||||||
debug logTxt "was stopped", fullMode=t.fullMode, nBuddies=t.nBuddies
|
|
||||||
else:
|
|
||||||
t.logTicker = safeSetTimer(at, runLogTicker, t)
|
|
||||||
|
|
||||||
proc initImpl(t: TickerRef; cb: TickerSnapStatsUpdater) =
|
|
||||||
t.fullMode = false
|
|
||||||
t.prettyPrint = snapTicker
|
|
||||||
t.snap = SnapDescDetails(snapCb: cb)
|
|
||||||
|
|
||||||
proc initImpl(t: TickerRef; cb: TickerFullStatsUpdater) =
|
|
||||||
t.fullMode = true
|
|
||||||
t.prettyPrint = fullTicker
|
|
||||||
t.full = FullDescDetails(fullCb: cb)
|
|
||||||
|
|
||||||
proc startImpl(t: TickerRef) =
|
|
||||||
t.logTicker = safeSetTimer(Moment.fromNow(tickerStartDelay),runLogTicker,t)
|
|
||||||
if t.started == Moment.default:
|
|
||||||
t.started = Moment.now()
|
|
||||||
|
|
||||||
proc stopImpl(t: TickerRef) =
|
|
||||||
## Stop ticker unconditionally
|
|
||||||
t.logTicker = nil
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Public constructor and start/stop functions
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
proc init*(
|
|
||||||
T: type TickerRef;
|
|
||||||
cb: TickerSnapStatsUpdater|TickerFullStatsUpdater;
|
|
||||||
): T =
|
|
||||||
## Constructor
|
|
||||||
new result
|
|
||||||
result.initImpl(cb)
|
|
||||||
|
|
||||||
proc init*(t: TickerRef; cb: TickerSnapStatsUpdater) =
|
|
||||||
## Re-initialise ticket
|
|
||||||
if not t.isNil:
|
|
||||||
t.visited.reset
|
|
||||||
if t.fullMode:
|
|
||||||
t.prettyPrint(t) # print final state for full sync
|
|
||||||
t.initImpl(cb)
|
|
||||||
|
|
||||||
proc init*(t: TickerRef; cb: TickerFullStatsUpdater) =
|
|
||||||
## Re-initialise ticket
|
|
||||||
if not t.isNil:
|
|
||||||
t.visited.reset
|
|
||||||
if not t.fullMode:
|
|
||||||
t.prettyPrint(t) # print final state for snap sync
|
|
||||||
t.initImpl(cb)
|
|
||||||
|
|
||||||
proc start*(t: TickerRef) =
|
|
||||||
## Re/start ticker unconditionally
|
|
||||||
if not t.isNil:
|
|
||||||
when extraTraceMessages:
|
|
||||||
debug logTxt "start", fullMode=t.fullMode, nBuddies=t.nBuddies
|
|
||||||
t.startImpl()
|
|
||||||
|
|
||||||
proc stop*(t: TickerRef) =
|
|
||||||
## Stop ticker unconditionally
|
|
||||||
if not t.isNil:
|
|
||||||
t.stopImpl()
|
|
||||||
when extraTraceMessages:
|
|
||||||
debug logTxt "stop", fullMode=t.fullMode, nBuddies=t.nBuddies
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Public functions
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
proc startBuddy*(t: TickerRef) =
|
|
||||||
## Increment buddies counter and start ticker unless running.
|
|
||||||
if not t.isNil:
|
|
||||||
if t.nBuddies <= 0:
|
|
||||||
t.nBuddies = 1
|
|
||||||
if t.fullMode or not t.snap.recovery:
|
|
||||||
t.startImpl()
|
|
||||||
when extraTraceMessages:
|
|
||||||
debug logTxt "start buddy", fullMode=t.fullMode, nBuddies=t.nBuddies
|
|
||||||
else:
|
|
||||||
t.nBuddies.inc
|
|
||||||
|
|
||||||
proc startRecovery*(t: TickerRef) =
|
|
||||||
## Ditto for recovery mode
|
|
||||||
if not t.isNil and not t.fullMode and not t.snap.recovery:
|
|
||||||
t.snap.recovery = true
|
|
||||||
if t.nBuddies <= 0:
|
|
||||||
t.startImpl()
|
|
||||||
when extraTraceMessages:
|
|
||||||
debug logTxt "start recovery", fullMode=t.fullMode, nBuddies=t.nBuddies
|
|
||||||
|
|
||||||
proc stopBuddy*(t: TickerRef) =
|
|
||||||
## Decrement buddies counter and stop ticker if there are no more registered
|
|
||||||
## buddies.
|
|
||||||
if not t.isNil:
|
|
||||||
t.nBuddies.dec
|
|
||||||
if t.nBuddies <= 0 and not t.fullMode and not t.snap.recovery:
|
|
||||||
t.nBuddies = 0
|
|
||||||
t.stopImpl()
|
|
||||||
when extraTraceMessages:
|
|
||||||
debug logTxt "stop (buddy)", fullMode=t.fullMode, nBuddies=t.nBuddies
|
|
||||||
|
|
||||||
proc stopRecovery*(t: TickerRef) =
|
|
||||||
## Ditto for recovery mode
|
|
||||||
if not t.isNil and not t.fullMode and t.snap.recovery:
|
|
||||||
t.snap.recovery = false
|
|
||||||
t.stopImpl()
|
|
||||||
when extraTraceMessages:
|
|
||||||
debug logTxt "stop (recovery)", fullMode=t.fullMode, nBuddies=t.nBuddies
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# End
|
|
||||||
# ------------------------------------------------------------------------------
|
|
@ -1,45 +0,0 @@
|
|||||||
# Nimbus
|
|
||||||
# Copyright (c) 2018-2023 Status Research & Development GmbH
|
|
||||||
# Licensed under either of
|
|
||||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
|
||||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
|
||||||
# http://opensource.org/licenses/MIT)
|
|
||||||
# at your option. This file may not be copied, modified, or
|
|
||||||
# distributed except according to those terms.
|
|
||||||
|
|
||||||
import
|
|
||||||
chronos
|
|
||||||
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
# Use `safeSetTimer` consistently, with a `ref T` argument if including one.
|
|
||||||
type
|
|
||||||
SafeCallbackFunc*[T] = proc (objectRef: ref T) {.gcsafe, raises: [].}
|
|
||||||
SafeCallbackFuncVoid* = proc () {.gcsafe, raises: [].}
|
|
||||||
|
|
||||||
proc safeSetTimer*[T](at: Moment, cb: SafeCallbackFunc[T],
|
|
||||||
objectRef: ref T = nil): TimerCallback =
|
|
||||||
## Like `setTimer` but takes a typed `ref T` argument, which is passed to the
|
|
||||||
## callback function correctly typed. Stores the `ref` in a closure to avoid
|
|
||||||
## garbage collection memory corruption issues that occur when the `setTimer`
|
|
||||||
## pointer argument is used.
|
|
||||||
proc chronosTimerSafeCb(udata: pointer) = cb(objectRef)
|
|
||||||
return setTimer(at, chronosTimerSafeCb)
|
|
||||||
|
|
||||||
proc safeSetTimer*[T](at: Moment, cb: SafeCallbackFuncVoid): TimerCallback =
|
|
||||||
## Like `setTimer` but takes no pointer argument. The callback function
|
|
||||||
## takes no arguments.
|
|
||||||
proc chronosTimerSafeCb(udata: pointer) = cb()
|
|
||||||
return setTimer(at, chronosTimerSafeCb)
|
|
||||||
|
|
||||||
proc setTimer*(at: Moment, cb: CallbackFunc, udata: pointer): TimerCallback
|
|
||||||
{.error: "Do not use setTimer with a `pointer` type argument".}
|
|
||||||
## `setTimer` with a non-nil pointer argument is dangerous because
|
|
||||||
## the pointed-to object is often freed or garbage collected before the
|
|
||||||
## timer callback runs. Call `setTimer` with a `ref` argument instead.
|
|
||||||
|
|
||||||
proc setTimer*(at: Moment, cb: CallbackFunc): TimerCallback =
|
|
||||||
chronos.setTimer(at, cb, nil)
|
|
||||||
|
|
||||||
# End
|
|
Loading…
x
Reference in New Issue
Block a user