Prepare snap server client test scenario (#1483)

* Enable `snap/1` accounts range service

* Allow to change the garbage collector to `boehm` as a Makefile option.

why:
  There is still an unsolved memory corruption problem that might be
  related to the standard `gc`. It seemingly goes away if the `gc` is
  changed to `boehm`.

  Specifying another `gc` on the make level simplifies debugging and
  development.

* Code cosmetics

details:
* updated exception annotations
* extracted `worker_desc.nim` from `full/worker.nim`
* etc.

* Implement option to state a sync modifier file

why:
  This allows to specify extra sync type specific options which might
  change over time. This file is regularly checked for updates.

* Implement a threshold when to suspend full syncing

why:
  For a test scenario, a full sync beep may work as a local snap server.
  There is no need to download the full block chain.

details:
  The file containing the pivot specs is specified by the
  `--sync-ctrl-file` option. It is regularly parsed for updates.
This commit is contained in:
Jordan Hrycaj 2023-03-02 09:57:58 +00:00 committed by GitHub
parent 06512409ea
commit f20f20f962
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 382 additions and 107 deletions

View File

@ -112,7 +112,11 @@ USE_MIRACL := 0
ENABLE_EVMC := 0
# "-d:release" cannot be added to config.nims
NIM_PARAMS += -d:release
ifeq ($(BOEHM_GC),1)
NIM_PARAMS += --mm:boehm
endif
T8N_PARAMS := -d:chronicles_default_output_device=stderr

View File

@ -220,6 +220,11 @@ engine is compiled. The variables are listed with decreasing priority (in
case of doubt, the lower prioritised variable is ignored when the higher on is
available.)
* BOEHM_GC=1<br>
Change garbage collector to `boehm`. This might help debugging in certain
cases when the `gc` is involved in a memory corruption or corruption
camouflage.
* ENABLE_CHUNKED_RLPX=0<br>
Disable legacy chunked RLPx messages which are enabled by default for
synchronising against `Nethermind` nodes

View File

@ -178,6 +178,14 @@ type
abbr: "y"
name: "sync-mode" .}: SyncMode
syncCtrlFile* {.
desc: "Specify a file that is regularly checked for updates. It " &
"contains extra information specific to the type of sync " &
"process. This option is primaily intended only for sync " &
"testing and debugging."
abbr: "z"
name: "sync-ctrl-file" }: Option[string]
importKey* {.
desc: "Import unencrypted 32 bytes hex private key from a file"
defaultValue: ""
@ -771,6 +779,11 @@ proc makeConfig*(cmdLine = commandLineParams()): NimbusConf
result.dataDir.string != defaultDataDir():
result.keyStore = OutDir(result.dataDir.string / "keystore")
# For consistency
if result.syncCtrlFile.isSome and result.syncCtrlFile.unsafeGet == "":
error "Argument missing", option="sync-ctrl-file"
quit QuitFailure
when isMainModule:
# for testing purpose
discard makeConfig()

View File

@ -159,21 +159,25 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
nimbus.ethNode.addSnapHandlerCapability(
nimbus.ethNode.peerPool,
nimbus.chainRef)
# Cannot do without minimal `eth` capability
if ProtocolFlag.Eth notin protocols:
nimbus.ethNode.addEthHandlerCapability(
nimbus.ethNode.peerPool,
nimbus.chainRef)
# Early-initialise "--snap-sync" before starting any network connections.
block:
let tickerOK =
conf.logLevel in {LogLevel.INFO, LogLevel.DEBUG, LogLevel.TRACE}
# Minimal capability needed for sync only
if ProtocolFlag.Eth notin protocols:
nimbus.ethNode.addEthHandlerCapability(
nimbus.ethNode.peerPool,
nimbus.chainRef)
let
noRecovery = conf.syncMode in {SyncMode.SnapCtx}
exCtrlFile = if conf.syncCtrlFile.isNone: none(string)
else: some(conf.syncCtrlFile.get.string)
tickerOK = conf.logLevel in {
LogLevel.INFO, LogLevel.DEBUG, LogLevel.TRACE}
case conf.syncMode:
of SyncMode.Full:
nimbus.fullSyncRef = FullSyncRef.init(
nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers,
tickerOK)
tickerOK, exCtrlFile)
of SyncMode.Snap, SyncMode.SnapCtx:
# Minimal capability needed for sync only
if ProtocolFlag.Snap notin protocols:
@ -181,7 +185,7 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
nimbus.ethNode.peerPool)
nimbus.snapSyncRef = SnapSyncRef.init(
nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers,
nimbus.dbBackend, tickerOK, noRecovery = (conf.syncMode==SyncMode.Snap))
nimbus.dbBackend, tickerOK, noRecovery=noRecovery, exCtrlFile)
of SyncMode.Default:
nimbus.legaSyncRef = LegacySyncRef.new(
nimbus.ethNode, nimbus.chainRef)

