Block header download beacon to era1 (#2601)
* Block header download starting at Beacon down to Era1 details: The header download implementation is intended to be completed to a full sync facility. Downloaded block headers are stored in a `CoreDb` table. Later on they should be fetched, complemented by a block body, executed/imported, and deleted from the table. The Era1 repository may be partial or missing. Era1 headers are neither downloaded nor stored on the `CoreDb` table. Headers are downloaded top down (largest block number first) using the hash of the block header by one peer. Other peers fetch headers opportunistically using block numbers Observed download times for 14m `MainNet` headers varies between 30min and 1h (Era1 size truncated to 66m blocks.), full download 52min (anectdotal.) The number of peers downloading concurrently is crucial here. * Activate `flare` by command line option * Fix copyright year
This commit is contained in:
parent
cb94dd0c5b
commit
71e466d173
|
@ -131,7 +131,7 @@ type
|
|||
|
||||
SyncMode* {.pure.} = enum
|
||||
Default
|
||||
#Snap ## Beware, experimental
|
||||
Flare ## Beware, experimental
|
||||
|
||||
NimbusConf* = object of RootObj
|
||||
## Main Nimbus configuration object
|
||||
|
@ -175,8 +175,8 @@ type
|
|||
syncMode* {.
|
||||
desc: "Specify particular blockchain sync mode."
|
||||
longDesc:
|
||||
"- default -- beacon sync mode\n" &
|
||||
# "- snap -- experimental snap mode (development only)\n" &
|
||||
"- default -- beacon sync mode\n" &
|
||||
"- flare -- re-facored beacon like mode, experimental\n" &
|
||||
""
|
||||
defaultValue: SyncMode.Default
|
||||
defaultValueDesc: $SyncMode.Default
|
||||
|
|
|
@ -28,6 +28,8 @@ type
|
|||
skeletonBlockHashToNumber = 11
|
||||
skeletonHeader = 12
|
||||
skeletonBody = 13
|
||||
flareState = 14
|
||||
flareHeader = 15
|
||||
|
||||
DbKey* = object
|
||||
# The first byte stores the key type. The rest are key-specific values
|
||||
|
@ -110,6 +112,17 @@ func hashIndexKey*(hash: Hash256, index: uint16): HashIndexKey =
|
|||
result[32] = byte(index and 0xFF)
|
||||
result[33] = byte((index shl 8) and 0xFF)
|
||||
|
||||
func flareStateKey*(u: uint8): DbKey =
|
||||
result.data[0] = byte ord(flareState)
|
||||
result.data[1] = u
|
||||
result.dataEndPos = 1
|
||||
|
||||
func flareHeaderKey*(u: BlockNumber): DbKey =
|
||||
result.data[0] = byte ord(flareHeader)
|
||||
doAssert sizeof(u) <= 32
|
||||
copyMem(addr result.data[1], unsafeAddr u, sizeof(u))
|
||||
result.dataEndPos = uint8 sizeof(u)
|
||||
|
||||
template toOpenArray*(k: DbKey): openArray[byte] =
|
||||
k.data.toOpenArray(0, int(k.dataEndPos))
|
||||
|
||||
|
|
|
@ -143,6 +143,10 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
|
|||
nimbus.beaconSyncRef = BeaconSyncRef.init(
|
||||
nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers,
|
||||
)
|
||||
of SyncMode.Flare:
|
||||
nimbus.flareSyncRef = FlareSyncRef.init(
|
||||
nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers,
|
||||
conf.era1Dir.string)
|
||||
|
||||
# Connect directly to the static nodes
|
||||
let staticPeers = conf.getStaticPeers()
|
||||
|
@ -163,6 +167,8 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
|
|||
# waitForPeers = false
|
||||
of SyncMode.Default:
|
||||
discard
|
||||
of SyncMode.Flare:
|
||||
discard
|
||||
nimbus.networkLoop = nimbus.ethNode.connectToNetwork(
|
||||
enableDiscovery = conf.discovery != DiscoveryType.None,
|
||||
waitForPeers = waitForPeers)
|
||||
|
@ -254,6 +260,8 @@ proc run(nimbus: NimbusNode, conf: NimbusConf) =
|
|||
case conf.syncMode:
|
||||
of SyncMode.Default:
|
||||
nimbus.beaconSyncRef.start
|
||||
of SyncMode.Flare:
|
||||
nimbus.flareSyncRef.start
|
||||
#of SyncMode.Snap:
|
||||
# nimbus.snapSyncRef.start
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@ import
|
|||
./core/chain,
|
||||
./core/tx_pool,
|
||||
./sync/peers,
|
||||
./sync/beacon,
|
||||
./sync/[beacon, flare],
|
||||
# ./sync/snap, # -- todo
|
||||
./beacon/beacon_engine,
|
||||
./common,
|
||||
|
@ -30,6 +30,7 @@ export
|
|||
tx_pool,
|
||||
peers,
|
||||
beacon,
|
||||
flare,
|
||||
#snap,
|
||||
full,
|
||||
beacon_engine,
|
||||
|
@ -52,6 +53,7 @@ type
|
|||
peerManager*: PeerManagerRef
|
||||
# snapSyncRef*: SnapSyncRef # -- todo
|
||||
beaconSyncRef*: BeaconSyncRef
|
||||
flareSyncRef*: FlareSyncRef
|
||||
beaconEngine*: BeaconEngineRef
|
||||
metricsServer*: MetricsHttpServerRef
|
||||
|
||||
|
|
|
@ -0,0 +1,128 @@
|
|||
# Nimbus
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
||||
# http://opensource.org/licenses/MIT)
|
||||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
pkg/[chronicles, chronos, eth/p2p, results],
|
||||
pkg/stew/[interval_set, sorted_set],
|
||||
./flare/[worker, worker_desc],
|
||||
"."/[sync_desc, sync_sched, protocol]
|
||||
|
||||
logScope:
|
||||
topics = "beacon2-sync"
|
||||
|
||||
type
|
||||
FlareSyncRef* = RunnerSyncRef[FlareCtxData,FlareBuddyData]
|
||||
|
||||
const
|
||||
extraTraceMessages = false # or true
|
||||
## Enable additional logging noise
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private logging helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
template traceMsg(f, info: static[string]; args: varargs[untyped]) =
|
||||
trace "Flare scheduler " & f & "() " & info, args
|
||||
|
||||
template traceMsgCtx(f, info: static[string]; c: FlareCtxRef) =
|
||||
when extraTraceMessages:
|
||||
block:
|
||||
let
|
||||
poolMode {.inject.} = c.poolMode
|
||||
daemon {.inject.} = c.daemon
|
||||
f.traceMsg info, poolMode, daemon
|
||||
|
||||
template traceMsgBuddy(f, info: static[string]; b: FlareBuddyRef) =
|
||||
when extraTraceMessages:
|
||||
block:
|
||||
let
|
||||
peer {.inject.} = b.peer
|
||||
runState {.inject.} = b.ctrl.state
|
||||
multiOk {.inject.} = b.ctrl.multiOk
|
||||
poolMode {.inject.} = b.ctx.poolMode
|
||||
daemon {.inject.} = b.ctx.daemon
|
||||
f.traceMsg info, peer, runState, multiOk, poolMode, daemon
|
||||
|
||||
|
||||
template tracerFrameCtx(f: static[string]; c: FlareCtxRef; code: untyped) =
|
||||
f.traceMsgCtx "begin", c
|
||||
code
|
||||
f.traceMsgCtx "end", c
|
||||
|
||||
template tracerFrameBuddy(f: static[string]; b: FlareBuddyRef; code: untyped) =
|
||||
f.traceMsgBuddy "begin", b
|
||||
code
|
||||
f.traceMsgBuddy "end", b
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Virtual methods/interface, `mixin` functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc runSetup(ctx: FlareCtxRef): bool =
|
||||
tracerFrameCtx("runSetup", ctx):
|
||||
result = worker.setup(ctx)
|
||||
|
||||
proc runRelease(ctx: FlareCtxRef) =
|
||||
tracerFrameCtx("runRelease", ctx):
|
||||
worker.release(ctx)
|
||||
|
||||
proc runDaemon(ctx: FlareCtxRef) {.async.} =
|
||||
tracerFrameCtx("runDaemon", ctx):
|
||||
await worker.runDaemon(ctx)
|
||||
|
||||
proc runStart(buddy: FlareBuddyRef): bool =
|
||||
tracerFrameBuddy("runStart", buddy):
|
||||
result = worker.start(buddy)
|
||||
|
||||
proc runStop(buddy: FlareBuddyRef) =
|
||||
tracerFrameBuddy("runStop", buddy):
|
||||
worker.stop(buddy)
|
||||
|
||||
proc runPool(buddy: FlareBuddyRef; last: bool; laps: int): bool =
|
||||
tracerFrameBuddy("runPool", buddy):
|
||||
result = worker.runPool(buddy, last, laps)
|
||||
|
||||
proc runSingle(buddy: FlareBuddyRef) {.async.} =
|
||||
tracerFrameBuddy("runSingle", buddy):
|
||||
await worker.runSingle(buddy)
|
||||
|
||||
proc runMulti(buddy: FlareBuddyRef) {.async.} =
|
||||
tracerFrameBuddy("runMulti", buddy):
|
||||
await worker.runMulti(buddy)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc init*(
|
||||
T: type FlareSyncRef;
|
||||
ethNode: EthereumNode;
|
||||
chain: ForkedChainRef;
|
||||
rng: ref HmacDrbgContext;
|
||||
maxPeers: int;
|
||||
era1Dir: string;
|
||||
): T =
|
||||
new result
|
||||
result.initSync(ethNode, chain, maxPeers)
|
||||
result.ctx.pool.rng = rng
|
||||
result.ctx.pool.e1Dir = era1Dir
|
||||
|
||||
proc start*(ctx: FlareSyncRef) =
|
||||
## Beacon Sync always begin with stop mode
|
||||
doAssert ctx.startSync() # Initialize subsystems
|
||||
|
||||
proc stop*(ctx: FlareSyncRef) =
|
||||
ctx.stopSync()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
|
@ -0,0 +1 @@
|
|||
*.html
|
|
@ -0,0 +1,129 @@
|
|||
Syncing
|
||||
=======
|
||||
|
||||
Syncing blocks is performed in two partially overlapping phases
|
||||
|
||||
* loading the header chains into separate database tables
|
||||
* removing headers from the headers chain, fetching the rest of the
|
||||
block the header belongs to and executing it
|
||||
|
||||
Header chains
|
||||
-------------
|
||||
|
||||
The header chains are the triple of
|
||||
|
||||
* a consecutively linked chain of headers starting starting at Genesis
|
||||
* followed by a sequence of missing headers
|
||||
* followed by a consecutively linked chain of headers ending up at a
|
||||
finalised block header received from the consensus layer
|
||||
|
||||
A sequence *@[h(1),h(2),..]* of block headers is called a consecutively
|
||||
linked chain if
|
||||
|
||||
* block numbers join without gaps, i.e. *h(n).number+1 == h(n+1).number*
|
||||
* parent hashes match, i.e. *h(n).hash == h(n+1).parentHash*
|
||||
|
||||
General header chains layout diagram
|
||||
|
||||
G B L F (1)
|
||||
o----------------o---------------------o----------------o--->
|
||||
| <-- linked --> | <-- unprocessed --> | <-- linked --> |
|
||||
|
||||
Here, the single upper letter symbols *G*, *B*, *L*, *F* denote block numbers.
|
||||
For convenience, these letters are also identified with its associated block
|
||||
header or the full block. Saying *"the header G"* is short for *"the header
|
||||
with block number G"*.
|
||||
|
||||
Meaning of *G*, *B*, *L*, *F*:
|
||||
|
||||
* *G* -- Genesis block number *#0*
|
||||
* *B* -- base, maximal block number of linked chain starting at *G*
|
||||
* *L* -- least, minimal block number of linked chain ending at *F* with *B <= L*
|
||||
* *F* -- final, some finalised block
|
||||
|
||||
This definition implies *G <= B <= L <= F* and the header chains can uniquely
|
||||
be described by the triple of block numbers *(B,L,F)*.
|
||||
|
||||
Storage of header chains:
|
||||
-------------------------
|
||||
|
||||
Some block numbers from the set *{w|G<=w<=B}* may correspond to finalised
|
||||
blocks which may be stored anywhere. If some block numbers do not correspond
|
||||
to finalised blocks, then the headers must reside in the *flareHeader*
|
||||
database table. Of course, due to being finalised such block numbers constitute
|
||||
a sub-chain starting at *G*.
|
||||
|
||||
The block numbers from the set *{w|L<=w<=F}* must reside in the *flareHeader*
|
||||
database table. They do not correspond to finalised blocks.
|
||||
|
||||
Header chains initialisation:
|
||||
-----------------------------
|
||||
|
||||
Minimal layout on a pristine system
|
||||
|
||||
G (2)
|
||||
B
|
||||
L
|
||||
F
|
||||
o--->
|
||||
|
||||
When first initialised, the header chains are set to *(G,G,G)*.
|
||||
|
||||
Updating header chains:
|
||||
-----------------------
|
||||
|
||||
A header chain with an non empty open interval *(B,L)* can be updated only by
|
||||
increasing *B* or decreasing *L* by adding headers so that the linked chain
|
||||
condition is not violated.
|
||||
|
||||
Only when the open interval *(B,L)* vanishes the right end *F* can be increased
|
||||
by *Z* say. Then
|
||||
|
||||
* *B==L* beacuse interval *(B,L)* is empty
|
||||
* *B==F* because *B* is maximal
|
||||
|
||||
and the header chains *(F,F,F)* (depicted in *(3)*) can be set to *(B,Z,Z)*
|
||||
(as depicted in *(4)*.)
|
||||
|
||||
Layout before updating of *F*
|
||||
|
||||
B (3)
|
||||
L
|
||||
G F Z
|
||||
o----------------o---------------------o---->
|
||||
| <-- linked --> |
|
||||
|
||||
New layout with *Z*
|
||||
|
||||
L' (4)
|
||||
G B F'
|
||||
o----------------o---------------------o---->
|
||||
| <-- linked --> | <-- unprocessed --> |
|
||||
|
||||
with *L'=Z* and *F'=Z*.
|
||||
|
||||
Note that diagram *(3)* is a generalisation of *(2)*.
|
||||
|
||||
|
||||
Era1 repository support:
|
||||
------------------------
|
||||
|
||||
For the initial blocks, *Era1* repositories are supported for *MainNet*
|
||||
and *Sepolia*. They might be truncated at the top, condition is that provide
|
||||
consecutive blocks (like the headr chains) and start at *Genesis*.
|
||||
|
||||
In that case, the position *B* will be immediately set to the largest available
|
||||
*Era1* block number without storing any headers.
|
||||
|
||||
Complete header chain:
|
||||
----------------------
|
||||
|
||||
The header chain is *relatively complete* if it satisfies clause *(3)* above
|
||||
for *G < B*. It is *fully complete* if *F==Z*. It should be obvious that the
|
||||
latter condition is temporary only on a live system (as *Z* is permanently
|
||||
updated.)
|
||||
|
||||
If a *relatively complete* header chain is reached for the first time, the
|
||||
execution layer can start running an importer in the background compiling
|
||||
or executing blocks (starting from block number *#1*.) So the ledger database
|
||||
state will be updated incrementally.
|
|
@ -0,0 +1,3 @@
|
|||
* Update/resolve code fragments which are tagged FIXME
|
||||
|
||||
* Revisit timeouts when fetching header data from the network
|
|
@ -0,0 +1,168 @@
|
|||
# Nimbus
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at
|
||||
# https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at
|
||||
# https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
{.push raises:[].}
|
||||
|
||||
import
|
||||
pkg/[chronicles, chronos],
|
||||
pkg/eth/[common, p2p],
|
||||
pkg/stew/[interval_set, sorted_set],
|
||||
../../common,
|
||||
./worker/[db, staged, start_stop, unproc, update],
|
||||
./worker_desc
|
||||
|
||||
logScope:
|
||||
topics = "flare"
|
||||
|
||||
const extraTraceMessages = false or true
|
||||
## Enabled additional logging noise
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public start/stop and admin functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc setup*(ctx: FlareCtxRef): bool =
|
||||
## Global set up
|
||||
debug "RUNSETUP"
|
||||
ctx.setupRpcMagic()
|
||||
|
||||
# Setup `Era1` access (if any) before setting up layout.
|
||||
discard ctx.dbInitEra1()
|
||||
|
||||
# Load initial state from database if there is any
|
||||
ctx.dbLoadLinkedHChainsLayout()
|
||||
|
||||
# Debugging stuff, might be an empty template
|
||||
ctx.setupTicker()
|
||||
|
||||
# Enable background daemon
|
||||
ctx.daemon = true
|
||||
true
|
||||
|
||||
proc release*(ctx: FlareCtxRef) =
|
||||
## Global clean up
|
||||
debug "RUNRELEASE"
|
||||
ctx.destroyRpcMagic()
|
||||
ctx.destroyTicker()
|
||||
|
||||
|
||||
proc start*(buddy: FlareBuddyRef): bool =
|
||||
## Initialise worker peer
|
||||
if buddy.startBuddy():
|
||||
buddy.ctrl.multiOk = true
|
||||
debug "RUNSTART", peer=buddy.peer, multiOk=buddy.ctrl.multiOk
|
||||
return true
|
||||
debug "RUNSTART failed", peer=buddy.peer
|
||||
|
||||
proc stop*(buddy: FlareBuddyRef) =
|
||||
## Clean up this peer
|
||||
debug "RUNSTOP", peer=buddy.peer
|
||||
buddy.stopBuddy()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc runDaemon*(ctx: FlareCtxRef) {.async.} =
|
||||
## Global background job that will be re-started as long as the variable
|
||||
## `ctx.daemon` is set `true`. If that job was stopped due to re-setting
|
||||
## `ctx.daemon` to `false`, it will be restarted next after it was reset
|
||||
## as `true` not before there is some activity on the `runPool()`,
|
||||
## `runSingle()`, or `runMulti()` functions.
|
||||
##
|
||||
const info = "RUNDAEMON"
|
||||
debug info
|
||||
|
||||
# Check for a possible layout change of the `HeaderChainsSync` state
|
||||
if ctx.updateLinkedHChainsLayout():
|
||||
debug info & ": headers chain layout was updated"
|
||||
|
||||
ctx.updateMetrics()
|
||||
await sleepAsync daemonWaitInterval
|
||||
|
||||
|
||||
proc runSingle*(buddy: FlareBuddyRef) {.async.} =
|
||||
## This peer worker is invoked if the peer-local flag `buddy.ctrl.multiOk`
|
||||
## is set `false` which is the default mode. This flag is updated by the
|
||||
## worker when deemed appropriate.
|
||||
## * For all workers, there can be only one `runSingle()` function active
|
||||
## simultaneously for all worker peers.
|
||||
## * There will be no `runMulti()` function active for the same worker peer
|
||||
## simultaneously
|
||||
## * There will be no `runPool()` iterator active simultaneously.
|
||||
##
|
||||
## Note that this function runs in `async` mode.
|
||||
##
|
||||
raiseAssert "RUNSINGLE should not be used: peer=" & $buddy.peer
|
||||
|
||||
|
||||
proc runPool*(buddy: FlareBuddyRef; last: bool; laps: int): bool =
|
||||
## Once started, the function `runPool()` is called for all worker peers in
|
||||
## sequence as the body of an iteration as long as the function returns
|
||||
## `false`. There will be no other worker peer functions activated
|
||||
## simultaneously.
|
||||
##
|
||||
## This procedure is started if the global flag `buddy.ctx.poolMode` is set
|
||||
## `true` (default is `false`.) It will be automatically reset before the
|
||||
## the loop starts. Re-setting it again results in repeating the loop. The
|
||||
## argument `laps` (starting with `0`) indicated the currend lap of the
|
||||
## repeated loops.
|
||||
##
|
||||
## The argument `last` is set `true` if the last entry is reached.
|
||||
##
|
||||
## Note that this function does not run in `async` mode.
|
||||
##
|
||||
const info = "RUNPOOL"
|
||||
when extraTraceMessages:
|
||||
debug info, peer=buddy.peer, laps
|
||||
buddy.ctx.stagedReorg info # reorg
|
||||
true # stop
|
||||
|
||||
|
||||
proc runMulti*(buddy: FlareBuddyRef) {.async.} =
|
||||
## This peer worker is invoked if the `buddy.ctrl.multiOk` flag is set
|
||||
## `true` which is typically done after finishing `runSingle()`. This
|
||||
## instance can be simultaneously active for all peer workers.
|
||||
##
|
||||
const info = "RUNMULTI"
|
||||
let
|
||||
ctx = buddy.ctx
|
||||
peer = buddy.peer
|
||||
|
||||
if ctx.unprocTotal() == 0 and ctx.stagedChunks() == 0:
|
||||
# Save cpu cycles waiting for something to do
|
||||
when extraTraceMessages:
|
||||
debug info & ": idle wasting time", peer
|
||||
await sleepAsync runMultiIdleWaitInterval
|
||||
return
|
||||
|
||||
if not ctx.flipCoin():
|
||||
# Come back next time
|
||||
when extraTraceMessages:
|
||||
debug info & ": running later", peer
|
||||
return
|
||||
|
||||
# * get unprocessed range from pool
|
||||
# * fetch headers for this range (as much as one can get)
|
||||
# * verify that a block is sound, i.e. contiguous, chained by parent hashes
|
||||
# * return remaining range to unprocessed range in the pool
|
||||
# * store this range on the staged queue on the pool
|
||||
if await buddy.stagedCollect info:
|
||||
# * increase the top/right interval of the trused range `[L,F]`
|
||||
# * save updated state and headers
|
||||
discard buddy.ctx.stagedProcess info
|
||||
|
||||
else:
|
||||
when extraTraceMessages:
|
||||
debug info & ": failed, done", peer
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
|
@ -0,0 +1,260 @@
|
|||
# Nimbus
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at
|
||||
# https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at
|
||||
# https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
{.push raises:[].}
|
||||
|
||||
import
|
||||
pkg/chronicles,
|
||||
pkg/eth/[common, rlp],
|
||||
pkg/stew/[interval_set, sorted_set],
|
||||
pkg/results,
|
||||
../../../db/[era1_db, storage_types],
|
||||
../../../common,
|
||||
../../sync_desc,
|
||||
../worker_desc,
|
||||
"."/[staged, unproc]
|
||||
|
||||
logScope:
|
||||
topics = "flare db"
|
||||
|
||||
const
|
||||
extraTraceMessages = false or true
|
||||
## Enabled additional logging noise
|
||||
|
||||
LhcStateKey = 1.flareStateKey
|
||||
|
||||
type
|
||||
SavedDbStateSpecs = tuple
|
||||
number: BlockNumber
|
||||
hash: Hash256
|
||||
parent: Hash256
|
||||
|
||||
Era1Specs = tuple
|
||||
e1db: Era1DbRef
|
||||
maxNum: BlockNumber
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
template hasKey(e1db: Era1DbRef; bn: BlockNumber): bool =
|
||||
e1db.getEthBlock(bn).isOk
|
||||
|
||||
proc newEra1Desc(networkID: NetworkID; era1Dir: string): Opt[Era1Specs] =
|
||||
const info = "newEra1Desc"
|
||||
var specs: Era1Specs
|
||||
|
||||
case networkID:
|
||||
of MainNet:
|
||||
specs.e1db = Era1DbRef.init(era1Dir, "mainnet").valueOr:
|
||||
when extraTraceMessages:
|
||||
trace info & ": no Era1 available", networkID, era1Dir
|
||||
return err()
|
||||
specs.maxNum = 15_537_393'u64 # Mainnet, copied from `nimbus_import`
|
||||
|
||||
of SepoliaNet:
|
||||
specs.e1db = Era1DbRef.init(era1Dir, "sepolia").valueOr:
|
||||
when extraTraceMessages:
|
||||
trace info & ": no Era1 available", networkID, era1Dir
|
||||
return err()
|
||||
specs.maxNum = 1_450_408'u64 # Sepolia
|
||||
|
||||
else:
|
||||
when extraTraceMessages:
|
||||
trace info & ": Era1 unsupported", networkID
|
||||
return err()
|
||||
|
||||
# At least block 1 should be supported
|
||||
if not specs.e1db.hasKey 1u64:
|
||||
specs.e1db.dispose()
|
||||
notice info & ": Era1 repo disfunctional", networkID, blockNumber=1
|
||||
return err()
|
||||
|
||||
when extraTraceMessages:
|
||||
trace info & ": Era1 supported",
|
||||
networkID, lastEra1Block=specs.maxNum.bnStr
|
||||
ok(specs)
|
||||
|
||||
|
||||
proc fetchLinkedHChainsLayout(ctx: FlareCtxRef): Opt[LinkedHChainsLayout] =
|
||||
let data = ctx.db.ctx.getKvt().get(LhcStateKey.toOpenArray).valueOr:
|
||||
return err()
|
||||
try:
|
||||
result = ok(rlp.decode(data, LinkedHChainsLayout))
|
||||
except RlpError:
|
||||
return err()
|
||||
|
||||
# --------------
|
||||
|
||||
proc fetchEra1State(ctx: FlareCtxRef): Opt[SavedDbStateSpecs] =
|
||||
var val: SavedDbStateSpecs
|
||||
val.number = ctx.pool.e1AvailMax
|
||||
if 0 < val.number:
|
||||
let header = ctx.pool.e1db.getEthBlock(val.number).value.header
|
||||
val.parent = header.parentHash
|
||||
val.hash = rlp.encode(header).keccakHash
|
||||
return ok(val)
|
||||
err()
|
||||
|
||||
proc fetchSavedState(ctx: FlareCtxRef): Opt[SavedDbStateSpecs] =
|
||||
let
|
||||
db = ctx.db
|
||||
e1Max = ctx.pool.e1AvailMax
|
||||
|
||||
var val: SavedDbStateSpecs
|
||||
val.number = db.getSavedStateBlockNumber()
|
||||
|
||||
if e1Max == 0 or e1Max < val.number:
|
||||
if db.getBlockHash(val.number, val.hash):
|
||||
var header: BlockHeader
|
||||
if db.getBlockHeader(val.hash, header):
|
||||
val.parent = header.parentHash
|
||||
return ok(val)
|
||||
return err()
|
||||
|
||||
ctx.fetchEra1State()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc dbStoreLinkedHChainsLayout*(ctx: FlareCtxRef): bool =
|
||||
## Save chain layout to persistent db
|
||||
const info = "dbStoreLinkedHChainsLayout"
|
||||
if ctx.layout == ctx.lhc.lastLayout:
|
||||
when extraTraceMessages:
|
||||
trace info & ": no layout change"
|
||||
return false
|
||||
|
||||
let data = rlp.encode(ctx.layout)
|
||||
ctx.db.ctx.getKvt().put(LhcStateKey.toOpenArray, data).isOkOr:
|
||||
raiseAssert info & " put() failed: " & $$error
|
||||
|
||||
# While executing blocks there are frequent save cycles. Otherwise, an
|
||||
# extra save request might help to pick up an interrupted sync session.
|
||||
if ctx.db.getSavedStateBlockNumber() == 0:
|
||||
ctx.db.persistent(0).isOkOr:
|
||||
when extraTraceMessages:
|
||||
trace info & ": failed to save layout pesistently", error=($$error)
|
||||
return false
|
||||
when extraTraceMessages:
|
||||
trace info & ": layout saved pesistently"
|
||||
true
|
||||
|
||||
|
||||
proc dbLoadLinkedHChainsLayout*(ctx: FlareCtxRef) =
|
||||
## Restore chain layout from persistent db
|
||||
const info = "dbLoadLinkedHChainsLayout"
|
||||
ctx.stagedInit()
|
||||
ctx.unprocInit()
|
||||
|
||||
let rc = ctx.fetchLinkedHChainsLayout()
|
||||
if rc.isOk:
|
||||
ctx.lhc.layout = rc.value
|
||||
let (uMin,uMax) = (rc.value.base+1, rc.value.least-1)
|
||||
if uMin <= uMax:
|
||||
# Add interval of unprocessed block range `(B,L)` from README
|
||||
ctx.unprocMerge(uMin, uMax)
|
||||
when extraTraceMessages:
|
||||
trace info & ": restored layout"
|
||||
else:
|
||||
let val = ctx.fetchSavedState().expect "saved states"
|
||||
ctx.lhc.layout = LinkedHChainsLayout(
|
||||
base: val.number,
|
||||
baseHash: val.hash,
|
||||
least: val.number,
|
||||
leastParent: val.parent,
|
||||
final: val.number,
|
||||
finalHash: val.hash)
|
||||
when extraTraceMessages:
|
||||
trace info & ": new layout"
|
||||
|
||||
ctx.lhc.lastLayout = ctx.layout
|
||||
|
||||
|
||||
proc dbInitEra1*(ctx: FlareCtxRef): bool =
|
||||
## Initialise Era1 repo.
|
||||
const info = "dbInitEra1"
|
||||
var specs = ctx.chain.com.networkId.newEra1Desc(ctx.pool.e1Dir).valueOr:
|
||||
return false
|
||||
|
||||
ctx.pool.e1db = specs.e1db
|
||||
|
||||
# Verify that last available block number is available
|
||||
if specs.e1db.hasKey specs.maxNum:
|
||||
ctx.pool.e1AvailMax = specs.maxNum
|
||||
when extraTraceMessages:
|
||||
trace info, lastEra1Block=specs.maxNum.bnStr
|
||||
return true
|
||||
|
||||
# This is a truncated repo. Use bisect for finding the top number assuming
|
||||
# that block numbers availability is contiguous.
|
||||
#
|
||||
# BlockNumber(1) is the least supported block number (was checked
|
||||
# in function `newEra1Desc()`)
|
||||
var
|
||||
minNum = BlockNumber(1)
|
||||
middle = (specs.maxNum + minNum) div 2
|
||||
delta = specs.maxNum - minNum
|
||||
while 1 < delta:
|
||||
if specs.e1db.hasKey middle:
|
||||
minNum = middle
|
||||
else:
|
||||
specs.maxNum = middle
|
||||
middle = (specs.maxNum + minNum) div 2
|
||||
delta = specs.maxNum - minNum
|
||||
|
||||
ctx.pool.e1AvailMax = minNum
|
||||
when extraTraceMessages:
|
||||
trace info, e1AvailMax=minNum.bnStr
|
||||
true
|
||||
|
||||
# ------------------
|
||||
|
||||
proc dbStashHeaders*(
|
||||
ctx: FlareCtxRef;
|
||||
first: BlockNumber;
|
||||
rlpBlobs: openArray[Blob];
|
||||
) =
|
||||
## Temporarily store header chain to persistent db (oblivious of the chain
|
||||
## layout.) Note that headres should not be stashed if they are available
|
||||
## on the `Era1` repo, i.e. if the corresponding block number is at most
|
||||
## `ctx.pool.e1AvailMax`.
|
||||
##
|
||||
const info = "dbStashHeaders"
|
||||
let kvt = ctx.db.ctx.getKvt()
|
||||
for n,data in rlpBlobs:
|
||||
let key = flareHeaderKey(first + n.uint)
|
||||
kvt.put(key.toOpenArray, data).isOkOr:
|
||||
raiseAssert info & ": put() failed: " & $$error
|
||||
when extraTraceMessages:
|
||||
trace info & ": headers stashed", first=first.bnStr, nHeaders=rlpBlobs.len
|
||||
|
||||
proc dbPeekHeader*(ctx: FlareCtxRef; num: BlockNumber): Opt[BlockHeader] =
|
||||
## Retrieve some stashed header.
|
||||
if num <= ctx.pool.e1AvailMax:
|
||||
return ok(ctx.pool.e1db.getEthBlock(num).value.header)
|
||||
let
|
||||
key = flareHeaderKey(num)
|
||||
rc = ctx.db.ctx.getKvt().get(key.toOpenArray)
|
||||
if rc.isOk:
|
||||
try:
|
||||
return ok(rlp.decode(rc.value, BlockHeader))
|
||||
except RlpError:
|
||||
discard
|
||||
err()
|
||||
|
||||
proc dbPeekParentHash*(ctx: FlareCtxRef; num: BlockNumber): Opt[Hash256] =
|
||||
## Retrieve some stashed parent hash.
|
||||
ok (? ctx.dbPeekHeader num).parentHash
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
|
@ -0,0 +1,378 @@
|
|||
# Nimbus
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at
|
||||
# https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at
|
||||
# https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
{.push raises:[].}
|
||||
|
||||
import
|
||||
std/strutils,
|
||||
pkg/[chronicles, chronos],
|
||||
pkg/eth/[common, p2p],
|
||||
pkg/stew/[interval_set, sorted_set],
|
||||
../../../common,
|
||||
../worker_desc,
|
||||
./staged/[headers, linked_hchain],
|
||||
./unproc
|
||||
|
||||
logScope:
|
||||
topics = "flare staged"
|
||||
|
||||
const
|
||||
extraTraceMessages = false # or true
|
||||
## Enabled additional logging noise
|
||||
|
||||
verifyStagedQueueOk = not defined(release) or true
|
||||
## Debugging mode
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private debugging helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
when verifyStagedQueueOk:
|
||||
proc verifyStagedQueue(
|
||||
ctx: FlareCtxRef;
|
||||
info: static[string];
|
||||
multiMode = true;
|
||||
) =
|
||||
## Verify stated queue, check that recorded ranges are no unprocessed,
|
||||
## and return the total sise if headers covered.
|
||||
##
|
||||
# Walk queue items
|
||||
let walk = LinkedHChainQueueWalk.init(ctx.lhc.staged)
|
||||
defer: walk.destroy()
|
||||
|
||||
var
|
||||
stTotal = 0u
|
||||
rc = walk.first()
|
||||
prv = BlockNumber(0)
|
||||
while rc.isOk:
|
||||
let
|
||||
key = rc.value.key
|
||||
nHeaders = rc.value.data.headers.len.uint
|
||||
minPt = key - nHeaders + 1
|
||||
unproc = ctx.unprocCovered(minPt, key)
|
||||
if 0 < unproc:
|
||||
raiseAssert info & ": unprocessed staged chain " &
|
||||
key.bnStr & " overlap=" & $unproc
|
||||
if minPt <= prv:
|
||||
raiseAssert info & ": overlapping staged chain " &
|
||||
key.bnStr & " prvKey=" & prv.bnStr & " overlap=" & $(prv - minPt + 1)
|
||||
stTotal += nHeaders
|
||||
prv = key
|
||||
rc = walk.next()
|
||||
|
||||
# Check `staged[] <= L`
|
||||
if ctx.layout.least <= prv:
|
||||
raiseAssert info & ": staged top mismatch " &
|
||||
" L=" & $ctx.layout.least.bnStr & " stagedTop=" & prv.bnStr
|
||||
|
||||
# Check `unprocessed{} <= L`
|
||||
let uTop = ctx.unprocTop()
|
||||
if ctx.layout.least <= uTop:
|
||||
raiseAssert info & ": unproc top mismatch " &
|
||||
" L=" & $ctx.layout.least.bnStr & " unprocTop=" & uTop.bnStr
|
||||
|
||||
# Check `staged[] + unprocessed{} == (B,L)`
|
||||
if not multiMode:
|
||||
let
|
||||
uTotal = ctx.unprocTotal()
|
||||
both = stTotal + uTotal
|
||||
unfilled = if ctx.layout.least <= ctx.layout.base + 1: 0u
|
||||
else: ctx.layout.least - ctx.layout.base - 1
|
||||
when extraTraceMessages:
|
||||
trace info & ": verify staged", stTotal, uTotal, both, unfilled
|
||||
if both != unfilled:
|
||||
raiseAssert info & ": staged/unproc mismatch " &
|
||||
" staged=" & $stTotal & " unproc=" & $uTotal & " exp-sum=" & $unfilled
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc fetchAndCheck(
|
||||
buddy: FlareBuddyRef;
|
||||
ivReq: BnRange;
|
||||
lhc: ref LinkedHChain; # update in place
|
||||
info: static[string];
|
||||
): Future[bool] {.async.} =
|
||||
## Collect single header chain from the peer and stash it on the `staged`
|
||||
## queue. Returns the length of the stashed chain of headers.
|
||||
##
|
||||
# Fetch headers for this range of block numbers
|
||||
let revHeaders = block:
|
||||
let
|
||||
rc = await buddy.headersFetchReversed(ivReq, lhc.parentHash, info)
|
||||
if rc.isOk:
|
||||
rc.value
|
||||
else:
|
||||
when extraTraceMessages:
|
||||
trace info & ": fetch headers failed", peer=buddy.peer, ivReq
|
||||
if buddy.ctrl.running:
|
||||
# Suspend peer for a while
|
||||
buddy.ctrl.zombie = true
|
||||
return false
|
||||
|
||||
# While assembling a `LinkedHChainRef`, verify that the `revHeaders` list
|
||||
# was sound, i.e. contiguous, linked, etc.
|
||||
if not revHeaders.extendLinkedHChain(buddy, ivReq.maxPt, lhc, info):
|
||||
when extraTraceMessages:
|
||||
trace info & ": fetched headers unusable", peer=buddy.peer, ivReq
|
||||
return false
|
||||
|
||||
return true
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc stagedCollect*(
|
||||
buddy: FlareBuddyRef;
|
||||
info: static[string];
|
||||
): Future[bool] {.async.} =
|
||||
## Collect a batch of chained headers totalling to at most `nHeaders`
|
||||
## headers. Fetch the headers from the the peer and stash it blockwise on
|
||||
## the `staged` queue. The function returns `true` it stashed a header
|
||||
## chains record on `staged` queue.
|
||||
##
|
||||
## This function segments the `nHeaders` length into smaller pieces of at
|
||||
## most `nFetchHeadersRequest` chunks ans fetch these chunks from the
|
||||
## network. Where possible, hashes are used to address the headers to be
|
||||
## fetched. Otherwise the system works opportunistically using block
|
||||
## numbers for fetching, stashing them away to be verified later when
|
||||
## appropriate.
|
||||
##
|
||||
let
|
||||
ctx = buddy.ctx
|
||||
peer = buddy.peer
|
||||
uTop = ctx.unprocTop()
|
||||
|
||||
if uTop == 0:
|
||||
# Nothing to do
|
||||
return false
|
||||
|
||||
let
|
||||
# Check for top header hash. If the range to fetch directly joins below
|
||||
# the top level linked chain `L..F`, then there is the hash available for
|
||||
# the top level header to fetch. Otherwise -- with multi-peer mode -- the
|
||||
# range of headers is fetched opportunistically using block numbers only.
|
||||
isOpportunistic = uTop + 1 != ctx.layout.least
|
||||
|
||||
# Parent hash for `lhc` below
|
||||
topLink = (if isOpportunistic: EMPTY_ROOT_HASH else: ctx.layout.leastParent)
|
||||
|
||||
# Get the total batch size
|
||||
nFetchHeaders = if isOpportunistic: nFetchHeadersOpportunisticly
|
||||
else: nFetchHeadersByTopHash
|
||||
|
||||
# Number of headers to fetch. Take as much as possible if there are many
|
||||
# more to fetch. Otherwise split the remaining part so that there is room
|
||||
# for opportuninstically fetching headers by other many peers.
|
||||
t2 = ctx.unprocTotal div 2
|
||||
nHeaders = if nFetchHeaders.uint < t2: nFetchHeaders.uint
|
||||
elif t2 < nFetchHeadersRequest: nFetchHeaders.uint
|
||||
else: t2
|
||||
|
||||
# Reserve the full range of block numbers so they can be appended in a row.
|
||||
# This avoid some fragmentation when header chains are stashed by multiple
|
||||
# peers, i.e. they interleave peer-wise.
|
||||
iv = ctx.unprocFetch(nHeaders).expect "valid interval"
|
||||
|
||||
var
|
||||
# This value is used for splitting the interval `iv` into
|
||||
# `[iv.minPt, somePt] + [somePt+1, ivTop] + already-collected` where the
|
||||
# middle interval `[somePt+1, ivTop]` will be fetched from the network.
|
||||
ivTop = iv.maxPt
|
||||
|
||||
# This record will accumulate the fetched headers. It must be on the heap
|
||||
# so that `async` can capture that properly.
|
||||
lhc = (ref LinkedHChain)(parentHash: topLink)
|
||||
|
||||
while true:
|
||||
# Extract a top range interval and fetch/stage it
|
||||
let
|
||||
ivReqMin = if ivTop + 1 <= iv.minPt + nFetchHeadersRequest: iv.minPt
|
||||
else: ivTop - nFetchHeadersRequest + 1
|
||||
|
||||
# Request interval
|
||||
ivReq = BnRange.new(ivReqMin, ivTop)
|
||||
|
||||
# Current length of the headers queue. This is one way to calculate
|
||||
# the response length from the network.
|
||||
nLhcHeaders = lhc.headers.len
|
||||
|
||||
# Fetch and extend chain record
|
||||
if not await buddy.fetchAndCheck(ivReq, lhc, info):
|
||||
# Throw away opportunistic data
|
||||
if isOpportunistic or nLhcHeaders == 0:
|
||||
when extraTraceMessages:
|
||||
trace info & ": completely failed", peer, iv, ivReq, isOpportunistic
|
||||
ctx.unprocMerge(iv)
|
||||
return false
|
||||
# It is deterministic. So safe downloaded data so far. Turn back
|
||||
# unused data.
|
||||
when extraTraceMessages:
|
||||
trace info & ": partially failed", peer, iv, ivReq,
|
||||
unused=BnRange.new(iv.minPt,ivTop), isOpportunistic
|
||||
ctx.unprocMerge(iv.minPt, ivTop)
|
||||
break
|
||||
|
||||
# Update remaining interval
|
||||
let ivRespLen = lhc.headers.len - nLhcHeaders
|
||||
if iv.minPt + ivRespLen.uint < ivTop:
|
||||
let newIvTop = ivTop - ivRespLen.uint # will mostly be `ivReq.minPt-1`
|
||||
when extraTraceMessages:
|
||||
trace info & ": collected range", peer, iv=BnRange.new(iv.minPt, ivTop),
|
||||
ivReq, ivResp=BnRange.new(newIvTop+1, ivReq.maxPt), ivRespLen,
|
||||
isOpportunistic
|
||||
ivTop = newIvTop
|
||||
else:
|
||||
break
|
||||
|
||||
# Store `lhcOpt` chain on the `staged` queue
|
||||
let qItem = ctx.lhc.staged.insert(iv.maxPt).valueOr:
|
||||
raiseAssert info & ": duplicate key on staged queue iv=" & $iv
|
||||
qItem.data = lhc[]
|
||||
|
||||
when extraTraceMessages:
|
||||
trace info & ": stashed on staged queue", peer,
|
||||
iv=BnRange.new(iv.maxPt - lhc.headers.len.uint + 1, iv.maxPt),
|
||||
nHeaders=lhc.headers.len, isOpportunistic
|
||||
else:
|
||||
trace info & ": stashed on staged queue", peer,
|
||||
topBlock=iv.maxPt.bnStr, nHeaders=lhc.headers.len, isOpportunistic
|
||||
|
||||
return true
|
||||
|
||||
|
||||
proc stagedProcess*(ctx: FlareCtxRef; info: static[string]): int =
|
||||
## Store/insert stashed chains from the `staged` queue into the linked
|
||||
## chains layout and the persistent tables. The function returns the number
|
||||
## of records processed and saved.
|
||||
while true:
|
||||
# Fetch largest block
|
||||
let qItem = ctx.lhc.staged.le(high BlockNumber).valueOr:
|
||||
break # all done
|
||||
|
||||
let
|
||||
least = ctx.layout.least # `L` from `README.md` (1) or `worker_desc`
|
||||
iv = BnRange.new(qItem.key - qItem.data.headers.len.uint + 1, qItem.key)
|
||||
if iv.maxPt+1 < least:
|
||||
when extraTraceMessages:
|
||||
trace info & ": there is a gap", iv, L=least.bnStr, nSaved=result
|
||||
break # there is a gap -- come back later
|
||||
|
||||
# Overlap must not happen
|
||||
if iv.maxPt+1 != least:
|
||||
raiseAssert info & ": Overlap iv=" & $iv & " L=" & least.bnStr
|
||||
|
||||
# Process item from `staged` queue. So it is not needed in the list,
|
||||
# anymore.
|
||||
discard ctx.lhc.staged.delete(iv.maxPt)
|
||||
|
||||
if qItem.data.hash != ctx.layout.leastParent:
|
||||
# Discard wrong chain.
|
||||
#
|
||||
# FIXME: Does it make sense to keep the `buddy` with the `qItem` chains
|
||||
# list object for marking the buddy a `zombie`?
|
||||
#
|
||||
ctx.unprocMerge(iv)
|
||||
when extraTraceMessages:
|
||||
trace info & ": discarding staged record", iv, L=least.bnStr, lap=result
|
||||
break
|
||||
|
||||
# Store headers on database
|
||||
ctx.dbStashHeaders(iv.minPt, qItem.data.headers)
|
||||
ctx.layout.least = iv.minPt
|
||||
ctx.layout.leastParent = qItem.data.parentHash
|
||||
let ok = ctx.dbStoreLinkedHChainsLayout()
|
||||
|
||||
result.inc # count records
|
||||
|
||||
when extraTraceMessages:
|
||||
trace info & ": staged record saved", iv, layout=ok, nSaved=result
|
||||
|
||||
when not extraTraceMessages:
|
||||
trace info & ": staged records saved",
|
||||
nStaged=ctx.lhc.staged.len, nSaved=result
|
||||
|
||||
if stagedQueueLengthLwm < ctx.lhc.staged.len:
|
||||
when extraTraceMessages:
|
||||
trace info & ": staged queue too large => reorg",
|
||||
nStaged=ctx.lhc.staged.len, max=stagedQueueLengthLwm
|
||||
ctx.poolMode = true
|
||||
|
||||
|
||||
proc stagedReorg*(ctx: FlareCtxRef; info: static[string]) =
|
||||
## Some pool mode intervention.
|
||||
|
||||
if ctx.lhc.staged.len == 0 and
|
||||
ctx.unprocChunks() == 0:
|
||||
# Nothing to do
|
||||
when extraTraceMessages:
|
||||
trace info & ": nothing to do"
|
||||
return
|
||||
|
||||
# Update counter
|
||||
ctx.pool.nReorg.inc
|
||||
|
||||
# Randomise the invocation order of the next few `runMulti()` calls by
|
||||
# asking an oracle whether to run now or later.
|
||||
#
|
||||
# With a multi peer approach, there might be a slow peer invoked first
|
||||
# that is handling the top range and blocking the rest. That causes the
|
||||
# the staged queue to fill up unnecessarily. Then pool mode is called which
|
||||
# ends up here. Returning to multi peer mode, the same invocation order
|
||||
# might be called as before.
|
||||
ctx.setCoinTosser()
|
||||
|
||||
when extraTraceMessages:
|
||||
trace info & ": coin tosser", nCoins=ctx.pool.tossUp.nCoins,
|
||||
coins=(ctx.pool.tossUp.coins.toHex), nLeft=ctx.pool.tossUp.nLeft
|
||||
|
||||
if stagedQueueLengthHwm < ctx.lhc.staged.len:
|
||||
trace info & ": hwm reached, flushing staged queue",
|
||||
nStaged=ctx.lhc.staged.len, max=stagedQueueLengthLwm
|
||||
# Flush `staged` queue into `unproc` so that it can be fetched anew
|
||||
block:
|
||||
let walk = LinkedHChainQueueWalk.init(ctx.lhc.staged)
|
||||
defer: walk.destroy()
|
||||
var rc = walk.first
|
||||
while rc.isOk:
|
||||
let (key, nHeaders) = (rc.value.key, rc.value.data.headers.len.uint)
|
||||
ctx.unprocMerge(key - nHeaders + 1, key)
|
||||
rc = walk.next
|
||||
# Reset `staged` queue
|
||||
ctx.lhc.staged.clear()
|
||||
|
||||
when verifyStagedQueueOk:
|
||||
ctx.verifyStagedQueue(info, multiMode = false)
|
||||
|
||||
when extraTraceMessages:
|
||||
trace info & ": reorg done"
|
||||
|
||||
|
||||
proc stagedTop*(ctx: FlareCtxRef): BlockNumber =
|
||||
## Retrieve to staged block number
|
||||
let qItem = ctx.lhc.staged.le(high BlockNumber).valueOr:
|
||||
return BlockNumber(0)
|
||||
qItem.key
|
||||
|
||||
proc stagedChunks*(ctx: FlareCtxRef): int =
|
||||
## Number of staged records
|
||||
ctx.lhc.staged.len
|
||||
|
||||
# ----------------
|
||||
|
||||
proc stagedInit*(ctx: FlareCtxRef) =
|
||||
## Constructor
|
||||
ctx.lhc.staged = LinkedHChainQueue.init()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
|
@ -0,0 +1,98 @@
|
|||
# Nimbus
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at
|
||||
# https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at
|
||||
# https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
{.push raises:[].}
|
||||
|
||||
import
|
||||
std/options,
|
||||
pkg/[chronicles, chronos, results],
|
||||
pkg/eth/p2p,
|
||||
pkg/stew/interval_set,
|
||||
"../../.."/[protocol, types],
|
||||
../../worker_desc
|
||||
|
||||
logScope:
|
||||
topics = "flare headers"
|
||||
|
||||
const extraTraceMessages = false # or true
|
||||
## Enabled additional logging noise
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc headersFetchReversed*(
|
||||
buddy: FlareBuddyRef;
|
||||
ivReq: BnRange;
|
||||
topHash: Hash256;
|
||||
info: static[string];
|
||||
): Future[Result[seq[BlockHeader],void]]
|
||||
{.async.} =
|
||||
## Get a list of headers in reverse order.
|
||||
let
|
||||
peer = buddy.peer
|
||||
useHash = (topHash != EMPTY_ROOT_HASH)
|
||||
req = block:
|
||||
if useHash:
|
||||
BlocksRequest(
|
||||
maxResults: ivReq.len.uint,
|
||||
skip: 0,
|
||||
reverse: true,
|
||||
startBlock: HashOrNum(
|
||||
isHash: true,
|
||||
hash: topHash))
|
||||
else:
|
||||
BlocksRequest(
|
||||
maxResults: ivReq.len.uint,
|
||||
skip: 0,
|
||||
reverse: true,
|
||||
startBlock: HashOrNum(
|
||||
isHash: false,
|
||||
number: ivReq.maxPt))
|
||||
|
||||
when extraTraceMessages:
|
||||
trace trEthSendSendingGetBlockHeaders & " reverse", peer, ivReq,
|
||||
nReq=req.maxResults, useHash
|
||||
|
||||
# Fetch headers from peer
|
||||
var resp: Option[blockHeadersObj]
|
||||
try:
|
||||
resp = await peer.getBlockHeaders(req)
|
||||
except TransportError as e:
|
||||
`info` info & ", stop", peer, ivReq, nReq=req.maxResults, useHash,
|
||||
error=($e.name), msg=e.msg
|
||||
return err()
|
||||
|
||||
# Beware of peer terminating the session while fetching data
|
||||
if buddy.ctrl.stopped:
|
||||
return err()
|
||||
|
||||
if resp.isNone:
|
||||
when extraTraceMessages:
|
||||
trace trEthRecvReceivedBlockHeaders, peer,
|
||||
ivReq, nReq=req.maxResults, respose="n/a", useHash
|
||||
return err()
|
||||
|
||||
let h: seq[BlockHeader] = resp.get.headers
|
||||
if h.len == 0 or ivReq.len < h.len.uint:
|
||||
when extraTraceMessages:
|
||||
trace trEthRecvReceivedBlockHeaders, peer, ivReq, nReq=req.maxResults,
|
||||
useHash, nResp=h.len
|
||||
return err()
|
||||
|
||||
when extraTraceMessages:
|
||||
trace trEthRecvReceivedBlockHeaders, peer, ivReq, nReq=req.maxResults,
|
||||
useHash, ivResp=BnRange.new(h[^1].number,h[0].number), nResp=h.len
|
||||
|
||||
return ok(h)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
|
@ -0,0 +1,175 @@
|
|||
# Nimbus
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at
|
||||
# https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at
|
||||
# https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
{.push raises:[].}
|
||||
|
||||
import
|
||||
pkg/eth/[common, p2p, rlp],
|
||||
pkg/stew/byteutils,
|
||||
../../../../common,
|
||||
../../worker_desc
|
||||
|
||||
const
|
||||
extraTraceMessages = false # or true
|
||||
## Enabled additional logging noise
|
||||
|
||||
verifyLinkedHChainOk = not defined(release) # or true
|
||||
## Debugging mode
|
||||
|
||||
when extraTraceMessages:
|
||||
import
|
||||
pkg/chronicles,
|
||||
stew/interval_set
|
||||
|
||||
logScope:
|
||||
topics = "flare staged"
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private debugging & logging helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc `$`(w: Hash256): string =
|
||||
w.data.toHex
|
||||
|
||||
formatIt(Hash256):
|
||||
$it
|
||||
|
||||
when verifyLinkedHChainOk:
|
||||
proc verifyHeaderChainItem(lhc: ref LinkedHChain; info: static[string]) =
|
||||
when extraTraceMessages:
|
||||
trace info & ": verifying", nLhc=lhc.headers.len
|
||||
var
|
||||
firstHdr, prvHdr: BlockHeader
|
||||
try:
|
||||
firstHdr = rlp.decode(lhc.headers[0], BlockHeader)
|
||||
doAssert lhc.parentHash == firstHdr.parentHash
|
||||
|
||||
prvHdr = firstHdr
|
||||
for n in 1 ..< lhc.headers.len:
|
||||
let header = rlp.decode(lhc.headers[n], BlockHeader)
|
||||
doAssert lhc.headers[n-1].keccakHash == header.parentHash
|
||||
doAssert prvHdr.number + 1 == header.number
|
||||
prvHdr = header
|
||||
|
||||
doAssert lhc.headers[^1].keccakHash == lhc.hash
|
||||
except RlpError as e:
|
||||
raiseAssert "verifyHeaderChainItem oops(" & $e.name & ") msg=" & e.msg
|
||||
|
||||
when extraTraceMessages:
|
||||
trace info & ": verify ok",
|
||||
iv=BnRange.new(firstHdr.number,prvHdr.number), nLhc=lhc.headers.len
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc newLHChain(
|
||||
rev: seq[BlockHeader];
|
||||
buddy: FlareBuddyRef;
|
||||
blockNumber: BlockNumber;
|
||||
topHash: Hash256;
|
||||
info: static[string];
|
||||
): Opt[ref LinkedHChain] =
|
||||
## Verify list of headers while assembling them to a `LinkedHChain`
|
||||
when extraTraceMessages:
|
||||
trace info, nHeaders=rev.len
|
||||
|
||||
# Verify top block number
|
||||
assert 0 < rev.len # debugging only
|
||||
if rev[0].number != blockNumber:
|
||||
when extraTraceMessages:
|
||||
trace info & ": top block number mismatch",
|
||||
number=rev[0].number.bnStr, expected=blockNumber.bnStr
|
||||
return err()
|
||||
|
||||
# Make space for return code array
|
||||
var chain = (ref LinkedHChain)(headers: newSeq[Blob](rev.len))
|
||||
|
||||
# Set up header with larges block number
|
||||
let blob0 = rlp.encode(rev[0])
|
||||
chain.headers[rev.len-1] = blob0
|
||||
chain.hash = blob0.keccakHash
|
||||
|
||||
# Verify top block hash (if any)
|
||||
if topHash != EMPTY_ROOT_HASH and chain.hash != topHash:
|
||||
when extraTraceMessages:
|
||||
trace info & ": top block hash mismatch",
|
||||
hash=(chain.hash.data.toHex), expected=(topHash.data.toHex)
|
||||
return err()
|
||||
|
||||
# Make sure that block headers are chained
|
||||
for n in 1 ..< rev.len:
|
||||
if rev[n].number + 1 != rev[n-1].number:
|
||||
when extraTraceMessages:
|
||||
trace info & ": #numbers mismatch", n,
|
||||
parentNumber=rev[n-1].number.bnStr, number=rev[n].number.bnStr
|
||||
return err()
|
||||
let blob = rlp.encode(rev[n])
|
||||
if rev[n-1].parentHash != blob.keccakHash:
|
||||
when extraTraceMessages:
|
||||
trace info & ": hash mismatch", n,
|
||||
parentHash=rev[n-1].parentHash, hash=blob.keccakHash
|
||||
return err()
|
||||
chain.headers[rev.len-n-1] = blob
|
||||
|
||||
# Finalise
|
||||
chain.parentHash = rev[rev.len-1].parentHash
|
||||
|
||||
when extraTraceMessages:
|
||||
trace info & " new chain record", nChain=chain.headers.len
|
||||
ok(chain)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc extendLinkedHChain*(
|
||||
rev: seq[BlockHeader];
|
||||
buddy: FlareBuddyRef;
|
||||
blockNumber: BlockNumber;
|
||||
lhc: ref LinkedHChain; # update in place
|
||||
info: static[string];
|
||||
): bool =
|
||||
|
||||
when extraTraceMessages:
|
||||
let
|
||||
peer = buddy.peer
|
||||
isOpportunistic = lhc.parentHash == EMPTY_ROOT_HASH
|
||||
|
||||
let newLhc = rev.newLHChain(buddy, blockNumber, lhc.parentHash, info).valueOr:
|
||||
when extraTraceMessages:
|
||||
trace info & ": fetched headers unusable", peer,
|
||||
blockNumber=blockNumber.bnStr, isOpportunistic
|
||||
return false
|
||||
|
||||
# Prepend `newLhc` before `lhc`
|
||||
#
|
||||
# FIXME: This must be cleaned up and optimised at some point.
|
||||
#
|
||||
when extraTraceMessages:
|
||||
trace info & ": extending chain record", peer,
|
||||
blockNumber=blockNumber.bnStr, len=lhc.headers.len,
|
||||
newLen=(newLhc.headers.len + lhc.headers.len), isOpportunistic
|
||||
|
||||
if lhc.headers.len == 0:
|
||||
lhc.hash = newLhc.hash
|
||||
lhc.headers = newLhc.headers
|
||||
else:
|
||||
lhc.headers = newLhc.headers & lhc.headers
|
||||
lhc.parentHash = newLhc.parentHash
|
||||
|
||||
when verifyLinkedHChainOk:
|
||||
lhc.verifyHeaderChainItem info
|
||||
|
||||
true
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
|
@ -0,0 +1,138 @@
|
|||
# Nimbus
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at
|
||||
# https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at
|
||||
# https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
{.push raises:[].}
|
||||
|
||||
import
|
||||
pkg/bearssl/rand,
|
||||
pkg/chronicles,
|
||||
pkg/eth/[common, p2p],
|
||||
../../protocol,
|
||||
../worker_desc,
|
||||
"."/[staged, unproc]
|
||||
|
||||
when enableTicker:
|
||||
import ./start_stop/ticker
|
||||
|
||||
logScope:
|
||||
topics = "flare start/stop"
|
||||
|
||||
const extraTraceMessages = false or true
|
||||
## Enabled additional logging noise
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
when enableTicker:
|
||||
proc tickerUpdater(ctx: FlareCtxRef): TickerFlareStatsUpdater =
|
||||
## Legacy stuff, will be probably be superseded by `metrics`
|
||||
result = proc: auto =
|
||||
TickerFlareStats(
|
||||
base: ctx.layout.base,
|
||||
least: ctx.layout.least,
|
||||
final: ctx.layout.final,
|
||||
beacon: ctx.lhc.beacon.header.number,
|
||||
nStaged: ctx.stagedChunks(),
|
||||
stagedTop: ctx.stagedTop(),
|
||||
unprocTop: ctx.unprocTop(),
|
||||
nUnprocessed: ctx.unprocTotal(),
|
||||
nUnprocFragm: ctx.unprocChunks(),
|
||||
reorg: ctx.pool.nReorg)
|
||||
|
||||
proc updateBeaconHeaderCB(ctx: FlareCtxRef): SyncReqNewHeadCB =
|
||||
## Update beacon header. This function is intended as a call back function
|
||||
## for the RPC module.
|
||||
when extraTraceMessages:
|
||||
var count = 0
|
||||
result = proc(h: BlockHeader) {.gcsafe, raises: [].} =
|
||||
if ctx.lhc.beacon.header.number < h.number:
|
||||
when extraTraceMessages:
|
||||
if count mod 77 == 0: # reduce some noise
|
||||
trace "updateBeaconHeaderCB", blockNumber=("#" & $h.number), count
|
||||
count.inc
|
||||
ctx.lhc.beacon.header = h
|
||||
ctx.lhc.beacon.changed = true
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
when enableTicker:
|
||||
proc setupTicker*(ctx: FlareCtxRef) =
|
||||
## Helper for `setup()`: Start ticker
|
||||
ctx.pool.ticker = TickerRef.init(ctx.tickerUpdater)
|
||||
|
||||
proc destroyTicker*(ctx: FlareCtxRef) =
|
||||
## Helper for `release()`
|
||||
ctx.pool.ticker.destroy()
|
||||
ctx.pool.ticker = TickerRef(nil)
|
||||
|
||||
else:
|
||||
template setupTicker*(ctx: FlareCtxRef) = discard
|
||||
template destroyTicker*(ctx: FlareCtxRef) = discard
|
||||
|
||||
# ---------
|
||||
|
||||
proc setupRpcMagic*(ctx: FlareCtxRef) =
|
||||
## Helper for `setup()`: Enable external pivot update via RPC
|
||||
ctx.chain.com.syncReqNewHead = ctx.updateBeaconHeaderCB
|
||||
|
||||
proc destroyRpcMagic*(ctx: FlareCtxRef) =
|
||||
## Helper for `release()`
|
||||
ctx.chain.com.syncReqNewHead = SyncReqNewHeadCB(nil)
|
||||
|
||||
# ---------
|
||||
|
||||
proc startBuddy*(buddy: FlareBuddyRef): bool =
|
||||
## Convenience setting for starting a new worker
|
||||
let
|
||||
ctx = buddy.ctx
|
||||
peer = buddy.peer
|
||||
if peer.supports(protocol.eth) and peer.state(protocol.eth).initialized:
|
||||
ctx.pool.nBuddies.inc # for metrics
|
||||
when enableTicker:
|
||||
ctx.pool.ticker.startBuddy()
|
||||
return true
|
||||
|
||||
proc stopBuddy*(buddy: FlareBuddyRef) =
|
||||
buddy.ctx.pool.nBuddies.dec # for metrics
|
||||
when enableTicker:
|
||||
buddy.ctx.pool.ticker.stopBuddy()
|
||||
|
||||
# ---------
|
||||
|
||||
proc flipCoin*(ctx: FlareCtxRef): bool =
|
||||
## This function is intended to randomise recurrent buddy processes. Each
|
||||
## participant fetches a vote via `getVote()` and continues on a positive
|
||||
## vote only. The scheduler will then re-queue the participant.
|
||||
##
|
||||
if ctx.pool.tossUp.nCoins == 0:
|
||||
result = true
|
||||
else:
|
||||
if ctx.pool.tossUp.nLeft == 0:
|
||||
ctx.pool.rng[].generate(ctx.pool.tossUp.coins)
|
||||
ctx.pool.tossUp.nLeft = 8 * sizeof(ctx.pool.tossUp.coins)
|
||||
ctx.pool.tossUp.nCoins.dec
|
||||
ctx.pool.tossUp.nLeft.dec
|
||||
result = bool(ctx.pool.tossUp.coins and 1)
|
||||
ctx.pool.tossUp.coins = ctx.pool.tossUp.coins shr 1
|
||||
|
||||
proc setCoinTosser*(ctx: FlareCtxRef; nCoins = 8u) =
|
||||
## Create a new sequence of `nCoins` oracles.
|
||||
ctx.pool.tossUp.nCoins = nCoins
|
||||
|
||||
proc resCoinTosser*(ctx: FlareCtxRef) =
|
||||
## Set up all oracles to be `true`
|
||||
ctx.pool.tossUp.nCoins = 0
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
|
@ -0,0 +1,170 @@
|
|||
# Nimbus - Fetch account and storage states from peers efficiently
|
||||
#
|
||||
# Copyright (c) 2021-2024 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
||||
# http://opensource.org/licenses/MIT)
|
||||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[strformat, strutils],
|
||||
pkg/[chronos, chronicles, eth/common, stint],
|
||||
../../../../utils/prettify,
|
||||
../../../types
|
||||
|
||||
logScope:
|
||||
topics = "ticker"
|
||||
|
||||
type
|
||||
TickerFlareStatsUpdater* = proc: TickerFlareStats {.gcsafe, raises: [].}
|
||||
## Full sync state update function
|
||||
|
||||
TickerFlareStats* = object
|
||||
## Full sync state (see `TickerFullStatsUpdater`)
|
||||
base*: BlockNumber
|
||||
least*: BlockNumber
|
||||
final*: BlockNumber
|
||||
beacon*: BlockNumber
|
||||
unprocTop*: BlockNumber
|
||||
nUnprocessed*: uint64
|
||||
nUnprocFragm*: int
|
||||
nStaged*: int
|
||||
stagedTop*: BlockNumber
|
||||
reorg*: int
|
||||
|
||||
TickerRef* = ref object
|
||||
## Ticker descriptor object
|
||||
nBuddies: int
|
||||
started: Moment
|
||||
visited: Moment
|
||||
prettyPrint: proc(t: TickerRef) {.gcsafe, raises: [].}
|
||||
flareCb: TickerFlareStatsUpdater
|
||||
lastStats: TickerFlareStats
|
||||
|
||||
const
|
||||
extraTraceMessages = false or true
|
||||
## Enabled additional logging noise
|
||||
|
||||
tickerStartDelay = chronos.milliseconds(100)
|
||||
tickerLogInterval = chronos.seconds(1)
|
||||
tickerLogSuppressMax = chronos.seconds(100)
|
||||
|
||||
logTxt0 = "Flare ticker"
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions: pretty printing
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc pc99(val: float): string =
|
||||
if 0.99 <= val and val < 1.0: "99%"
|
||||
elif 0.0 < val and val <= 0.01: "1%"
|
||||
else: val.toPC(0)
|
||||
|
||||
proc toStr(a: Opt[int]): string =
|
||||
if a.isNone: "n/a"
|
||||
else: $a.unsafeGet
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions: printing ticker messages
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
when extraTraceMessages:
|
||||
template logTxt(info: static[string]): static[string] =
|
||||
logTxt0 & " " & info
|
||||
|
||||
proc flareTicker(t: TickerRef) {.gcsafe.} =
|
||||
let
|
||||
data = t.flareCb()
|
||||
now = Moment.now()
|
||||
|
||||
if data != t.lastStats or
|
||||
tickerLogSuppressMax < (now - t.visited):
|
||||
let
|
||||
B = data.base.toStr
|
||||
L = data.least.toStr
|
||||
F = data.final.toStr
|
||||
Z = data.beacon.toStr
|
||||
staged = if data.nStaged == 0: "n/a"
|
||||
else: data.stagedTop.toStr & "(" & $data.nStaged & ")"
|
||||
unproc = if data.nUnprocFragm == 0: "n/a"
|
||||
else: data.unprocTop.toStr & "(" &
|
||||
data.nUnprocessed.toSI & "," & $data.nUnprocFragm & ")"
|
||||
reorg = data.reorg
|
||||
peers = t.nBuddies
|
||||
|
||||
# With `int64`, there are more than 29*10^10 years range for seconds
|
||||
up = (now - t.started).seconds.uint64.toSI
|
||||
mem = getTotalMem().uint.toSI
|
||||
|
||||
t.lastStats = data
|
||||
t.visited = now
|
||||
|
||||
info logTxt0, up, peers, B, L, F, Z, staged, unproc, reorg, mem
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions: ticking log messages
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc setLogTicker(t: TickerRef; at: Moment) {.gcsafe.}
|
||||
|
||||
proc runLogTicker(t: TickerRef) {.gcsafe.} =
|
||||
t.prettyPrint(t)
|
||||
t.setLogTicker(Moment.fromNow(tickerLogInterval))
|
||||
|
||||
proc setLogTicker(t: TickerRef; at: Moment) =
|
||||
if t.flareCb.isNil:
|
||||
when extraTraceMessages:
|
||||
debug logTxt "was stopped", nBuddies=t.nBuddies
|
||||
else:
|
||||
# Store the `runLogTicker()` in a closure to avoid some garbage collection
|
||||
# memory corruption issues that might occur otherwise.
|
||||
discard setTimer(at, proc(ign: pointer) = runLogTicker(t))
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public constructor and start/stop functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc init*(T: type TickerRef; cb: TickerFlareStatsUpdater): T =
|
||||
## Constructor
|
||||
result = TickerRef(
|
||||
prettyPrint: flareTicker,
|
||||
flareCb: cb,
|
||||
started: Moment.now())
|
||||
result.setLogTicker Moment.fromNow(tickerStartDelay)
|
||||
|
||||
proc destroy*(t: TickerRef) =
|
||||
## Stop ticker unconditionally
|
||||
if not t.isNil:
|
||||
t.flareCb = TickerFlareStatsUpdater(nil)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc startBuddy*(t: TickerRef) =
|
||||
## Increment buddies counter and start ticker unless running.
|
||||
if not t.isNil:
|
||||
if t.nBuddies <= 0:
|
||||
t.nBuddies = 1
|
||||
else:
|
||||
t.nBuddies.inc
|
||||
when extraTraceMessages:
|
||||
debug logTxt "start buddy", nBuddies=t.nBuddies
|
||||
|
||||
proc stopBuddy*(t: TickerRef) =
|
||||
## Decrement buddies counter and stop ticker if there are no more registered
|
||||
## buddies.
|
||||
if not t.isNil:
|
||||
if 0 < t.nBuddies:
|
||||
t.nBuddies.dec
|
||||
when extraTraceMessages:
|
||||
debug logTxt "stop buddy", nBuddies=t.nBuddies
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
|
@ -0,0 +1,105 @@
|
|||
# Nimbus
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at
|
||||
# https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at
|
||||
# https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
{.push raises:[].}
|
||||
|
||||
import
|
||||
pkg/eth/p2p,
|
||||
pkg/results,
|
||||
pkg/stew/interval_set,
|
||||
../worker_desc
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc unprocFetch*(
|
||||
ctx: FlareCtxRef;
|
||||
maxLen: uint64;
|
||||
): Result[BnRange,void] =
|
||||
## Fetch interval from block ranges with maximal size `maxLen`, where
|
||||
## `0` is interpreted as `2^64`.
|
||||
##
|
||||
let
|
||||
q = ctx.lhc.unprocessed
|
||||
|
||||
# Fetch top right interval with largest block numbers
|
||||
jv = q.le().valueOr:
|
||||
return err()
|
||||
|
||||
# Curb interval to maximal length `maxLen`
|
||||
iv = block:
|
||||
if maxLen == 0 or (0 < jv.len and jv.len <= maxLen):
|
||||
jv
|
||||
else:
|
||||
# Curb interval `jv` to length `maxLen`
|
||||
#
|
||||
# Note that either (fringe case):
|
||||
# (`jv.len`==0) => (`jv`==`[0,high(u64)]`) => `jv.maxPt`==`high(u64)`
|
||||
# or (in the non-fringe case)
|
||||
# (`maxLen` < `jv.len`) => (`jv.maxPt` - `maxLen` + 1 <= `jv.maxPt`)
|
||||
#
|
||||
BnRange.new(jv.maxPt - maxLen + 1, jv.maxPt)
|
||||
|
||||
discard q.reduce(iv)
|
||||
ok(iv)
|
||||
|
||||
proc unprocMerge*(ctx: FlareCtxRef; iv: BnRange) =
|
||||
## Merge back unprocessed range
|
||||
discard ctx.lhc.unprocessed.merge(iv)
|
||||
|
||||
proc unprocMerge*(ctx: FlareCtxRef; minPt, maxPt: BlockNumber) =
|
||||
## Ditto
|
||||
discard ctx.lhc.unprocessed.merge(minPt, maxPt)
|
||||
|
||||
|
||||
proc unprocReduce*(ctx: FlareCtxRef; minPt, maxPt: BlockNumber) =
|
||||
## Merge back unprocessed range
|
||||
discard ctx.lhc.unprocessed.reduce(minPt, maxPt)
|
||||
|
||||
|
||||
proc unprocFullyCovered*(
|
||||
ctx: FlareCtxRef; minPt, maxPt: BlockNumber): bool =
|
||||
## Check whether range is fully contained
|
||||
ctx.lhc.unprocessed.covered(minPt, maxPt) == maxPt - minPt + 1
|
||||
|
||||
proc unprocCovered*(ctx: FlareCtxRef; minPt, maxPt: BlockNumber): uint64 =
|
||||
## Check whether range is fully contained
|
||||
ctx.lhc.unprocessed.covered(minPt, maxPt)
|
||||
|
||||
proc unprocCovered*(ctx: FlareCtxRef; pt: BlockNumber): bool =
|
||||
## Check whether point is contained
|
||||
ctx.lhc.unprocessed.covered(pt, pt) == 1
|
||||
|
||||
|
||||
proc unprocClear*(ctx: FlareCtxRef) =
|
||||
ctx.lhc.unprocessed.clear()
|
||||
|
||||
|
||||
proc unprocTop*(ctx: FlareCtxRef): BlockNumber =
|
||||
let iv = ctx.lhc.unprocessed.le().valueOr:
|
||||
return BlockNumber(0)
|
||||
iv.maxPt
|
||||
|
||||
proc unprocTotal*(ctx: FlareCtxRef): uint64 =
|
||||
ctx.lhc.unprocessed.total()
|
||||
|
||||
proc unprocChunks*(ctx: FlareCtxRef): int =
|
||||
ctx.lhc.unprocessed.chunks()
|
||||
|
||||
# ------------
|
||||
|
||||
proc unprocInit*(ctx: FlareCtxRef) =
|
||||
## Constructor
|
||||
ctx.lhc.unprocessed = BnRangeSet.init()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
|
@ -0,0 +1,149 @@
|
|||
# Nimbus
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at
|
||||
# https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at
|
||||
# https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
{.push raises:[].}
|
||||
|
||||
import
|
||||
pkg/[chronicles, chronos],
|
||||
pkg/eth/[common, rlp],
|
||||
pkg/stew/sorted_set,
|
||||
../../sync_desc,
|
||||
../worker_desc,
|
||||
./update/metrics,
|
||||
"."/[db, unproc]
|
||||
|
||||
logScope:
|
||||
topics = "flare update"
|
||||
|
||||
const extraTraceMessages = false # or true
|
||||
## Enabled additional logging noise
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc updateBeaconChange(ctx: FlareCtxRef): bool =
|
||||
##
|
||||
## Layout (see (3) in README):
|
||||
## ::
|
||||
## G B==L==F Z
|
||||
## o----------------o---------------------o---->
|
||||
## | <-- linked --> |
|
||||
##
|
||||
## or
|
||||
## ::
|
||||
## G==Z B==L==F
|
||||
## o----------------o-------------------------->
|
||||
## | <-- linked --> |
|
||||
##
|
||||
## with `Z == beacon.header.number` or `Z == 0`
|
||||
##
|
||||
## to be updated to
|
||||
## ::
|
||||
## G B==L L'==F'
|
||||
## o----------------o---------------------o---->
|
||||
## | <-- linked --> |
|
||||
##
|
||||
const info = "updateBeaconChange"
|
||||
|
||||
var z = ctx.lhc.beacon.header.number
|
||||
|
||||
# Need: `F < Z` and `B == L`
|
||||
if z != 0 and z <= ctx.layout.final: # violates `F < Z`
|
||||
when extraTraceMessages:
|
||||
trace info & ": not applicable",
|
||||
Z=("#" & $z), F=("#" & $ctx.layout.final)
|
||||
return false
|
||||
|
||||
if ctx.layout.base != ctx.layout.least: # violates `B == L`
|
||||
when extraTraceMessages:
|
||||
trace info & ": not applicable",
|
||||
B=("#" & $ctx.layout.base), L=("#" & $ctx.layout.least)
|
||||
return false
|
||||
|
||||
# Check consistency: `B == L <= F` for maximal `B` => `L == F`
|
||||
doAssert ctx.layout.least == ctx.layout.final
|
||||
|
||||
let rlpHeader = rlp.encode(ctx.lhc.beacon.header)
|
||||
|
||||
ctx.lhc.layout = LinkedHChainsLayout(
|
||||
base: ctx.layout.base,
|
||||
baseHash: ctx.layout.baseHash,
|
||||
least: z,
|
||||
leastParent: ctx.lhc.beacon.header.parentHash,
|
||||
final: z,
|
||||
finalHash: rlpHeader.keccakHash)
|
||||
|
||||
# Save this header on the database so it needs not be fetched again from
|
||||
# somewhere else.
|
||||
ctx.dbStashHeaders(z, @[rlpHeader])
|
||||
|
||||
# Save state
|
||||
discard ctx.dbStoreLinkedHChainsLayout()
|
||||
|
||||
# Update range
|
||||
ctx.unprocMerge(ctx.layout.base+1, ctx.layout.least-1)
|
||||
|
||||
when extraTraceMessages:
|
||||
trace info & ": updated"
|
||||
true
|
||||
|
||||
|
||||
proc mergeAdjacentChains(ctx: FlareCtxRef): bool =
|
||||
const info = "mergeAdjacentChains"
|
||||
|
||||
if ctx.lhc.layout.base + 1 < ctx.lhc.layout.least or # gap betw `B` and `L`
|
||||
ctx.lhc.layout.base == ctx.lhc.layout.least: # merged already
|
||||
return false
|
||||
|
||||
# No overlap allowed!
|
||||
doAssert ctx.lhc.layout.base + 1 == ctx.lhc.layout.least
|
||||
|
||||
# Verify adjacent chains
|
||||
if ctx.lhc.layout.baseHash != ctx.lhc.layout.leastParent:
|
||||
# FIXME: Oops -- any better idea than to defect?
|
||||
raiseAssert info & ": hashes do not match" &
|
||||
" B=#" & $ctx.lhc.layout.base & " L=#" & $ctx.lhc.layout.least
|
||||
|
||||
# Merge adjacent linked chains
|
||||
ctx.lhc.layout = LinkedHChainsLayout(
|
||||
base: ctx.layout.final,
|
||||
baseHash: ctx.layout.finalHash,
|
||||
least: ctx.layout.final,
|
||||
leastParent: ctx.dbPeekParentHash(ctx.layout.final).expect "Hash256",
|
||||
final: ctx.layout.final,
|
||||
finalHash: ctx.layout.finalHash)
|
||||
|
||||
# Save state
|
||||
discard ctx.dbStoreLinkedHChainsLayout()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc updateLinkedHChainsLayout*(ctx: FlareCtxRef): bool =
|
||||
## Update layout
|
||||
|
||||
# Check whether there is something to do regarding beacon node change
|
||||
if ctx.lhc.beacon.changed:
|
||||
ctx.lhc.beacon.changed = false
|
||||
result = ctx.updateBeaconChange()
|
||||
|
||||
# Check whether header downloading is done
|
||||
if ctx.mergeAdjacentChains():
|
||||
result = true
|
||||
|
||||
|
||||
proc updateMetrics*(ctx: FlareCtxRef) =
|
||||
ctx.updateMetricsImpl()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
|
@ -0,0 +1,54 @@
|
|||
# Nimbus
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at
|
||||
# https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at
|
||||
# https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
{.push raises:[].}
|
||||
|
||||
import
|
||||
pkg/metrics,
|
||||
../../worker_desc
|
||||
|
||||
declareGauge flare_beacon_block_number, "" &
|
||||
"Block number for latest finalised header"
|
||||
|
||||
declareGauge flare_era1_max_block_number, "" &
|
||||
"Max block number for era1 blocks"
|
||||
|
||||
declareGauge flare_max_trusted_block_number, "" &
|
||||
"Max block number for trusted headers chain starting at genesis"
|
||||
|
||||
declareGauge flare_least_verified_block_number, "" &
|
||||
"Starting block number for verified higher up headers chain"
|
||||
|
||||
declareGauge flare_top_verified_block_number, "" &
|
||||
"Top block number for verified higher up headers chain"
|
||||
|
||||
declareGauge flare_staged_headers_queue_size, "" &
|
||||
"Number of isolated verified header chains, gaps to be filled"
|
||||
|
||||
declareGauge flare_number_of_buddies, "" &
|
||||
"Number of current worker instances"
|
||||
|
||||
declareCounter flare_serial, "" &
|
||||
"Serial counter for debugging"
|
||||
|
||||
template updateMetricsImpl*(ctx: FlareCtxRef) =
|
||||
let now = Moment.now()
|
||||
if ctx.pool.nextUpdate < now:
|
||||
metrics.set(flare_era1_max_block_number, ctx.pool.e1AvailMax.int64)
|
||||
metrics.set(flare_max_trusted_block_number, ctx.layout.base.int64)
|
||||
metrics.set(flare_least_verified_block_number, ctx.layout.least.int64)
|
||||
metrics.set(flare_top_verified_block_number, ctx.layout.final.int64)
|
||||
metrics.set(flare_beacon_block_number, ctx.lhc.beacon.header.number.int64)
|
||||
metrics.set(flare_staged_headers_queue_size, ctx.lhc.staged.len)
|
||||
metrics.set(flare_number_of_buddies, ctx.pool.nBuddies)
|
||||
flare_serial.inc(1)
|
||||
ctx.pool.nextUpdate += metricsUpdateInterval
|
||||
|
||||
# End
|
|
@ -0,0 +1,210 @@
|
|||
# Nimbus
|
||||
# Copyright (c) 2021-2024 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at
|
||||
# https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at
|
||||
# https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
{.push raises:[].}
|
||||
|
||||
import
|
||||
pkg/[bearssl/rand, chronos, chronos/timer],
|
||||
pkg/stew/[interval_set, sorted_set],
|
||||
../../db/era1_db,
|
||||
../sync_desc
|
||||
|
||||
export
|
||||
sync_desc
|
||||
|
||||
const
|
||||
enableTicker* = false or true
|
||||
## Log regular status updates similar to metrics. Great for debugging.
|
||||
|
||||
metricsUpdateInterval* = chronos.seconds(10)
|
||||
## Wait at least this time before next update
|
||||
|
||||
daemonWaitInterval* = chronos.seconds(30)
|
||||
## Some waiting time at the end of the daemon task which always lingers
|
||||
## in the background.
|
||||
|
||||
runMultiIdleWaitInterval* = chronos.seconds(30)
|
||||
## Sllep some time in multi-mode if there is nothing to do
|
||||
|
||||
nFetchHeadersRequest* = 1_024
|
||||
## Number of headers that will be requested with a single `eth/xx` message.
|
||||
## Generously calculating a header with size 1k, fetching 1_024 headers
|
||||
## would amount to a megabyte. As suggested in
|
||||
## github.com/ethereum/devp2p/blob/master/caps/eth.md#blockheaders-0x04,
|
||||
## the size of a message should not exceed 2 MiB.
|
||||
##
|
||||
## On live tests, responses to larger requests where all truncted to 1024
|
||||
## header entries. It makes sense to not ask for more. So reserving
|
||||
## smaller unprocessed slots that mostly all will be served leads to less
|
||||
## fragmentation on a multi-peer downloading approach.
|
||||
|
||||
nFetchHeadersOpportunisticly* = 8 * nFetchHeadersRequest
|
||||
## Length of the request/stage batch. Several headers are consecutively
|
||||
## fetched and stashed together as a single record on the staged queue.
|
||||
## This is the size of an opportunistic run where the record stashed on
|
||||
## the queue might be later discarded.
|
||||
|
||||
nFetchHeadersByTopHash* = 16 * nFetchHeadersRequest
|
||||
## This entry is similar to `nFetchHeadersOpportunisticly` only that it
|
||||
## will always be successfully merged into the database.
|
||||
|
||||
stagedQueueLengthLwm* = 24
|
||||
## Limit the number of records in the staged queue. They start accumulating
|
||||
## if one peer stalls while fetching the top chain so leaving a gap. This
|
||||
## gap must be filled first before inserting the queue into a contiguous
|
||||
## chain of headers. So this is a low-water mark where the system will
|
||||
## try some magic to mitigate this problem.
|
||||
|
||||
stagedQueueLengthHwm* = 40
|
||||
## If this size is exceeded, the staged queue is flushed and its contents
|
||||
## is re-fetched from scratch.
|
||||
|
||||
when enableTicker:
|
||||
import ./worker/start_stop/ticker
|
||||
|
||||
type
|
||||
BnRangeSet* = IntervalSetRef[BlockNumber,uint64]
|
||||
## Disjunct sets of block number intervals
|
||||
|
||||
BnRange* = Interval[BlockNumber,uint64]
|
||||
## Single block number interval
|
||||
|
||||
LinkedHChainQueue* = SortedSet[BlockNumber,LinkedHChain]
|
||||
## Block intervals sorted by largest block number.
|
||||
|
||||
LinkedHChainQueueWalk* = SortedSetWalkRef[BlockNumber,LinkedHChain]
|
||||
## Traversal descriptor
|
||||
|
||||
LinkedHChain* = object
|
||||
## Public block items for the `LinkedHChainQueue` list, indexed by
|
||||
## largest block number.
|
||||
##
|
||||
## FIXME: `headers[]` should be reversed, i.e. `headers[0]` has the
|
||||
## highest block number. This makes it natural to extend the
|
||||
## sequence with parent headers at the growing end.
|
||||
##
|
||||
parentHash*: Hash256 ## Parent hash of `headers[0]`
|
||||
headers*: seq[Blob] ## Encoded linked header chain
|
||||
hash*: Hash256 ## Hash of `headers[^1]`
|
||||
|
||||
# -------------------
|
||||
|
||||
LinkedHChainsLayout* = object
|
||||
## Layout of a triple of linked header chains
|
||||
## ::
|
||||
## G B L F
|
||||
## o----------------o---------------------o----------------o--->
|
||||
## | <-- linked --> | <-- unprocessed --> | <-- linked --> |
|
||||
##
|
||||
## see `README.md` for details and explanations
|
||||
##
|
||||
base*: BlockNumber
|
||||
## `B`, maximal block number of linked chain starting at Genesis `G`
|
||||
baseHash*: Hash256
|
||||
## Hash of `B`
|
||||
|
||||
least*: BlockNumber
|
||||
## `L`, minimal block number of linked chain ending at `F` with `B <= L`
|
||||
leastParent*: Hash256
|
||||
## Parent hash of `L` (similar to `parentHash` in `HeaderChainItemRef`)
|
||||
|
||||
final*: BlockNumber
|
||||
## `F`, some finalised block
|
||||
finalHash*: Hash256
|
||||
## Hash of `F` (similar to `hash` in `HeaderChainItemRef`)
|
||||
|
||||
BeaconHeader* = object
|
||||
## Beacon state to be implicitely updated by RPC method
|
||||
changed*: bool ## Set a marker if something has changed
|
||||
header*: BlockHeader ## Running on beacon chain, last header
|
||||
slow_start*: float ## Share of block number to use if positive
|
||||
|
||||
LinkedHChainsSync* = object
|
||||
## Sync state for linked header chains
|
||||
beacon*: BeaconHeader ## See `Z` in README
|
||||
unprocessed*: BnRangeSet ## Block or header ranges to fetch
|
||||
staged*: LinkedHChainQueue ## Blocks fetched but not stored yet
|
||||
layout*: LinkedHChainsLayout ## Current header chains layout
|
||||
lastLayout*: LinkedHChainsLayout ## Previous layout (for delta update)
|
||||
|
||||
# -------------------
|
||||
|
||||
FlareBuddyData* = object
|
||||
## Local descriptor data extension
|
||||
fetchBlocks*: BnRange
|
||||
|
||||
FlareTossUp* = object
|
||||
## Reminiscent of CSMA/CD. For the next number `nCoins` in a row, each
|
||||
## participant can fetch a `true`/`false` value to decide whether to
|
||||
## start doing something or delay.
|
||||
nCoins*: uint ## Numner of coins to toss in a row
|
||||
nLeft*: uint ## Number of flopped coins left
|
||||
coins*: uint64 ## Sequence of fliopped coins
|
||||
|
||||
FlareCtxData* = object
|
||||
## Globally shared data extension
|
||||
rng*: ref HmacDrbgContext ## Random generator, pre-initialised
|
||||
lhcSyncState*: LinkedHChainsSync ## Syncing by linked header chains
|
||||
tossUp*: FlareTossUp ## Reminiscent of CSMA/CD
|
||||
nextUpdate*: Moment ## For updating metrics
|
||||
|
||||
# Era1 related, disabled if `e1db` is `nil`
|
||||
e1Dir*: string ## Pre-merge archive (if any)
|
||||
e1db*: Era1DbRef ## Era1 db handle (if any)
|
||||
e1AvailMax*: BlockNumber ## Last Era block applicable here
|
||||
|
||||
# Info stuff, no functional contribution
|
||||
nBuddies*: int ## Number of active workers (info only)
|
||||
nReorg*: int ## Number of reorg invocations (info only)
|
||||
|
||||
# Debugging stuff
|
||||
when enableTicker:
|
||||
ticker*: TickerRef ## Logger ticker
|
||||
|
||||
FlareBuddyRef* = BuddyRef[FlareCtxData,FlareBuddyData]
|
||||
## Extended worker peer descriptor
|
||||
|
||||
FlareCtxRef* = CtxRef[FlareCtxData]
|
||||
## Extended global descriptor
|
||||
|
||||
static:
|
||||
doAssert 0 < nFetchHeadersRequest
|
||||
doAssert nFetchHeadersRequest <= nFetchHeadersOpportunisticly
|
||||
doAssert nFetchHeadersRequest <= nFetchHeadersByTopHash
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
func lhc*(ctx: FlareCtxRef): var LinkedHChainsSync =
|
||||
## Shortcut
|
||||
ctx.pool.lhcSyncState
|
||||
|
||||
func layout*(ctx: FlareCtxRef): var LinkedHChainsLayout =
|
||||
## Shortcut
|
||||
ctx.pool.lhcSyncState.layout
|
||||
|
||||
func db*(ctx: FlareCtxRef): CoreDbRef =
|
||||
## Getter
|
||||
ctx.chain.com.db
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public logging/debugging helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc `$`*(w: BnRange): string =
|
||||
if w.len == 1: $w.minPt else: $w.minPt & ".." & $w.maxPt
|
||||
|
||||
proc bnStr*(w: BlockNumber): string =
|
||||
"#" & $w
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
Loading…
Reference in New Issue