Sync update to work with sepolia reorgs (#1168)
* Error return in `persistBlocks()` on initial `VmState` roblem why: previously threw an exception * Updated sync mode option why: using enum rather than bool => space for more * Added sync mode `full`, re-factued legacy sync also: rebased * Fix typo (crashes `pesistBlocks()` otherwise) also: rebase to master * Reduce log ticker noise by suppressing duplicate messages * Clarify staged queue overflow handling why: backtrack/re-org mode in `stageItem()` should be detected by both, the global indicator or the work item where it might have moved into. also: rebased
This commit is contained in:
parent
aa945f1ed9
commit
5d98f68c09
|
@ -131,6 +131,11 @@ type
|
|||
V4
|
||||
V5
|
||||
|
||||
SyncMode* {.pure.} = enum
|
||||
Default
|
||||
Full ## Beware, experimental
|
||||
Snap ## Beware, experimental
|
||||
|
||||
NimbusConf* = object of RootObj
|
||||
## Main Nimbus configuration object
|
||||
|
||||
|
@ -156,10 +161,16 @@ type
|
|||
abbr : "p"
|
||||
name: "prune-mode" }: PruneMode
|
||||
|
||||
snapSync* {.
|
||||
desc: "Enable experimental new sync algorithms"
|
||||
defaultValue: false
|
||||
name: "snap-sync" .}: bool
|
||||
syncMode* {.
|
||||
desc: "Specify particular blockchain sync mode."
|
||||
longDesc:
|
||||
"- default -- legacy sync mode\n" &
|
||||
"- full -- full blockchain archive\n" &
|
||||
"- snap -- experimental snap mode (development only)\n"
|
||||
defaultValue: SyncMode.Default
|
||||
defaultValueDesc: $SyncMode.Default
|
||||
abbr: "y"
|
||||
name: "sync-mode" .}: SyncMode
|
||||
|
||||
importKey* {.
|
||||
desc: "Import unencrypted 32 bytes hex private key from a file"
|
||||
|
|
|
@ -28,7 +28,7 @@ import
|
|||
./graphql/ethapi,
|
||||
./p2p/[chain, clique/clique_desc, clique/clique_sealer],
|
||||
./rpc/[common, debug, engine_api, jwt_auth, p2p, cors],
|
||||
./sync/[fast, protocol, snap],
|
||||
./sync/[fast, full, protocol, snap],
|
||||
./utils/tx_pool
|
||||
|
||||
when defined(evmc_enabled):
|
||||
|
@ -122,8 +122,12 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
|
|||
# Add protocol capabilities based on protocol flags
|
||||
if ProtocolFlag.Eth in protocols:
|
||||
nimbus.ethNode.addCapability protocol.eth
|
||||
if conf.snapSync:
|
||||
case conf.syncMode:
|
||||
of SyncMode.Snap:
|
||||
nimbus.ethNode.addCapability protocol.snap
|
||||
of SyncMode.Full, SyncMode.Default:
|
||||
discard
|
||||
|
||||
if ProtocolFlag.Les in protocols:
|
||||
nimbus.ethNode.addCapability les
|
||||
|
||||
|
@ -136,8 +140,16 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
|
|||
nimbus.chainRef.verifyFrom = verifyFrom
|
||||
|
||||
# Early-initialise "--snap-sync" before starting any network connections.
|
||||
if ProtocolFlag.Eth in protocols and conf.snapSync:
|
||||
SnapSyncCtx.new(nimbus.ethNode, conf.maxPeers).start
|
||||
if ProtocolFlag.Eth in protocols:
|
||||
let tickerOK =
|
||||
conf.logLevel in {LogLevel.INFO, LogLevel.DEBUG, LogLevel.TRACE}
|
||||
case conf.syncMode:
|
||||
of SyncMode.Full:
|
||||
FullSyncRef.init(nimbus.ethNode, conf.maxPeers, tickerOK).start
|
||||
of SyncMode.Snap:
|
||||
SnapSyncRef.init(nimbus.ethNode, conf.maxPeers).start
|
||||
of SyncMode.Default:
|
||||
discard
|
||||
|
||||
# Connect directly to the static nodes
|
||||
let staticPeers = conf.getStaticPeers()
|
||||
|
@ -146,9 +158,15 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
|
|||
|
||||
# Start Eth node
|
||||
if conf.maxPeers > 0:
|
||||
var waitForPeers = true
|
||||
case conf.syncMode:
|
||||
of SyncMode.Snap:
|
||||
waitForPeers = false
|
||||
of SyncMode.Full, SyncMode.Default:
|
||||
discard
|
||||
nimbus.networkLoop = nimbus.ethNode.connectToNetwork(
|
||||
enableDiscovery = conf.discovery != DiscoveryType.None,
|
||||
waitForPeers = not conf.snapSync)
|
||||
waitForPeers = waitForPeers)
|
||||
|
||||
proc localServices(nimbus: NimbusNode, conf: NimbusConf,
|
||||
chainDB: BaseChainDB, protocols: set[ProtocolFlag]) =
|
||||
|
@ -361,8 +379,11 @@ proc start(nimbus: NimbusNode, conf: NimbusConf) =
|
|||
localServices(nimbus, conf, chainDB, protocols)
|
||||
|
||||
if ProtocolFlag.Eth in protocols and conf.maxPeers > 0:
|
||||
if not conf.snapSync:
|
||||
case conf.syncMode:
|
||||
of SyncMode.Default:
|
||||
FastSyncCtx.new(nimbus.ethNode).start
|
||||
of SyncMode.Full, SyncMode.Snap:
|
||||
discard
|
||||
|
||||
if nimbus.state == Starting:
|
||||
# it might have been set to "Stopping" with Ctrl+C
|
||||
|
|
|
@ -50,15 +50,20 @@ proc persistBlocksImpl(c: Chain; headers: openArray[BlockHeader];
|
|||
let transaction = c.db.db.beginTransaction()
|
||||
defer: transaction.dispose()
|
||||
|
||||
trace "Persisting blocks",
|
||||
fromBlock = headers[0].blockNumber,
|
||||
toBlock = headers[^1].blockNumber
|
||||
|
||||
var cliqueState = c.clique.cliqueSave
|
||||
defer: c.clique.cliqueRestore(cliqueState)
|
||||
|
||||
# Note that `0 < headers.len`, assured when called from `persistBlocks()`
|
||||
var vmState = BaseVMState.new(headers[0], c.db)
|
||||
let vmState = BaseVMState()
|
||||
if not vmState.init(headers[0], c.db):
|
||||
debug "Cannot initialise VmState",
|
||||
fromBlock = headers[0].blockNumber,
|
||||
toBlock = headers[^1].blockNumber
|
||||
return ValidationResult.Error
|
||||
|
||||
trace "Persisting blocks",
|
||||
fromBlock = headers[0].blockNumber,
|
||||
toBlock = headers[^1].blockNumber
|
||||
|
||||
for i in 0 ..< headers.len:
|
||||
let
|
||||
|
@ -72,7 +77,6 @@ proc persistBlocksImpl(c: Chain; headers: openArray[BlockHeader];
|
|||
|
||||
let
|
||||
validationResult = vmState.processBlock(c.clique, header, body)
|
||||
|
||||
when not defined(release):
|
||||
if validationResult == ValidationResult.Error and
|
||||
body.transactions.calcTxRoot == header.txRoot:
|
||||
|
|
|
@ -117,11 +117,14 @@ proc persistWorkItem(ctx: FastSyncCtx, wi: var WantedBlocks): ValidationResult
|
|||
result = ValidationResult.Error
|
||||
except Defect as e:
|
||||
# Pass through
|
||||
raise (ref Defect)(msg: e.msg)
|
||||
raise e
|
||||
except Exception as e:
|
||||
# Notorious case where the `Chain` reference applied to `persistBlocks()`
|
||||
# has the compiler traced a possible `Exception` (i.e. `ctx.chain` could
|
||||
# be uninitialised.)
|
||||
error "exception while storing persistent blocks",
|
||||
error = $e.name, msg = e.msg
|
||||
result = ValidationResult.Error
|
||||
raise (ref Defect)(msg: $e.name & ": " & e.msg)
|
||||
case result
|
||||
of ValidationResult.OK:
|
||||
ctx.finalizedBlock = wi.endIndex
|
||||
|
@ -430,20 +433,6 @@ proc onPeerDisconnected(ctx: FastSyncCtx, p: Peer) =
|
|||
trace "peer disconnected ", peer = p
|
||||
ctx.trustedPeers.excl(p)
|
||||
|
||||
proc findBestPeer(node: EthereumNode): (Peer, DifficultyInt) =
|
||||
var
|
||||
bestBlockDifficulty: DifficultyInt = 0.stuint(256)
|
||||
bestPeer: Peer = nil
|
||||
|
||||
for peer in node.peers(eth):
|
||||
let peerEthState = peer.state(eth)
|
||||
if peerEthState.initialized:
|
||||
if peerEthState.bestDifficulty > bestBlockDifficulty:
|
||||
bestBlockDifficulty = peerEthState.bestDifficulty
|
||||
bestPeer = peer
|
||||
|
||||
result = (bestPeer, bestBlockDifficulty)
|
||||
|
||||
proc new*(T: type FastSyncCtx; ethNode: EthereumNode): T
|
||||
{.gcsafe, raises:[Defect,CatchableError].} =
|
||||
FastSyncCtx(
|
||||
|
|
|
@ -0,0 +1,218 @@
|
|||
# Nimbus - New sync approach - A fusion of snap, trie, beam and other methods
|
||||
#
|
||||
# Copyright (c) 2021 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
||||
# http://opensource.org/licenses/MIT)
|
||||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
import
|
||||
std/hashes,
|
||||
chronicles,
|
||||
chronos,
|
||||
eth/[common/eth_types, p2p, p2p/peer_pool, p2p/private/p2p_types],
|
||||
stew/keyed_queue,
|
||||
./protocol,
|
||||
./full/[full_desc, worker]
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
logScope:
|
||||
topics = "full-sync"
|
||||
|
||||
type
|
||||
ActiveBuddies = ##\
|
||||
## List of active workers
|
||||
KeyedQueue[Peer,BuddyRef]
|
||||
|
||||
FullSyncRef* = ref object of CtxRef
|
||||
pool: PeerPool ## for starting the system
|
||||
buddies: ActiveBuddies ## LRU cache with worker descriptors
|
||||
tickerOk: bool ## Ticker logger
|
||||
singleRunLock: bool ## For worker initialisation
|
||||
monitorLock: bool ## For worker monitor
|
||||
activeMulti: int ## Activated runners
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc nsCtx(buddy: BuddyRef): FullSyncRef =
|
||||
buddy.ctx.FullSyncRef
|
||||
|
||||
proc hash(peer: Peer): Hash =
|
||||
## Needed for `buddies` table key comparison
|
||||
hash(peer.remote.id)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc workerLoop(buddy: BuddyRef) {.async.} =
|
||||
let
|
||||
ctx = buddy.nsCtx
|
||||
peer = buddy.peer
|
||||
trace "Starting peer worker", peer,
|
||||
peers=ctx.pool.len, workers=ctx.buddies.len, maxWorkers=ctx.buddiesMax
|
||||
|
||||
# Continue until stopped
|
||||
while not buddy.ctrl.stopped:
|
||||
if ctx.monitorLock:
|
||||
await sleepAsync(500.milliseconds)
|
||||
continue
|
||||
|
||||
# Rotate connection table so the most used entry is at the top/right
|
||||
# end. So zombies will implicitely be pushed left.
|
||||
discard ctx.buddies.lruFetch(peer)
|
||||
|
||||
# Invoke `runPool()` over all buddies if requested
|
||||
if ctx.poolMode:
|
||||
# Grab `monitorLock` (was `false` as checked above) and wait until clear
|
||||
# to run as the only activated instance.
|
||||
ctx.monitorLock = true
|
||||
while 0 < ctx.activeMulti:
|
||||
await sleepAsync(500.milliseconds)
|
||||
while ctx.singleRunLock:
|
||||
await sleepAsync(500.milliseconds)
|
||||
trace "Starting pool mode for repair & recovery"
|
||||
for w in ctx.buddies.nextValues:
|
||||
buddy.runPool()
|
||||
trace "Pool mode done"
|
||||
ctx.monitorLock = false
|
||||
continue
|
||||
|
||||
await sleepAsync(50.milliseconds)
|
||||
|
||||
# Multi mode
|
||||
if buddy.ctrl.multiOk:
|
||||
if not ctx.singleRunLock:
|
||||
ctx.activeMulti.inc
|
||||
# Continue doing something, work a bit
|
||||
await buddy.runMulti()
|
||||
ctx.activeMulti.dec
|
||||
continue
|
||||
|
||||
# Single mode as requested. The `multiOk` flag for this worker was just
|
||||
# found `false` in the pervious clause.
|
||||
if not ctx.singleRunLock:
|
||||
# Lock single instance mode and wait for other workers to finish
|
||||
ctx.singleRunLock = true
|
||||
while 0 < ctx.activeMulti:
|
||||
await sleepAsync(500.milliseconds)
|
||||
# Run single instance and release afterwards
|
||||
await buddy.runSingle()
|
||||
ctx.singleRunLock = false
|
||||
|
||||
# End while
|
||||
|
||||
buddy.stop()
|
||||
|
||||
trace "Peer worker done", peer, ctrlState=buddy.ctrl.state,
|
||||
peers=ctx.pool.len, workers=ctx.buddies.len, maxWorkers=ctx.buddiesMax
|
||||
|
||||
|
||||
proc onPeerConnected(ctx: FullSyncRef; peer: Peer) =
|
||||
# Check for known entry (which should not exist.)
|
||||
if ctx.buddies.hasKey(peer):
|
||||
trace "Reconnecting zombie peer rejected", peer,
|
||||
peers=ctx.pool.len, workers=ctx.buddies.len, maxWorkers=ctx.buddiesMax
|
||||
return
|
||||
|
||||
# Initialise worker for this peer
|
||||
let buddy = BuddyRef(ctx: ctx, peer: peer)
|
||||
if not buddy.start():
|
||||
trace "Ignoring useless peer", peer,
|
||||
peers=ctx.pool.len, workers=ctx.buddies.len, maxWorkers=ctx.buddiesMax
|
||||
buddy.ctrl.zombie = true
|
||||
return
|
||||
|
||||
# Check for table overflow. An overflow might happen if there are zombies
|
||||
# in the table (though preventing them from re-connecting for a while.)
|
||||
if ctx.buddiesMax <= ctx.buddies.len:
|
||||
let leastPeer = ctx.buddies.shift.value.data
|
||||
if leastPeer.ctrl.zombie:
|
||||
trace "Dequeuing zombie peer", leastPeer,
|
||||
peers=ctx.pool.len, workers=ctx.buddies.len, maxWorkers=ctx.buddiesMax
|
||||
discard
|
||||
else:
|
||||
# This could happen if there are idle entries in the table, i.e.
|
||||
# somehow hanging runners.
|
||||
trace "Peer table full! Dequeuing least used entry", leastPeer,
|
||||
peers=ctx.pool.len, workers=ctx.buddies.len, maxWorkers=ctx.buddiesMax
|
||||
leastPeer.stop()
|
||||
leastPeer.ctrl.zombie = true
|
||||
|
||||
# Add peer entry
|
||||
discard ctx.buddies.lruAppend(peer, buddy, ctx.buddiesMax)
|
||||
|
||||
# Run worker
|
||||
asyncSpawn buddy.workerLoop()
|
||||
|
||||
|
||||
proc onPeerDisconnected(ctx: FullSyncRef, peer: Peer) =
|
||||
let
|
||||
peers = ctx.pool.len
|
||||
maxWorkers = ctx.buddiesMax
|
||||
rc = ctx.buddies.eq(peer)
|
||||
if rc.isErr:
|
||||
debug "Disconnected from unregistered peer", peer, peers,
|
||||
workers=ctx.buddies.len, maxWorkers
|
||||
return
|
||||
if rc.value.ctrl.zombie:
|
||||
# Don't disconnect, leave them fall out of the LRU cache. The effect is,
|
||||
# that reconnecting might be blocked, for a while.
|
||||
trace "Disconnected zombie", peer, peers,
|
||||
workers=ctx.buddies.len, maxWorkers
|
||||
else:
|
||||
rc.value.ctrl.stopped = true # in case it is hanging somewhere
|
||||
ctx.buddies.del(peer)
|
||||
trace "Disconnected buddy", peer, peers,
|
||||
workers=ctx.buddies.len, maxWorkers
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc init*(
|
||||
T: type FullSyncRef;
|
||||
ethNode: EthereumNode;
|
||||
maxPeers: int;
|
||||
enableTicker: bool): T =
|
||||
## Constructor
|
||||
# Leave one extra slot so that it can holds a *zombie* even if all slots
|
||||
# are full. The effect is that a re-connect on the latest zombie will be
|
||||
# rejected as long as its worker descriptor is registered.
|
||||
let lruSize = max(1,maxPeers+1)
|
||||
result = T(
|
||||
buddiesMax: lruSize,
|
||||
chain: ethNode.chain,
|
||||
pool: ethNode.peerPool,
|
||||
tickerOk: enableTicker)
|
||||
result.buddies.init(lruSize)
|
||||
|
||||
proc start*(ctx: FullSyncRef) =
|
||||
## Set up syncing. This call should come early.
|
||||
var po = PeerObserver(
|
||||
onPeerConnected:
|
||||
proc(p: Peer) {.gcsafe.} =
|
||||
ctx.onPeerConnected(p),
|
||||
onPeerDisconnected:
|
||||
proc(p: Peer) {.gcsafe.} =
|
||||
ctx.onPeerDisconnected(p))
|
||||
|
||||
# Initialise sub-systems
|
||||
doAssert ctx.workerSetup(ctx.tickerOk)
|
||||
po.setProtocol eth
|
||||
ctx.pool.addObserver(ctx, po)
|
||||
|
||||
proc stop*(ctx: FullSyncRef) =
|
||||
## Stop syncing
|
||||
ctx.pool.delObserver(ctx)
|
||||
ctx.workerRelease()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
|
@ -0,0 +1,130 @@
|
|||
# Nimbus - New sync approach - A fusion of snap, trie, beam and other methods
|
||||
#
|
||||
# Copyright (c) 2021 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
||||
# http://opensource.org/licenses/MIT)
|
||||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
import
|
||||
eth/[common/eth_types, p2p]
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
type
|
||||
BuddyRunState = enum
|
||||
## Combined state of two boolean values (`stopped`,`stopThisState`) as used
|
||||
## in the original source set up (should be double checked and simplified.)
|
||||
Running = 0 ## running, default state
|
||||
Stopped ## stopped or about stopping
|
||||
ZombieStop ## abandon/ignore (LRU tab overflow, odd packets)
|
||||
ZombieRun ## extra zombie state to potentially recover from
|
||||
|
||||
BuddyCtrl* = object
|
||||
## Control and state settings
|
||||
runState: BuddyRunState ## Access with getters
|
||||
multiPeer: bool ## Triggers `runSingle()` mode unless `true`
|
||||
|
||||
BuddyDataRef* = ref object of RootObj
|
||||
## Stub object, to be inherited in file `worker.nim`
|
||||
|
||||
BuddyRef* = ref object
|
||||
## Non-inheritable peer state tracking descriptor.
|
||||
ctx*: CtxRef ## Shared data back reference
|
||||
peer*: Peer ## Reference to eth p2pProtocol entry
|
||||
ctrl*: BuddyCtrl ## Control and state settings
|
||||
data*: BuddyDataRef ## Opaque object reference for sub-module
|
||||
|
||||
# -----
|
||||
|
||||
CtxDataRef* = ref object of RootObj
|
||||
## Stub object, to be inherited in file `worker.nim`
|
||||
|
||||
CtxRef* = ref object of RootObj
|
||||
## Shared state among all syncing peer workers (aka buddies.) This object
|
||||
## Will be amended/inherited main module which controls the peer workers.
|
||||
buddiesMax*: int ## Max number of buddies (for LRU cache, read only)
|
||||
chain*: AbstractChainDB ## Block chain database (read only reference)
|
||||
poolMode*: bool ## Activate `runPool()` workers if set `true`
|
||||
data*: CtxDataRef ## Opaque object reference for sub-module
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc `$`*(buddy: BuddyRef): string =
|
||||
$buddy.peer & "$" & $buddy.ctrl.runState
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public getters, `BuddyRunState` execution control functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc multiOk*(ctrl: BuddyCtrl): bool =
|
||||
## Getter
|
||||
ctrl.multiPeer
|
||||
|
||||
proc state*(ctrl: BuddyCtrl): BuddyRunState =
|
||||
## Getter (logging only, details of `BuddyCtrl` are private)
|
||||
ctrl.runState
|
||||
|
||||
proc running*(ctrl: BuddyCtrl): bool =
|
||||
## Getter, if `true` if `ctrl.state()` is `Running`
|
||||
ctrl.runState == Running
|
||||
|
||||
proc stopped*(ctrl: BuddyCtrl): bool =
|
||||
## Getter, if `true`, if `ctrl.state()` is not `Running`
|
||||
ctrl.runState in {Stopped, ZombieStop, ZombieRun}
|
||||
|
||||
proc zombie*(ctrl: BuddyCtrl): bool =
|
||||
## Getter, `true` if `ctrl.state()` is `Zombie` (i.e. not `running()` and
|
||||
## not `stopped()`)
|
||||
ctrl.runState in {ZombieStop, ZombieRun}
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public setters, `BuddyRunState` execution control functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc `multiOk=`*(ctrl: var BuddyCtrl; val: bool) =
|
||||
## Setter
|
||||
ctrl.multiPeer = val
|
||||
|
||||
proc `zombie=`*(ctrl: var BuddyCtrl; value: bool) =
|
||||
## Setter
|
||||
if value:
|
||||
case ctrl.runState:
|
||||
of Running:
|
||||
ctrl.runState = ZombieRun
|
||||
of Stopped:
|
||||
ctrl.runState = ZombieStop
|
||||
else:
|
||||
discard
|
||||
else:
|
||||
case ctrl.runState:
|
||||
of ZombieRun:
|
||||
ctrl.runState = Running
|
||||
of ZombieStop:
|
||||
ctrl.runState = Stopped
|
||||
else:
|
||||
discard
|
||||
|
||||
proc `stopped=`*(ctrl: var BuddyCtrl; value: bool) =
|
||||
## Setter
|
||||
if value:
|
||||
case ctrl.runState:
|
||||
of Running:
|
||||
ctrl.runState = Stopped
|
||||
else:
|
||||
discard
|
||||
else:
|
||||
case ctrl.runState:
|
||||
of Stopped:
|
||||
ctrl.runState = Running
|
||||
else:
|
||||
discard
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
|
@ -0,0 +1,128 @@
|
|||
# Nimbus - Fetch account and storage states from peers efficiently
|
||||
#
|
||||
# Copyright (c) 2021 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
||||
# http://opensource.org/licenses/MIT)
|
||||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
import
|
||||
chronos,
|
||||
chronicles,
|
||||
eth/[common/eth_types, p2p],
|
||||
stint,
|
||||
".."/[timer_helper, types]
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
logScope:
|
||||
topics = "full-ticker"
|
||||
|
||||
type
|
||||
TickerStats* = object
|
||||
topPersistent*: BlockNumber
|
||||
nextUnprocessed*: Option[BlockNumber]
|
||||
nextStaged*: Option[BlockNumber]
|
||||
nStagedQueue*: int
|
||||
reOrg*: bool
|
||||
|
||||
TickerStatsUpdater* =
|
||||
proc: TickerStats {.gcsafe, raises: [Defect].}
|
||||
|
||||
Ticker* = ref object
|
||||
nBuddies: int
|
||||
lastStats: TickerStats
|
||||
lastTick: uint64
|
||||
statsCb: TickerStatsUpdater
|
||||
logTicker: TimerCallback
|
||||
tick: uint64 # more than 5*10^11y before wrap when ticking every sec
|
||||
|
||||
const
|
||||
tickerStartDelay = 100.milliseconds
|
||||
tickerLogInterval = 1.seconds
|
||||
tickerLogSuppressMax = 100
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions: ticking log messages
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc pp(n: BlockNumber): string =
|
||||
"#" & $n
|
||||
|
||||
proc pp(n: Option[BlockNumber]): string =
|
||||
if n.isNone: "n/a" else: n.get.pp
|
||||
|
||||
proc setLogTicker(t: Ticker; at: Moment) {.gcsafe.}
|
||||
|
||||
proc runLogTicker(t: Ticker) {.gcsafe.} =
|
||||
let data = t.statsCb()
|
||||
|
||||
if data != t.lastStats or
|
||||
t.lastTick + tickerLogSuppressMax < t.tick:
|
||||
t.lastStats = data
|
||||
t.lastTick = t.tick
|
||||
let
|
||||
persistent = data.topPersistent.pp
|
||||
staged = data.nextStaged.pp
|
||||
unprocessed = data.nextUnprocessed.pp
|
||||
queued = data.nStagedQueue
|
||||
reOrg = if data.reOrg: "t" else: "f"
|
||||
|
||||
buddies = t.nBuddies
|
||||
tick = t.tick.toSI
|
||||
mem = getTotalMem().uint.toSI
|
||||
|
||||
info "Sync statistics", tick, buddies,
|
||||
persistent, unprocessed, staged, queued, reOrg, mem
|
||||
|
||||
t.tick.inc
|
||||
t.setLogTicker(Moment.fromNow(tickerLogInterval))
|
||||
|
||||
|
||||
proc setLogTicker(t: Ticker; at: Moment) =
|
||||
if not t.logTicker.isNil:
|
||||
t.logTicker = safeSetTimer(at, runLogTicker, t)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public constructor and start/stop functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc init*(T: type Ticker; cb: TickerStatsUpdater): T =
|
||||
## Constructor
|
||||
T(statsCb: cb)
|
||||
|
||||
proc start*(t: Ticker) =
|
||||
## Re/start ticker unconditionally
|
||||
#debug "Started ticker"
|
||||
t.logTicker = safeSetTimer(Moment.fromNow(tickerStartDelay), runLogTicker, t)
|
||||
|
||||
proc stop*(t: Ticker) =
|
||||
## Stop ticker unconditionally
|
||||
t.logTicker = nil
|
||||
#debug "Stopped ticker"
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc startBuddy*(t: Ticker) =
|
||||
## Increment buddies counter and start ticker unless running.
|
||||
if t.nBuddies <= 0:
|
||||
t.nBuddies = 1
|
||||
t.start()
|
||||
else:
|
||||
t.nBuddies.inc
|
||||
|
||||
proc stopBuddy*(t: Ticker) =
|
||||
## Decrement buddies counter and stop ticker if there are no more registered
|
||||
## buddies.
|
||||
t.nBuddies.dec
|
||||
if t.nBuddies <= 0:
|
||||
t.stop()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
|
@ -0,0 +1,921 @@
|
|||
# nim-eth
|
||||
# Copyright (c) 2018-2021 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at
|
||||
# https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at
|
||||
# https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
## Fetch and store blocks
|
||||
## ======================
|
||||
##
|
||||
## Worker items state diagram and sketch of sync algorithm:
|
||||
## ::
|
||||
## set of unprocessed | peer workers | list of work items ready
|
||||
## block ranges | | for persistent database
|
||||
## ==================================================================
|
||||
##
|
||||
## +---------------------------------------------+
|
||||
## | |
|
||||
## | +---------------------------------+ |
|
||||
## | | | |
|
||||
## V v | |
|
||||
## <unprocessed> ---+-----> <worker-0> -----+---> <staged> ---> block chain
|
||||
## | |
|
||||
## +-----> <worker-1> -----+
|
||||
## | |
|
||||
## +-----> <worker-2> -----+
|
||||
## : :
|
||||
##
|
||||
## A work item is created from a range of block numbers extracted from the
|
||||
## `<unprocessed>` set of block ranges.
|
||||
##
|
||||
## A work item consists of a
|
||||
## * current state `<worker-#>` or `<staged>`
|
||||
## * given range of consecutive block numbers `[from..to]`
|
||||
## * sequence of block headers relating to `[from..to]` (to be completed)
|
||||
## * sequence of block buddies relating to `[from..to]` (to be completed)
|
||||
##
|
||||
## Block ranges *may* be recycled back into the `<unprocessed>` set when a
|
||||
## work item is destroyed. This is supposed to be an exceptional case.
|
||||
## Typically, a `<staged>` work item is added to the persistent block chain
|
||||
## database and destroyed without block range recycling.
|
||||
##
|
||||
## Beware of `<staged>` overflow
|
||||
## -----------------------------
|
||||
## When the `<staged>` queue gets too long in non-backtrack/re-org mode, this
|
||||
## may be caused by a gap between the least `<unprocessed>` block number and
|
||||
## the least `<staged>` block number. Then a mechanism is invoked where
|
||||
## `<unprocessed>` block range is updated.
|
||||
##
|
||||
## For backtrack/re-org the system runs in single instance mode tracing
|
||||
## backvards parent hash references. So updating `<unprocessed>` block numbers
|
||||
## would have no effect. In that case, the record with the largest block
|
||||
## numbers are deleted from the `<staged>` list.
|
||||
##
|
||||
|
||||
import
|
||||
std/[algorithm, hashes, options, random, sequtils, sets, strutils],
|
||||
chronicles,
|
||||
chronos,
|
||||
eth/[common/eth_types, p2p],
|
||||
stew/[byteutils, interval_set, sorted_set],
|
||||
../../utils,
|
||||
../protocol,
|
||||
"."/[full_desc, ticker]
|
||||
|
||||
{.push raises:[Defect].}
|
||||
|
||||
logScope:
|
||||
topics = "full-sync"
|
||||
|
||||
const
|
||||
minPeersToStartSync = ##\
|
||||
## Wait for consensus of at least this number of peers before syncing.
|
||||
2
|
||||
|
||||
maxStagedWorkItems = ##\
|
||||
## Maximal items in the `staged` list.
|
||||
70
|
||||
|
||||
stagedWorkItemsTrigger = ##\
|
||||
## Turn on the global `poolMode` if there are more than this many items
|
||||
## staged.
|
||||
50
|
||||
|
||||
static:
|
||||
doAssert stagedWorkItemsTrigger < maxStagedWorkItems
|
||||
|
||||
type
|
||||
BlockRangeSetRef* = ##\
|
||||
## Disjunct sets of block number intervals
|
||||
IntervalSetRef[BlockNumber,UInt256]
|
||||
|
||||
BlockRange* = ##\
|
||||
## Block number interval
|
||||
Interval[BlockNumber,UInt256]
|
||||
|
||||
WorkItemQueue* = ##\
|
||||
## Block intervals sorted by least block number
|
||||
SortedSet[BlockNumber,WorkItemRef]
|
||||
|
||||
WorkItemWalkRef* = ##\
|
||||
## Fast traversal descriptor for `WorkItemQueue`
|
||||
SortedSetWalkRef[BlockNumber,WorkItemRef]
|
||||
|
||||
WorkItemRef* = ref object
|
||||
## Block worker item wrapper for downloading a block range
|
||||
blocks: BlockRange ## Block numbers to fetch
|
||||
topHash: Option[Hash256] ## Fetch by top hash rather than blocks
|
||||
headers: seq[BlockHeader] ## Block headers received
|
||||
hashes: seq[Hash256] ## Hashed from `headers[]` for convenience
|
||||
bodies: seq[BlockBody] ## Block bodies received
|
||||
|
||||
BuddyDataEx = ref object of BuddyDataRef
|
||||
## Local descriptor data extension
|
||||
bestNumber: Option[BlockNumber] ## Largest block number reported
|
||||
|
||||
CtxDataEx = ref object of CtxDataRef
|
||||
## Globally shared data extension
|
||||
backtrack: Option[Hash256] ## Find reverse block after re-org
|
||||
unprocessed: BlockRangeSetRef ## Block ranges to fetch
|
||||
staged: WorkItemQueue ## Blocks fetched but not stored yet
|
||||
untrusted: seq[Peer] ## Clean up list
|
||||
trusted: HashSet[Peer] ## Peers ready for delivery
|
||||
topPersistent: BlockNumber ## Up to this block number stored OK
|
||||
ticker: Ticker ## Logger ticker
|
||||
|
||||
let
|
||||
highBlockRange =
|
||||
BlockRange.new(high(BlockNumber),high(BlockNumber))
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc getOrHigh(b: Option[BlockNumber]): BlockNumber =
|
||||
## Syntactic sugar
|
||||
if b.isSome: b.get else: high(BlockNumber)
|
||||
|
||||
proc getOrHigh(b: Option[BlockRange]): BlockRange =
|
||||
if b.isSome: b.get else: highBlockRange
|
||||
|
||||
proc hash(peer: Peer): Hash =
|
||||
## Mixin `HashSet[Peer]` handler
|
||||
hash(cast[pointer](peer))
|
||||
|
||||
proc `+`(n: BlockNumber; delta: static[int]): BlockNumber =
|
||||
## Syntactic sugar for expressions like `xxx.toBlockNumber + 1`
|
||||
n + delta.toBlockNumber
|
||||
|
||||
proc `-`(n: BlockNumber; delta: static[int]): BlockNumber =
|
||||
## Syntactic sugar for expressions like `xxx.toBlockNumber - 1`
|
||||
n - delta.toBlockNumber
|
||||
|
||||
proc merge(ivSet: BlockRangeSetRef; wi: WorkItemRef): Uint256 =
|
||||
## Syntactic sugar
|
||||
ivSet.merge(wi.blocks)
|
||||
|
||||
proc reduce(ivSet: BlockRangeSetRef; wi: WorkItemRef): Uint256 =
|
||||
## Syntactic sugar
|
||||
ivSet.reduce(wi.blocks)
|
||||
|
||||
|
||||
proc pp(n: BlockNumber): string =
|
||||
## Dedicated pretty printer (`$` is defined elsewhere using `UInt256`)
|
||||
if n == high(BlockNumber): "high" else:"#" & $n
|
||||
|
||||
proc `$`(iv: BlockRange): string =
|
||||
## Needed for macro generated DSL files like `snap.nim` because the
|
||||
## `distinct` flavour of `NodeTag` is discarded there.
|
||||
result = "[" & iv.minPt.pp
|
||||
if iv.minPt != iv.maxPt:
|
||||
result &= "," & iv.maxPt.pp
|
||||
result &= "]"
|
||||
|
||||
proc `$`(n: Option[BlockRange]): string =
|
||||
if n.isNone: "n/a" else: $n.get
|
||||
|
||||
proc `$`(n: Option[BlockNumber]): string =
|
||||
if n.isNone: "n/a" else: n.get.pp
|
||||
|
||||
proc `$`(brs: BlockRangeSetRef): string =
|
||||
"{" & toSeq(brs.increasing).mapIt($it).join(",") & "}"
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private getters
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc local(buddy: BuddyRef): BuddyDataEx =
|
||||
## Parameters local to this peer worker
|
||||
buddy.data.BuddyDataEx
|
||||
|
||||
proc pool(ctx: CtxRef): CtxDataEx =
|
||||
## Parameters shared between all peer workers
|
||||
ctx.data.CtxDataEx
|
||||
|
||||
proc pool(buddy: BuddyRef): CtxDataEx =
|
||||
## Ditto
|
||||
buddy.ctx.data.CtxDataEx
|
||||
|
||||
proc nextUnprocessed(pool: CtxDataEx): Option[BlockNumber] =
|
||||
## Pseudo getter
|
||||
let rc = pool.unprocessed.ge()
|
||||
if rc.isOK:
|
||||
result = some(rc.value.minPt)
|
||||
|
||||
proc nextStaged(pool: CtxDataEx): Option[BlockRange] =
|
||||
## Pseudo getter
|
||||
let rc = pool.staged.ge(low(BlockNumber))
|
||||
if rc.isOK:
|
||||
result = some(rc.value.data.blocks)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions affecting all shared data
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc globalReset(ctx: CtxRef; backBlocks = maxHeadersFetch): bool =
|
||||
## Globally flush `pending` and `staged` items and update `unprocessed`
|
||||
## ranges and set the `unprocessed` back before the best block number/
|
||||
var topPersistent: BlockNumber
|
||||
try:
|
||||
let
|
||||
bestNumber = ctx.chain.getBestBlockHeader.blockNumber
|
||||
nBackBlocks = backBlocks.toBlockNumber
|
||||
# Initialise before best block number
|
||||
topPersistent =
|
||||
if nBackBlocks < bestNumber: bestNumber - nBackBlocks
|
||||
else: 0.toBlockNumber
|
||||
except CatchableError as e:
|
||||
error "Best block header problem", backBlocks, error=($e.name), msg=e.msg
|
||||
return false
|
||||
|
||||
ctx.pool.unprocessed.clear()
|
||||
ctx.pool.staged.clear()
|
||||
ctx.pool.trusted.clear()
|
||||
ctx.pool.topPersistent = topPersistent
|
||||
discard ctx.pool.unprocessed.merge(topPersistent + 1, high(BlockNumber))
|
||||
|
||||
true
|
||||
|
||||
proc tickerUpdater(ctx: CtxRef): TickerStatsUpdater =
|
||||
result = proc: TickerStats =
|
||||
let
|
||||
stagedRange = ctx.pool.nextStaged
|
||||
nextStaged = if stagedRange.isSome: some(stagedRange.get.minPt)
|
||||
else: none(BlockNumber)
|
||||
TickerStats(
|
||||
topPersistent: ctx.pool.topPersistent,
|
||||
nextStaged: nextStaged,
|
||||
nextUnprocessed: ctx.pool.nextUnprocessed,
|
||||
nStagedQueue: ctx.pool.staged.len,
|
||||
reOrg: ctx.pool.backtrack.isSome)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
template safeTransport(buddy: BuddyRef; info: static[string]; code: untyped) =
|
||||
try:
|
||||
code
|
||||
except TransportError as e:
|
||||
error info & ", stop", error=($e.name), msg=e.msg
|
||||
buddy.ctrl.stopped = true
|
||||
|
||||
|
||||
proc getRandomTrustedPeer(buddy: BuddyRef): Result[Peer,void] =
|
||||
## Return random entry from `trusted` peer different from this peer set if
|
||||
## there are enough
|
||||
##
|
||||
## Ackn: nim-eth/eth/p2p/blockchain_sync.nim: `randomTrustedPeer()`
|
||||
let
|
||||
nPeers = buddy.pool.trusted.len
|
||||
offInx = if buddy.peer in buddy.pool.trusted: 2 else: 1
|
||||
if 0 < nPeers:
|
||||
var (walkInx, stopInx) = (0, rand(nPeers - offInx))
|
||||
for p in buddy.pool.trusted:
|
||||
if p == buddy.peer:
|
||||
continue
|
||||
if walkInx == stopInx:
|
||||
return ok(p)
|
||||
walkInx.inc
|
||||
err()
|
||||
|
||||
|
||||
proc newWorkItem(buddy: BuddyRef): Result[WorkItemRef,void] =
|
||||
## Fetch the next unprocessed block range and register it as work item.
|
||||
##
|
||||
## This function will grab a block range from the `unprocessed` range set,
|
||||
## ove it and return it as a `WorkItemRef`. The returned range is registered
|
||||
## in the `pending` list.
|
||||
let
|
||||
peer = buddy.peer
|
||||
rc = buddy.pool.unprocessed.ge()
|
||||
if rc.isErr:
|
||||
return err() # no more data for this peer
|
||||
|
||||
# Check whether there is somthing to do at all
|
||||
if buddy.local.bestNumber.isNone or
|
||||
buddy.local.bestNumber.get < rc.value.minPt:
|
||||
return err() # no more data for this peer
|
||||
|
||||
# Compute interval
|
||||
let iv = BlockRange.new(
|
||||
rc.value.minPt,
|
||||
min(rc.value.maxPt,
|
||||
min(rc.value.minPt + maxHeadersFetch - 1,
|
||||
buddy.local.bestNumber.get)))
|
||||
|
||||
discard buddy.pool.unprocessed.reduce(iv)
|
||||
return ok(WorkItemRef(blocks: iv))
|
||||
|
||||
|
||||
proc recycleStaged(buddy: BuddyRef) =
|
||||
## Flush list of staged items and store the block ranges
|
||||
## back to the `unprocessed` ranges set
|
||||
##
|
||||
# using fast traversal
|
||||
let walk = WorkItemWalkRef.init(buddy.pool.staged)
|
||||
var rc = walk.first()
|
||||
while rc.isOk:
|
||||
# Store back into `unprocessed` ranges set
|
||||
discard buddy.pool.unprocessed.merge(rc.value.data)
|
||||
rc = walk.next()
|
||||
# optional clean up, see comments on the destroy() directive
|
||||
walk.destroy()
|
||||
buddy.pool.staged.clear()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private `Future` helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc getBestNumber(buddy: BuddyRef): Future[Result[BlockNumber,void]]{.async.} =
|
||||
## Get best block number from best block hash.
|
||||
##
|
||||
## Ackn: nim-eth/eth/p2p/blockchain_sync.nim: `getBestBlockNumber()`
|
||||
let
|
||||
peer = buddy.peer
|
||||
startHash = peer.state(eth).bestBlockHash
|
||||
reqLen = 1u
|
||||
hdrReq = BlocksRequest(
|
||||
startBlock: HashOrNum(
|
||||
isHash: true,
|
||||
hash: startHash),
|
||||
maxResults: reqLen,
|
||||
skip: 0,
|
||||
reverse: true)
|
||||
|
||||
trace trEthSendSendingGetBlockHeaders, peer,
|
||||
startBlock=startHash.data.toHex, reqLen
|
||||
|
||||
var hdrResp: Option[blockHeadersObj]
|
||||
buddy.safeTransport("Error fetching block header"):
|
||||
hdrResp = await peer.getBlockHeaders(hdrReq)
|
||||
if buddy.ctrl.stopped:
|
||||
return err()
|
||||
|
||||
if hdrResp.isNone:
|
||||
trace trEthRecvReceivedBlockHeaders, peer, reqLen, respose="n/a"
|
||||
return err()
|
||||
|
||||
let hdrRespLen = hdrResp.get.headers.len
|
||||
if hdrRespLen == 1:
|
||||
let blockNumber = hdrResp.get.headers[0].blockNumber
|
||||
trace trEthRecvReceivedBlockHeaders, peer, hdrRespLen, blockNumber
|
||||
return ok(blockNumber)
|
||||
|
||||
trace trEthRecvReceivedBlockHeaders, peer, reqLen, hdrRespLen
|
||||
return err()
|
||||
|
||||
|
||||
proc agreesOnChain(buddy: BuddyRef; other: Peer): Future[bool] {.async.} =
|
||||
## Returns `true` if one of the peers `buddy.peer` or `other` acknowledges
|
||||
## existence of the best block of the other peer.
|
||||
##
|
||||
## Ackn: nim-eth/eth/p2p/blockchain_sync.nim: `peersAgreeOnChain()`
|
||||
var
|
||||
peer = buddy.peer
|
||||
start = peer
|
||||
fetch = other
|
||||
# Make sure that `fetch` has not the smaller difficulty.
|
||||
if fetch.state(eth).bestDifficulty < start.state(eth).bestDifficulty:
|
||||
swap(fetch, start)
|
||||
|
||||
let
|
||||
startHash = start.state(eth).bestBlockHash
|
||||
hdrReq = BlocksRequest(
|
||||
startBlock: HashOrNum(
|
||||
isHash: true,
|
||||
hash: startHash),
|
||||
maxResults: 1,
|
||||
skip: 0,
|
||||
reverse: true)
|
||||
|
||||
trace trEthSendSendingGetBlockHeaders, peer, start, fetch,
|
||||
startBlock=startHash.data.toHex, hdrReqLen=1
|
||||
|
||||
var hdrResp: Option[blockHeadersObj]
|
||||
buddy.safeTransport("Error fetching block header"):
|
||||
hdrResp = await fetch.getBlockHeaders(hdrReq)
|
||||
if buddy.ctrl.stopped:
|
||||
return false
|
||||
|
||||
if hdrResp.isSome:
|
||||
let hdrRespLen = hdrResp.get.headers.len
|
||||
if 0 < hdrRespLen:
|
||||
let blockNumber = hdrResp.get.headers[0].blockNumber
|
||||
trace trEthRecvReceivedBlockHeaders, peer, start, fetch,
|
||||
hdrRespLen, blockNumber
|
||||
return true
|
||||
|
||||
trace trEthRecvReceivedBlockHeaders, peer, start, fetch,
|
||||
blockNumber="n/a"
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions, worker sub-tasks
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc initaliseWorker(buddy: BuddyRef): Future[bool] {.async.} =
|
||||
## Initalise worker. This function must be run in single mode at the
|
||||
## beginning of running worker peer.
|
||||
##
|
||||
## Ackn: nim-eth/eth/p2p/blockchain_sync.nim: `startSyncWithPeer()`
|
||||
##
|
||||
let peer = buddy.peer
|
||||
|
||||
# Delayed clean up batch list
|
||||
if 0 < buddy.pool.untrusted.len:
|
||||
trace "Removing untrused peers", peer, count=buddy.pool.untrusted.len
|
||||
for p in buddy.pool.untrusted:
|
||||
buddy.pool.trusted.excl p
|
||||
buddy.pool.untrusted.setLen(0)
|
||||
|
||||
if buddy.local.bestNumber.isNone:
|
||||
let rc = await buddy.getBestNumber()
|
||||
# Beware of peer terminating the session right after communicating
|
||||
if rc.isErr or buddy.ctrl.stopped:
|
||||
return false
|
||||
if rc.value <= buddy.pool.topPersistent:
|
||||
buddy.ctrl.zombie = true
|
||||
trace "Useless peer, best number too low", peer,
|
||||
topPersistent=buddy.pool.topPersistent, bestNumber=rc.value
|
||||
buddy.local.bestNumber = some(rc.value)
|
||||
|
||||
if minPeersToStartSync <= buddy.pool.trusted.len:
|
||||
# We have enough trusted peers. Validate new peer against trusted
|
||||
let rc = buddy.getRandomTrustedPeer()
|
||||
if rc.isOK:
|
||||
if await buddy.agreesOnChain(rc.value):
|
||||
# Beware of peer terminating the session
|
||||
if not buddy.ctrl.stopped:
|
||||
buddy.pool.trusted.incl peer
|
||||
return true
|
||||
|
||||
# If there are no trusted peers yet, assume this very peer is trusted,
|
||||
# but do not finish initialisation until there are more peers.
|
||||
elif buddy.pool.trusted.len == 0:
|
||||
trace "Assume initial trusted peer", peer
|
||||
buddy.pool.trusted.incl peer
|
||||
|
||||
elif buddy.pool.trusted.len == 1 and buddy.peer in buddy.pool.trusted:
|
||||
# Ignore degenerate case, note that `trusted.len < minPeersToStartSync`
|
||||
discard
|
||||
|
||||
else:
|
||||
# At this point we have some "trusted" candidates, but they are not
|
||||
# "trusted" enough. We evaluate `peer` against all other candidates. If
|
||||
# one of the candidates disagrees, we swap it for `peer`. If all candidates
|
||||
# agree, we add `peer` to trusted set. The peers in the set will become
|
||||
# "fully trusted" (and sync will start) when the set is big enough
|
||||
var
|
||||
agreeScore = 0
|
||||
otherPeer: Peer
|
||||
for p in buddy.pool.trusted:
|
||||
if peer == p:
|
||||
inc agreeScore
|
||||
else:
|
||||
let agreedOk = await buddy.agreesOnChain(p)
|
||||
# Beware of peer terminating the session
|
||||
if buddy.ctrl.stopped:
|
||||
return false
|
||||
if agreedOk:
|
||||
inc agreeScore
|
||||
else:
|
||||
otherPeer = p
|
||||
|
||||
# Check for the number of peers that disagree
|
||||
case buddy.pool.trusted.len - agreeScore
|
||||
of 0:
|
||||
trace "Peer trusted by score", peer,
|
||||
trusted=buddy.pool.trusted.len
|
||||
buddy.pool.trusted.incl peer # best possible outcome
|
||||
of 1:
|
||||
trace "Other peer no longer trusted", peer,
|
||||
otherPeer, trusted=buddy.pool.trusted.len
|
||||
buddy.pool.trusted.excl otherPeer
|
||||
buddy.pool.trusted.incl peer
|
||||
else:
|
||||
trace "Peer not trusted", peer,
|
||||
trusted=buddy.pool.trusted.len
|
||||
discard
|
||||
|
||||
if minPeersToStartSync <= buddy.pool.trusted.len:
|
||||
return true
|
||||
|
||||
|
||||
proc fetchHeaders(buddy: BuddyRef; wi: WorkItemRef): Future[bool] {.async.} =
|
||||
## Get the work item with the least interval and complete it. The function
|
||||
## returns `true` if bodies were fetched and there were no inconsistencies.
|
||||
let peer = buddy.peer
|
||||
|
||||
if 0 < wi.hashes.len:
|
||||
return true
|
||||
|
||||
var hdrReq: BlocksRequest
|
||||
if wi.topHash.isNone:
|
||||
hdrReq = BlocksRequest(
|
||||
startBlock: HashOrNum(
|
||||
isHash: false,
|
||||
number: wi.blocks.minPt),
|
||||
maxResults: wi.blocks.len.truncate(uint),
|
||||
skip: 0,
|
||||
reverse: false)
|
||||
trace trEthSendSendingGetBlockHeaders, peer,
|
||||
blocks=($wi.blocks)
|
||||
|
||||
else:
|
||||
hdrReq = BlocksRequest(
|
||||
startBlock: HashOrNum(
|
||||
isHash: true,
|
||||
hash: wi.topHash.get),
|
||||
maxResults: maxHeadersFetch,
|
||||
skip: 0,
|
||||
reverse: true)
|
||||
trace trEthSendSendingGetBlockHeaders & " reverse", peer,
|
||||
topHash=hdrReq.startBlock.hash, reqLen=hdrReq.maxResults
|
||||
|
||||
# Fetch headers from peer
|
||||
var hdrResp: Option[blockHeadersObj]
|
||||
block:
|
||||
let reqLen = hdrReq.maxResults
|
||||
buddy.safeTransport("Error fetching block headers"):
|
||||
hdrResp = await peer.getBlockHeaders(hdrReq)
|
||||
# Beware of peer terminating the session
|
||||
if buddy.ctrl.stopped:
|
||||
return false
|
||||
|
||||
if hdrResp.isNone:
|
||||
trace trEthRecvReceivedBlockHeaders, peer, reqLen, respose="n/a"
|
||||
return false
|
||||
|
||||
let hdrRespLen = hdrResp.get.headers.len
|
||||
trace trEthRecvReceivedBlockHeaders, peer, reqLen, hdrRespLen
|
||||
|
||||
if hdrRespLen == 0:
|
||||
buddy.ctrl.stopped = true
|
||||
return false
|
||||
|
||||
# Update block range for reverse search
|
||||
if wi.topHash.isSome:
|
||||
# Headers are in reversed order
|
||||
wi.headers = hdrResp.get.headers.reversed
|
||||
wi.blocks = BlockRange.new(
|
||||
wi.headers[0].blockNumber, wi.headers[^1].blockNumber)
|
||||
discard buddy.pool.unprocessed.reduce(wi)
|
||||
trace "Updated reverse header range", peer, range=($wi.blocks)
|
||||
|
||||
# Verify start block number
|
||||
elif hdrResp.get.headers[0].blockNumber != wi.blocks.minPt:
|
||||
trace "Header range starts with wrong block number", peer,
|
||||
startBlock=hdrResp.get.headers[0].blockNumber,
|
||||
requestedBlock=wi.blocks.minPt
|
||||
buddy.ctrl.zombie = true
|
||||
return false
|
||||
|
||||
# Import into `wi.headers`
|
||||
else:
|
||||
wi.headers.shallowCopy(hdrResp.get.headers)
|
||||
|
||||
# Calculate block header hashes and verify it against parent links. If
|
||||
# necessary, cut off some offending block headers tail.
|
||||
wi.hashes.setLen(wi.headers.len)
|
||||
wi.hashes[0] = wi.headers[0].hash
|
||||
for n in 1 ..< wi.headers.len:
|
||||
if wi.headers[n-1].blockNumber + 1 != wi.headers[n].blockNumber:
|
||||
trace "Non-consecutive block numbers in header list response", peer
|
||||
buddy.ctrl.zombie = true
|
||||
return false
|
||||
if wi.hashes[n-1] != wi.headers[n].parentHash:
|
||||
# Oops, cul-de-sac after block chain re-org?
|
||||
trace "Dangling parent link in header list response. Re-org?", peer
|
||||
wi.headers.setLen(n)
|
||||
wi.hashes.setLen(n)
|
||||
break
|
||||
wi.hashes[n] = wi.headers[n].hash
|
||||
|
||||
# Adjust range length if necessary
|
||||
if wi.headers[^1].blockNumber < wi.blocks.maxPt:
|
||||
let redRng = BlockRange.new(
|
||||
wi.headers[0].blockNumber, wi.headers[^1].blockNumber)
|
||||
trace "Adjusting block range", peer, range=($wi.blocks), reduced=($redRng)
|
||||
discard buddy.pool.unprocessed.merge(redRng.maxPt + 1, wi.blocks.maxPt)
|
||||
wi.blocks = redRng
|
||||
|
||||
return true
|
||||
|
||||
|
||||
proc fetchBodies(buddy: BuddyRef; wi: WorkItemRef): Future[bool] {.async.} =
|
||||
## Get the work item with the least interval and complete it. The function
|
||||
## returns `true` if bodies were fetched and there were no inconsistencies.
|
||||
let peer = buddy.peer
|
||||
|
||||
# Complete group of bodies
|
||||
buddy.safeTransport("Error fetching block bodies"):
|
||||
while wi.bodies.len < wi.hashes.len:
|
||||
let
|
||||
start = wi.bodies.len
|
||||
reqLen = min(wi.hashes.len - wi.bodies.len, maxBodiesFetch)
|
||||
top = start + reqLen
|
||||
hashes = wi.hashes[start ..< top]
|
||||
|
||||
trace trEthSendSendingGetBlockBodies, peer, reqLen
|
||||
|
||||
# Append bodies from peer to `wi.bodies`
|
||||
block:
|
||||
let bdyResp = await peer.getBlockBodies(hashes)
|
||||
# Beware of peer terminating the session
|
||||
if buddy.ctrl.stopped:
|
||||
return false
|
||||
|
||||
if bdyResp.isNone:
|
||||
trace trEthRecvReceivedBlockBodies, peer, reqLen, respose="n/a"
|
||||
buddy.ctrl.zombie = true
|
||||
return false
|
||||
|
||||
let bdyRespLen = bdyResp.get.blocks.len
|
||||
trace trEthRecvReceivedBlockBodies, peer, reqLen, bdyRespLen
|
||||
|
||||
if bdyRespLen == 0 or reqLen < bdyRespLen:
|
||||
buddy.ctrl.zombie = true
|
||||
return false
|
||||
|
||||
wi.bodies.add bdyResp.get.blocks
|
||||
|
||||
return true
|
||||
|
||||
|
||||
proc stageItem(buddy: BuddyRef; wi: WorkItemRef) =
|
||||
## Add work item to the list of staged items
|
||||
let peer = buddy.peer
|
||||
|
||||
let rc = buddy.pool.staged.insert(wi.blocks.minPt)
|
||||
if rc.isOk:
|
||||
rc.value.data = wi
|
||||
|
||||
# Turn on pool mode if there are too may staged work items queued.
|
||||
# This must only be done when the added work item is not backtracking.
|
||||
if stagedWorkItemsTrigger < buddy.pool.staged.len and
|
||||
buddy.pool.backtrack.isNone and
|
||||
wi.topHash.isNone:
|
||||
buddy.ctx.poolMode = true
|
||||
|
||||
# The list size is limited. So cut if necessary and recycle back the block
|
||||
# range of the discarded item (tough luck if the current work item is the
|
||||
# one removed from top.)
|
||||
while maxStagedWorkItems < buddy.pool.staged.len:
|
||||
let topValue = buddy.pool.staged.le(high(BlockNumber)).value
|
||||
discard buddy.pool.unprocessed.merge(topValue.data)
|
||||
discard buddy.pool.staged.delete(topValue.key)
|
||||
return
|
||||
|
||||
# Ooops, duplicates should not exist (but anyway ...)
|
||||
let wj = block:
|
||||
let rc = buddy.pool.staged.eq(wi.blocks.minPt)
|
||||
doAssert rc.isOk
|
||||
# Store `wi` and return offending entry
|
||||
let rcData = rc.value.data
|
||||
rc.value.data = wi
|
||||
rcData
|
||||
|
||||
debug "Replacing dup item in staged list", peer,
|
||||
range=($wi.blocks), discarded=($wj.blocks)
|
||||
# Update `staged` list and `unprocessed` ranges
|
||||
block:
|
||||
let rc = wi.blocks - wj.blocks
|
||||
if rc.isOk:
|
||||
discard buddy.pool.unprocessed.merge(rc.value)
|
||||
|
||||
|
||||
proc processStaged(buddy: BuddyRef): bool =
|
||||
## Fetch a work item from the `staged` queue an process it to be
|
||||
## stored on the persistent block chain.
|
||||
|
||||
let
|
||||
peer = buddy.peer
|
||||
chainDb = buddy.ctx.chain
|
||||
rc = buddy.pool.staged.ge(low(BlockNumber))
|
||||
if rc.isErr:
|
||||
# No more items in the database
|
||||
return false
|
||||
|
||||
let
|
||||
wi = rc.value.data
|
||||
topPersistent = buddy.pool.topPersistent
|
||||
startNumber = wi.headers[0].blockNumber
|
||||
stagedRecords = buddy.pool.staged.len
|
||||
|
||||
# Check whether this record of blocks can be stored, at all
|
||||
if topPersistent + 1 < startNumber:
|
||||
trace "Staged work item postponed", peer, topPersistent,
|
||||
range=($wi.blocks), stagedRecords
|
||||
return false
|
||||
|
||||
# Ok, store into the block chain database
|
||||
trace "Processing staged work item", peer,
|
||||
topPersistent, range=($wi.blocks)
|
||||
|
||||
# remove from staged DB
|
||||
discard buddy.pool.staged.delete(wi.blocks.minPt)
|
||||
|
||||
try:
|
||||
if chainDb.persistBlocks(wi.headers, wi.bodies) == ValidationResult.OK:
|
||||
buddy.pool.topPersistent = wi.blocks.maxPt
|
||||
return true
|
||||
except CatchableError as e:
|
||||
error "Storing persistent blocks failed", peer, range=($wi.blocks),
|
||||
error = $e.name, msg = e.msg
|
||||
except Defect as e:
|
||||
# Pass through
|
||||
raise e
|
||||
except Exception as e:
|
||||
# Notorious case where the `Chain` reference applied to
|
||||
# `persistBlocks()` has the compiler traced a possible `Exception`
|
||||
# (i.e. `ctx.chain` could be uninitialised.)
|
||||
error "Exception while storing persistent blocks", peer,
|
||||
range=($wi.blocks), error=($e.name), msg=e.msg
|
||||
raise (ref Defect)(msg: $e.name & ": " & e.msg)
|
||||
|
||||
# Something went wrong. Recycle work item (needs to be re-fetched, anyway)
|
||||
let
|
||||
parentHash = wi.headers[0].parentHash
|
||||
parentHoN = HashOrNum(isHash: true, hash: parentHash)
|
||||
try:
|
||||
# Check whether hash of the first block is consistent
|
||||
var parent: BlockHeader
|
||||
if chainDb.getBlockHeader(parentHoN, parent):
|
||||
# First block parent is ok, so there might be other problems. Re-fetch
|
||||
# the blocks from another peer.
|
||||
trace "Storing persistent blocks failed", peer,
|
||||
range=($wi.blocks)
|
||||
discard buddy.pool.unprocessed.merge(wi.blocks)
|
||||
buddy.ctrl.zombie = true
|
||||
return false
|
||||
except CatchableError as e:
|
||||
error "Failed to access parent blocks", peer,
|
||||
blockNumber=wi.headers[0].blockNumber.pp, error=($e.name), msg=e.msg
|
||||
|
||||
# Parent block header problem, so we might be in the middle of a re-org.
|
||||
# Set single mode backtrack following the offending parent hash.
|
||||
buddy.pool.backtrack = some(parentHash)
|
||||
buddy.ctrl.multiOk = false
|
||||
|
||||
if wi.topHash.isNone:
|
||||
# Assuming that currently staged entries are on the wrong branch
|
||||
buddy.recycleStaged()
|
||||
notice "Starting chain re-org backtrack work item", peer,
|
||||
range=($wi.blocks)
|
||||
else:
|
||||
# Leave that block range in the staged list
|
||||
trace "Resuming chain re-org backtrack work item", peer,
|
||||
range=($wi.blocks)
|
||||
discard
|
||||
|
||||
return false
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public start/stop and admin functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc workerSetup*(ctx: CtxRef; tickerOK: bool): bool =
|
||||
## Global set up
|
||||
ctx.data = CtxDataEx(unprocessed: BlockRangeSetRef.init()) # `pool` extension
|
||||
ctx.pool.staged.init()
|
||||
if tickerOK:
|
||||
ctx.pool.ticker = Ticker.init(ctx.tickerUpdater)
|
||||
else:
|
||||
debug "Ticker is disabled"
|
||||
return ctx.globalReset(0)
|
||||
|
||||
proc workerRelease*(ctx: CtxRef) =
|
||||
## Global clean up
|
||||
if not ctx.pool.ticker.isNil:
|
||||
ctx.pool.ticker.stop()
|
||||
|
||||
proc start*(buddy: BuddyRef): bool =
|
||||
## Initialise worker peer
|
||||
if buddy.peer.supports(protocol.eth) and
|
||||
buddy.peer.state(protocol.eth).initialized:
|
||||
buddy.data = BuddyDataEx.new() # `local` extension
|
||||
if not buddy.pool.ticker.isNil:
|
||||
buddy.pool.ticker.startBuddy()
|
||||
return true
|
||||
|
||||
proc stop*(buddy: BuddyRef) =
|
||||
## Clean up this peer
|
||||
buddy.ctrl.stopped = true
|
||||
buddy.pool.untrusted.add buddy.peer
|
||||
if not buddy.pool.ticker.isNil:
|
||||
buddy.pool.ticker.stopBuddy()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc runSingle*(buddy: BuddyRef) {.async.} =
|
||||
## This peer worker is invoked if the peer-local flag `buddy.ctrl.multiOk`
|
||||
## is set `false` which is the default mode. This flag is updated by the
|
||||
## worker when deemed appropriate.
|
||||
## * For all workers, there can be only one `runSingle()` function active
|
||||
## simultaneously for all worker peers.
|
||||
## * There will be no `runMulti()` function active for the same worker peer
|
||||
## simultaneously
|
||||
## * There will be no `runPool()` iterator active simultaneously.
|
||||
##
|
||||
## Note that this function runs in `async` mode.
|
||||
##
|
||||
let peer = buddy.peer
|
||||
|
||||
if buddy.pool.backtrack.isSome:
|
||||
trace "Single run mode, re-org backtracking", peer
|
||||
let wi = WorkItemRef(
|
||||
# This dummy interval can savely merged back without any effect
|
||||
blocks: highBlockRange,
|
||||
# Enable backtrack
|
||||
topHash: some(buddy.pool.backtrack.get))
|
||||
|
||||
# Fetch headers and bodies for the current work item
|
||||
if await buddy.fetchHeaders(wi):
|
||||
if await buddy.fetchBodies(wi):
|
||||
buddy.pool.backtrack = none(Hash256)
|
||||
buddy.stageItem(wi)
|
||||
|
||||
# Update pool and persistent database (may reset `multiOk`)
|
||||
buddy.ctrl.multiOk = true
|
||||
while buddy.processStaged():
|
||||
discard
|
||||
return
|
||||
|
||||
# This work item failed, nothing to do anymore.
|
||||
discard buddy.pool.unprocessed.merge(wi)
|
||||
buddy.ctrl.zombie = true
|
||||
|
||||
else:
|
||||
if buddy.local.bestNumber.isNone:
|
||||
# Only log for the first time, or so
|
||||
trace "Single run mode, initialisation", peer,
|
||||
trusted=buddy.pool.trusted.len
|
||||
discard
|
||||
|
||||
# Initialise/re-initialise this worker
|
||||
if await buddy.initaliseWorker():
|
||||
buddy.ctrl.multiOk = true
|
||||
elif not buddy.ctrl.stopped:
|
||||
await sleepAsync(2.seconds)
|
||||
|
||||
|
||||
proc runPool*(buddy: BuddyRef) =
|
||||
## Ocne started, the function `runPool()` is called for all worker peers in
|
||||
## a row (as the body of an iteration.) There will be no other worker peer
|
||||
## functions activated simultaneously.
|
||||
##
|
||||
## This procedure is started if the global flag `buddy.ctx.poolMode` is set
|
||||
## `true` (default is `false`.) It is the responsibility of the `runPool()`
|
||||
## instance to reset the flag `buddy.ctx.poolMode`, typically at the first
|
||||
## peer instance as the number of active instances is unknown to `runPool()`.
|
||||
##
|
||||
## Note that this function does not run in `async` mode.
|
||||
##
|
||||
if buddy.ctx.poolMode:
|
||||
# Mind the gap, fill in if necessary
|
||||
let
|
||||
topPersistent = buddy.pool.topPersistent
|
||||
covered = min(
|
||||
buddy.pool.nextUnprocessed.getOrHigh,
|
||||
buddy.pool.nextStaged.getOrHigh.minPt)
|
||||
if topPersistent + 1 < covered:
|
||||
discard buddy.pool.unprocessed.merge(topPersistent + 1, covered - 1)
|
||||
buddy.ctx.poolMode = false
|
||||
|
||||
|
||||
proc runMulti*(buddy: BuddyRef) {.async.} =
|
||||
## This peer worker is invoked if the `buddy.ctrl.multiOk` flag is set
|
||||
## `true` which is typically done after finishing `runSingle()`. This
|
||||
## instance can be simultaneously active for all peer workers.
|
||||
##
|
||||
# Fetch work item
|
||||
let rc = buddy.newWorkItem()
|
||||
if rc.isErr:
|
||||
# No way, end of capacity for this peer => re-calibrate
|
||||
buddy.ctrl.multiOk = false
|
||||
buddy.local.bestNumber = none(BlockNumber)
|
||||
return
|
||||
let wi = rc.value
|
||||
|
||||
# Fetch headers and bodies for the current work item
|
||||
if await buddy.fetchHeaders(wi):
|
||||
if await buddy.fetchBodies(wi):
|
||||
buddy.stageItem(wi)
|
||||
|
||||
# Update pool and persistent database
|
||||
while buddy.processStaged():
|
||||
discard
|
||||
return
|
||||
|
||||
# This work item failed
|
||||
discard buddy.pool.unprocessed.merge(wi)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
|
@ -24,7 +24,7 @@ logScope:
|
|||
topics = "snap-sync"
|
||||
|
||||
type
|
||||
SnapSyncCtx* = ref object of Worker
|
||||
SnapSyncRef* = ref object of Worker
|
||||
chain: AbstractChainDB
|
||||
buddies: KeyedQueue[Peer,WorkerBuddy] ## LRU cache with worker descriptors
|
||||
pool: PeerPool ## for starting the system
|
||||
|
@ -33,8 +33,8 @@ type
|
|||
# Private helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc nsCtx(sp: WorkerBuddy): SnapSyncCtx =
|
||||
sp.ns.SnapSyncCtx
|
||||
proc nsCtx(sp: WorkerBuddy): SnapSyncRef =
|
||||
sp.ns.SnapSyncRef
|
||||
|
||||
proc hash(peer: Peer): Hash =
|
||||
## Needed for `buddies` table key comparison
|
||||
|
@ -67,7 +67,7 @@ proc workerLoop(sp: WorkerBuddy) {.async.} =
|
|||
peers=ns.pool.len, workers=ns.buddies.len, maxWorkers=ns.buddiesMax
|
||||
|
||||
|
||||
proc onPeerConnected(ns: SnapSyncCtx, peer: Peer) =
|
||||
proc onPeerConnected(ns: SnapSyncRef, peer: Peer) =
|
||||
let sp = WorkerBuddy.new(ns, peer)
|
||||
|
||||
# Check for known entry (which should not exist.)
|
||||
|
@ -104,7 +104,7 @@ proc onPeerConnected(ns: SnapSyncCtx, peer: Peer) =
|
|||
asyncSpawn sp.workerLoop()
|
||||
|
||||
|
||||
proc onPeerDisconnected(ns: SnapSyncCtx, peer: Peer) =
|
||||
proc onPeerDisconnected(ns: SnapSyncRef, peer: Peer) =
|
||||
let rc = ns.buddies.eq(peer)
|
||||
if rc.isErr:
|
||||
debug "Disconnected from unregistered peer", peer,
|
||||
|
@ -124,7 +124,7 @@ proc onPeerDisconnected(ns: SnapSyncCtx, peer: Peer) =
|
|||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc new*(T: type SnapSyncCtx; ethNode: EthereumNode; maxPeers: int): T =
|
||||
proc init*(T: type SnapSyncRef; ethNode: EthereumNode; maxPeers: int): T =
|
||||
## Constructor
|
||||
new result
|
||||
let size = max(1,maxPeers)
|
||||
|
@ -133,7 +133,7 @@ proc new*(T: type SnapSyncCtx; ethNode: EthereumNode; maxPeers: int): T =
|
|||
result.buddiesMax = size
|
||||
result.pool = ethNode.peerPool
|
||||
|
||||
proc start*(ctx: SnapSyncCtx) =
|
||||
proc start*(ctx: SnapSyncRef) =
|
||||
## Set up syncing. This call should come early.
|
||||
var po = PeerObserver(
|
||||
onPeerConnected:
|
||||
|
@ -148,7 +148,7 @@ proc start*(ctx: SnapSyncCtx) =
|
|||
po.setProtocol eth
|
||||
ctx.pool.addObserver(ctx, po)
|
||||
|
||||
proc stop*(ctx: SnapSyncCtx) =
|
||||
proc stop*(ctx: SnapSyncRef) =
|
||||
## Stop syncing
|
||||
ctx.pool.delObserver(ctx)
|
||||
ctx.workerRelease()
|
||||
|
|
|
@ -15,8 +15,8 @@ import
|
|||
chronicles,
|
||||
eth/[common/eth_types, p2p],
|
||||
stint,
|
||||
../../types,
|
||||
"."/[timer_helper, worker_desc]
|
||||
"../.."/[timer_helper, types],
|
||||
./worker_desc
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
|
|
|
@ -230,10 +230,12 @@ proc reinit*(self: BaseVMState; ## Object descriptor
|
|||
## This is a variant of the `reinit()` function above where the field
|
||||
## `header.parentHash`, is used to fetch the `parent` BlockHeader to be
|
||||
## used in the `update()` variant, above.
|
||||
self.reinit(
|
||||
parent = self.chainDB.getBlockHeader(header.parentHash),
|
||||
header = header,
|
||||
pruneTrie = pruneTrie)
|
||||
var parent: BlockHeader
|
||||
if self.chainDB.getBlockHeader(header.parentHash, parent):
|
||||
return self.reinit(
|
||||
parent = parent,
|
||||
header = header,
|
||||
pruneTrie = pruneTrie)
|
||||
|
||||
|
||||
proc init*(
|
||||
|
@ -302,6 +304,24 @@ proc new*(
|
|||
tracerFlags = tracerFlags,
|
||||
pruneTrie = pruneTrie)
|
||||
|
||||
proc init*(
|
||||
vmState: BaseVMState;
|
||||
header: BlockHeader; ## header with tx environment data fields
|
||||
chainDB: BaseChainDB; ## block chain database
|
||||
tracerFlags: set[TracerFlags] = {};
|
||||
pruneTrie: bool = true): bool
|
||||
{.gcsafe, raises: [Defect,CatchableError].} =
|
||||
## Variant of `new()` which does not throw an exception on a dangling
|
||||
## `BlockHeader` parent hash reference.
|
||||
var parent: BlockHeader
|
||||
if chainDB.getBlockHeader(header.parentHash, parent):
|
||||
vmState.init(
|
||||
parent = parent,
|
||||
header = header,
|
||||
chainDB = chainDB,
|
||||
tracerFlags = tracerFlags,
|
||||
pruneTrie = pruneTrie)
|
||||
return true
|
||||
|
||||
proc setupTxContext*(vmState: BaseVMState, origin: EthAddress, gasPrice: GasInt, forkOverride=none(Fork)) =
|
||||
## this proc will be called each time a new transaction
|
||||
|
|
|
@ -228,10 +228,12 @@ proc reinit*(self: BaseVMState; ## Object descriptor
|
|||
## This is a variant of the `reinit()` function above where the field
|
||||
## `header.parentHash`, is used to fetch the `parent` BlockHeader to be
|
||||
## used in the `update()` variant, above.
|
||||
self.reinit(
|
||||
parent = self.chainDB.getBlockHeader(header.parentHash),
|
||||
header = header,
|
||||
pruneTrie = pruneTrie)
|
||||
var parent: BlockHeader
|
||||
if self.chainDB.getBlockHeader(header.parentHash, parent):
|
||||
return self.reinit(
|
||||
parent = parent,
|
||||
header = header,
|
||||
pruneTrie = pruneTrie)
|
||||
|
||||
|
||||
proc init*(
|
||||
|
@ -300,6 +302,25 @@ proc new*(
|
|||
tracerFlags = tracerFlags,
|
||||
pruneTrie = pruneTrie)
|
||||
|
||||
proc init*(
|
||||
vmState: BaseVMState;
|
||||
header: BlockHeader; ## header with tx environment data fields
|
||||
chainDB: BaseChainDB; ## block chain database
|
||||
tracerFlags: set[TracerFlags] = {};
|
||||
pruneTrie: bool = true): bool
|
||||
{.gcsafe, raises: [Defect,CatchableError].} =
|
||||
## Variant of `new()` which does not throw an exception on a dangling
|
||||
## `BlockHeader` parent hash reference.
|
||||
var parent: BlockHeader
|
||||
if chainDB.getBlockHeader(header.parentHash, parent):
|
||||
vmState.init(
|
||||
parent = parent,
|
||||
header = header,
|
||||
chainDB = chainDB,
|
||||
tracerFlags = tracerFlags,
|
||||
pruneTrie = pruneTrie)
|
||||
return true
|
||||
|
||||
method coinbase*(vmState: BaseVMState): EthAddress {.base, gcsafe.} =
|
||||
vmState.minerAddress
|
||||
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit 9c3596d9de809a5933fd777cec1183c2cdf521ec
|
||||
Subproject commit 598246620da5c41d0e92a8dd6aab0755381b21cd
|
Loading…
Reference in New Issue