View File

@ -8,14 +8,15 @@
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises: [].}
import
eth/[common, p2p],
chronicles,
chronos,
stew/[interval_set, sorted_set],
"."/[full/worker, sync_desc, sync_sched, protocol]
{.push raises: [].}
./full/[worker, worker_desc],
"."/[sync_desc, sync_sched, protocol]
logScope:
topics = "full-sync"
@ -110,11 +111,14 @@ proc init*(
chain: ChainRef;
rng: ref HmacDrbgContext;
maxPeers: int;
enableTicker = false): T =
enableTicker = false;
exCtrlFile = none(string);
): T =
new result
result.initSync(ethNode, chain, maxPeers, enableTicker)
result.initSync(ethNode, chain, maxPeers, enableTicker, exCtrlFile)
result.ctx.pool.rng = rng
proc start*(ctx: FullSyncRef) =
doAssert ctx.startSync()

View File

@ -27,10 +27,11 @@ type
nextUnprocessed*: Option[BlockNumber]
nextStaged*: Option[BlockNumber]
nStagedQueue*: int
suspended*: bool
reOrg*: bool
TickerStatsUpdater* =
proc: TickerStats {.gcsafe, raises: [Defect].}
proc: TickerStats {.gcsafe, raises: [].}
TickerRef* = ref object
nBuddies: int
@ -75,8 +76,12 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} =
tick = t.tick.toSI
mem = getTotalMem().uint.toSI
info "Sync statistics", tick, buddies,
persistent, unprocessed, staged, queued, reOrg, mem
if data.suspended:
info "Sync statistics (suspended)", tick, buddies,
persistent, unprocessed, staged, queued, reOrg, mem
else:
info "Sync statistics", tick, buddies,
persistent, unprocessed, staged, queued, reOrg, mem
t.tick.inc
t.setLogTicker(Moment.fromNow(tickerLogInterval))

View File

