diff --git a/nimbus/config.nim b/nimbus/config.nim index 1e5e50cc9..d9bf72df3 100644 --- a/nimbus/config.nim +++ b/nimbus/config.nim @@ -131,6 +131,11 @@ type V4 V5 + SyncMode* {.pure.} = enum + Default + Full ## Beware, experimental + Snap ## Beware, experimental + NimbusConf* = object of RootObj ## Main Nimbus configuration object @@ -156,10 +161,16 @@ type abbr : "p" name: "prune-mode" }: PruneMode - snapSync* {. - desc: "Enable experimental new sync algorithms" - defaultValue: false - name: "snap-sync" .}: bool + syncMode* {. + desc: "Specify particular blockchain sync mode." + longDesc: + "- default -- legacy sync mode\n" & + "- full -- full blockchain archive\n" & + "- snap -- experimental snap mode (development only)\n" + defaultValue: SyncMode.Default + defaultValueDesc: $SyncMode.Default + abbr: "y" + name: "sync-mode" .}: SyncMode importKey* {. desc: "Import unencrypted 32 bytes hex private key from a file" diff --git a/nimbus/nimbus.nim b/nimbus/nimbus.nim index 34e9e1ccf..8deca12e6 100644 --- a/nimbus/nimbus.nim +++ b/nimbus/nimbus.nim @@ -28,7 +28,7 @@ import ./graphql/ethapi, ./p2p/[chain, clique/clique_desc, clique/clique_sealer], ./rpc/[common, debug, engine_api, jwt_auth, p2p, cors], - ./sync/[fast, protocol, snap], + ./sync/[fast, full, protocol, snap], ./utils/tx_pool when defined(evmc_enabled): @@ -122,8 +122,12 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf, # Add protocol capabilities based on protocol flags if ProtocolFlag.Eth in protocols: nimbus.ethNode.addCapability protocol.eth - if conf.snapSync: + case conf.syncMode: + of SyncMode.Snap: nimbus.ethNode.addCapability protocol.snap + of SyncMode.Full, SyncMode.Default: + discard + if ProtocolFlag.Les in protocols: nimbus.ethNode.addCapability les @@ -136,8 +140,16 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf, nimbus.chainRef.verifyFrom = verifyFrom # Early-initialise "--snap-sync" before starting any network connections. - if ProtocolFlag.Eth in protocols and conf.snapSync: - SnapSyncCtx.new(nimbus.ethNode, conf.maxPeers).start + if ProtocolFlag.Eth in protocols: + let tickerOK = + conf.logLevel in {LogLevel.INFO, LogLevel.DEBUG, LogLevel.TRACE} + case conf.syncMode: + of SyncMode.Full: + FullSyncRef.init(nimbus.ethNode, conf.maxPeers, tickerOK).start + of SyncMode.Snap: + SnapSyncRef.init(nimbus.ethNode, conf.maxPeers).start + of SyncMode.Default: + discard # Connect directly to the static nodes let staticPeers = conf.getStaticPeers() @@ -146,9 +158,15 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf, # Start Eth node if conf.maxPeers > 0: + var waitForPeers = true + case conf.syncMode: + of SyncMode.Snap: + waitForPeers = false + of SyncMode.Full, SyncMode.Default: + discard nimbus.networkLoop = nimbus.ethNode.connectToNetwork( enableDiscovery = conf.discovery != DiscoveryType.None, - waitForPeers = not conf.snapSync) + waitForPeers = waitForPeers) proc localServices(nimbus: NimbusNode, conf: NimbusConf, chainDB: BaseChainDB, protocols: set[ProtocolFlag]) = @@ -361,8 +379,11 @@ proc start(nimbus: NimbusNode, conf: NimbusConf) = localServices(nimbus, conf, chainDB, protocols) if ProtocolFlag.Eth in protocols and conf.maxPeers > 0: - if not conf.snapSync: + case conf.syncMode: + of SyncMode.Default: FastSyncCtx.new(nimbus.ethNode).start + of SyncMode.Full, SyncMode.Snap: + discard if nimbus.state == Starting: # it might have been set to "Stopping" with Ctrl+C diff --git a/nimbus/p2p/chain/persist_blocks.nim b/nimbus/p2p/chain/persist_blocks.nim index e339f0d72..90583556d 100644 --- a/nimbus/p2p/chain/persist_blocks.nim +++ b/nimbus/p2p/chain/persist_blocks.nim @@ -50,15 +50,20 @@ proc persistBlocksImpl(c: Chain; headers: openArray[BlockHeader]; let transaction = c.db.db.beginTransaction() defer: transaction.dispose() - trace "Persisting blocks", - fromBlock = headers[0].blockNumber, - toBlock = headers[^1].blockNumber - var cliqueState = c.clique.cliqueSave defer: c.clique.cliqueRestore(cliqueState) # Note that `0 < headers.len`, assured when called from `persistBlocks()` - var vmState = BaseVMState.new(headers[0], c.db) + let vmState = BaseVMState() + if not vmState.init(headers[0], c.db): + debug "Cannot initialise VmState", + fromBlock = headers[0].blockNumber, + toBlock = headers[^1].blockNumber + return ValidationResult.Error + + trace "Persisting blocks", + fromBlock = headers[0].blockNumber, + toBlock = headers[^1].blockNumber for i in 0 ..< headers.len: let @@ -72,7 +77,6 @@ proc persistBlocksImpl(c: Chain; headers: openArray[BlockHeader]; let validationResult = vmState.processBlock(c.clique, header, body) - when not defined(release): if validationResult == ValidationResult.Error and body.transactions.calcTxRoot == header.txRoot: diff --git a/nimbus/sync/fast.nim b/nimbus/sync/fast.nim index f440c383a..3f91bde9a 100644 --- a/nimbus/sync/fast.nim +++ b/nimbus/sync/fast.nim @@ -117,11 +117,14 @@ proc persistWorkItem(ctx: FastSyncCtx, wi: var WantedBlocks): ValidationResult result = ValidationResult.Error except Defect as e: # Pass through - raise (ref Defect)(msg: e.msg) + raise e except Exception as e: + # Notorious case where the `Chain` reference applied to `persistBlocks()` + # has the compiler traced a possible `Exception` (i.e. `ctx.chain` could + # be uninitialised.) error "exception while storing persistent blocks", error = $e.name, msg = e.msg - result = ValidationResult.Error + raise (ref Defect)(msg: $e.name & ": " & e.msg) case result of ValidationResult.OK: ctx.finalizedBlock = wi.endIndex @@ -430,20 +433,6 @@ proc onPeerDisconnected(ctx: FastSyncCtx, p: Peer) = trace "peer disconnected ", peer = p ctx.trustedPeers.excl(p) -proc findBestPeer(node: EthereumNode): (Peer, DifficultyInt) = - var - bestBlockDifficulty: DifficultyInt = 0.stuint(256) - bestPeer: Peer = nil - - for peer in node.peers(eth): - let peerEthState = peer.state(eth) - if peerEthState.initialized: - if peerEthState.bestDifficulty > bestBlockDifficulty: - bestBlockDifficulty = peerEthState.bestDifficulty - bestPeer = peer - - result = (bestPeer, bestBlockDifficulty) - proc new*(T: type FastSyncCtx; ethNode: EthereumNode): T {.gcsafe, raises:[Defect,CatchableError].} = FastSyncCtx( diff --git a/nimbus/sync/full.nim b/nimbus/sync/full.nim new file mode 100644 index 000000000..c6a0e88ca --- /dev/null +++ b/nimbus/sync/full.nim @@ -0,0 +1,218 @@ +# Nimbus - New sync approach - A fusion of snap, trie, beam and other methods +# +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +import + std/hashes, + chronicles, + chronos, + eth/[common/eth_types, p2p, p2p/peer_pool, p2p/private/p2p_types], + stew/keyed_queue, + ./protocol, + ./full/[full_desc, worker] + +{.push raises: [Defect].} + +logScope: + topics = "full-sync" + +type + ActiveBuddies = ##\ + ## List of active workers + KeyedQueue[Peer,BuddyRef] + + FullSyncRef* = ref object of CtxRef + pool: PeerPool ## for starting the system + buddies: ActiveBuddies ## LRU cache with worker descriptors + tickerOk: bool ## Ticker logger + singleRunLock: bool ## For worker initialisation + monitorLock: bool ## For worker monitor + activeMulti: int ## Activated runners + +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +proc nsCtx(buddy: BuddyRef): FullSyncRef = + buddy.ctx.FullSyncRef + +proc hash(peer: Peer): Hash = + ## Needed for `buddies` table key comparison + hash(peer.remote.id) + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +proc workerLoop(buddy: BuddyRef) {.async.} = + let + ctx = buddy.nsCtx + peer = buddy.peer + trace "Starting peer worker", peer, + peers=ctx.pool.len, workers=ctx.buddies.len, maxWorkers=ctx.buddiesMax + + # Continue until stopped + while not buddy.ctrl.stopped: + if ctx.monitorLock: + await sleepAsync(500.milliseconds) + continue + + # Rotate connection table so the most used entry is at the top/right + # end. So zombies will implicitely be pushed left. + discard ctx.buddies.lruFetch(peer) + + # Invoke `runPool()` over all buddies if requested + if ctx.poolMode: + # Grab `monitorLock` (was `false` as checked above) and wait until clear + # to run as the only activated instance. + ctx.monitorLock = true + while 0 < ctx.activeMulti: + await sleepAsync(500.milliseconds) + while ctx.singleRunLock: + await sleepAsync(500.milliseconds) + trace "Starting pool mode for repair & recovery" + for w in ctx.buddies.nextValues: + buddy.runPool() + trace "Pool mode done" + ctx.monitorLock = false + continue + + await sleepAsync(50.milliseconds) + + # Multi mode + if buddy.ctrl.multiOk: + if not ctx.singleRunLock: + ctx.activeMulti.inc + # Continue doing something, work a bit + await buddy.runMulti() + ctx.activeMulti.dec + continue + + # Single mode as requested. The `multiOk` flag for this worker was just + # found `false` in the pervious clause. + if not ctx.singleRunLock: + # Lock single instance mode and wait for other workers to finish + ctx.singleRunLock = true + while 0 < ctx.activeMulti: + await sleepAsync(500.milliseconds) + # Run single instance and release afterwards + await buddy.runSingle() + ctx.singleRunLock = false + + # End while + + buddy.stop() + + trace "Peer worker done", peer, ctrlState=buddy.ctrl.state, + peers=ctx.pool.len, workers=ctx.buddies.len, maxWorkers=ctx.buddiesMax + + +proc onPeerConnected(ctx: FullSyncRef; peer: Peer) = + # Check for known entry (which should not exist.) + if ctx.buddies.hasKey(peer): + trace "Reconnecting zombie peer rejected", peer, + peers=ctx.pool.len, workers=ctx.buddies.len, maxWorkers=ctx.buddiesMax + return + + # Initialise worker for this peer + let buddy = BuddyRef(ctx: ctx, peer: peer) + if not buddy.start(): + trace "Ignoring useless peer", peer, + peers=ctx.pool.len, workers=ctx.buddies.len, maxWorkers=ctx.buddiesMax + buddy.ctrl.zombie = true + return + + # Check for table overflow. An overflow might happen if there are zombies + # in the table (though preventing them from re-connecting for a while.) + if ctx.buddiesMax <= ctx.buddies.len: + let leastPeer = ctx.buddies.shift.value.data + if leastPeer.ctrl.zombie: + trace "Dequeuing zombie peer", leastPeer, + peers=ctx.pool.len, workers=ctx.buddies.len, maxWorkers=ctx.buddiesMax + discard + else: + # This could happen if there are idle entries in the table, i.e. + # somehow hanging runners. + trace "Peer table full! Dequeuing least used entry", leastPeer, + peers=ctx.pool.len, workers=ctx.buddies.len, maxWorkers=ctx.buddiesMax + leastPeer.stop() + leastPeer.ctrl.zombie = true + + # Add peer entry + discard ctx.buddies.lruAppend(peer, buddy, ctx.buddiesMax) + + # Run worker + asyncSpawn buddy.workerLoop() + + +proc onPeerDisconnected(ctx: FullSyncRef, peer: Peer) = + let + peers = ctx.pool.len + maxWorkers = ctx.buddiesMax + rc = ctx.buddies.eq(peer) + if rc.isErr: + debug "Disconnected from unregistered peer", peer, peers, + workers=ctx.buddies.len, maxWorkers + return + if rc.value.ctrl.zombie: + # Don't disconnect, leave them fall out of the LRU cache. The effect is, + # that reconnecting might be blocked, for a while. + trace "Disconnected zombie", peer, peers, + workers=ctx.buddies.len, maxWorkers + else: + rc.value.ctrl.stopped = true # in case it is hanging somewhere + ctx.buddies.del(peer) + trace "Disconnected buddy", peer, peers, + workers=ctx.buddies.len, maxWorkers + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc init*( + T: type FullSyncRef; + ethNode: EthereumNode; + maxPeers: int; + enableTicker: bool): T = + ## Constructor + # Leave one extra slot so that it can holds a *zombie* even if all slots + # are full. The effect is that a re-connect on the latest zombie will be + # rejected as long as its worker descriptor is registered. + let lruSize = max(1,maxPeers+1) + result = T( + buddiesMax: lruSize, + chain: ethNode.chain, + pool: ethNode.peerPool, + tickerOk: enableTicker) + result.buddies.init(lruSize) + +proc start*(ctx: FullSyncRef) = + ## Set up syncing. This call should come early. + var po = PeerObserver( + onPeerConnected: + proc(p: Peer) {.gcsafe.} = + ctx.onPeerConnected(p), + onPeerDisconnected: + proc(p: Peer) {.gcsafe.} = + ctx.onPeerDisconnected(p)) + + # Initialise sub-systems + doAssert ctx.workerSetup(ctx.tickerOk) + po.setProtocol eth + ctx.pool.addObserver(ctx, po) + +proc stop*(ctx: FullSyncRef) = + ## Stop syncing + ctx.pool.delObserver(ctx) + ctx.workerRelease() + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/full/full_desc.nim b/nimbus/sync/full/full_desc.nim new file mode 100644 index 000000000..8b7d2f6a0 --- /dev/null +++ b/nimbus/sync/full/full_desc.nim @@ -0,0 +1,130 @@ +# Nimbus - New sync approach - A fusion of snap, trie, beam and other methods +# +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +import + eth/[common/eth_types, p2p] + +{.push raises: [Defect].} + +type + BuddyRunState = enum + ## Combined state of two boolean values (`stopped`,`stopThisState`) as used + ## in the original source set up (should be double checked and simplified.) + Running = 0 ## running, default state + Stopped ## stopped or about stopping + ZombieStop ## abandon/ignore (LRU tab overflow, odd packets) + ZombieRun ## extra zombie state to potentially recover from + + BuddyCtrl* = object + ## Control and state settings + runState: BuddyRunState ## Access with getters + multiPeer: bool ## Triggers `runSingle()` mode unless `true` + + BuddyDataRef* = ref object of RootObj + ## Stub object, to be inherited in file `worker.nim` + + BuddyRef* = ref object + ## Non-inheritable peer state tracking descriptor. + ctx*: CtxRef ## Shared data back reference + peer*: Peer ## Reference to eth p2pProtocol entry + ctrl*: BuddyCtrl ## Control and state settings + data*: BuddyDataRef ## Opaque object reference for sub-module + + # ----- + + CtxDataRef* = ref object of RootObj + ## Stub object, to be inherited in file `worker.nim` + + CtxRef* = ref object of RootObj + ## Shared state among all syncing peer workers (aka buddies.) This object + ## Will be amended/inherited main module which controls the peer workers. + buddiesMax*: int ## Max number of buddies (for LRU cache, read only) + chain*: AbstractChainDB ## Block chain database (read only reference) + poolMode*: bool ## Activate `runPool()` workers if set `true` + data*: CtxDataRef ## Opaque object reference for sub-module + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc `$`*(buddy: BuddyRef): string = + $buddy.peer & "$" & $buddy.ctrl.runState + +# ------------------------------------------------------------------------------ +# Public getters, `BuddyRunState` execution control functions +# ------------------------------------------------------------------------------ + +proc multiOk*(ctrl: BuddyCtrl): bool = + ## Getter + ctrl.multiPeer + +proc state*(ctrl: BuddyCtrl): BuddyRunState = + ## Getter (logging only, details of `BuddyCtrl` are private) + ctrl.runState + +proc running*(ctrl: BuddyCtrl): bool = + ## Getter, if `true` if `ctrl.state()` is `Running` + ctrl.runState == Running + +proc stopped*(ctrl: BuddyCtrl): bool = + ## Getter, if `true`, if `ctrl.state()` is not `Running` + ctrl.runState in {Stopped, ZombieStop, ZombieRun} + +proc zombie*(ctrl: BuddyCtrl): bool = + ## Getter, `true` if `ctrl.state()` is `Zombie` (i.e. not `running()` and + ## not `stopped()`) + ctrl.runState in {ZombieStop, ZombieRun} + +# ------------------------------------------------------------------------------ +# Public setters, `BuddyRunState` execution control functions +# ------------------------------------------------------------------------------ + +proc `multiOk=`*(ctrl: var BuddyCtrl; val: bool) = + ## Setter + ctrl.multiPeer = val + +proc `zombie=`*(ctrl: var BuddyCtrl; value: bool) = + ## Setter + if value: + case ctrl.runState: + of Running: + ctrl.runState = ZombieRun + of Stopped: + ctrl.runState = ZombieStop + else: + discard + else: + case ctrl.runState: + of ZombieRun: + ctrl.runState = Running + of ZombieStop: + ctrl.runState = Stopped + else: + discard + +proc `stopped=`*(ctrl: var BuddyCtrl; value: bool) = + ## Setter + if value: + case ctrl.runState: + of Running: + ctrl.runState = Stopped + else: + discard + else: + case ctrl.runState: + of Stopped: + ctrl.runState = Running + else: + discard + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/full/ticker.nim b/nimbus/sync/full/ticker.nim new file mode 100644 index 000000000..1cf8fe4ae --- /dev/null +++ b/nimbus/sync/full/ticker.nim @@ -0,0 +1,128 @@ +# Nimbus - Fetch account and storage states from peers efficiently +# +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +import + chronos, + chronicles, + eth/[common/eth_types, p2p], + stint, + ".."/[timer_helper, types] + +{.push raises: [Defect].} + +logScope: + topics = "full-ticker" + +type + TickerStats* = object + topPersistent*: BlockNumber + nextUnprocessed*: Option[BlockNumber] + nextStaged*: Option[BlockNumber] + nStagedQueue*: int + reOrg*: bool + + TickerStatsUpdater* = + proc: TickerStats {.gcsafe, raises: [Defect].} + + Ticker* = ref object + nBuddies: int + lastStats: TickerStats + lastTick: uint64 + statsCb: TickerStatsUpdater + logTicker: TimerCallback + tick: uint64 # more than 5*10^11y before wrap when ticking every sec + +const + tickerStartDelay = 100.milliseconds + tickerLogInterval = 1.seconds + tickerLogSuppressMax = 100 + +# ------------------------------------------------------------------------------ +# Private functions: ticking log messages +# ------------------------------------------------------------------------------ + +proc pp(n: BlockNumber): string = + "#" & $n + +proc pp(n: Option[BlockNumber]): string = + if n.isNone: "n/a" else: n.get.pp + +proc setLogTicker(t: Ticker; at: Moment) {.gcsafe.} + +proc runLogTicker(t: Ticker) {.gcsafe.} = + let data = t.statsCb() + + if data != t.lastStats or + t.lastTick + tickerLogSuppressMax < t.tick: + t.lastStats = data + t.lastTick = t.tick + let + persistent = data.topPersistent.pp + staged = data.nextStaged.pp + unprocessed = data.nextUnprocessed.pp + queued = data.nStagedQueue + reOrg = if data.reOrg: "t" else: "f" + + buddies = t.nBuddies + tick = t.tick.toSI + mem = getTotalMem().uint.toSI + + info "Sync statistics", tick, buddies, + persistent, unprocessed, staged, queued, reOrg, mem + + t.tick.inc + t.setLogTicker(Moment.fromNow(tickerLogInterval)) + + +proc setLogTicker(t: Ticker; at: Moment) = + if not t.logTicker.isNil: + t.logTicker = safeSetTimer(at, runLogTicker, t) + +# ------------------------------------------------------------------------------ +# Public constructor and start/stop functions +# ------------------------------------------------------------------------------ + +proc init*(T: type Ticker; cb: TickerStatsUpdater): T = + ## Constructor + T(statsCb: cb) + +proc start*(t: Ticker) = + ## Re/start ticker unconditionally + #debug "Started ticker" + t.logTicker = safeSetTimer(Moment.fromNow(tickerStartDelay), runLogTicker, t) + +proc stop*(t: Ticker) = + ## Stop ticker unconditionally + t.logTicker = nil + #debug "Stopped ticker" + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc startBuddy*(t: Ticker) = + ## Increment buddies counter and start ticker unless running. + if t.nBuddies <= 0: + t.nBuddies = 1 + t.start() + else: + t.nBuddies.inc + +proc stopBuddy*(t: Ticker) = + ## Decrement buddies counter and stop ticker if there are no more registered + ## buddies. + t.nBuddies.dec + if t.nBuddies <= 0: + t.stop() + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/full/worker.nim b/nimbus/sync/full/worker.nim new file mode 100644 index 000000000..ef94c9723 --- /dev/null +++ b/nimbus/sync/full/worker.nim @@ -0,0 +1,921 @@ +# nim-eth +# Copyright (c) 2018-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at +# https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at +# https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +## Fetch and store blocks +## ====================== +## +## Worker items state diagram and sketch of sync algorithm: +## :: +## set of unprocessed | peer workers | list of work items ready +## block ranges | | for persistent database +## ================================================================== +## +## +---------------------------------------------+ +## | | +## | +---------------------------------+ | +## | | | | +## V v | | +## ---+-----> -----+---> ---> block chain +## | | +## +-----> -----+ +## | | +## +-----> -----+ +## : : +## +## A work item is created from a range of block numbers extracted from the +## `` set of block ranges. +## +## A work item consists of a +## * current state `` or `` +## * given range of consecutive block numbers `[from..to]` +## * sequence of block headers relating to `[from..to]` (to be completed) +## * sequence of block buddies relating to `[from..to]` (to be completed) +## +## Block ranges *may* be recycled back into the `` set when a +## work item is destroyed. This is supposed to be an exceptional case. +## Typically, a `` work item is added to the persistent block chain +## database and destroyed without block range recycling. +## +## Beware of `` overflow +## ----------------------------- +## When the `` queue gets too long in non-backtrack/re-org mode, this +## may be caused by a gap between the least `` block number and +## the least `` block number. Then a mechanism is invoked where +## `` block range is updated. +## +## For backtrack/re-org the system runs in single instance mode tracing +## backvards parent hash references. So updating `` block numbers +## would have no effect. In that case, the record with the largest block +## numbers are deleted from the `` list. +## + +import + std/[algorithm, hashes, options, random, sequtils, sets, strutils], + chronicles, + chronos, + eth/[common/eth_types, p2p], + stew/[byteutils, interval_set, sorted_set], + ../../utils, + ../protocol, + "."/[full_desc, ticker] + +{.push raises:[Defect].} + +logScope: + topics = "full-sync" + +const + minPeersToStartSync = ##\ + ## Wait for consensus of at least this number of peers before syncing. + 2 + + maxStagedWorkItems = ##\ + ## Maximal items in the `staged` list. + 70 + + stagedWorkItemsTrigger = ##\ + ## Turn on the global `poolMode` if there are more than this many items + ## staged. + 50 + +static: + doAssert stagedWorkItemsTrigger < maxStagedWorkItems + +type + BlockRangeSetRef* = ##\ + ## Disjunct sets of block number intervals + IntervalSetRef[BlockNumber,UInt256] + + BlockRange* = ##\ + ## Block number interval + Interval[BlockNumber,UInt256] + + WorkItemQueue* = ##\ + ## Block intervals sorted by least block number + SortedSet[BlockNumber,WorkItemRef] + + WorkItemWalkRef* = ##\ + ## Fast traversal descriptor for `WorkItemQueue` + SortedSetWalkRef[BlockNumber,WorkItemRef] + + WorkItemRef* = ref object + ## Block worker item wrapper for downloading a block range + blocks: BlockRange ## Block numbers to fetch + topHash: Option[Hash256] ## Fetch by top hash rather than blocks + headers: seq[BlockHeader] ## Block headers received + hashes: seq[Hash256] ## Hashed from `headers[]` for convenience + bodies: seq[BlockBody] ## Block bodies received + + BuddyDataEx = ref object of BuddyDataRef + ## Local descriptor data extension + bestNumber: Option[BlockNumber] ## Largest block number reported + + CtxDataEx = ref object of CtxDataRef + ## Globally shared data extension + backtrack: Option[Hash256] ## Find reverse block after re-org + unprocessed: BlockRangeSetRef ## Block ranges to fetch + staged: WorkItemQueue ## Blocks fetched but not stored yet + untrusted: seq[Peer] ## Clean up list + trusted: HashSet[Peer] ## Peers ready for delivery + topPersistent: BlockNumber ## Up to this block number stored OK + ticker: Ticker ## Logger ticker + +let + highBlockRange = + BlockRange.new(high(BlockNumber),high(BlockNumber)) + +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +proc getOrHigh(b: Option[BlockNumber]): BlockNumber = + ## Syntactic sugar + if b.isSome: b.get else: high(BlockNumber) + +proc getOrHigh(b: Option[BlockRange]): BlockRange = + if b.isSome: b.get else: highBlockRange + +proc hash(peer: Peer): Hash = + ## Mixin `HashSet[Peer]` handler + hash(cast[pointer](peer)) + +proc `+`(n: BlockNumber; delta: static[int]): BlockNumber = + ## Syntactic sugar for expressions like `xxx.toBlockNumber + 1` + n + delta.toBlockNumber + +proc `-`(n: BlockNumber; delta: static[int]): BlockNumber = + ## Syntactic sugar for expressions like `xxx.toBlockNumber - 1` + n - delta.toBlockNumber + +proc merge(ivSet: BlockRangeSetRef; wi: WorkItemRef): Uint256 = + ## Syntactic sugar + ivSet.merge(wi.blocks) + +proc reduce(ivSet: BlockRangeSetRef; wi: WorkItemRef): Uint256 = + ## Syntactic sugar + ivSet.reduce(wi.blocks) + + +proc pp(n: BlockNumber): string = + ## Dedicated pretty printer (`$` is defined elsewhere using `UInt256`) + if n == high(BlockNumber): "high" else:"#" & $n + +proc `$`(iv: BlockRange): string = + ## Needed for macro generated DSL files like `snap.nim` because the + ## `distinct` flavour of `NodeTag` is discarded there. + result = "[" & iv.minPt.pp + if iv.minPt != iv.maxPt: + result &= "," & iv.maxPt.pp + result &= "]" + +proc `$`(n: Option[BlockRange]): string = + if n.isNone: "n/a" else: $n.get + +proc `$`(n: Option[BlockNumber]): string = + if n.isNone: "n/a" else: n.get.pp + +proc `$`(brs: BlockRangeSetRef): string = + "{" & toSeq(brs.increasing).mapIt($it).join(",") & "}" + +# ------------------------------------------------------------------------------ +# Private getters +# ------------------------------------------------------------------------------ + +proc local(buddy: BuddyRef): BuddyDataEx = + ## Parameters local to this peer worker + buddy.data.BuddyDataEx + +proc pool(ctx: CtxRef): CtxDataEx = + ## Parameters shared between all peer workers + ctx.data.CtxDataEx + +proc pool(buddy: BuddyRef): CtxDataEx = + ## Ditto + buddy.ctx.data.CtxDataEx + +proc nextUnprocessed(pool: CtxDataEx): Option[BlockNumber] = + ## Pseudo getter + let rc = pool.unprocessed.ge() + if rc.isOK: + result = some(rc.value.minPt) + +proc nextStaged(pool: CtxDataEx): Option[BlockRange] = + ## Pseudo getter + let rc = pool.staged.ge(low(BlockNumber)) + if rc.isOK: + result = some(rc.value.data.blocks) + +# ------------------------------------------------------------------------------ +# Private functions affecting all shared data +# ------------------------------------------------------------------------------ + +proc globalReset(ctx: CtxRef; backBlocks = maxHeadersFetch): bool = + ## Globally flush `pending` and `staged` items and update `unprocessed` + ## ranges and set the `unprocessed` back before the best block number/ + var topPersistent: BlockNumber + try: + let + bestNumber = ctx.chain.getBestBlockHeader.blockNumber + nBackBlocks = backBlocks.toBlockNumber + # Initialise before best block number + topPersistent = + if nBackBlocks < bestNumber: bestNumber - nBackBlocks + else: 0.toBlockNumber + except CatchableError as e: + error "Best block header problem", backBlocks, error=($e.name), msg=e.msg + return false + + ctx.pool.unprocessed.clear() + ctx.pool.staged.clear() + ctx.pool.trusted.clear() + ctx.pool.topPersistent = topPersistent + discard ctx.pool.unprocessed.merge(topPersistent + 1, high(BlockNumber)) + + true + +proc tickerUpdater(ctx: CtxRef): TickerStatsUpdater = + result = proc: TickerStats = + let + stagedRange = ctx.pool.nextStaged + nextStaged = if stagedRange.isSome: some(stagedRange.get.minPt) + else: none(BlockNumber) + TickerStats( + topPersistent: ctx.pool.topPersistent, + nextStaged: nextStaged, + nextUnprocessed: ctx.pool.nextUnprocessed, + nStagedQueue: ctx.pool.staged.len, + reOrg: ctx.pool.backtrack.isSome) + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +template safeTransport(buddy: BuddyRef; info: static[string]; code: untyped) = + try: + code + except TransportError as e: + error info & ", stop", error=($e.name), msg=e.msg + buddy.ctrl.stopped = true + + +proc getRandomTrustedPeer(buddy: BuddyRef): Result[Peer,void] = + ## Return random entry from `trusted` peer different from this peer set if + ## there are enough + ## + ## Ackn: nim-eth/eth/p2p/blockchain_sync.nim: `randomTrustedPeer()` + let + nPeers = buddy.pool.trusted.len + offInx = if buddy.peer in buddy.pool.trusted: 2 else: 1 + if 0 < nPeers: + var (walkInx, stopInx) = (0, rand(nPeers - offInx)) + for p in buddy.pool.trusted: + if p == buddy.peer: + continue + if walkInx == stopInx: + return ok(p) + walkInx.inc + err() + + +proc newWorkItem(buddy: BuddyRef): Result[WorkItemRef,void] = + ## Fetch the next unprocessed block range and register it as work item. + ## + ## This function will grab a block range from the `unprocessed` range set, + ## ove it and return it as a `WorkItemRef`. The returned range is registered + ## in the `pending` list. + let + peer = buddy.peer + rc = buddy.pool.unprocessed.ge() + if rc.isErr: + return err() # no more data for this peer + + # Check whether there is somthing to do at all + if buddy.local.bestNumber.isNone or + buddy.local.bestNumber.get < rc.value.minPt: + return err() # no more data for this peer + + # Compute interval + let iv = BlockRange.new( + rc.value.minPt, + min(rc.value.maxPt, + min(rc.value.minPt + maxHeadersFetch - 1, + buddy.local.bestNumber.get))) + + discard buddy.pool.unprocessed.reduce(iv) + return ok(WorkItemRef(blocks: iv)) + + +proc recycleStaged(buddy: BuddyRef) = + ## Flush list of staged items and store the block ranges + ## back to the `unprocessed` ranges set + ## + # using fast traversal + let walk = WorkItemWalkRef.init(buddy.pool.staged) + var rc = walk.first() + while rc.isOk: + # Store back into `unprocessed` ranges set + discard buddy.pool.unprocessed.merge(rc.value.data) + rc = walk.next() + # optional clean up, see comments on the destroy() directive + walk.destroy() + buddy.pool.staged.clear() + +# ------------------------------------------------------------------------------ +# Private `Future` helpers +# ------------------------------------------------------------------------------ + +proc getBestNumber(buddy: BuddyRef): Future[Result[BlockNumber,void]]{.async.} = + ## Get best block number from best block hash. + ## + ## Ackn: nim-eth/eth/p2p/blockchain_sync.nim: `getBestBlockNumber()` + let + peer = buddy.peer + startHash = peer.state(eth).bestBlockHash + reqLen = 1u + hdrReq = BlocksRequest( + startBlock: HashOrNum( + isHash: true, + hash: startHash), + maxResults: reqLen, + skip: 0, + reverse: true) + + trace trEthSendSendingGetBlockHeaders, peer, + startBlock=startHash.data.toHex, reqLen + + var hdrResp: Option[blockHeadersObj] + buddy.safeTransport("Error fetching block header"): + hdrResp = await peer.getBlockHeaders(hdrReq) + if buddy.ctrl.stopped: + return err() + + if hdrResp.isNone: + trace trEthRecvReceivedBlockHeaders, peer, reqLen, respose="n/a" + return err() + + let hdrRespLen = hdrResp.get.headers.len + if hdrRespLen == 1: + let blockNumber = hdrResp.get.headers[0].blockNumber + trace trEthRecvReceivedBlockHeaders, peer, hdrRespLen, blockNumber + return ok(blockNumber) + + trace trEthRecvReceivedBlockHeaders, peer, reqLen, hdrRespLen + return err() + + +proc agreesOnChain(buddy: BuddyRef; other: Peer): Future[bool] {.async.} = + ## Returns `true` if one of the peers `buddy.peer` or `other` acknowledges + ## existence of the best block of the other peer. + ## + ## Ackn: nim-eth/eth/p2p/blockchain_sync.nim: `peersAgreeOnChain()` + var + peer = buddy.peer + start = peer + fetch = other + # Make sure that `fetch` has not the smaller difficulty. + if fetch.state(eth).bestDifficulty < start.state(eth).bestDifficulty: + swap(fetch, start) + + let + startHash = start.state(eth).bestBlockHash + hdrReq = BlocksRequest( + startBlock: HashOrNum( + isHash: true, + hash: startHash), + maxResults: 1, + skip: 0, + reverse: true) + + trace trEthSendSendingGetBlockHeaders, peer, start, fetch, + startBlock=startHash.data.toHex, hdrReqLen=1 + + var hdrResp: Option[blockHeadersObj] + buddy.safeTransport("Error fetching block header"): + hdrResp = await fetch.getBlockHeaders(hdrReq) + if buddy.ctrl.stopped: + return false + + if hdrResp.isSome: + let hdrRespLen = hdrResp.get.headers.len + if 0 < hdrRespLen: + let blockNumber = hdrResp.get.headers[0].blockNumber + trace trEthRecvReceivedBlockHeaders, peer, start, fetch, + hdrRespLen, blockNumber + return true + + trace trEthRecvReceivedBlockHeaders, peer, start, fetch, + blockNumber="n/a" + +# ------------------------------------------------------------------------------ +# Private functions, worker sub-tasks +# ------------------------------------------------------------------------------ + +proc initaliseWorker(buddy: BuddyRef): Future[bool] {.async.} = + ## Initalise worker. This function must be run in single mode at the + ## beginning of running worker peer. + ## + ## Ackn: nim-eth/eth/p2p/blockchain_sync.nim: `startSyncWithPeer()` + ## + let peer = buddy.peer + + # Delayed clean up batch list + if 0 < buddy.pool.untrusted.len: + trace "Removing untrused peers", peer, count=buddy.pool.untrusted.len + for p in buddy.pool.untrusted: + buddy.pool.trusted.excl p + buddy.pool.untrusted.setLen(0) + + if buddy.local.bestNumber.isNone: + let rc = await buddy.getBestNumber() + # Beware of peer terminating the session right after communicating + if rc.isErr or buddy.ctrl.stopped: + return false + if rc.value <= buddy.pool.topPersistent: + buddy.ctrl.zombie = true + trace "Useless peer, best number too low", peer, + topPersistent=buddy.pool.topPersistent, bestNumber=rc.value + buddy.local.bestNumber = some(rc.value) + + if minPeersToStartSync <= buddy.pool.trusted.len: + # We have enough trusted peers. Validate new peer against trusted + let rc = buddy.getRandomTrustedPeer() + if rc.isOK: + if await buddy.agreesOnChain(rc.value): + # Beware of peer terminating the session + if not buddy.ctrl.stopped: + buddy.pool.trusted.incl peer + return true + + # If there are no trusted peers yet, assume this very peer is trusted, + # but do not finish initialisation until there are more peers. + elif buddy.pool.trusted.len == 0: + trace "Assume initial trusted peer", peer + buddy.pool.trusted.incl peer + + elif buddy.pool.trusted.len == 1 and buddy.peer in buddy.pool.trusted: + # Ignore degenerate case, note that `trusted.len < minPeersToStartSync` + discard + + else: + # At this point we have some "trusted" candidates, but they are not + # "trusted" enough. We evaluate `peer` against all other candidates. If + # one of the candidates disagrees, we swap it for `peer`. If all candidates + # agree, we add `peer` to trusted set. The peers in the set will become + # "fully trusted" (and sync will start) when the set is big enough + var + agreeScore = 0 + otherPeer: Peer + for p in buddy.pool.trusted: + if peer == p: + inc agreeScore + else: + let agreedOk = await buddy.agreesOnChain(p) + # Beware of peer terminating the session + if buddy.ctrl.stopped: + return false + if agreedOk: + inc agreeScore + else: + otherPeer = p + + # Check for the number of peers that disagree + case buddy.pool.trusted.len - agreeScore + of 0: + trace "Peer trusted by score", peer, + trusted=buddy.pool.trusted.len + buddy.pool.trusted.incl peer # best possible outcome + of 1: + trace "Other peer no longer trusted", peer, + otherPeer, trusted=buddy.pool.trusted.len + buddy.pool.trusted.excl otherPeer + buddy.pool.trusted.incl peer + else: + trace "Peer not trusted", peer, + trusted=buddy.pool.trusted.len + discard + + if minPeersToStartSync <= buddy.pool.trusted.len: + return true + + +proc fetchHeaders(buddy: BuddyRef; wi: WorkItemRef): Future[bool] {.async.} = + ## Get the work item with the least interval and complete it. The function + ## returns `true` if bodies were fetched and there were no inconsistencies. + let peer = buddy.peer + + if 0 < wi.hashes.len: + return true + + var hdrReq: BlocksRequest + if wi.topHash.isNone: + hdrReq = BlocksRequest( + startBlock: HashOrNum( + isHash: false, + number: wi.blocks.minPt), + maxResults: wi.blocks.len.truncate(uint), + skip: 0, + reverse: false) + trace trEthSendSendingGetBlockHeaders, peer, + blocks=($wi.blocks) + + else: + hdrReq = BlocksRequest( + startBlock: HashOrNum( + isHash: true, + hash: wi.topHash.get), + maxResults: maxHeadersFetch, + skip: 0, + reverse: true) + trace trEthSendSendingGetBlockHeaders & " reverse", peer, + topHash=hdrReq.startBlock.hash, reqLen=hdrReq.maxResults + + # Fetch headers from peer + var hdrResp: Option[blockHeadersObj] + block: + let reqLen = hdrReq.maxResults + buddy.safeTransport("Error fetching block headers"): + hdrResp = await peer.getBlockHeaders(hdrReq) + # Beware of peer terminating the session + if buddy.ctrl.stopped: + return false + + if hdrResp.isNone: + trace trEthRecvReceivedBlockHeaders, peer, reqLen, respose="n/a" + return false + + let hdrRespLen = hdrResp.get.headers.len + trace trEthRecvReceivedBlockHeaders, peer, reqLen, hdrRespLen + + if hdrRespLen == 0: + buddy.ctrl.stopped = true + return false + + # Update block range for reverse search + if wi.topHash.isSome: + # Headers are in reversed order + wi.headers = hdrResp.get.headers.reversed + wi.blocks = BlockRange.new( + wi.headers[0].blockNumber, wi.headers[^1].blockNumber) + discard buddy.pool.unprocessed.reduce(wi) + trace "Updated reverse header range", peer, range=($wi.blocks) + + # Verify start block number + elif hdrResp.get.headers[0].blockNumber != wi.blocks.minPt: + trace "Header range starts with wrong block number", peer, + startBlock=hdrResp.get.headers[0].blockNumber, + requestedBlock=wi.blocks.minPt + buddy.ctrl.zombie = true + return false + + # Import into `wi.headers` + else: + wi.headers.shallowCopy(hdrResp.get.headers) + + # Calculate block header hashes and verify it against parent links. If + # necessary, cut off some offending block headers tail. + wi.hashes.setLen(wi.headers.len) + wi.hashes[0] = wi.headers[0].hash + for n in 1 ..< wi.headers.len: + if wi.headers[n-1].blockNumber + 1 != wi.headers[n].blockNumber: + trace "Non-consecutive block numbers in header list response", peer + buddy.ctrl.zombie = true + return false + if wi.hashes[n-1] != wi.headers[n].parentHash: + # Oops, cul-de-sac after block chain re-org? + trace "Dangling parent link in header list response. Re-org?", peer + wi.headers.setLen(n) + wi.hashes.setLen(n) + break + wi.hashes[n] = wi.headers[n].hash + + # Adjust range length if necessary + if wi.headers[^1].blockNumber < wi.blocks.maxPt: + let redRng = BlockRange.new( + wi.headers[0].blockNumber, wi.headers[^1].blockNumber) + trace "Adjusting block range", peer, range=($wi.blocks), reduced=($redRng) + discard buddy.pool.unprocessed.merge(redRng.maxPt + 1, wi.blocks.maxPt) + wi.blocks = redRng + + return true + + +proc fetchBodies(buddy: BuddyRef; wi: WorkItemRef): Future[bool] {.async.} = + ## Get the work item with the least interval and complete it. The function + ## returns `true` if bodies were fetched and there were no inconsistencies. + let peer = buddy.peer + + # Complete group of bodies + buddy.safeTransport("Error fetching block bodies"): + while wi.bodies.len < wi.hashes.len: + let + start = wi.bodies.len + reqLen = min(wi.hashes.len - wi.bodies.len, maxBodiesFetch) + top = start + reqLen + hashes = wi.hashes[start ..< top] + + trace trEthSendSendingGetBlockBodies, peer, reqLen + + # Append bodies from peer to `wi.bodies` + block: + let bdyResp = await peer.getBlockBodies(hashes) + # Beware of peer terminating the session + if buddy.ctrl.stopped: + return false + + if bdyResp.isNone: + trace trEthRecvReceivedBlockBodies, peer, reqLen, respose="n/a" + buddy.ctrl.zombie = true + return false + + let bdyRespLen = bdyResp.get.blocks.len + trace trEthRecvReceivedBlockBodies, peer, reqLen, bdyRespLen + + if bdyRespLen == 0 or reqLen < bdyRespLen: + buddy.ctrl.zombie = true + return false + + wi.bodies.add bdyResp.get.blocks + + return true + + +proc stageItem(buddy: BuddyRef; wi: WorkItemRef) = + ## Add work item to the list of staged items + let peer = buddy.peer + + let rc = buddy.pool.staged.insert(wi.blocks.minPt) + if rc.isOk: + rc.value.data = wi + + # Turn on pool mode if there are too may staged work items queued. + # This must only be done when the added work item is not backtracking. + if stagedWorkItemsTrigger < buddy.pool.staged.len and + buddy.pool.backtrack.isNone and + wi.topHash.isNone: + buddy.ctx.poolMode = true + + # The list size is limited. So cut if necessary and recycle back the block + # range of the discarded item (tough luck if the current work item is the + # one removed from top.) + while maxStagedWorkItems < buddy.pool.staged.len: + let topValue = buddy.pool.staged.le(high(BlockNumber)).value + discard buddy.pool.unprocessed.merge(topValue.data) + discard buddy.pool.staged.delete(topValue.key) + return + + # Ooops, duplicates should not exist (but anyway ...) + let wj = block: + let rc = buddy.pool.staged.eq(wi.blocks.minPt) + doAssert rc.isOk + # Store `wi` and return offending entry + let rcData = rc.value.data + rc.value.data = wi + rcData + + debug "Replacing dup item in staged list", peer, + range=($wi.blocks), discarded=($wj.blocks) + # Update `staged` list and `unprocessed` ranges + block: + let rc = wi.blocks - wj.blocks + if rc.isOk: + discard buddy.pool.unprocessed.merge(rc.value) + + +proc processStaged(buddy: BuddyRef): bool = + ## Fetch a work item from the `staged` queue an process it to be + ## stored on the persistent block chain. + + let + peer = buddy.peer + chainDb = buddy.ctx.chain + rc = buddy.pool.staged.ge(low(BlockNumber)) + if rc.isErr: + # No more items in the database + return false + + let + wi = rc.value.data + topPersistent = buddy.pool.topPersistent + startNumber = wi.headers[0].blockNumber + stagedRecords = buddy.pool.staged.len + + # Check whether this record of blocks can be stored, at all + if topPersistent + 1 < startNumber: + trace "Staged work item postponed", peer, topPersistent, + range=($wi.blocks), stagedRecords + return false + + # Ok, store into the block chain database + trace "Processing staged work item", peer, + topPersistent, range=($wi.blocks) + + # remove from staged DB + discard buddy.pool.staged.delete(wi.blocks.minPt) + + try: + if chainDb.persistBlocks(wi.headers, wi.bodies) == ValidationResult.OK: + buddy.pool.topPersistent = wi.blocks.maxPt + return true + except CatchableError as e: + error "Storing persistent blocks failed", peer, range=($wi.blocks), + error = $e.name, msg = e.msg + except Defect as e: + # Pass through + raise e + except Exception as e: + # Notorious case where the `Chain` reference applied to + # `persistBlocks()` has the compiler traced a possible `Exception` + # (i.e. `ctx.chain` could be uninitialised.) + error "Exception while storing persistent blocks", peer, + range=($wi.blocks), error=($e.name), msg=e.msg + raise (ref Defect)(msg: $e.name & ": " & e.msg) + + # Something went wrong. Recycle work item (needs to be re-fetched, anyway) + let + parentHash = wi.headers[0].parentHash + parentHoN = HashOrNum(isHash: true, hash: parentHash) + try: + # Check whether hash of the first block is consistent + var parent: BlockHeader + if chainDb.getBlockHeader(parentHoN, parent): + # First block parent is ok, so there might be other problems. Re-fetch + # the blocks from another peer. + trace "Storing persistent blocks failed", peer, + range=($wi.blocks) + discard buddy.pool.unprocessed.merge(wi.blocks) + buddy.ctrl.zombie = true + return false + except CatchableError as e: + error "Failed to access parent blocks", peer, + blockNumber=wi.headers[0].blockNumber.pp, error=($e.name), msg=e.msg + + # Parent block header problem, so we might be in the middle of a re-org. + # Set single mode backtrack following the offending parent hash. + buddy.pool.backtrack = some(parentHash) + buddy.ctrl.multiOk = false + + if wi.topHash.isNone: + # Assuming that currently staged entries are on the wrong branch + buddy.recycleStaged() + notice "Starting chain re-org backtrack work item", peer, + range=($wi.blocks) + else: + # Leave that block range in the staged list + trace "Resuming chain re-org backtrack work item", peer, + range=($wi.blocks) + discard + + return false + +# ------------------------------------------------------------------------------ +# Public start/stop and admin functions +# ------------------------------------------------------------------------------ + +proc workerSetup*(ctx: CtxRef; tickerOK: bool): bool = + ## Global set up + ctx.data = CtxDataEx(unprocessed: BlockRangeSetRef.init()) # `pool` extension + ctx.pool.staged.init() + if tickerOK: + ctx.pool.ticker = Ticker.init(ctx.tickerUpdater) + else: + debug "Ticker is disabled" + return ctx.globalReset(0) + +proc workerRelease*(ctx: CtxRef) = + ## Global clean up + if not ctx.pool.ticker.isNil: + ctx.pool.ticker.stop() + +proc start*(buddy: BuddyRef): bool = + ## Initialise worker peer + if buddy.peer.supports(protocol.eth) and + buddy.peer.state(protocol.eth).initialized: + buddy.data = BuddyDataEx.new() # `local` extension + if not buddy.pool.ticker.isNil: + buddy.pool.ticker.startBuddy() + return true + +proc stop*(buddy: BuddyRef) = + ## Clean up this peer + buddy.ctrl.stopped = true + buddy.pool.untrusted.add buddy.peer + if not buddy.pool.ticker.isNil: + buddy.pool.ticker.stopBuddy() + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc runSingle*(buddy: BuddyRef) {.async.} = + ## This peer worker is invoked if the peer-local flag `buddy.ctrl.multiOk` + ## is set `false` which is the default mode. This flag is updated by the + ## worker when deemed appropriate. + ## * For all workers, there can be only one `runSingle()` function active + ## simultaneously for all worker peers. + ## * There will be no `runMulti()` function active for the same worker peer + ## simultaneously + ## * There will be no `runPool()` iterator active simultaneously. + ## + ## Note that this function runs in `async` mode. + ## + let peer = buddy.peer + + if buddy.pool.backtrack.isSome: + trace "Single run mode, re-org backtracking", peer + let wi = WorkItemRef( + # This dummy interval can savely merged back without any effect + blocks: highBlockRange, + # Enable backtrack + topHash: some(buddy.pool.backtrack.get)) + + # Fetch headers and bodies for the current work item + if await buddy.fetchHeaders(wi): + if await buddy.fetchBodies(wi): + buddy.pool.backtrack = none(Hash256) + buddy.stageItem(wi) + + # Update pool and persistent database (may reset `multiOk`) + buddy.ctrl.multiOk = true + while buddy.processStaged(): + discard + return + + # This work item failed, nothing to do anymore. + discard buddy.pool.unprocessed.merge(wi) + buddy.ctrl.zombie = true + + else: + if buddy.local.bestNumber.isNone: + # Only log for the first time, or so + trace "Single run mode, initialisation", peer, + trusted=buddy.pool.trusted.len + discard + + # Initialise/re-initialise this worker + if await buddy.initaliseWorker(): + buddy.ctrl.multiOk = true + elif not buddy.ctrl.stopped: + await sleepAsync(2.seconds) + + +proc runPool*(buddy: BuddyRef) = + ## Ocne started, the function `runPool()` is called for all worker peers in + ## a row (as the body of an iteration.) There will be no other worker peer + ## functions activated simultaneously. + ## + ## This procedure is started if the global flag `buddy.ctx.poolMode` is set + ## `true` (default is `false`.) It is the responsibility of the `runPool()` + ## instance to reset the flag `buddy.ctx.poolMode`, typically at the first + ## peer instance as the number of active instances is unknown to `runPool()`. + ## + ## Note that this function does not run in `async` mode. + ## + if buddy.ctx.poolMode: + # Mind the gap, fill in if necessary + let + topPersistent = buddy.pool.topPersistent + covered = min( + buddy.pool.nextUnprocessed.getOrHigh, + buddy.pool.nextStaged.getOrHigh.minPt) + if topPersistent + 1 < covered: + discard buddy.pool.unprocessed.merge(topPersistent + 1, covered - 1) + buddy.ctx.poolMode = false + + +proc runMulti*(buddy: BuddyRef) {.async.} = + ## This peer worker is invoked if the `buddy.ctrl.multiOk` flag is set + ## `true` which is typically done after finishing `runSingle()`. This + ## instance can be simultaneously active for all peer workers. + ## + # Fetch work item + let rc = buddy.newWorkItem() + if rc.isErr: + # No way, end of capacity for this peer => re-calibrate + buddy.ctrl.multiOk = false + buddy.local.bestNumber = none(BlockNumber) + return + let wi = rc.value + + # Fetch headers and bodies for the current work item + if await buddy.fetchHeaders(wi): + if await buddy.fetchBodies(wi): + buddy.stageItem(wi) + + # Update pool and persistent database + while buddy.processStaged(): + discard + return + + # This work item failed + discard buddy.pool.unprocessed.merge(wi) + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap.nim b/nimbus/sync/snap.nim index c1d901943..9be594522 100644 --- a/nimbus/sync/snap.nim +++ b/nimbus/sync/snap.nim @@ -24,7 +24,7 @@ logScope: topics = "snap-sync" type - SnapSyncCtx* = ref object of Worker + SnapSyncRef* = ref object of Worker chain: AbstractChainDB buddies: KeyedQueue[Peer,WorkerBuddy] ## LRU cache with worker descriptors pool: PeerPool ## for starting the system @@ -33,8 +33,8 @@ type # Private helpers # ------------------------------------------------------------------------------ -proc nsCtx(sp: WorkerBuddy): SnapSyncCtx = - sp.ns.SnapSyncCtx +proc nsCtx(sp: WorkerBuddy): SnapSyncRef = + sp.ns.SnapSyncRef proc hash(peer: Peer): Hash = ## Needed for `buddies` table key comparison @@ -67,7 +67,7 @@ proc workerLoop(sp: WorkerBuddy) {.async.} = peers=ns.pool.len, workers=ns.buddies.len, maxWorkers=ns.buddiesMax -proc onPeerConnected(ns: SnapSyncCtx, peer: Peer) = +proc onPeerConnected(ns: SnapSyncRef, peer: Peer) = let sp = WorkerBuddy.new(ns, peer) # Check for known entry (which should not exist.) @@ -104,7 +104,7 @@ proc onPeerConnected(ns: SnapSyncCtx, peer: Peer) = asyncSpawn sp.workerLoop() -proc onPeerDisconnected(ns: SnapSyncCtx, peer: Peer) = +proc onPeerDisconnected(ns: SnapSyncRef, peer: Peer) = let rc = ns.buddies.eq(peer) if rc.isErr: debug "Disconnected from unregistered peer", peer, @@ -124,7 +124,7 @@ proc onPeerDisconnected(ns: SnapSyncCtx, peer: Peer) = # Public functions # ------------------------------------------------------------------------------ -proc new*(T: type SnapSyncCtx; ethNode: EthereumNode; maxPeers: int): T = +proc init*(T: type SnapSyncRef; ethNode: EthereumNode; maxPeers: int): T = ## Constructor new result let size = max(1,maxPeers) @@ -133,7 +133,7 @@ proc new*(T: type SnapSyncCtx; ethNode: EthereumNode; maxPeers: int): T = result.buddiesMax = size result.pool = ethNode.peerPool -proc start*(ctx: SnapSyncCtx) = +proc start*(ctx: SnapSyncRef) = ## Set up syncing. This call should come early. var po = PeerObserver( onPeerConnected: @@ -148,7 +148,7 @@ proc start*(ctx: SnapSyncCtx) = po.setProtocol eth ctx.pool.addObserver(ctx, po) -proc stop*(ctx: SnapSyncCtx) = +proc stop*(ctx: SnapSyncRef) = ## Stop syncing ctx.pool.delObserver(ctx) ctx.workerRelease() diff --git a/nimbus/sync/snap/worker/ticker.nim b/nimbus/sync/snap/worker/ticker.nim index 7c3172a2a..bc7aaccae 100644 --- a/nimbus/sync/snap/worker/ticker.nim +++ b/nimbus/sync/snap/worker/ticker.nim @@ -15,8 +15,8 @@ import chronicles, eth/[common/eth_types, p2p], stint, - ../../types, - "."/[timer_helper, worker_desc] + "../.."/[timer_helper, types], + ./worker_desc {.push raises: [Defect].} diff --git a/nimbus/sync/snap/worker/timer_helper.nim b/nimbus/sync/timer_helper.nim similarity index 100% rename from nimbus/sync/snap/worker/timer_helper.nim rename to nimbus/sync/timer_helper.nim diff --git a/nimbus/vm/state.nim b/nimbus/vm/state.nim index b5ea2edee..7231db93e 100644 --- a/nimbus/vm/state.nim +++ b/nimbus/vm/state.nim @@ -230,10 +230,12 @@ proc reinit*(self: BaseVMState; ## Object descriptor ## This is a variant of the `reinit()` function above where the field ## `header.parentHash`, is used to fetch the `parent` BlockHeader to be ## used in the `update()` variant, above. - self.reinit( - parent = self.chainDB.getBlockHeader(header.parentHash), - header = header, - pruneTrie = pruneTrie) + var parent: BlockHeader + if self.chainDB.getBlockHeader(header.parentHash, parent): + return self.reinit( + parent = parent, + header = header, + pruneTrie = pruneTrie) proc init*( @@ -302,6 +304,24 @@ proc new*( tracerFlags = tracerFlags, pruneTrie = pruneTrie) +proc init*( + vmState: BaseVMState; + header: BlockHeader; ## header with tx environment data fields + chainDB: BaseChainDB; ## block chain database + tracerFlags: set[TracerFlags] = {}; + pruneTrie: bool = true): bool + {.gcsafe, raises: [Defect,CatchableError].} = + ## Variant of `new()` which does not throw an exception on a dangling + ## `BlockHeader` parent hash reference. + var parent: BlockHeader + if chainDB.getBlockHeader(header.parentHash, parent): + vmState.init( + parent = parent, + header = header, + chainDB = chainDB, + tracerFlags = tracerFlags, + pruneTrie = pruneTrie) + return true proc setupTxContext*(vmState: BaseVMState, origin: EthAddress, gasPrice: GasInt, forkOverride=none(Fork)) = ## this proc will be called each time a new transaction diff --git a/nimbus/vm2/state.nim b/nimbus/vm2/state.nim index 1a409fc97..55d221067 100644 --- a/nimbus/vm2/state.nim +++ b/nimbus/vm2/state.nim @@ -228,10 +228,12 @@ proc reinit*(self: BaseVMState; ## Object descriptor ## This is a variant of the `reinit()` function above where the field ## `header.parentHash`, is used to fetch the `parent` BlockHeader to be ## used in the `update()` variant, above. - self.reinit( - parent = self.chainDB.getBlockHeader(header.parentHash), - header = header, - pruneTrie = pruneTrie) + var parent: BlockHeader + if self.chainDB.getBlockHeader(header.parentHash, parent): + return self.reinit( + parent = parent, + header = header, + pruneTrie = pruneTrie) proc init*( @@ -300,6 +302,25 @@ proc new*( tracerFlags = tracerFlags, pruneTrie = pruneTrie) +proc init*( + vmState: BaseVMState; + header: BlockHeader; ## header with tx environment data fields + chainDB: BaseChainDB; ## block chain database + tracerFlags: set[TracerFlags] = {}; + pruneTrie: bool = true): bool + {.gcsafe, raises: [Defect,CatchableError].} = + ## Variant of `new()` which does not throw an exception on a dangling + ## `BlockHeader` parent hash reference. + var parent: BlockHeader + if chainDB.getBlockHeader(header.parentHash, parent): + vmState.init( + parent = parent, + header = header, + chainDB = chainDB, + tracerFlags = tracerFlags, + pruneTrie = pruneTrie) + return true + method coinbase*(vmState: BaseVMState): EthAddress {.base, gcsafe.} = vmState.minerAddress diff --git a/vendor/nim-stew b/vendor/nim-stew index 9c3596d9d..598246620 160000 --- a/vendor/nim-stew +++ b/vendor/nim-stew @@ -1 +1 @@ -Subproject commit 9c3596d9de809a5933fd777cec1183c2cdf521ec +Subproject commit 598246620da5c41d0e92a8dd6aab0755381b21cd