@ -8,48 +8,20 @@
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises:[].}
import
std/[options],
chronicles,
chronos,
eth/[common, p2p],
".."/[protocol, sync_desc],
../misc/[best_pivot, block_queue],
./ticker
{.push raises:[].}
../misc/[best_pivot, block_queue, sync_ctrl],
"."/[ticker, worker_desc]
logScope:
topics = "full-buddy"
type
PivotState = enum
PivotStateInitial, ## Initial state
FirstPivotSeen, ## Starting, first pivot seen
FirstPivotAccepted, ## Accepted, waiting for second
FirstPivotUseRegardless ## Force pivot if available
PivotRunMode ## SNAFU after some magic
BuddyData* = object
## Local descriptor data extension
pivot: BestPivotWorkerRef ## Local pivot worker descriptor
bQueue: BlockQueueWorkerRef ## Block queue worker
CtxData* = object
## Globally shared data extension
rng*: ref HmacDrbgContext ## Random generator, pre-initialised
pivot: BestPivotCtxRef ## Global pivot descriptor
pivotState: PivotState ## For initial pivot control
pivotStamp: Moment ## `PivotState` driven timing control
bCtx: BlockQueueCtxRef ## Global block queue descriptor
ticker: TickerRef ## Logger ticker
FullBuddyRef* = BuddyRef[CtxData,BuddyData]
## Extended worker peer descriptor
FullCtxRef* = CtxRef[CtxData]
## Extended global descriptor
const
extraTraceMessages = false # or true
## Enabled additional logging noise
@ -100,11 +72,15 @@ proc tickerUpdater(ctx: FullCtxRef): TickerStatsUpdater =
var stats: BlockQueueStats
ctx.pool.bCtx.blockQueueStats(stats)
let suspended =
0 < ctx.pool.suspendAt and ctx.pool.suspendAt < stats.topAccepted
TickerStats(
topPersistent: stats.topAccepted,
nextStaged: stats.nextStaged,
nextUnprocessed: stats.nextUnprocessed,
nStagedQueue: stats.nStagedQueue,
suspended: suspended,
reOrg: stats.reOrg)
@ -185,6 +161,10 @@ proc setup*(ctx: FullCtxRef; tickerOK: bool): bool =
ctx.pool.ticker = TickerRef.init(ctx.tickerUpdater)
else:
debug "Ticker is disabled"
if ctx.exCtrlFile.isSome:
warn "Full sync accepts suspension request block number",
syncCtrlFile=ctx.exCtrlFile.get
true
proc release*(ctx: FullCtxRef) =
@ -404,15 +384,26 @@ proc runMulti*(buddy: FullBuddyRef) {.async.} =
## `true` which is typically done after finishing `runSingle()`. This
## instance can be simultaneously active for all peer workers.
##
# Fetch work item
let
ctx {.used.} = buddy.ctx
ctx = buddy.ctx
bq = buddy.only.bQueue
rc = await bq.blockQueueWorker()
if ctx.exCtrlFile.isSome:
let rc = ctx.exCtrlFile.syncCtrlBlockNumberFromFile
if rc.isOk:
ctx.pool.suspendAt = rc.value
if 0 < ctx.pool.suspendAt:
if ctx.pool.suspendAt < buddy.only.bQueue.topAccepted:
# Sleep for a while, then leave
await sleepAsync(10.seconds)
return
# Fetch work item
let rc = await bq.blockQueueWorker()
if rc.isErr:
if rc.error == StagedQueueOverflow:
# Mind the gap: Turn on pool mode if there are too may staged items.
buddy.ctx.poolMode = true
ctx.poolMode = true
else:
return

View File

@ -0,0 +1,49 @@
# Nimbus
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises:[].}
import
eth/p2p,
chronos,
../sync_desc,
../misc/[best_pivot, block_queue],
./ticker
type
PivotState* = enum
PivotStateInitial, ## Initial state
FirstPivotSeen, ## Starting, first pivot seen
FirstPivotAccepted, ## Accepted, waiting for second
FirstPivotUseRegardless ## Force pivot if available
PivotRunMode ## SNAFU after some magic
BuddyData* = object
## Local descriptor data extension
pivot*: BestPivotWorkerRef ## Local pivot worker descriptor
bQueue*: BlockQueueWorkerRef ## Block queue worker
CtxData* = object
## Globally shared data extension
rng*: ref HmacDrbgContext ## Random generator, pre-initialised
pivot*: BestPivotCtxRef ## Global pivot descriptor
pivotState*: PivotState ## For initial pivot control
pivotStamp*: Moment ## `PivotState` driven timing control
bCtx*: BlockQueueCtxRef ## Global block queue descriptor
suspendAt*: BlockNumber ## Suspend if persistent head is larger
ticker*: TickerRef ## Logger ticker
FullBuddyRef* = BuddyRef[CtxData,BuddyData]
## Extended worker peer descriptor
FullCtxRef* = CtxRef[CtxData]
## Extended global descriptor
# End

View File

@ -8,6 +8,8 @@
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises: [].}
import
std/[tables, times, hashes, sets],
chronicles, chronos,
@ -18,7 +20,8 @@ import
../protocol/trace_config, # gossip noise control
../../core/[chain, tx_pool, tx_pool/tx_item]
{.push raises: [].}
logScope:
topics = "eth-wire"
type
HashToTime = TableRef[Hash256, Time]
@ -317,7 +320,7 @@ proc onPeerConnected(ctx: EthWireRef, peer: Peer) =
asyncSpawn ctx.sendNewTxHashes(txHashes, @[peer])
proc onPeerDisconnected(ctx: EthWireRef, peer: Peer) =
debug "ethwire: remove peer from knownByPeer",
debug "remove peer from knownByPeer",
peer
ctx.knownByPeer.del(peer)
@ -346,6 +349,7 @@ proc new*(_: type EthWireRef,
chain: chain,
txPool: txPool,
peerPool: peerPool,
enableTxPool: Enabled,
lastCleanup: getTime())
if txPool.isNil:
ctx.enableTxPool = NotAvailable
@ -372,13 +376,20 @@ proc setNewBlockHashesHandler*(ctx: EthWireRef, handler: NewBlockHashesHandler,
)
# ------------------------------------------------------------------------------
# Public functions: eth wire protocol handlers
# Public getters/setters
# ------------------------------------------------------------------------------
proc txPoolEnabled*(ctx: EthWireRef; ena: bool) =
proc `txPoolEnabled=`*(ctx: EthWireRef; ena: bool) =
if ctx.enableTxPool != NotAvailable:
ctx.enableTxPool = if ena: Enabled else: Suspended
proc txPoolEnabled*(ctx: EthWireRef): bool =
ctx.enableTxPool == Enabled
# ------------------------------------------------------------------------------
# Public functions: eth wire protocol handlers
# ------------------------------------------------------------------------------
method getStatus*(ctx: EthWireRef): EthState
{.gcsafe, raises: [RlpError,EVMError].} =
let

View File

@ -11,15 +11,19 @@
{.push raises: [].}
import
std/sequtils,
chronicles,
eth/p2p,
../snap/worker/db/hexary_range,
stew/interval_set,
../../db/db_chain,
../../core/chain,
../snap/range_desc,
../snap/worker/db/[hexary_desc, hexary_range, snapdb_desc, snapdb_accounts],
../protocol,
../protocol/snap/snap_types,
../../core/chain
../protocol/snap/snap_types
logScope:
topics = "wire-protocol"
topics = "snap-wire"
type
SnapWireRef* = ref object of SnapWireBase
@ -30,10 +34,15 @@ const
proofNodeSizeMax = 532
## Branch node with all branches `high(UInt256)` within RLP list
proc proofNodesSizeMax*(n: int): int {.gcsafe.}
# ------------------------------------------------------------------------------
# Private functions: helper functions
# Private functions: helpers
# ------------------------------------------------------------------------------
template logTxt(info: static[string]): static[string] =
"handlers.snap." & info
proc notImplemented(name: string) =
debug "snapWire: hHandler method not implemented", meth=name
@ -41,6 +50,69 @@ proc append(writer: var RlpWriter; t: SnapProof; node: Blob) =
## RLP mixin, encoding
writer.snapAppend node
# ------------------------------------------------------------------------------
# Private functions: fetch leaf range
# ------------------------------------------------------------------------------
proc fetchLeafRange(
ctx: SnapWireRef; # Handler descriptor
db: HexaryGetFn; # Database abstraction
root: Hash256; # State root
iv: NodeTagRange; # Proofed range of leaf paths
replySizeMax: int; # Updated size counter for the raw list
): Result[RangeProof,void]
{.gcsafe, raises: [CatchableError].} =
let
rootKey = root.to(NodeKey)
estimatedProofSize = proofNodesSizeMax(10) # some expected upper limit
if replySizeMax <= estimatedProofSize:
trace logTxt "fetchLeafRange(): data size too small", iv, replySizeMax
return err() # package size too small
# Assemble result Note that the size limit is the size of the leaf nodes
# on wire. So the `sizeMax` is the argument size `replySizeMax` with some
# space removed to accomodate for the proof nodes.
let
sizeMax =replySizeMax - estimatedProofSize
rc = db.hexaryRangeLeafsProof(rootKey, iv, sizeMax)
if rc.isErr:
error logTxt "fetchLeafRange(): database problem",
iv, replySizeMax, error=rc.error
return err() # database error
let sizeOnWire = rc.value.leafsSize + rc.value.proofSize
if sizeOnWire <= replySizeMax:
return ok(rc.value)
# Strip parts of leafs result and amend remainder by adding proof nodes
var
leafs = rc.value.leafs
leafsTop = leafs.len - 1
tailSize = 0
tailItems = 0
reduceBy = replySizeMax - sizeOnWire
while tailSize <= reduceBy and tailItems < leafsTop:
# Estimate the size on wire needed for the tail item
const extraSize = (sizeof RangeLeaf()) - (sizeof newSeq[Blob](0))
tailSize += leafs[leafsTop - tailItems].data.len + extraSize
tailItems.inc
if leafsTop <= tailItems:
trace logTxt "fetchLeafRange(): stripping leaf list failed",
iv, replySizeMax,leafsTop, tailItems
return err() # package size too small
leafs.setLen(leafsTop - tailItems - 1) # chop off one more for slack
let
leafProof = db.hexaryRangeLeafsProof(rootKey, iv.minPt, leafs)
strippedSizeOnWire = leafProof.leafsSize + leafProof.proofSize
if strippedSizeOnWire <= replySizeMax:
return ok(leafProof)
trace logTxt "fetchLeafRange(): data size problem",
iv, replySizeMax, leafsTop, tailItems, strippedSizeOnWire
err()
# ------------------------------------------------------------------------------
# Private functions: peer observer
# ------------------------------------------------------------------------------
@ -109,8 +181,19 @@ method getAccountRange*(
limit: Hash256;
replySizeMax: uint64;
): (seq[SnapAccount], seq[SnapProof])
{.gcsafe.} =
notImplemented("getAccountRange")
{.gcsafe, raises: [CatchableError].} =
## Fetch accounts list from database
let
db = SnapDbRef.init(ctx.chain.com.db.db).getAccountFn
iv = NodeTagRange.new(origin.to(NodeTag), limit.to(NodeTag))
sizeMax = min(replySizeMax,high(int).uint64).int
trace logTxt "getAccountRange(): request data range", iv, replySizeMax
let rc = ctx.fetchLeafRange(db, root, iv, sizeMax)
if rc.isOk:
return (rc.value.leafs.mapIt(it.to(SnapAccount)), rc.value.proof)
method getStorageRanges*(
ctx: SnapWireRef;

View File

@ -484,6 +484,10 @@ proc `bestNumber=`*(qd: BlockQueueWorkerRef; val: Option[BlockNumber]) =
## does something useful.
qd.bestNumber = val
proc topAccepted*(qd: BlockQueueWorkerRef): BlockNumber =
## Getter
qd.global.topAccepted
# ------------------------------------------------------------------------------
# Public functions -- synchronous
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,56 @@
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises: [].}
import
std/[os, strutils],
chronicles,
eth/[common, p2p]
logScope:
topics = "sync-ctrl"
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc getDataLine(name: string): string {.gcsafe, raises: [IOError].} =
if name.fileExists:
let file = name.open
defer: file.close
return (file.readAll.splitLines)[0].strip
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc syncCtrlBlockNumberFromFile*(
fileName: Option[string];
): Result[BlockNumber,void] =
## Returns a block number from the file name argument `fileName`. The first
## line of the file is parsed as a decimal encoded block number.
if fileName.isSome:
let file = fileName.get
try:
let data = file.getDataLine
if 0 < data.len:
let num = parse(data,UInt256)
return ok(num.toBlockNumber)
except CatchableError as e:
let
name {.used.} = $e.name
msg {.used.} = e.msg
debug "Exception while parsing block number", file, name, msg
err()
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -1,8 +1,23 @@
# Nimbus
# Copyright (c) 2018 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed except
# according to those terms.
{.push raises: [].}
import
chronicles,
eth/[common, p2p, p2p/private/p2p_types],
../../types
logScope:
topics = "eth-wire"
type
NewBlockHashesAnnounce* = object
hash*: Hash256
@ -34,31 +49,37 @@ const
proc notImplemented(name: string) =
debug "Method not implemented", meth = name
method getStatus*(ctx: EthWireBase): EthState {.base.} =
method getStatus*(ctx: EthWireBase): EthState
{.base, gcsafe, raises: [CatchableError].} =
notImplemented("getStatus")
method getReceipts*(ctx: EthWireBase, hashes: openArray[Hash256]): seq[seq[Receipt]] {.base.} =
method getReceipts*(ctx: EthWireBase, hashes: openArray[Hash256]): seq[seq[Receipt]]
{.base, gcsafe, raises: [CatchableError].} =
notImplemented("getReceipts")
method getPooledTxs*(ctx: EthWireBase, hashes: openArray[Hash256]): seq[Transaction] {.base.} =
notImplemented("getPooledTxs")
method getBlockBodies*(ctx: EthWireBase, hashes: openArray[Hash256]): seq[BlockBody] {.base.} =
method getBlockBodies*(ctx: EthWireBase, hashes: openArray[Hash256]): seq[BlockBody] {.base, gcsafe, raises: [CatchableError].} =
notImplemented("getBlockBodies")
method getBlockHeaders*(ctx: EthWireBase, req: BlocksRequest): seq[BlockHeader] {.base.} =
method getBlockHeaders*(ctx: EthWireBase, req: BlocksRequest): seq[BlockHeader]
{.base, gcsafe, raises: [CatchableError].} =
notImplemented("getBlockHeaders")
method handleNewBlock*(ctx: EthWireBase, peer: Peer, blk: EthBlock, totalDifficulty: DifficultyInt) {.base.} =
method handleNewBlock*(ctx: EthWireBase, peer: Peer, blk: EthBlock, totalDifficulty: DifficultyInt)
{.base, gcsafe, raises: [CatchableError].} =
notImplemented("handleNewBlock")
method handleAnnouncedTxs*(ctx: EthWireBase, peer: Peer, txs: openArray[Transaction]) {.base.} =
method handleAnnouncedTxs*(ctx: EthWireBase, peer: Peer, txs: openArray[Transaction])
{.base, gcsafe, raises: [CatchableError].} =
notImplemented("handleAnnouncedTxs")
method handleAnnouncedTxsHashes*(ctx: EthWireBase, peer: Peer, txHashes: openArray[Hash256]) {.base.} =
notImplemented("handleAnnouncedTxsHashes")
method handleNewBlockHashes*(ctx: EthWireBase, peer: Peer, hashes: openArray[NewBlockHashesAnnounce]) {.base.} =
method handleNewBlockHashes*(ctx: EthWireBase, peer: Peer, hashes: openArray[NewBlockHashesAnnounce])
{.base, gcsafe, raises: [CatchableError].} =
notImplemented("handleNewBlockHashes")
when defined(legacy_eth66_enabled):

View File

@ -8,13 +8,15 @@
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises: [].}
import
chronicles,
eth/common,
stew/endians2,
../../../constants
{.push raises: [].}
logScope:
topics = "snap-wire"
type
SnapAccount* = object
@ -149,7 +151,7 @@ method getAccountRange*(
limit: Hash256;
replySizeMax: uint64;
): (seq[SnapAccount], seq[SnapProof])
{.base.} =
{.base, raises: [CatchableError].} =
notImplemented("getAccountRange")
method getStorageRanges*(

View File

@ -115,9 +115,10 @@ proc init*(
dbBackend: ChainDb;
enableTicker = false;
noRecovery = false;
exCtrlFile = none(string);
): T =
new result
result.initSync(ethNode, chain, maxPeers, enableTicker)
result.initSync(ethNode, chain, maxPeers, enableTicker, exCtrlFile)
result.ctx.chain = chain # explicitely override
result.ctx.pool.rng = rng
result.ctx.pool.dbBackend = dbBackend

View File

@ -103,7 +103,7 @@ proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool =
## Global set up
ctx.pool.coveredAccounts = NodeTagRangeSet.init()
noExceptionOops("worker.setup()"):
ctx.ethWireCtx.txPoolEnabled false
ctx.ethWireCtx.txPoolEnabled = false
ctx.chain.com.syncReqNewHead = ctx.pivotUpdateBeaconHeaderCB
ctx.pool.snapDb =
if ctx.pool.dbBackend.isNil: SnapDbRef.init(ctx.chain.db.db)
@ -134,7 +134,7 @@ proc release*(ctx: SnapCtxRef) =
ctx.pool.ticker.stop()
ctx.pool.ticker = nil
noExceptionOops("worker.release()"):
ctx.ethWireCtx.txPoolEnabled true
ctx.ethWireCtx.txPoolEnabled = true
ctx.chain.com.syncReqNewHead = nil
proc start*(buddy: SnapBuddyRef): bool =

View File

@ -50,6 +50,21 @@ proc rlpPairSize(aLen: int; bRlpLen: int): int =
else:
high(int)
proc nonLeafPathNodes(
baseTag: NodeTag; # Left boundary
rootKey: NodeKey|RepairKey; # State root
db: HexaryGetFn|HexaryTreeDbRef; # Database abstraction
): HashSet[Blob]
{.gcsafe, raises: [CatchableError]} =
## Helper for `updateProof()`
baseTag
.hexaryPath(rootKey, db)
.path
.mapIt(it.node)
.filterIt(it.kind != Leaf)
.mapIt(it.convertTo(Blob))
.toHashSet
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
@ -83,7 +98,7 @@ template collectLeafs(
rc = typeof(rc).err(rx.error)
break body
rx.value
rightKey = xPath.getPartialPath.convertTo(NodeKey)
rightKey = getPartialPath(xPath).convertTo(NodeKey)
rightTag = rightKey.to(NodeTag)
# Prevents from semi-endless looping
@ -121,23 +136,13 @@ template updateProof(
): auto =
## Complement leafs list by adding proof nodes. This directive is provided as
## `template` for avoiding varying exceprion annotations.
var proof = baseTag.hexaryPath(rootKey, db)
.path
.mapIt(it.node)
.filterIt(it.kind != Leaf)
.mapIt(it.convertTo(Blob))
.toHashSet
var proof = nonLeafPathNodes(baseTag, rootKey, db)
if 0 < leafList.len:
proof.incl leafList[^1].key.to(NodeTag).hexaryPath(rootKey, db)
.path
.mapIt(it.node)
.filterIt(it.kind != Leaf)
.mapIt(it.convertTo(Blob))
.toHashSet
proof.incl nonLeafPathNodes(leafList[^1].key.to(NodeTag), rootKey, db)
var rp = RangeProof(
leafs: leafList,
proof: proof.toSeq.mapIt(SnapProof(data: it)))
proof: mapIt(toSeq(proof), SnapProof(data: it)))
if 0 < nSizeUsed:
rp.leafsSize = hexaryRangeRlpSize nSizeUsed
@ -180,6 +185,16 @@ proc hexaryRangeLeafsProof*(
# Public helpers
# ------------------------------------------------------------------------------
proc to*(
rl: RangeLeaf;
T: type SnapAccount;
): T
{.gcsafe, raises: [RlpError]} =
## Convert the generic `RangeLeaf` argument to payload type.
T(accHash: rl.key.to(Hash256),
accBody: rl.data.decode(Account))
proc hexaryRangeRlpSize*(blobLen: int): int =
## Returns the size of RLP encoded <blob> of argument length `blobLen`.
if blobLen < 56:

View File

@ -8,6 +8,8 @@
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises: [].}
import
std/[math, sets, sequtils],
chronicles,
@ -22,8 +24,6 @@ import
storage_queue_helper],
./ticker
{.push raises: [].}
logScope:
topics = "snap-pivot"

View File

@ -41,7 +41,7 @@ type
nQueues*: int
TickerStatsUpdater* =
proc: SnapTickerStats {.gcsafe, raises: [Defect].}
proc: SnapTickerStats {.gcsafe, raises: [].}
TickerRef* = ref object
## Account fetching state that is shared among all peers.

View File

@ -14,6 +14,7 @@
## Public descriptors
import
#std/options,
eth/[common, p2p],
../core/chain,
../db/db_chain,
@ -34,24 +35,25 @@ type
BuddyCtrlRef* = ref object
## Control and state settings
runState: BuddyRunState ## Access with getters
multiOk*: bool ## Triggers `runSingle()` mode unless `true`
runState: BuddyRunState ## Access with getters
multiOk*: bool ## Triggers `runSingle()` mode unless `true`
BuddyRef*[S,W] = ref object
## Worker peer state descriptor.
ctx*: CtxRef[S] ## Shared data descriptor back reference
peer*: Peer ## Reference to eth p2pProtocol entry
ctrl*: BuddyCtrlRef ## Control and state settings
only*: W ## Worker peer specific data
ctx*: CtxRef[S] ## Shared data descriptor back reference
peer*: Peer ## Reference to eth p2pProtocol entry
ctrl*: BuddyCtrlRef ## Control and state settings
only*: W ## Worker peer specific data
CtxRef*[S] = ref object
## Shared state among all syncing peer workers (aka buddies.)
buddiesMax*: int ## Max number of buddies
ethWireCtx*: EthWireRef ## Eth protocol wire context (if available)
chain*: ChainRef ## Block chain database (no need for `Peer`)
poolMode*: bool ## Activate `runPool()` workers if set `true`
daemon*: bool ## Enable global background job
pool*: S ## Shared context for all worker peers
buddiesMax*: int ## Max number of buddies
ethWireCtx*: EthWireRef ## Eth protocol wire context (if available)
chain*: ChainRef ## Block chain database (no need for `Peer`)
poolMode*: bool ## Activate `runPool()` workers if set `true`
daemon*: bool ## Enable global background job
exCtrlFile*: Option[string] ## Extra instructions file (if any)
pool*: S ## Shared context for all worker peers
# ------------------------------------------------------------------------------
# Public functions

View File

@ -330,7 +330,9 @@ proc initSync*[S,W](
node: EthereumNode;
chain: ChainRef,
slots: int;
noisy = false) =
noisy = false;
exCtrlFile = none(string);
) =
## Constructor
# Leave one extra slot so that it can holds a *zombie* even if all slots
# are full. The effect is that a re-connect on the latest zombie will be
@ -338,6 +340,7 @@ proc initSync*[S,W](
dsc.ctx = CtxRef[S](
ethWireCtx: cast[EthWireRef](node.protocolState protocol.eth),
buddiesMax: max(1, slots + 1),
exCtrlFile: exCtrlFile,
chain: chain)
dsc.pool = node.peerPool
dsc.tickerOk = noisy

View File

@ -22,6 +22,8 @@ const
rc &= ", legacy-eth/66"
when defined(chunked_rlpx_enabled):
rc &= ", chunked-rlpx"
when defined(boehmgc):
rc &= ", boehm/gc"
rc &= " enabled"
rc