Flare sync (#2627)

* Cosmetics, small fixes, add stashed headers verifier

* Remove direct `Era1` support

why:
  Era1 is indirectly supported by using the import tool before syncing.

* Clarify database persistent save function.

why:
  Function relied on the last saved state block number which was wrong.
  It now relies on the tx-level. If it is 0, then data are saved directly.
  Otherwise the task that owns the tx will do it.

* Extracted configuration constants into separate file

* Enable single peer mode for debugging

* Fix peer losing issue in multi-mode

details:
  Running concurrent download peers was previously programmed as running
  a batch downloading and storing ~8k headers and then leaving the `async`
  function to be restarted by a scheduler.

  This was unfortunate because of occasionally occurring long waiting
  times for restart.

  While the time gap until restarting were typically observed a few
  millisecs, there were always a few outliers which well exceed several
  seconds. This seemed to let remote peers run into timeouts.

* Prefix function names `unprocXxx()` and `stagedYyy()` by `headers`

why:
  There will be other `unproc` and `staged` modules.

* Remove cruft, update logging

* Fix accounting issue

details:
  When staging after fetching headers from the network, there was an off
  by 1 error occurring when the result was by one smaller than requested.
  Also, a whole range was mis-accounted when a peer was terminating
  connection immediately after responding.

* Fix slow/error header accounting when fetching

why:
  Originally set for detecting slow headers in a row, the counter
  was wrongly extended to general errors.

* Ban peers for a while that respond with too few headers continuously

why:
  Some peers only returned one header at a time. If these peers sit on a
  farm, they might collectively slow down the download process.

* Update RPC beacon header updater

why:
  Old function hook has slightly changed its meaning since it was used
  for snap sync. Also, the old hook is used by other functions already.

* Limit number of peers or set to single peer mode

details:
  Merge several concepts, single peer mode being one of it.

* Some code clean up, fixings for removing of compiler warnings

* De-noise header fetch related sources

why:
  Header download looks relatively stable, so general debugging is not
  needed, anymore. This is the equivalent of removing the scaffold from
  the part of the building where work has completed.

* More clean up and code prettification for headers stuff

* Implement body fetch and block import

details:
  Available headers are used stage blocks by combining existing headers
  with newly fetched blocks. Then these blocks are imported/executed via
  `persistBlocks()`.

* Logger cosmetics and cleanup

* Remove staged block queue debugging

details:
  Feature still available, just not executed anymore

* Docu, logging update

* Update/simplify `runDaemon()`

* Re-calibrate block body requests and soft config for import blocks batch

why:
* For fetching, larger fetch requests are mostly truncated anyway on
  MainNet.
* For executing, smaller batch sizes reduce the memory needed for the
  price of longer execution times.

* Update metrics counters

* Docu update

* Some fixes, formatting updates, etc.

* Update `borrowed` type: uint -. uint64

also:
  Always convert to `uint64` rather than `uint` where appropriate
This commit is contained in:
Jordan Hrycaj 2024-09-27 15:07:42 +00:00 committed by GitHub
parent db8b68a28c
commit 0d2a72d2a9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 2010 additions and 1042 deletions

View File

@ -194,7 +194,7 @@ available.)
cases when the `gc` is involved in a memory corruption or corruption
camouflage.
* ENABLE_LINE_NUMBERS=1
* ENABLE_LINE_NUMBERS=1<br>
Enables logger to print out source code location with log message
* ENABLE_EVMC=1<br>

View File

@ -116,6 +116,13 @@ proc forkchoiceUpdated*(ben: BeaconEngineRef,
# Update sync header (if any)
com.syncReqNewHead(header)
# Pass on finalised header
if com.haveSyncFinalisedBlockHash():
let finalizedBlockHash = ethHash update.finalizedBlockHash
if finalizedBlockHash != common.Hash256():
com.syncFinalisedBlockHash(finalizedBlockHash)
return simpleFCU(PayloadExecutionStatus.syncing)
validateHeaderTimestamp(header, com, apiVersion)

View File

@ -42,6 +42,9 @@ type
SyncReqNewHeadCB* = proc(header: BlockHeader) {.gcsafe, raises: [].}
## Update head for syncing
SyncFinalisedBlockHashCB* = proc(hash: Hash256) {.gcsafe, raises: [].}
## Ditto
NotifyBadBlockCB* = proc(invalid, origin: BlockHeader) {.gcsafe, raises: [].}
## Notify engine-API of encountered bad block
@ -76,6 +79,10 @@ type
## Call back function for the sync processor. This function stages
## the arguent header to a private aerea for subsequent processing.
syncFinalisedBlockHash: SyncFinalisedBlockHashCB
## Call back function for a sync processor that returns the canonical
## header.
notifyBadBlock: NotifyBadBlockCB
## Allow synchronizer to inform engine-API of bad encountered during sync
## progress
@ -401,10 +408,18 @@ proc consensus*(com: CommonRef, header: BlockHeader): ConsensusType =
proc syncReqNewHead*(com: CommonRef; header: BlockHeader)
{.gcsafe, raises: [].} =
## Used by RPC to update the beacon head for snap sync
## Used by RPC updater
if not com.syncReqNewHead.isNil:
com.syncReqNewHead(header)
func haveSyncFinalisedBlockHash*(com: CommonRef): bool =
not com.syncFinalisedBlockHash.isNil
proc syncFinalisedBlockHash*(com: CommonRef; hash: Hash256) =
## Used by RPC updater
if not com.syncFinalisedBlockHash.isNil:
com.syncFinalisedBlockHash(hash)
proc notifyBadBlock*(com: CommonRef; invalid, origin: BlockHeader)
{.gcsafe, raises: [].} =
@ -513,6 +528,10 @@ func `syncReqNewHead=`*(com: CommonRef; cb: SyncReqNewHeadCB) =
## Activate or reset a call back handler for syncing.
com.syncReqNewHead = cb
func `syncFinalisedBlockHash=`*(com: CommonRef; cb: SyncFinalisedBlockHashCB) =
## Activate or reset a call back handler for syncing.
com.syncFinalisedBlockHash = cb
func `notifyBadBlock=`*(com: CommonRef; cb: NotifyBadBlockCB) =
## Activate or reset a call back handler for bad block notification.
com.notifyBadBlock = cb

View File

@ -385,6 +385,12 @@ type
defaultValueDesc: $ProtocolFlag.Eth
name: "protocols" .}: seq[string]
flareChunkSize* {.
hidden
desc: "Number of blocks per database transaction for flare sync"
defaultValue: 0
name: "debug-flare-chunk-size" .}: int
rocksdbMaxOpenFiles {.
hidden
defaultValue: defaultMaxOpenFiles

View File

@ -145,8 +145,7 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
)
of SyncMode.Flare:
nimbus.flareSyncRef = FlareSyncRef.init(
nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers,
conf.era1Dir.string)
nimbus.ethNode, nimbus.chainRef, conf.maxPeers, conf.flareChunkSize)
# Connect directly to the static nodes
let staticPeers = conf.getStaticPeers()

View File

@ -17,7 +17,7 @@ import
"."/[sync_desc, sync_sched, protocol]
logScope:
topics = "beacon2-sync"
topics = "flare"
type
FlareSyncRef* = RunnerSyncRef[FlareCtxData,FlareBuddyData]
@ -107,14 +107,13 @@ proc init*(
T: type FlareSyncRef;
ethNode: EthereumNode;
chain: ForkedChainRef;
rng: ref HmacDrbgContext;
maxPeers: int;
era1Dir: string;
chunkSize: int;
): T =
new result
result.initSync(ethNode, chain, maxPeers)
result.ctx.pool.rng = rng
result.ctx.pool.e1Dir = era1Dir
var desc = T()
desc.initSync(ethNode, chain, maxPeers)
desc.ctx.pool.nBodiesBatch = chunkSize
desc
proc start*(ctx: FlareSyncRef) =
## Beacon Sync always begin with stop mode

View File

@ -44,8 +44,7 @@ Meaning of *G*, *B*, *L*, *F*:
This definition implies *G <= B <= L <= F* and the header chains can uniquely
be described by the triple of block numbers *(B,L,F)*.
Storage of header chains:
-------------------------
### Storage of header chains:
Some block numbers from the set *{w|G<=w<=B}* may correspond to finalised
blocks which may be stored anywhere. If some block numbers do not correspond
@ -56,8 +55,7 @@ a sub-chain starting at *G*.
The block numbers from the set *{w|L<=w<=F}* must reside in the *flareHeader*
database table. They do not correspond to finalised blocks.
Header chains initialisation:
-----------------------------
### Header chains initialisation:
Minimal layout on a pristine system
@ -69,8 +67,7 @@ Minimal layout on a pristine system
When first initialised, the header chains are set to *(G,G,G)*.
Updating header chains:
-----------------------
### Updating header chains:
A header chain with an non empty open interval *(B,L)* can be updated only by
increasing *B* or decreasing *L* by adding headers so that the linked chain
@ -105,18 +102,7 @@ with *L'=Z* and *F'=Z*.
Note that diagram *(3)* is a generalisation of *(2)*.
Era1 repository support:
------------------------
For the initial blocks, *Era1* repositories are supported for *MainNet*
and *Sepolia*. They might be truncated at the top, condition is that provide
consecutive blocks (like the headr chains) and start at *Genesis*.
In that case, the position *B* will be immediately set to the largest available
*Era1* block number without storing any headers.
Complete header chain:
----------------------
### Complete header chain:
The header chain is *relatively complete* if it satisfies clause *(3)* above
for *G < B*. It is *fully complete* if *F==Z*. It should be obvious that the
@ -127,3 +113,126 @@ If a *relatively complete* header chain is reached for the first time, the
execution layer can start running an importer in the background compiling
or executing blocks (starting from block number *#1*.) So the ledger database
state will be updated incrementally.
Imported block chain
--------------------
The following imported block chain diagram amends the layout *(1)*:
G T B L F (5)
o------------------o-------o---------------------o----------------o-->
| <-- imported --> | | | |
| <------- linked ------> | <-- unprocessed --> | <-- linked --> |
where *T* is the number of the last imported and executed block. Coincidentally,
*T* also refers to the global state of the ledger database.
The headers corresponding to the half open interval `(T,B]` can be completed by
fetching block bodies and then imported/executed.
Running the sync process for *MainNet*
--------------------------------------
For syncing, a beacon node is needed that regularly informs via *RPC* of a
recently finalised block header.
The beacon node program used here is the *nimbus_beacon_node* binary from the
*nimbus-eth2* project (any other will do.) *Nimbus_beacon_node* is started as
./run-mainnet-beacon-node.sh \
--web3-url=http://127.0.0.1:8551 \
--jwt-secret=/tmp/jwtsecret
where *http://127.0.0.1:8551* is the URL of the sync process that receives the
finalised block header (here on the same physical machine) and `/tmp/jwtsecret`
is the shared secret file needed for mutual communication authentication.
It will take a while for *nimbus_beacon_node* to catch up (see the
[Nimbus Guide](https://nimbus.guide/quick-start.html) for details.)
### Starting `nimbus` for syncing
As the sync process is quite slow, it makes sense to pre-load the database
with data from an `Era1` archive (if available) before starting the real
sync process. The command would be something like
./build/nimbus import \
--era1-dir:/path/to/main-era1/repo \
...
which will take a while for the full *MainNet* era1 repository (but way faster
than the sync.)
On a system with memory considerably larger than *8GiB* the *nimbus*
binary is started on the same machine where the beacon node runs as
./build/nimbus \
--network=mainnet \
--sync-mode=flare \
--engine-api=true \
--engine-api-port=8551 \
--engine-api-ws=true \
--jwt-secret=/tmp/jwtsecret \
...
Note that *--engine-api-port=8551* and *--jwt-secret=/tmp/jwtsecret* match
the corresponding options from the *nimbus-eth2* beacon source example.
### Syncing on a low memory machine
On a system with memory with *8GiB* the following additional options proved
useful for *nimbus* to reduce the memory footprint.
For the *Era1* pre-load (if any) the following extra options apply to
"*nimbus import*":
--chunk-size=1024
--debug-rocksdb-row-cache-size=512000
--debug-rocksdb-block-cache-size=1500000
To start syncing, the following additional options apply to *nimbus*:
--debug-flare-chunk-size=384
--debug-rocksdb-max-open-files=384
--debug-rocksdb-write-buffer-size=50331648
--debug-rocksdb-block-cache-size=1073741824
--debug-rdb-key-cache-size=67108864
--debug-rdb-vtx-cache-size=268435456
Also, to reduce the backlog for *nimbus-eth2* stored on disk, the following
changes might be considered. For file
*nimbus-eth2/vendor/mainnet/metadata/config.yaml* change setting constants:
MIN_EPOCHS_FOR_BLOCK_REQUESTS: 33024
MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: 4096
to
MIN_EPOCHS_FOR_BLOCK_REQUESTS: 8
MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: 8
Caveat: These changes are not useful when running *nimbus_beacon_node* as a
production system.
Metrics
-------
The following metrics are defined in *worker/update/metrics.nim* which will
be available if *nimbus* is compiled with the additional make flags
*NIMFLAGS="-d:metrics \-\-threads:on"*:
| *Variable* | *Logic type* | *Short description* |
|:------------------------------|:------------:|:--------------------|
| | | |
| flare_state_block_number | block height | **T**, *increasing* |
| flare_base_block_number | block height | **B**, *increasing* |
| flare_least_block_number | block height | **L** |
| flare_final_block_number | block height | **F**, *increasing* |
| flare_beacon_block_number | block height | **Z**, *increasing* |
| | | |
| flare_headers_staged_queue_len| size | # of staged header list records |
| flare_headers_unprocessed | size | # of accumulated header block numbers|
| flare_blocks_staged_queue_len | size | # of staged block list records |
| flare_blocks_unprocessed | size | # of accumulated body block numbers |
| | | |
| flare_number_of_buddies | size | # of working peers |

View File

@ -1,6 +1,2 @@
* Update/resolve code fragments which are tagged FIXME
* Check noisy and verification sections whether they are really wanted
when going into production
+ **extraTraceMessages**
+ **verifyDataStructureOk**
* Remove debug.nim file with a separate PR

View File

@ -16,14 +16,39 @@ import
pkg/eth/[common, p2p],
pkg/stew/[interval_set, sorted_set],
../../common,
./worker/[db, staged, start_stop, unproc, update],
./worker/[blocks_staged, db, headers_staged, headers_unproc,
start_stop, update],
./worker_desc
logScope:
topics = "flare"
const extraTraceMessages = false or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc headersToFetchOk(buddy: FlareBuddyRef): bool =
0 < buddy.ctx.headersUnprocTotal() and
buddy.ctrl.running and
not buddy.ctx.poolMode
proc bodiesToFetchOk(buddy: FlareBuddyRef): bool =
buddy.ctx.blocksStagedFetchOk() and
buddy.ctrl.running and
not buddy.ctx.poolMode
proc napUnlessSomethingToFetch(
buddy: FlareBuddyRef;
info: static[string];
): Future[bool] {.async.} =
## When idle, save cpu cycles waiting for something to do.
if buddy.ctx.pool.importRunningOk or
not (buddy.headersToFetchOk() or
buddy.bodiesToFetchOk()):
debug info & ": idly wasting time", peer=buddy.peer
await sleepAsync workerIdleWaitInterval
return true
return false
# ------------------------------------------------------------------------------
# Public start/stop and admin functions
@ -34,11 +59,8 @@ proc setup*(ctx: FlareCtxRef): bool =
debug "RUNSETUP"
ctx.setupRpcMagic()
# Setup `Era1` access (if any) before setting up layout.
discard ctx.dbInitEra1()
# Load initial state from database if there is any
ctx.dbLoadLinkedHChainsLayout()
ctx.setupDatabase()
# Debugging stuff, might be an empty template
ctx.setupTicker()
@ -56,15 +78,24 @@ proc release*(ctx: FlareCtxRef) =
proc start*(buddy: FlareBuddyRef): bool =
## Initialise worker peer
if buddy.startBuddy():
buddy.ctrl.multiOk = true
debug "RUNSTART", peer=buddy.peer, multiOk=buddy.ctrl.multiOk
return true
debug "RUNSTART failed", peer=buddy.peer
const info = "RUNSTART"
if runsThisManyPeersOnly <= buddy.ctx.pool.nBuddies:
debug info & " peer limit reached", peer=buddy.peer
return false
if not buddy.startBuddy():
debug info & " failed", peer=buddy.peer
return false
buddy.ctrl.multiOk = true
debug info, peer=buddy.peer
true
proc stop*(buddy: FlareBuddyRef) =
## Clean up this peer
debug "RUNSTOP", peer=buddy.peer
debug "RUNSTOP", peer=buddy.peer, nInvocations=buddy.only.nMultiLoop,
lastIdleGap=buddy.only.multiRunIdle.toStr(2)
buddy.stopBuddy()
# ------------------------------------------------------------------------------
@ -81,12 +112,31 @@ proc runDaemon*(ctx: FlareCtxRef) {.async.} =
const info = "RUNDAEMON"
debug info
# Check for a possible layout change of the `HeaderChainsSync` state
if ctx.updateLinkedHChainsLayout():
debug info & ": headers chain layout was updated"
# Check for a possible header layout and body request changes
discard ctx.updateLinkedHChainsLayout()
discard ctx.updateBlockRequests()
# Execute staged block records.
if ctx.blocksStagedCanImportOk():
block:
# Set advisory flag telling that a slow/long running process will take
# place. This works a bit like `runSingle()` only that in the case here
# we might have no peer.
ctx.pool.importRunningOk = true
defer: ctx.pool.importRunningOk = false
# Import from staged queue.
while ctx.blocksStagedImport info:
ctx.updateMetrics()
# Allow pseudo/async thread switch
await sleepAsync asyncThreadSwitchTimeSlot
# At the end of the cycle, leave time to refill
await sleepAsync daemonWaitInterval
ctx.updateMetrics()
await sleepAsync daemonWaitInterval
proc runSingle*(buddy: FlareBuddyRef) {.async.} =
@ -101,7 +151,8 @@ proc runSingle*(buddy: FlareBuddyRef) {.async.} =
##
## Note that this function runs in `async` mode.
##
raiseAssert "RUNSINGLE should not be used: peer=" & $buddy.peer
const info = "RUNSINGLE"
raiseAssert info & " should not be used: peer=" & $buddy.peer
proc runPool*(buddy: FlareBuddyRef; last: bool; laps: int): bool =
@ -121,9 +172,8 @@ proc runPool*(buddy: FlareBuddyRef; last: bool; laps: int): bool =
## Note that this function does not run in `async` mode.
##
const info = "RUNPOOL"
when extraTraceMessages:
debug info, peer=buddy.peer, laps
buddy.ctx.stagedReorg info # reorg
#debug info, peer=buddy.peer, laps
buddy.ctx.headersStagedReorg info # reorg
true # stop
@ -133,36 +183,78 @@ proc runMulti*(buddy: FlareBuddyRef) {.async.} =
## instance can be simultaneously active for all peer workers.
##
const info = "RUNMULTI"
let
ctx = buddy.ctx
peer = buddy.peer
let peer = buddy.peer
if ctx.unprocTotal() == 0 and ctx.stagedChunks() == 0:
# Save cpu cycles waiting for something to do
when extraTraceMessages:
debug info & ": idle wasting time", peer
await sleepAsync runMultiIdleWaitInterval
return
if 0 < buddy.only.nMultiLoop: # statistics/debugging
buddy.only.multiRunIdle = Moment.now() - buddy.only.stoppedMultiRun
buddy.only.nMultiLoop.inc # statistics/debugging
if not ctx.flipCoin():
# Come back next time
when extraTraceMessages:
debug info & ": running later", peer
return
trace info, peer, nInvocations=buddy.only.nMultiLoop,
lastIdleGap=buddy.only.multiRunIdle.toStr(2)
# * get unprocessed range from pool
# * fetch headers for this range (as much as one can get)
# * verify that a block is sound, i.e. contiguous, chained by parent hashes
# * return remaining range to unprocessed range in the pool
# * store this range on the staged queue on the pool
if await buddy.stagedCollect info:
# * increase the top/right interval of the trused range `[L,F]`
# * save updated state and headers
discard buddy.ctx.stagedProcess info
# Update beacon header when needed. For the beacon header, a hash will be
# auto-magically made available via RPC. The corresponding header is then
# fetched from the current peer.
await buddy.headerStagedUpdateBeacon info
else:
when extraTraceMessages:
debug info & ": nothing fetched, done", peer
if not await buddy.napUnlessSomethingToFetch info:
#
# Layout of a triple of linked header chains (see `README.md`)
# ::
# G B L F
# | <--- [G,B] --> | <----- (B,L) -----> | <-- [L,F] ---> |
# o----------------o---------------------o----------------o--->
# | <-- linked --> | <-- unprocessed --> | <-- linked --> |
#
# This function is run concurrently for fetching the next batch of
# headers and stashing them on the database. Each concurrently running
# actor works as follows:
#
# * Get a range of block numbers from the `unprocessed` range `(B,L)`.
# * Fetch headers for this range (as much as one can get).
# * Stash then on the database.
# * Rinse and repeat.
#
# The block numbers range concurrently taken from `(B,L)` are chosen
# from the upper range. So exactly one of the actors has a range
# `[whatever,L-1]` adjacent to `[L,F]`. Call this actor the lead actor.
#
# For the lead actor, headers can be downloaded all by the hashes as
# the parent hash for the header with block number `L` is known. All
# other non-lead actors will download headers by the block number only
# and stage it to be re-ordered and stashed on the database when ready.
#
# Once the lead actor stashes the dowloaded headers, the other staged
# headers will also be stashed on the database until there is a gap or
# the stashed haeders are exhausted.
#
# Due to the nature of the `async` logic, the current lead actor will
# stay lead when fetching the next range of block numbers.
#
while buddy.headersToFetchOk():
# * Get unprocessed range from pool
# * Fetch headers for this range (as much as one can get)
# * Verify that a block is contiguous, chained by parent hash, etc.
# * Stash this range on the staged queue on the pool
if await buddy.headersStagedCollect info:
# * Save updated state and headers
# * Decrease the left boundary `L` of the trusted range `[L,F]`
discard buddy.ctx.headersStagedProcess info
# Fetch bodies and combine them with headers to blocks to be staged. These
# staged blocks are then excuted by the daemon process (no `peer` needed.)
while buddy.bodiesToFetchOk():
discard await buddy.blocksStagedCollect info
# Note that it is important **not** to leave this function to be
# re-invoked by the scheduler unless necessary. While the time gap
# until restarting is typically a few millisecs, there are always
# outliers which well exceed several seconds. This seems to let
# remote peers run into timeouts.
buddy.only.stoppedMultiRun = Moment.now() # statistics/debugging
# ------------------------------------------------------------------------------
# End

View File

@ -0,0 +1,277 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises:[].}
import
pkg/[chronicles, chronos],
pkg/eth/[common, p2p],
pkg/stew/[interval_set, sorted_set],
../../../common,
../../../core/chain,
../worker_desc,
./blocks_staged/bodies,
"."/[blocks_unproc, db]
logScope:
topics = "flare blocks"
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc fetchAndCheck(
buddy: FlareBuddyRef;
ivReq: BnRange;
blk: ref BlocksForImport; # update in place
info: static[string];
): Future[bool] {.async.} =
let
ctx = buddy.ctx
offset = blk.blocks.len.uint64
# Make sure that the block range matches the top
doAssert offset == 0 or blk.blocks[offset - 1].header.number+1 == ivReq.minPt
# Preset/append headers to be completed with bodies. Also collect block hashes
# for fetching missing blocks.
blk.blocks.setLen(offset + ivReq.len)
var blockHash = newSeq[Hash256](ivReq.len)
for n in 1u ..< ivReq.len:
let header = ctx.dbPeekHeader(ivReq.minPt + n).expect "stashed header"
blockHash[n - 1] = header.parentHash
blk.blocks[offset + n].header = header
blk.blocks[offset].header =
ctx.dbPeekHeader(ivReq.minPt).expect "stashed header"
blockHash[ivReq.len - 1] =
rlp.encode(blk.blocks[offset + ivReq.len - 1].header).keccakHash
# Fetch bodies
let bodies = block:
let rc = await buddy.bodiesFetch(blockHash, info)
if rc.isErr:
blk.blocks.setLen(offset)
return false
rc.value
# Append bodies, note that the bodies are not fully verified here but rather
# when they are imported and executed.
let nBodies = bodies.len.uint64
if nBodies < ivReq.len:
blk.blocks.setLen(offset + nBodies)
block loop:
for n in 0 ..< nBodies:
block checkTxLenOk:
if blk.blocks[offset + n].header.txRoot != EMPTY_ROOT_HASH:
if 0 < bodies[n].transactions.len:
break checkTxLenOk
else:
if bodies[n].transactions.len == 0:
break checkTxLenOk
# Oops, cut off the rest
blk.blocks.setLen(offset + n)
buddy.fetchRegisterError()
trace info & ": fetch bodies cut off junk", peer=buddy.peer, ivReq,
n, nTxs=bodies[n].transactions.len, nBodies,
nRespErrors=buddy.only.nBdyRespErrors
break loop
blk.blocks[offset + n].transactions = bodies[n].transactions
blk.blocks[offset + n].uncles = bodies[n].uncles
blk.blocks[offset + n].withdrawals = bodies[n].withdrawals
blk.blocks[offset + n].requests = bodies[n].requests
return offset < blk.blocks.len.uint64
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc blocksStagedCanImportOk*(ctx: FlareCtxRef): bool =
## Check whether the queue is at its maximum size so import can start with
## a full queue.
if ctx.pool.blocksStagedQuLenMax <= ctx.blk.staged.len:
return true
if 0 < ctx.blk.staged.len:
# Import if what is on the queue is all we have got.
if ctx.blocksUnprocIsEmpty() and ctx.blocksUnprocBorrowed() == 0:
return true
# Import if there is currently no peer active
if ctx.pool.nBuddies == 0:
return true
false
proc blocksStagedFetchOk*(ctx: FlareCtxRef): bool =
## Check whether body records can be fetched and stored on the `staged` queue.
##
let uBottom = ctx.blocksUnprocBottom()
if uBottom < high(BlockNumber):
# Not to start fetching while the queue is busy (i.e. larger than Lwm)
# so that import might still be running strong.
if ctx.blk.staged.len < ctx.pool.blocksStagedQuLenMax:
return true
# Make sure that there is no gap at the bottom which needs to be
# addressed regardless of the length of the queue.
if uBottom < ctx.blk.staged.ge(0).value.key:
return true
false
proc blocksStagedCollect*(
buddy: FlareBuddyRef;
info: static[string];
): Future[bool] {.async.} =
## Collect bodies and stage them.
##
if buddy.ctx.blocksUnprocIsEmpty():
# Nothing to do
return false
let
ctx = buddy.ctx
peer = buddy.peer
# Fetch the full range of headers to be completed to blocks
iv = ctx.blocksUnprocFetch(
ctx.pool.nBodiesBatch.uint64).expect "valid interval"
var
# This value is used for splitting the interval `iv` into
# `already-collected + [ivBottom,somePt] + [somePt+1,iv.maxPt]` where the
# middle interval `[ivBottom,somePt]` will be fetched from the network.
ivBottom = iv.minPt
# This record will accumulate the fetched headers. It must be on the heap
# so that `async` can capture that properly.
blk = (ref BlocksForImport)()
# nFetchBodiesRequest
while true:
# Extract bottom range interval and fetch/stage it
let
ivReqMax = if iv.maxPt < ivBottom + nFetchBodiesRequest - 1: iv.maxPt
else: ivBottom + nFetchBodiesRequest - 1
# Request interval
ivReq = BnRange.new(ivBottom, ivReqMax)
# Current length of the blocks queue. This is used to calculate the
# response length from the network.
nBlkBlocks = blk.blocks.len
# Fetch and extend staging record
if not await buddy.fetchAndCheck(ivReq, blk, info):
if nBlkBlocks == 0:
if 0 < buddy.only.nBdyRespErrors and buddy.ctrl.stopped:
# Make sure that this peer does not immediately reconnect
buddy.ctrl.zombie = true
trace info & ": completely failed", peer, iv, ivReq,
ctrl=buddy.ctrl.state, nRespErrors=buddy.only.nBdyRespErrors
ctx.blocksUnprocCommit(iv.len, iv)
# At this stage allow a task switch so that some other peer might try
# to work on the currently returned interval.
await sleepAsync asyncThreadSwitchTimeSlot
return false
# So there were some bodies downloaded already. Turn back unused data
# and proceed with staging.
trace info & ": partially failed", peer, iv, ivReq,
unused=BnRange.new(ivBottom,iv.maxPt)
# There is some left over to store back
ctx.blocksUnprocCommit(iv.len, ivBottom, iv.maxPt)
break
# Update remaining interval
let ivRespLen = blk.blocks.len - nBlkBlocks
if iv.maxPt < ivBottom + ivRespLen.uint64:
# All collected
ctx.blocksUnprocCommit(iv.len)
break
ivBottom += ivRespLen.uint64 # will mostly result into `ivReq.maxPt+1`
if buddy.ctrl.stopped:
# There is some left over to store back. And `ivBottom <= iv.maxPt`
# because of the check against `ivRespLen` above.
ctx.blocksUnprocCommit(iv.len, ivBottom, iv.maxPt)
break
# Store `blk` chain on the `staged` queue
let qItem = ctx.blk.staged.insert(iv.minPt).valueOr:
raiseAssert info & ": duplicate key on staged queue iv=" & $iv
qItem.data = blk[]
trace info & ": staged blocks", peer, bottomBlock=iv.minPt.bnStr,
nBlocks=blk.blocks.len, nStaged=ctx.blk.staged.len, ctrl=buddy.ctrl.state
return true
proc blocksStagedImport*(ctx: FlareCtxRef; info: static[string]): bool =
## Import/execute blocks record from staged queue
##
let qItem = ctx.blk.staged.ge(0).valueOr:
return false
# Fetch least record, accept only if it matches the global ledger state
let t = ctx.dbStateBlockNumber()
if qItem.key != t + 1:
trace info & ": there is a gap", T=t.bnStr, stagedBottom=qItem.key.bnStr
return false
# Remove from queue
discard ctx.blk.staged.delete qItem.key
# Execute blocks
let stats = ctx.pool.chain.persistBlocks(qItem.data.blocks).valueOr:
# FIXME: should that be rather an `raiseAssert` here?
warn info & ": block exec error", T=t.bnStr,
iv=BnRange.new(qItem.key,qItem.key+qItem.data.blocks.len.uint64-1), error
doAssert t == ctx.dbStateBlockNumber()
return false
trace info & ": imported staged blocks", T=ctx.dbStateBlockNumber.bnStr,
first=qItem.key.bnStr, stats
# Remove stashed headers
for bn in qItem.key ..< qItem.key + qItem.data.blocks.len.uint64:
ctx.dbUnstashHeader bn
true
proc blocksStagedBottomKey*(ctx: FlareCtxRef): BlockNumber =
## Retrieve to staged block number
let qItem = ctx.blk.staged.ge(0).valueOr:
return high(BlockNumber)
qItem.key
proc blocksStagedQueueLen*(ctx: FlareCtxRef): int =
## Number of staged records
ctx.blk.staged.len
proc blocksStagedQueueIsEmpty*(ctx: FlareCtxRef): bool =
## `true` iff no data are on the queue.
ctx.blk.staged.len == 0
# ----------------
proc blocksStagedInit*(ctx: FlareCtxRef) =
## Constructor
ctx.blk.staged = StagedBlocksQueue.init()
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,99 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises:[].}
import
std/options,
pkg/[chronicles, chronos, results],
pkg/eth/p2p,
pkg/stew/interval_set,
"../../.."/[protocol, types],
../../worker_desc
logScope:
topics = "flare bodies"
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
func toStr(a: chronos.Duration): string =
a.toStr(2)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc fetchRegisterError*(buddy: FlareBuddyRef) =
buddy.only.nBdyRespErrors.inc
if fetchBodiesReqThresholdCount < buddy.only.nBdyRespErrors:
buddy.ctrl.zombie = true # abandon slow peer
proc bodiesFetch*(
buddy: FlareBuddyRef;
blockHashes: seq[Hash256];
info: static[string];
): Future[Result[seq[BlockBody],void]]
{.async.} =
## Fetch bodies from the network.
let
peer = buddy.peer
start = Moment.now()
nReq = blockHashes.len
trace trEthSendSendingGetBlockBodies, peer, nReq,
nRespErrors=buddy.only.nBdyRespErrors
var resp: Option[blockBodiesObj]
try:
resp = await peer.getBlockBodies(blockHashes)
except TransportError as e:
buddy.fetchRegisterError()
`info` info & " error", peer, nReq, elapsed=(Moment.now() - start).toStr,
error=($e.name), msg=e.msg, nRespErrors=buddy.only.nBdyRespErrors
return err()
let elapsed = Moment.now() - start
# Evaluate result
if resp.isNone or buddy.ctrl.stopped:
buddy.fetchRegisterError()
trace trEthRecvReceivedBlockBodies, peer, nReq, nResp=0,
elapsed=elapsed.toStr, ctrl=buddy.ctrl.state,
nRespErrors=buddy.only.nBdyRespErrors
return err()
let b: seq[BlockBody] = resp.get.blocks
if b.len == 0 or nReq < b.len:
buddy.fetchRegisterError()
trace trEthRecvReceivedBlockBodies, peer, nReq, nResp=b.len,
elapsed=elapsed.toStr, ctrl=buddy.ctrl.state,
nRespErrors=buddy.only.nBdyRespErrors
return err()
# Ban an overly slow peer for a while when seen in a row. Also there is a
# mimimum share of the number of requested headers expected, typically 10%.
if fetchBodiesReqThresholdZombie < elapsed or
b.len.uint64 * 100 < nReq.uint64 * fetchBodiesReqMinResponsePC:
buddy.fetchRegisterError()
else:
buddy.only.nBdyRespErrors = 0 # reset error count
trace trEthRecvReceivedBlockBodies, peer, nReq, nResp=b.len,
elapsed=elapsed.toStr, ctrl=buddy.ctrl.state,
nRespErrors=buddy.only.nBdyRespErrors
return ok(b)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,133 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises:[].}
import
pkg/chronicles,
pkg/eth/[common, p2p],
pkg/stew/[interval_set, sorted_set],
../../../../common,
../../worker_desc,
../blocks_unproc
type
BlocksForImportQueueWalk = SortedSetWalkRef[BlockNumber,BlocksForImport]
## Traversal descriptor (for `verifyStagedBlocksQueue()`)
# ------------------------------------------------------------------------------
# Public debugging helpers
# ------------------------------------------------------------------------------
proc rlpSize*(blk: ref BlocksForImport): int =
rlp.encode(blk[]).len
proc verifyStagedBlocksQueue*(ctx: FlareCtxRef; info: static[string]) =
## Verify staged queue
##
# Walk queue items
let walk = BlocksForImportQueueWalk.init(ctx.blk.staged)
defer: walk.destroy()
var
stTotal = 0u
rc = walk.first()
prv = BlockNumber(0)
while rc.isOk:
let
key = rc.value.key
nBlocks = rc.value.data.blocks.len.uint64
maxPt = key + nBlocks - 1
unproc = ctx.blocksUnprocCovered(key, maxPt)
if 0 < unproc:
raiseAssert info & ": unprocessed staged chain " &
key.bnStr & " overlap=" & $unproc
if key <= prv:
raiseAssert info & ": overlapping staged chain " &
key.bnStr & " prvKey=" & prv.bnStr & " overlap=" & $(prv - key + 1)
stTotal += nBlocks
prv = maxPt
rc = walk.next()
let t = ctx.dbStateBlockNumber()
if 0 < stTotal:
let first = ctx.blk.staged.ge(0).value.key
# Check `T < staged[] <= B`
if first <= t:
raiseAssert info & ": staged bottom mismatch " &
" T=" & t.bnStr & " stBottom=" & first.bnStr
if ctx.lhc.layout.base < prv:
raiseAssert info & ": staged top mismatch " &
" B=" & ctx.lhc.layout.base.bnStr & " stTop=" & prv.bnStr
if not ctx.blocksUnprocIsEmpty():
let
uBottom = ctx.blocksUnprocBottom()
uTop = ctx.blocksUnprocTop()
topReq = ctx.blk.topRequest
# Check `T < unprocessed{} <= B`
if uBottom <= t:
raiseAssert info & ": unproc bottom mismatch " &
" T=" & t.bnStr & " uBottom=" & uBottom.bnStr
if ctx.lhc.layout.base < uTop:
raiseAssert info & ": unproc top mismatch " &
" B=" & ctx.lhc.layout.base.bnStr & " uTop=" & uTop.bnStr
# Check `unprocessed{} <= topRequest <= B`
if topReq < uTop:
raiseAssert info & ": unproc top req mismatch " &
" uTop=" & uTop.bnStr & " topRequest=" & topReq.bnStr
if ctx.lhc.layout.base < topReq:
raiseAssert info & ": unproc top req mismatch " &
" B=" & ctx.lhc.layout.base.bnStr & " topReq=" & topReq.bnStr
# Check `staged[] + unprocessed{} == (T,B]`
let
uTotal = ctx.blocksUnprocTotal()
uBorrowed = ctx.blocksUnprocBorrowed()
all3 = stTotal + uTotal + uBorrowed
unfilled = if t < ctx.layout.base: ctx.layout.base - t
else: 0u
trace info & ": verify staged", stTotal, uTotal, uBorrowed, all3, unfilled
if unfilled < all3:
raiseAssert info & ": staged/unproc too large" & " staged=" & $stTotal &
" unproc=" & $uTotal & " borrowed=" & $uBorrowed & " exp-sum=" & $unfilled
proc verifyStagedBlocksItem*(blk: ref BlocksForImport; info: static[string]) =
## Verify record
##
if blk.blocks.len == 0:
trace info & ": verifying ok", nBlocks=0
return
trace info & ": verifying", nBlocks=blk.blocks.len
if blk.blocks[0].header.txRoot != EMPTY_ROOT_HASH:
doAssert 0 < blk.blocks[0].transactions.len
else:
doAssert blk.blocks[0].transactions.len == 0
for n in 1 ..< blk.blocks.len:
doAssert blk.blocks[n-1].header.number + 1 == blk.blocks[n].header.number
if blk.blocks[n].header.txRoot != EMPTY_ROOT_HASH:
doAssert 0 < blk.blocks[n].transactions.len
else:
doAssert blk.blocks[n].transactions.len == 0
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,115 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises:[].}
import
pkg/eth/p2p,
pkg/results,
pkg/stew/interval_set,
../worker_desc
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc blocksUnprocFetch*(
ctx: FlareCtxRef;
maxLen: uint64;
): Result[BnRange,void] =
## Fetch interval from block ranges with maximal size `maxLen`, where
## `0` is interpreted as `2^64`.
##
let
q = ctx.blk.unprocessed
# Fetch bottom/left interval with least block numbers
jv = q.ge().valueOr:
return err()
# Curb interval to maximal length `maxLen`
iv = block:
if maxLen == 0 or (0 < jv.len and jv.len <= maxLen):
jv
else:
# Curb interval `jv` to length `maxLen`
#
# Note that either (fringe case):
# (`jv.len`==0) => (`jv`==`[0,high(u64)]`) => `jv.maxPt`==`high(u64)`
# or (in the non-fringe case)
# (`maxLen` < `jv.len`) => (`jv.minPt` + `maxLen` - 1 < `jv.maxPt`)
#
BnRange.new(jv.minPt, jv.minPt + maxLen - 1)
discard q.reduce(iv)
ctx.blk.borrowed += iv.len
ok(iv)
proc blocksUnprocCommit*(ctx: FlareCtxRef; borrowed: uint) =
## Commit back all processed range
ctx.blk.borrowed -= borrowed
proc blocksUnprocCommit*(ctx: FlareCtxRef; borrowed: uint; retuor: BnRange) =
## Merge back unprocessed range `retour`
ctx.blocksUnprocCommit borrowed
doAssert ctx.blk.unprocessed.merge(retuor) == retuor.len
proc blocksUnprocCommit*(
ctx: FlareCtxRef;
borrowed: uint;
rMinPt: BlockNumber;
rMaxPt: BlockNumber) =
## Variant of `blocksUnprocCommit()`
ctx.blocksUnprocCommit borrowed
doAssert ctx.blk.unprocessed.merge(rMinPt, rMaxPt) == rMaxPt - rMinPt + 1
proc blocksUnprocCovered*(ctx: FlareCtxRef; minPt,maxPt: BlockNumber): uint64 =
## Check whether range is fully contained
ctx.blk.unprocessed.covered(minPt, maxPt)
proc blocksUnprocCovered*(ctx: FlareCtxRef; pt: BlockNumber): bool =
## Check whether point is contained
ctx.blk.unprocessed.covered(pt, pt) == 1
proc blocksUnprocTop*(ctx: FlareCtxRef): BlockNumber =
let iv = ctx.blk.unprocessed.le().valueOr:
return BlockNumber(0)
iv.maxPt
proc blocksUnprocBottom*(ctx: FlareCtxRef): BlockNumber =
let iv = ctx.blk.unprocessed.ge().valueOr:
return high(BlockNumber)
iv.minPt
proc blocksUnprocTotal*(ctx: FlareCtxRef): uint64 =
ctx.blk.unprocessed.total()
proc blocksUnprocBorrowed*(ctx: FlareCtxRef): uint64 =
ctx.blk.borrowed
proc blocksUnprocChunks*(ctx: FlareCtxRef): int =
ctx.blk.unprocessed.chunks()
proc blocksUnprocIsEmpty*(ctx: FlareCtxRef): bool =
ctx.blk.unprocessed.chunks() == 0
# ------------------
proc blocksUnprocInit*(ctx: FlareCtxRef) =
## Constructor
ctx.blk.unprocessed = BnRangeSet.init()
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -11,23 +11,19 @@
{.push raises:[].}
import
pkg/chronicles,
pkg/[chronicles, chronos],
pkg/eth/[common, rlp],
pkg/stew/[interval_set, sorted_set],
pkg/stew/[byteutils, interval_set, sorted_set],
pkg/results,
../../../db/[era1_db, storage_types],
../../../db/storage_types,
../../../common,
../../sync_desc,
../worker_desc,
"."/[staged, unproc]
./headers_unproc
logScope:
topics = "flare db"
const
extraTraceMessages = false or true
## Enabled additional logging noise
LhcStateKey = 1.flareStateKey
type
@ -36,53 +32,17 @@ type
hash: Hash256
parent: Hash256
Era1Specs = tuple
e1db: Era1DbRef
maxNum: BlockNumber
# ------------------------------------------------------------------------------
# Private debugging & logging helpers
# ------------------------------------------------------------------------------
formatIt(Hash256):
it.data.toHex
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
template hasKey(e1db: Era1DbRef; bn: BlockNumber): bool =
e1db.getEthBlock(bn).isOk
proc newEra1Desc(networkID: NetworkID; era1Dir: string): Opt[Era1Specs] =
const info = "newEra1Desc"
var specs: Era1Specs
case networkID:
of MainNet:
specs.e1db = Era1DbRef.init(era1Dir, "mainnet").valueOr:
when extraTraceMessages:
trace info & ": no Era1 available", networkID, era1Dir
return err()
specs.maxNum = 15_537_393'u64 # Mainnet, copied from `nimbus_import`
of SepoliaNet:
specs.e1db = Era1DbRef.init(era1Dir, "sepolia").valueOr:
when extraTraceMessages:
trace info & ": no Era1 available", networkID, era1Dir
return err()
specs.maxNum = 1_450_408'u64 # Sepolia
else:
when extraTraceMessages:
trace info & ": Era1 unsupported", networkID
return err()
# At least block 1 should be supported
if not specs.e1db.hasKey 1u64:
specs.e1db.dispose()
notice info & ": Era1 repo disfunctional", networkID, blockNumber=1
return err()
when extraTraceMessages:
trace info & ": Era1 supported",
networkID, lastEra1Block=specs.maxNum.bnStr
ok(specs)
proc fetchLinkedHChainsLayout(ctx: FlareCtxRef): Opt[LinkedHChainsLayout] =
let data = ctx.db.ctx.getKvt().get(LhcStateKey.toOpenArray).valueOr:
return err()
@ -91,35 +51,20 @@ proc fetchLinkedHChainsLayout(ctx: FlareCtxRef): Opt[LinkedHChainsLayout] =
except RlpError:
return err()
# --------------
proc fetchEra1State(ctx: FlareCtxRef): Opt[SavedDbStateSpecs] =
var val: SavedDbStateSpecs
val.number = ctx.pool.e1AvailMax
if 0 < val.number:
let header = ctx.pool.e1db.getEthBlock(val.number).value.header
val.parent = header.parentHash
val.hash = rlp.encode(header).keccakHash
return ok(val)
err()
proc fetchSavedState(ctx: FlareCtxRef): Opt[SavedDbStateSpecs] =
let
db = ctx.db
e1Max = ctx.pool.e1AvailMax
let db = ctx.db
var val: SavedDbStateSpecs
val.number = db.getSavedStateBlockNumber()
if e1Max == 0 or e1Max < val.number:
if db.getBlockHash(val.number, val.hash):
var header: BlockHeader
if db.getBlockHeader(val.hash, header):
val.parent = header.parentHash
return ok(val)
return err()
if db.getBlockHash(val.number, val.hash):
var header: BlockHeader
if db.getBlockHeader(val.hash, header):
val.parent = header.parentHash
return ok(val)
err()
ctx.fetchEra1State()
# ------------------------------------------------------------------------------
# Public functions
@ -129,8 +74,6 @@ proc dbStoreLinkedHChainsLayout*(ctx: FlareCtxRef): bool =
## Save chain layout to persistent db
const info = "dbStoreLinkedHChainsLayout"
if ctx.layout == ctx.lhc.lastLayout:
when extraTraceMessages:
trace info & ": no layout change"
return false
let data = rlp.encode(ctx.layout)
@ -139,21 +82,23 @@ proc dbStoreLinkedHChainsLayout*(ctx: FlareCtxRef): bool =
# While executing blocks there are frequent save cycles. Otherwise, an
# extra save request might help to pick up an interrupted sync session.
if ctx.db.getSavedStateBlockNumber() == 0:
ctx.db.persistent(0).isOkOr:
when extraTraceMessages:
trace info & ": failed to save layout pesistently", error=($$error)
let txLevel = ctx.db.level()
if txLevel == 0:
let number = ctx.db.getSavedStateBlockNumber()
ctx.db.persistent(number).isOkOr:
debug info & ": failed to save persistently", error=($$error)
return false
when extraTraceMessages:
trace info & ": layout saved pesistently"
else:
trace info & ": not saved, tx pending", txLevel
return false
trace info & ": saved pesistently on DB"
true
proc dbLoadLinkedHChainsLayout*(ctx: FlareCtxRef) =
## Restore chain layout from persistent db
const info = "dbLoadLinkedHChainsLayout"
ctx.stagedInit()
ctx.unprocInit()
let rc = ctx.fetchLinkedHChainsLayout()
if rc.isOk:
@ -161,9 +106,8 @@ proc dbLoadLinkedHChainsLayout*(ctx: FlareCtxRef) =
let (uMin,uMax) = (rc.value.base+1, rc.value.least-1)
if uMin <= uMax:
# Add interval of unprocessed block range `(B,L)` from README
ctx.unprocMerge(uMin, uMax)
when extraTraceMessages:
trace info & ": restored layout"
ctx.headersUnprocSet(uMin, uMax)
trace info & ": restored layout from DB"
else:
let val = ctx.fetchSavedState().expect "saved states"
ctx.lhc.layout = LinkedHChainsLayout(
@ -173,49 +117,11 @@ proc dbLoadLinkedHChainsLayout*(ctx: FlareCtxRef) =
leastParent: val.parent,
final: val.number,
finalHash: val.hash)
when extraTraceMessages:
trace info & ": new layout"
trace info & ": new layout"
ctx.lhc.lastLayout = ctx.layout
proc dbInitEra1*(ctx: FlareCtxRef): bool =
## Initialise Era1 repo.
const info = "dbInitEra1"
var specs = ctx.chain.com.networkId.newEra1Desc(ctx.pool.e1Dir).valueOr:
return false
ctx.pool.e1db = specs.e1db
# Verify that last available block number is available
if specs.e1db.hasKey specs.maxNum:
ctx.pool.e1AvailMax = specs.maxNum
when extraTraceMessages:
trace info, lastEra1Block=specs.maxNum.bnStr
return true
# This is a truncated repo. Use bisect for finding the top number assuming
# that block numbers availability is contiguous.
#
# BlockNumber(1) is the least supported block number (was checked
# in function `newEra1Desc()`)
var
minNum = BlockNumber(1)
middle = (specs.maxNum + minNum) div 2
delta = specs.maxNum - minNum
while 1 < delta:
if specs.e1db.hasKey middle:
minNum = middle
else:
specs.maxNum = middle
middle = (specs.maxNum + minNum) div 2
delta = specs.maxNum - minNum
ctx.pool.e1AvailMax = minNum
when extraTraceMessages:
trace info, e1AvailMax=minNum.bnStr
true
# ------------------
proc dbStashHeaders*(
@ -224,9 +130,8 @@ proc dbStashHeaders*(
revBlobs: openArray[Blob];
) =
## Temporarily store header chain to persistent db (oblivious of the chain
## layout.) The headers should not be stashed if they are available on the
## `Era1` repo, i.e. if the corresponding block number is at most
## `ctx.pool.e1AvailMax`.
## layout.) The headers should not be stashed if they are imepreted and
## executed on the database, already.
##
## The `revBlobs[]` arguments are passed in reverse order so that block
## numbers apply as
@ -238,19 +143,14 @@ proc dbStashHeaders*(
const info = "dbStashHeaders"
let
kvt = ctx.db.ctx.getKvt()
last = first + revBlobs.len.uint - 1
last = first + revBlobs.len.uint64 - 1
for n,data in revBlobs:
let key = flareHeaderKey(last - n.uint)
let key = flareHeaderKey(last - n.uint64)
kvt.put(key.toOpenArray, data).isOkOr:
raiseAssert info & ": put() failed: " & $$error
when extraTraceMessages:
trace info & ": headers stashed",
iv=BnRange.new(first, last), nHeaders=revBlobs.len
proc dbPeekHeader*(ctx: FlareCtxRef; num: BlockNumber): Opt[BlockHeader] =
## Retrieve some stashed header.
if num <= ctx.pool.e1AvailMax:
return ok(ctx.pool.e1db.getEthBlock(num).value.header)
let
key = flareHeaderKey(num)
rc = ctx.db.ctx.getKvt().get(key.toOpenArray)
@ -265,6 +165,17 @@ proc dbPeekParentHash*(ctx: FlareCtxRef; num: BlockNumber): Opt[Hash256] =
## Retrieve some stashed parent hash.
ok (? ctx.dbPeekHeader num).parentHash
proc dbUnstashHeader*(ctx: FlareCtxRef; bn: BlockNumber) =
## Remove header from temporary DB list
discard ctx.db.ctx.getKvt().del(flareHeaderKey(bn).toOpenArray)
# ------------------
proc dbStateBlockNumber*(ctx: FlareCtxRef): BlockNumber =
## Currently only a wrapper around the function returning the current
## database state block number
ctx.db.getSavedStateBlockNumber()
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,99 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises:[].}
import
pkg/[chronicles, chronos],
pkg/eth/[common, rlp],
pkg/stew/[byteutils, interval_set, sorted_set],
pkg/results,
../../../../db/storage_types,
../../worker_desc
logScope:
topics = "flare db"
# ------------------------------------------------------------------------------
# Private debugging & logging helpers
# ------------------------------------------------------------------------------
formatIt(Hash256):
it.data.toHex
# ------------------------------------------------------------------------------
# Public debugging functions
# ------------------------------------------------------------------------------
proc dbVerifyStashedHeaders*(
ctx: FlareCtxRef;
info: static[string];
): Future[bool] {.async.} =
## For debugging. Verify integrity of stashed headers on the database.
# Last executed block on database
let
db = ctx.db
kvt = ctx.db.ctx.getKvt()
elNum = db.getSavedStateBlockNumber()
lyLeast = ctx.layout.least
lyFinal = ctx.layout.final
lyFinalHash = ctx.layout.finalHash
if lyLeast == 0:
return true
if lyLeast <= elNum and 0 < elNum:
debug info & ": base header B unsynced", elNum=elNum.bnStr, B=lyLeast.bnStr
return false
let iv = BnRange.new(lyLeast,lyFinal)
trace info & ": verifying stashed headers", iv, len=(lyFinal-lyLeast+1)
var lastHash = ctx.layout.leastParent
for num in lyLeast .. lyFinal:
let data = kvt.get(flareHeaderKey(num).toOpenArray).valueOr:
debug info & ": unstashed header", num=num.bnStr
return false
var header: BlockHeader
try: header = rlp.decode(data, BlockHeader)
except RlpError:
debug info & ": cannot decode rlp header", num=num.bnStr
return false
if header.number != num:
debug info & ": wrongly addressed header",
num=header.number.bnStr, expected=num.bnStr
return false
if header.parentHash != lastHash:
debug info & ": hash mismatch", lastNum=(num-1).bnStr, lastHash,
parentHash=header.parentHash
return false
lastHash = data.keccakHash
# Allow thread change
if (num mod 100_000) == 98_765:
# trace info & ": thread change offer", num=num.bnStr
await sleepAsync asyncThreadSwitchTimeSlot
if lyFinalHash != lastHash:
debug info & ": base header B hash mismatch", num=lyFinal.bnStr,
hash=lyFinalHash, expected=lastHash
return false
trace info & ": done verifying", iv, len=(lyFinal-lyLeast+1)
true
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,291 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises:[].}
import
std/strutils,
pkg/[chronicles, chronos],
pkg/eth/[common, p2p],
pkg/stew/[interval_set, sorted_set],
../../../common,
../worker_desc,
./headers_staged/[headers, linked_hchain],
./headers_unproc
logScope:
topics = "flare headers"
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc fetchAndCheck(
buddy: FlareBuddyRef;
ivReq: BnRange;
lhc: ref LinkedHChain; # update in place
info: static[string];
): Future[bool] {.async.} =
## Collect single header chain from the peer and stash it on the `staged`
## queue. Returns the length of the stashed chain of headers.
##
# Fetch headers for this range of block numbers
let revHeaders = block:
let rc = await buddy.headersFetchReversed(ivReq, lhc.parentHash, info)
if rc.isErr:
return false
rc.value
# While assembling a `LinkedHChainRef`, verify that the `revHeaders` list
# was sound, i.e. contiguous, linked, etc.
if not revHeaders.extendLinkedHChain(buddy, ivReq.maxPt, lhc, info):
return false
return true
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc headerStagedUpdateBeacon*(
buddy: FlareBuddyRef;
info: static[string];
) {.async.} =
## Fetch beacon header if there is an update available
let ctx = buddy.ctx
if ctx.lhc.beacon.finalised != ZERO_HASH256:
const iv = BnRange.new(1u,1u) # dummy interval
let finHash = ctx.lhc.beacon.finalised
let rc = await buddy.headersFetchReversed(iv, finHash, info)
if rc.isOk and ctx.lhc.beacon.header.number < rc.value[0].number:
ctx.lhc.beacon.header = rc.value[0]
ctx.lhc.beacon.changed = true
ctx.lhc.beacon.finalised = ZERO_HASH256
proc headersStagedCollect*(
buddy: FlareBuddyRef;
info: static[string];
): Future[bool] {.async.} =
## Collect a batch of chained headers totalling to at most `nHeaders`
## headers. Fetch the headers from the the peer and stash it blockwise on
## the `staged` queue. The function returns `true` it stashed a header
## chains record on `staged` queue.
##
## This function segments the `nHeaders` length into smaller pieces of at
## most `nFetchHeadersRequest` chunks ans fetch these chunks from the
## network. Where possible, hashes are used to address the headers to be
## fetched. Otherwise the system works opportunistically using block
## numbers for fetching, stashing them away to be verified later when
## appropriate.
##
let
ctx = buddy.ctx
peer = buddy.peer
uTop = ctx.headersUnprocTop()
if uTop == 0:
# Nothing to do
return false
let
# Reserve the full range of block numbers so they can be appended in a row.
# This avoid some fragmentation when header chains are stashed by multiple
# peers, i.e. they interleave peer-wise.
iv = ctx.headersUnprocFetch(nFetchHeadersBatch).expect "valid interval"
# Check for top header hash. If the range to fetch directly joins below
# the top level linked chain `L..F`, then there is the hash available for
# the top level header to fetch. Otherwise -- with multi-peer mode -- the
# range of headers is fetched opportunistically using block numbers only.
isOpportunistic = uTop + 1 != ctx.layout.least
# Parent hash for `lhc` below
topLink = (if isOpportunistic: EMPTY_ROOT_HASH else: ctx.layout.leastParent)
var
# This value is used for splitting the interval `iv` into
# `[iv.minPt, somePt] + [somePt+1, ivTop] + already-collected` where the
# middle interval `[somePt+1, ivTop]` will be fetched from the network.
ivTop = iv.maxPt
# This record will accumulate the fetched headers. It must be on the heap
# so that `async` can capture that properly.
lhc = (ref LinkedHChain)(parentHash: topLink)
while true:
# Extract top range interval and fetch/stage it
let
ivReqMin = if ivTop + 1 <= iv.minPt + nFetchHeadersRequest: iv.minPt
else: ivTop - nFetchHeadersRequest + 1
# Request interval
ivReq = BnRange.new(ivReqMin, ivTop)
# Current length of the headers queue. This is used to calculate the
# response length from the network.
nLhcHeaders = lhc.revHdrs.len
# Fetch and extend chain record
if not await buddy.fetchAndCheck(ivReq, lhc, info):
# Throw away opportunistic data (or first time header fetch.) Turn back
# unused data.
if isOpportunistic or nLhcHeaders == 0:
if 0 < buddy.only.nHdrRespErrors and buddy.ctrl.stopped:
# Make sure that this peer does not immediately reconnect
buddy.ctrl.zombie = true
trace info & ": completely failed", peer, iv, ivReq, isOpportunistic,
ctrl=buddy.ctrl.state, nRespErrors=buddy.only.nHdrRespErrors
ctx.headersUnprocCommit(iv.len, iv)
# At this stage allow a task switch so that some other peer might try
# to work on the currently returned interval.
await sleepAsync asyncThreadSwitchTimeSlot
return false
# So it is deterministic and there were some headers downloaded already.
# Turn back unused data and proceed with staging.
trace info & ": partially failed", peer, iv, ivReq,
unused=BnRange.new(iv.minPt,ivTop), isOpportunistic
# There is some left over to store back
ctx.headersUnprocCommit(iv.len, iv.minPt, ivTop)
break
# Update remaining interval
let ivRespLen = lhc.revHdrs.len - nLhcHeaders
if ivTop < iv.minPt + ivRespLen.uint64:
# All collected
ctx.headersUnprocCommit(iv.len)
break
ivTop -= ivRespLen.uint64 # will mostly result into `ivReq.minPt-1`
if buddy.ctrl.stopped:
# There is some left over to store back. And `iv.minPt <= ivTop` because
# of the check against `ivRespLen` above.
ctx.headersUnprocCommit(iv.len, iv.minPt, ivTop)
break
# Store `lhc` chain on the `staged` queue
let qItem = ctx.lhc.staged.insert(iv.maxPt).valueOr:
raiseAssert info & ": duplicate key on staged queue iv=" & $iv
qItem.data = lhc[]
trace info & ": staged headers", peer,
topBlock=iv.maxPt.bnStr, nHeaders=lhc.revHdrs.len,
nStaged=ctx.lhc.staged.len, isOpportunistic, ctrl=buddy.ctrl.state
return true
proc headersStagedProcess*(ctx: FlareCtxRef; info: static[string]): int =
## Store/insert stashed chains from the `staged` queue into the linked
## chains layout and the persistent tables. The function returns the number
## of records processed and saved.
while true:
# Fetch largest block
let qItem = ctx.lhc.staged.le(high BlockNumber).valueOr:
trace info & ": no staged headers", error
break # all done
let
least = ctx.layout.least # `L` from `README.md` (1) or `worker_desc`
iv = BnRange.new(qItem.key - qItem.data.revHdrs.len.uint64 + 1, qItem.key)
if iv.maxPt+1 < least:
trace info & ": there is a gap", iv, L=least.bnStr, nSaved=result
break # there is a gap -- come back later
# Overlap must not happen
if iv.maxPt+1 != least:
raiseAssert info & ": Overlap iv=" & $iv & " L=" & least.bnStr
# Process item from `staged` queue. So it is not needed in the list,
# anymore.
discard ctx.lhc.staged.delete(iv.maxPt)
if qItem.data.hash != ctx.layout.leastParent:
# Discard wrong chain and merge back the range into the `unproc` list.
ctx.headersUnprocCommit(0,iv)
trace info & ": discarding staged record", iv, L=least.bnStr, lap=result
break
# Store headers on database
ctx.dbStashHeaders(iv.minPt, qItem.data.revHdrs)
ctx.layout.least = iv.minPt
ctx.layout.leastParent = qItem.data.parentHash
discard ctx.dbStoreLinkedHChainsLayout()
result.inc # count records
trace info & ": staged records saved",
nStaged=ctx.lhc.staged.len, nSaved=result
if headersStagedQueueLengthLwm < ctx.lhc.staged.len:
ctx.poolMode = true
proc headersStagedReorg*(ctx: FlareCtxRef; info: static[string]) =
## Some pool mode intervention. The effect is that all concurrent peers
## finish up their current work and run this function here (which might
## do nothing.) This stopping should be enough in most cases to re-organise
## when re-starting concurrently, again.
##
## Only when the staged list gets too big it will be cleared to be re-filled
## again. In therory, this might happen on a really slow lead actor
## (downloading deterministically by hashes) and many fast opportunistic
## actors filling the staged queue.
##
if ctx.lhc.staged.len == 0:
# nothing to do
return
# Update counter
ctx.pool.nReorg.inc
let nStaged = ctx.lhc.staged.len
if headersStagedQueueLengthHwm < nStaged:
trace info & ": hwm reached, flushing staged queue",
nStaged, max=headersStagedQueueLengthLwm
# Remove the leading `1 + nStaged - headersStagedQueueLengthLwm` entries
# from list so that the upper `headersStagedQueueLengthLwm-1` entries
# remain.
for _ in 0 .. nStaged - headersStagedQueueLengthLwm:
let
qItem = ctx.lhc.staged.ge(BlockNumber 0).expect "valid record"
key = qItem.key
nHeaders = qItem.data.revHdrs.len.uint64
ctx.headersUnprocCommit(0, key - nHeaders + 1, key)
discard ctx.lhc.staged.delete key
proc headersStagedTopKey*(ctx: FlareCtxRef): BlockNumber =
## Retrieve to staged block number
let qItem = ctx.lhc.staged.le(high BlockNumber).valueOr:
return BlockNumber(0)
qItem.key
proc headersStagedQueueLen*(ctx: FlareCtxRef): int =
## Number of staged records
ctx.lhc.staged.len
proc headersStagedQueueIsEmpty*(ctx: FlareCtxRef): bool =
## `true` iff no data are on the queue.
ctx.lhc.staged.len == 0
# ----------------
proc headersStagedInit*(ctx: FlareCtxRef) =
## Constructor
ctx.lhc.staged = LinkedHChainQueue.init()
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,111 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises:[].}
import
pkg/chronicles,
pkg/eth/[common, p2p],
pkg/stew/[interval_set, sorted_set],
../../../../common,
../../worker_desc,
../headers_unproc
type
LinkedHChainQueueWalk = SortedSetWalkRef[BlockNumber,LinkedHChain]
## Traversal descriptor (for `verifyStagedHeadersQueue()`)
# ------------------------------------------------------------------------------
# Public debugging helpers
# ------------------------------------------------------------------------------
proc verifyStagedHeadersQueue*(ctx: FlareCtxRef; info: static[string]) =
## Verify staged queue
##
# Walk queue items
let walk = LinkedHChainQueueWalk.init(ctx.lhc.staged)
defer: walk.destroy()
var
stTotal = 0u
rc = walk.first()
prv = BlockNumber(0)
while rc.isOk:
let
key = rc.value.key
nHeaders = rc.value.data.revHdrs.len.uint64
minPt = key - nHeaders + 1
unproc = ctx.headersUnprocCovered(minPt, key)
if 0 < unproc:
raiseAssert info & ": unprocessed staged chain " &
key.bnStr & " overlap=" & $unproc
if minPt <= prv:
raiseAssert info & ": overlapping staged chain " &
key.bnStr & " prvKey=" & prv.bnStr & " overlap=" & $(prv - minPt + 1)
stTotal += nHeaders
prv = key
rc = walk.next()
# Check `staged[] <= L`
if ctx.layout.least <= prv:
raiseAssert info & ": staged top mismatch " &
" L=" & ctx.layout.least.bnStr & " stagedTop=" & prv.bnStr
# Check `unprocessed{} <= L`
let uTop = ctx.headersUnprocTop()
if ctx.layout.least <= uTop:
raiseAssert info & ": unproc top mismatch " &
" L=" & ctx.layout.least.bnStr & " unprocTop=" & uTop.bnStr
# Check `staged[] + unprocessed{} == (B,L)`
let
uTotal = ctx.headersUnprocTotal()
uBorrowed = ctx.headersUnprocBorrowed()
all3 = stTotal + uTotal + uBorrowed
unfilled = if ctx.layout.least <= ctx.layout.base + 1: 0u
else: ctx.layout.least - ctx.layout.base - 1
trace info & ": verify staged", stTotal, uTotal, uBorrowed, all3, unfilled
if all3 != unfilled:
raiseAssert info & ": staged/unproc mismatch " & " staged=" & $stTotal &
" unproc=" & $uTotal & " borrowed=" & $uBorrowed &
" exp-sum=" & $unfilled
proc verifyHeaderChainItem*(lhc: ref LinkedHChain; info: static[string]) =
## Verify a header chain.
if lhc.revHdrs.len == 0:
trace info & ": verifying ok", nLhc=0
return
trace info & ": verifying", nLhc=lhc.revHdrs.len
var
topHdr, childHdr: BlockHeader
try:
doAssert lhc.revHdrs[0].keccakHash == lhc.hash
topHdr = rlp.decode(lhc.revHdrs[0], BlockHeader)
childHdr = topHdr
for n in 1 ..< lhc.revHdrs.len:
let header = rlp.decode(lhc.revHdrs[n], BlockHeader)
doAssert childHdr.number == header.number + 1
doAssert lhc.revHdrs[n].keccakHash == childHdr.parentHash
childHdr = header
doAssert childHdr.parentHash == lhc.parentHash
except RlpError as e:
raiseAssert "verifyHeaderChainItem oops(" & $e.name & ") msg=" & e.msg
trace info & ": verify ok",
iv=BnRange.new(childHdr.number,topHdr.number), nLhc=lhc.revHdrs.len
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -21,42 +21,20 @@ import
logScope:
topics = "flare headers"
const extraTraceMessages = false # or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
# Copied from `nimbus_import`
func shortLog(a: chronos.Duration, parts = int.high): string =
## Returns string representation of Duration ``a`` as nanoseconds value.
var
res = ""
v = a.nanoseconds()
parts = parts
template f(n: string, T: Duration) =
if v >= T.nanoseconds():
res.add($(uint64(v div T.nanoseconds())))
res.add(n)
v = v mod T.nanoseconds()
dec parts
if v == 0 or parts <= 0:
return res
f("s", Second)
f("ms", Millisecond)
f("us", Microsecond)
f("ns", Nanosecond)
res
# For some reason neither `formatIt` nor `$` works as expected with logging
# the `elapsed` variable, below. This might be due to the fact that the
# `headersFetchReversed()` function is a generic one, i.e. a template.
func toStr(a: chronos.Duration): string =
a.shortLog(2)
a.toStr(2)
proc registerError(buddy: FlareBuddyRef) =
buddy.only.nHdrRespErrors.inc
if fetchHeadersReqThresholdCount < buddy.only.nHdrRespErrors:
buddy.ctrl.zombie = true # abandon slow peer
# ------------------------------------------------------------------------------
# Public functions
@ -70,8 +48,6 @@ proc headersFetchReversed*(
): Future[Result[seq[BlockHeader],void]]
{.async.} =
## Get a list of headers in reverse order.
const
threshold = fetchHeaderReqZombieThreshold # shortcut
let
peer = buddy.peer
useHash = (topHash != EMPTY_ROOT_HASH)
@ -94,9 +70,8 @@ proc headersFetchReversed*(
number: ivReq.maxPt))
start = Moment.now()
when extraTraceMessages:
trace trEthSendSendingGetBlockHeaders & " reverse", peer, ivReq,
nReq=req.maxResults, useHash
trace trEthSendSendingGetBlockHeaders & " reverse", peer, ivReq,
nReq=req.maxResults, useHash, nRespErrors=buddy.only.nHdrRespErrors
# Fetch headers from peer
var resp: Option[blockHeadersObj]
@ -108,38 +83,42 @@ proc headersFetchReversed*(
# in `rplx` with a violated `req.timeoutAt <= Moment.now()` assertion.
resp = await peer.getBlockHeaders(req)
except TransportError as e:
buddy.registerError()
`info` info & " error", peer, ivReq, nReq=req.maxResults, useHash,
elapsed=(Moment.now() - start).toStr, error=($e.name), msg=e.msg
elapsed=(Moment.now() - start).toStr, error=($e.name), msg=e.msg,
nRespErrors=buddy.only.nHdrRespErrors
return err()
# Kludge: Ban an overly slow peer for a while
let elapsed = Moment.now() - start
if threshold < elapsed:
buddy.ctrl.zombie = true # abandon slow peer
# Evaluate result
if resp.isNone or buddy.ctrl.stopped:
when extraTraceMessages:
trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash,
nResp=0, elapsed=elapsed.toStr, threshold, ctrl=buddy.ctrl.state
buddy.registerError()
trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash,
nResp=0, elapsed=elapsed.toStr, ctrl=buddy.ctrl.state,
nRespErrors=buddy.only.nHdrRespErrors
return err()
let h: seq[BlockHeader] = resp.get.headers
if h.len == 0 or ivReq.len < h.len.uint:
when extraTraceMessages:
trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash,
nResp=h.len, elapsed=elapsed.toStr, threshold, ctrl=buddy.ctrl.state
if h.len == 0 or ivReq.len < h.len.uint64:
buddy.registerError()
trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash,
nResp=h.len, elapsed=elapsed.toStr, ctrl=buddy.ctrl.state,
nRespErrors=buddy.only.nHdrRespErrors
return err()
when extraTraceMessages:
trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash,
ivResp=BnRange.new(h[^1].number,h[0].number), nResp=h.len,
elapsed=elapsed.toStr, threshold, ctrl=buddy.ctrl.state
# Ban an overly slow peer for a while when seen in a row. Also there is a
# mimimum share of the number of requested headers expected, typically 10%.
if fetchHeadersReqThresholdZombie < elapsed or
h.len.uint64 * 100 < req.maxResults * fetchHeadersReqMinResponsePC:
buddy.registerError()
else:
if buddy.ctrl.stopped:
trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash,
ivResp=BnRange.new(h[^1].number,h[0].number), nResp=h.len,
elapsed=elapsed.toStr, threshold, ctrl=buddy.ctrl.state
buddy.only.nHdrRespErrors = 0 # reset error count
trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash,
ivResp=BnRange.new(h[^1].number,h[0].number), nResp=h.len,
elapsed=elapsed.toStr, ctrl=buddy.ctrl.state,
nRespErrors=buddy.only.nHdrRespErrors
return ok(h)

View File

@ -0,0 +1,73 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises:[].}
import
pkg/eth/[common, p2p, rlp],
pkg/stew/byteutils,
../../../../common,
../../worker_desc
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc extendLinkedHChain*(
rev: seq[BlockHeader];
buddy: FlareBuddyRef;
topNumber: BlockNumber;
lhc: ref LinkedHChain; # update in place
info: static[string];
): bool =
## Returns sort of `lhc[] += rev[]` where `lhc[]` is updated in place.
# Verify top block number
doAssert 0 < rev.len
if rev[0].number != topNumber:
return false
# Make space for return code array
let offset = lhc.revHdrs.len
lhc.revHdrs.setLen(offset + rev.len)
# Set up header with largest block number
let
blob0 = rlp.encode(rev[0])
hash0 = blob0.keccakHash
lhc.revHdrs[offset] = blob0
if offset == 0:
lhc.hash = hash0
# Verify top block hash (if any)
if lhc.parentHash != EMPTY_ROOT_HASH and hash0 != lhc.parentHash:
lhc.revHdrs.setLen(offset)
return false
# Encode block headers and make sure they are chained
for n in 1 ..< rev.len:
if rev[n].number + 1 != rev[n-1].number:
lhc.revHdrs.setLen(offset)
return false
lhc.revHdrs[offset + n] = rlp.encode(rev[n])
let hashN = lhc.revHdrs[offset + n].keccakHash
if rev[n-1].parentHash != hashN:
lhc.revHdrs.setLen(offset)
return false
# Finalise
lhc.parentHash = rev[rev.len-1].parentHash
true
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -20,7 +20,7 @@ import
# Public functions
# ------------------------------------------------------------------------------
proc unprocFetch*(
proc headersUnprocFetch*(
ctx: FlareCtxRef;
maxLen: uint64;
): Result[BnRange,void] =
@ -30,7 +30,7 @@ proc unprocFetch*(
let
q = ctx.lhc.unprocessed
# Fetch top right interval with largest block numbers
# Fetch top/right interval with largest block numbers
jv = q.le().valueOr:
return err()
@ -44,62 +44,83 @@ proc unprocFetch*(
# Note that either (fringe case):
# (`jv.len`==0) => (`jv`==`[0,high(u64)]`) => `jv.maxPt`==`high(u64)`
# or (in the non-fringe case)
# (`maxLen` < `jv.len`) => (`jv.maxPt` - `maxLen` + 1 <= `jv.maxPt`)
# (`maxLen` < `jv.len`) => (`jv.maxPt` - `maxLen` + 1 < `jv.maxPt`)
#
BnRange.new(jv.maxPt - maxLen + 1, jv.maxPt)
discard q.reduce(iv)
ctx.lhc.borrowed += iv.len
ok(iv)
proc unprocMerge*(ctx: FlareCtxRef; iv: BnRange) =
## Merge back unprocessed range
discard ctx.lhc.unprocessed.merge(iv)
proc unprocMerge*(ctx: FlareCtxRef; minPt, maxPt: BlockNumber) =
## Ditto
discard ctx.lhc.unprocessed.merge(minPt, maxPt)
proc headersUnprocCommit*(ctx: FlareCtxRef; borrowed: uint) =
## Commit back all processed range
ctx.lhc.borrowed -= borrowed
proc headersUnprocCommit*(ctx: FlareCtxRef; borrowed: uint; retuor: BnRange) =
## Merge back unprocessed range `retour`
ctx.headersUnprocCommit borrowed
doAssert ctx.lhc.unprocessed.merge(retuor) == retuor.len
proc headersUnprocCommit*(
ctx: FlareCtxRef;
borrowed: uint;
rMinPt: BlockNumber;
rMaxPt: BlockNumber) =
## Variant of `headersUnprocCommit()`
ctx.headersUnprocCommit borrowed
doAssert ctx.lhc.unprocessed.merge(rMinPt, rMaxPt) == rMaxPt - rMinPt + 1
proc unprocReduce*(ctx: FlareCtxRef; minPt, maxPt: BlockNumber) =
## Merge back unprocessed range
discard ctx.lhc.unprocessed.reduce(minPt, maxPt)
proc unprocFullyCovered*(
ctx: FlareCtxRef; minPt, maxPt: BlockNumber): bool =
## Check whether range is fully contained
ctx.lhc.unprocessed.covered(minPt, maxPt) == maxPt - minPt + 1
proc unprocCovered*(ctx: FlareCtxRef; minPt, maxPt: BlockNumber): uint64 =
proc headersUnprocCovered*(ctx: FlareCtxRef; minPt,maxPt: BlockNumber): uint64 =
## Check whether range is fully contained
ctx.lhc.unprocessed.covered(minPt, maxPt)
proc unprocCovered*(ctx: FlareCtxRef; pt: BlockNumber): bool =
proc headersUnprocCovered*(ctx: FlareCtxRef; pt: BlockNumber): bool =
## Check whether point is contained
ctx.lhc.unprocessed.covered(pt, pt) == 1
proc unprocClear*(ctx: FlareCtxRef) =
ctx.lhc.unprocessed.clear()
proc unprocTop*(ctx: FlareCtxRef): BlockNumber =
proc headersUnprocTop*(ctx: FlareCtxRef): BlockNumber =
let iv = ctx.lhc.unprocessed.le().valueOr:
return BlockNumber(0)
iv.maxPt
proc unprocTotal*(ctx: FlareCtxRef): uint64 =
proc headersUnprocTotal*(ctx: FlareCtxRef): uint64 =
ctx.lhc.unprocessed.total()
proc unprocChunks*(ctx: FlareCtxRef): int =
proc headersUnprocBorrowed*(ctx: FlareCtxRef): uint64 =
ctx.lhc.borrowed
proc headersUnprocChunks*(ctx: FlareCtxRef): int =
ctx.lhc.unprocessed.chunks()
proc headersUnprocIsEmpty*(ctx: FlareCtxRef): bool =
ctx.lhc.unprocessed.chunks() == 0
# ------------
proc unprocInit*(ctx: FlareCtxRef) =
proc headersUnprocInit*(ctx: FlareCtxRef) =
## Constructor
ctx.lhc.unprocessed = BnRangeSet.init()
proc headersUnprocSet*(ctx: FlareCtxRef) =
## Clear
ctx.lhc.unprocessed.clear()
ctx.lhc.borrowed = 0u
proc headersUnprocSet*(ctx: FlareCtxRef; iv: BnRange) =
## Set up new unprocessed range
ctx.headersUnprocSet()
discard ctx.lhc.unprocessed.merge(iv)
proc headersUnprocSet*(ctx: FlareCtxRef; minPt, maxPt: BlockNumber) =
## Set up new unprocessed range
ctx.headersUnprocSet()
discard ctx.lhc.unprocessed.merge(minPt, maxPt)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -1,379 +0,0 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises:[].}
import
std/strutils,
pkg/[chronicles, chronos],
pkg/eth/[common, p2p],
pkg/stew/[interval_set, sorted_set],
../../../common,
../worker_desc,
./staged/[headers, linked_hchain],
./unproc
logScope:
topics = "flare staged"
const
extraTraceMessages = false # or true
## Enabled additional logging noise
verifyDataStructureOk = false or true
## Debugging mode
# ------------------------------------------------------------------------------
# Private debugging helpers
# ------------------------------------------------------------------------------
when verifyDataStructureOk:
proc verifyStagedQueue(
ctx: FlareCtxRef;
info: static[string];
multiMode = true;
) =
## Verify stated queue, check that recorded ranges are no unprocessed,
## and return the total sise if headers covered.
##
# Walk queue items
let walk = LinkedHChainQueueWalk.init(ctx.lhc.staged)
defer: walk.destroy()
var
stTotal = 0u
rc = walk.first()
prv = BlockNumber(0)
while rc.isOk:
let
key = rc.value.key
nHeaders = rc.value.data.revHdrs.len.uint
minPt = key - nHeaders + 1
unproc = ctx.unprocCovered(minPt, key)
if 0 < unproc:
raiseAssert info & ": unprocessed staged chain " &
key.bnStr & " overlap=" & $unproc
if minPt <= prv:
raiseAssert info & ": overlapping staged chain " &
key.bnStr & " prvKey=" & prv.bnStr & " overlap=" & $(prv - minPt + 1)
stTotal += nHeaders
prv = key
rc = walk.next()
# Check `staged[] <= L`
if ctx.layout.least <= prv:
raiseAssert info & ": staged top mismatch " &
" L=" & $ctx.layout.least.bnStr & " stagedTop=" & prv.bnStr
# Check `unprocessed{} <= L`
let uTop = ctx.unprocTop()
if ctx.layout.least <= uTop:
raiseAssert info & ": unproc top mismatch " &
" L=" & $ctx.layout.least.bnStr & " unprocTop=" & uTop.bnStr
# Check `staged[] + unprocessed{} == (B,L)`
if not multiMode:
let
uTotal = ctx.unprocTotal()
both = stTotal + uTotal
unfilled = if ctx.layout.least <= ctx.layout.base + 1: 0u
else: ctx.layout.least - ctx.layout.base - 1
when extraTraceMessages:
trace info & ": verify staged", stTotal, uTotal, both, unfilled
if both != unfilled:
raiseAssert info & ": staged/unproc mismatch " &
" staged=" & $stTotal & " unproc=" & $uTotal & " exp-sum=" & $unfilled
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc fetchAndCheck(
buddy: FlareBuddyRef;
ivReq: BnRange;
lhc: ref LinkedHChain; # update in place
info: static[string];
): Future[bool] {.async.} =
## Collect single header chain from the peer and stash it on the `staged`
## queue. Returns the length of the stashed chain of headers.
##
# Fetch headers for this range of block numbers
let revHeaders = block:
let
rc = await buddy.headersFetchReversed(ivReq, lhc.parentHash, info)
if rc.isOk:
rc.value
else:
when extraTraceMessages:
trace info & ": fetch headers failed", peer=buddy.peer, ivReq
if buddy.ctrl.running:
# Suspend peer for a while
buddy.ctrl.zombie = true
return false
# While assembling a `LinkedHChainRef`, verify that the `revHeaders` list
# was sound, i.e. contiguous, linked, etc.
if not revHeaders.extendLinkedHChain(buddy, ivReq.maxPt, lhc, info):
when extraTraceMessages:
trace info & ": fetched headers unusable", peer=buddy.peer, ivReq
return false
return true
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc stagedCollect*(
buddy: FlareBuddyRef;
info: static[string];
): Future[bool] {.async.} =
## Collect a batch of chained headers totalling to at most `nHeaders`
## headers. Fetch the headers from the the peer and stash it blockwise on
## the `staged` queue. The function returns `true` it stashed a header
## chains record on `staged` queue.
##
## This function segments the `nHeaders` length into smaller pieces of at
## most `nFetchHeadersRequest` chunks ans fetch these chunks from the
## network. Where possible, hashes are used to address the headers to be
## fetched. Otherwise the system works opportunistically using block
## numbers for fetching, stashing them away to be verified later when
## appropriate.
##
let
ctx = buddy.ctx
peer = buddy.peer
uTop = ctx.unprocTop()
if uTop == 0:
# Nothing to do
return false
let
# Check for top header hash. If the range to fetch directly joins below
# the top level linked chain `L..F`, then there is the hash available for
# the top level header to fetch. Otherwise -- with multi-peer mode -- the
# range of headers is fetched opportunistically using block numbers only.
isOpportunistic = uTop + 1 != ctx.layout.least
# Parent hash for `lhc` below
topLink = (if isOpportunistic: EMPTY_ROOT_HASH else: ctx.layout.leastParent)
# Get the total batch size
nFetchHeaders = if isOpportunistic: nFetchHeadersOpportunisticly
else: nFetchHeadersByTopHash
# Number of headers to fetch. Take as much as possible if there are many
# more to fetch. Otherwise split the remaining part so that there is room
# for opportuninstically fetching headers by other many peers.
t2 = ctx.unprocTotal div 2
nHeaders = if nFetchHeaders.uint < t2: nFetchHeaders.uint
elif t2 < nFetchHeadersRequest: nFetchHeaders.uint
else: t2
# Reserve the full range of block numbers so they can be appended in a row.
# This avoid some fragmentation when header chains are stashed by multiple
# peers, i.e. they interleave peer-wise.
iv = ctx.unprocFetch(nHeaders).expect "valid interval"
var
# This value is used for splitting the interval `iv` into
# `[iv.minPt, somePt] + [somePt+1, ivTop] + already-collected` where the
# middle interval `[somePt+1, ivTop]` will be fetched from the network.
ivTop = iv.maxPt
# This record will accumulate the fetched headers. It must be on the heap
# so that `async` can capture that properly.
lhc = (ref LinkedHChain)(parentHash: topLink)
while true:
# Extract a top range interval and fetch/stage it
let
ivReqMin = if ivTop + 1 <= iv.minPt + nFetchHeadersRequest: iv.minPt
else: ivTop - nFetchHeadersRequest + 1
# Request interval
ivReq = BnRange.new(ivReqMin, ivTop)
# Current length of the headers queue. This is used to calculate the
# response length from the network.
nLhcHeaders = lhc.revHdrs.len
# Fetch and extend chain record
if not await buddy.fetchAndCheck(ivReq, lhc, info):
# Throw away opportunistic data
if isOpportunistic or nLhcHeaders == 0:
when extraTraceMessages:
trace info & ": completely failed", peer, iv, ivReq, isOpportunistic
ctx.unprocMerge(iv)
return false
# It is deterministic. So safe downloaded data so far. Turn back
# unused data.
when extraTraceMessages:
trace info & ": partially failed", peer, iv, ivReq,
unused=BnRange.new(iv.minPt,ivTop), isOpportunistic
ctx.unprocMerge(iv.minPt, ivTop)
break
# Update remaining interval
let ivRespLen = lhc.revHdrs.len - nLhcHeaders
if ivTop <= iv.minPt + ivRespLen.uint or buddy.ctrl.stopped:
break
let newIvTop = ivTop - ivRespLen.uint # will mostly be `ivReq.minPt-1`
when extraTraceMessages:
trace info & ": collected range", peer, iv=BnRange.new(iv.minPt, ivTop),
ivReq, ivResp=BnRange.new(newIvTop+1, ivReq.maxPt), ivRespLen,
isOpportunistic
ivTop = newIvTop
# Store `lhcOpt` chain on the `staged` queue
let qItem = ctx.lhc.staged.insert(iv.maxPt).valueOr:
raiseAssert info & ": duplicate key on staged queue iv=" & $iv
qItem.data = lhc[]
when extraTraceMessages:
trace info & ": stashed on staged queue", peer,
iv=BnRange.new(iv.maxPt - lhc.headers.len.uint + 1, iv.maxPt),
nHeaders=lhc.headers.len, isOpportunistic, ctrl=buddy.ctrl.state
else:
trace info & ": stashed on staged queue", peer,
topBlock=iv.maxPt.bnStr, nHeaders=lhc.revHdrs.len,
isOpportunistic, ctrl=buddy.ctrl.state
return true
proc stagedProcess*(ctx: FlareCtxRef; info: static[string]): int =
## Store/insert stashed chains from the `staged` queue into the linked
## chains layout and the persistent tables. The function returns the number
## of records processed and saved.
while true:
# Fetch largest block
let qItem = ctx.lhc.staged.le(high BlockNumber).valueOr:
break # all done
let
least = ctx.layout.least # `L` from `README.md` (1) or `worker_desc`
iv = BnRange.new(qItem.key - qItem.data.revHdrs.len.uint + 1, qItem.key)
if iv.maxPt+1 < least:
when extraTraceMessages:
trace info & ": there is a gap", iv, L=least.bnStr, nSaved=result
break # there is a gap -- come back later
# Overlap must not happen
if iv.maxPt+1 != least:
raiseAssert info & ": Overlap iv=" & $iv & " L=" & least.bnStr
# Process item from `staged` queue. So it is not needed in the list,
# anymore.
discard ctx.lhc.staged.delete(iv.maxPt)
if qItem.data.hash != ctx.layout.leastParent:
# Discard wrong chain.
#
# FIXME: Does it make sense to keep the `buddy` with the `qItem` chains
# list object for marking the buddy a `zombie`?
#
ctx.unprocMerge(iv)
when extraTraceMessages:
trace info & ": discarding staged record", iv, L=least.bnStr, lap=result
break
# Store headers on database
ctx.dbStashHeaders(iv.minPt, qItem.data.revHdrs)
ctx.layout.least = iv.minPt
ctx.layout.leastParent = qItem.data.parentHash
let ok = ctx.dbStoreLinkedHChainsLayout()
result.inc # count records
when extraTraceMessages:
trace info & ": staged record saved", iv, layout=ok, nSaved=result
when not extraTraceMessages:
trace info & ": staged records saved",
nStaged=ctx.lhc.staged.len, nSaved=result
if stagedQueueLengthLwm < ctx.lhc.staged.len:
when extraTraceMessages:
trace info & ": staged queue too large => reorg",
nStaged=ctx.lhc.staged.len, max=stagedQueueLengthLwm
ctx.poolMode = true
proc stagedReorg*(ctx: FlareCtxRef; info: static[string]) =
## Some pool mode intervention.
if ctx.lhc.staged.len == 0 and
ctx.unprocChunks() == 0:
# Nothing to do
when extraTraceMessages:
trace info & ": nothing to do"
return
# Update counter
ctx.pool.nReorg.inc
# Randomise the invocation order of the next few `runMulti()` calls by
# asking an oracle whether to run now or later.
#
# With a multi peer approach, there might be a slow peer invoked first
# that is handling the top range and blocking the rest. That causes the
# the staged queue to fill up unnecessarily. Then pool mode is called which
# ends up here. Returning to multi peer mode, the same invocation order
# might be called as before.
ctx.setCoinTosser()
when extraTraceMessages:
trace info & ": coin tosser", nCoins=ctx.pool.tossUp.nCoins,
coins=(ctx.pool.tossUp.coins.toHex), nLeft=ctx.pool.tossUp.nLeft
if stagedQueueLengthHwm < ctx.lhc.staged.len:
trace info & ": hwm reached, flushing staged queue",
nStaged=ctx.lhc.staged.len, max=stagedQueueLengthLwm
# Flush `staged` queue into `unproc` so that it can be fetched anew
block:
let walk = LinkedHChainQueueWalk.init(ctx.lhc.staged)
defer: walk.destroy()
var rc = walk.first
while rc.isOk:
let (key, nHeaders) = (rc.value.key, rc.value.data.revHdrs.len.uint)
ctx.unprocMerge(key - nHeaders + 1, key)
rc = walk.next
# Reset `staged` queue
ctx.lhc.staged.clear()
when verifyDataStructureOk:
ctx.verifyStagedQueue(info, multiMode = false)
when extraTraceMessages:
trace info & ": reorg done"
proc stagedTop*(ctx: FlareCtxRef): BlockNumber =
## Retrieve to staged block number
let qItem = ctx.lhc.staged.le(high BlockNumber).valueOr:
return BlockNumber(0)
qItem.key
proc stagedChunks*(ctx: FlareCtxRef): int =
## Number of staged records
ctx.lhc.staged.len
# ----------------
proc stagedInit*(ctx: FlareCtxRef) =
## Constructor
ctx.lhc.staged = LinkedHChainQueue.init()
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -1,143 +0,0 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises:[].}
import
pkg/eth/[common, p2p, rlp],
pkg/stew/byteutils,
../../../../common,
../../worker_desc
const
extraTraceMessages = false # or true
## Enabled additional logging noise
verifyDataStructureOk = false # or true
## Debugging mode
when extraTraceMessages:
import
pkg/chronicles,
stew/interval_set
logScope:
topics = "flare staged"
# ------------------------------------------------------------------------------
# Private debugging & logging helpers
# ------------------------------------------------------------------------------
proc `$`(w: Hash256): string =
w.data.toHex
formatIt(Hash256):
$it
when verifyDataStructureOk:
proc verifyHeaderChainItem(lhc: ref LinkedHChain; info: static[string]) =
when extraTraceMessages:
trace info & ": verifying", nLhc=lhc.revHdrs.len
var
topHdr, childHdr: BlockHeader
try:
doAssert lhc.revHdrs[0].keccakHash == lhc.hash
topHdr = rlp.decode(lhc.revHdrs[0], BlockHeader)
childHdr = topHdr
for n in 1 ..< lhc.revHdrs.len:
let header = rlp.decode(lhc.revHdrs[n], BlockHeader)
doAssert childHdr.number == header.number + 1
doAssert lhc.revHdrs[n].keccakHash == childHdr.parentHash
childHdr = header
doAssert childHdr.parentHash == lhc.parentHash
except RlpError as e:
raiseAssert "verifyHeaderChainItem oops(" & $e.name & ") msg=" & e.msg
when extraTraceMessages:
trace info & ": verify ok",
iv=BnRange.new(childHdr.number,topHdr.number), nLhc=lhc.revHdrs.len
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc extendLinkedHChain*(
rev: seq[BlockHeader];
buddy: FlareBuddyRef;
topNumber: BlockNumber;
lhc: ref LinkedHChain; # update in place
info: static[string];
): bool =
## Returns sort of `lhc[] += rev[]` where `lhc[]` is updated in place.
when extraTraceMessages:
let peer = buddy.peer
# Verify top block number
assert 0 < rev.len # debugging only
if rev[0].number != topNumber:
when extraTraceMessages:
trace info & ": top block number mismatch", peer, n=0,
number=rev[0].number.bnStr, expected=topNumber.bnStr
return false
# Make space for return code array
let offset = lhc.revHdrs.len
lhc.revHdrs.setLen(offset + rev.len)
# Set up header with largest block number
let
blob0 = rlp.encode(rev[0])
hash0 = blob0.keccakHash
lhc.revHdrs[offset] = blob0
if offset == 0:
lhc.hash = hash0
# Verify top block hash (if any)
if lhc.parentHash != EMPTY_ROOT_HASH and hash0 != lhc.parentHash:
when extraTraceMessages:
trace info & ": top hash mismatch", peer, hash0, expected=lhc.parentHash
lhc.revHdrs.setLen(offset)
return false
# Encode block headers and make sure they are chained
for n in 1 ..< rev.len:
if rev[n].number + 1 != rev[n-1].number:
when extraTraceMessages:
trace info & ": #numbers mismatch", peer, n,
parentNumber=rev[n-1].number.bnStr, number=rev[n].number.bnStr
lhc.revHdrs.setLen(offset)
return false
lhc.revHdrs[offset + n] = rlp.encode(rev[n])
let hashN = lhc.revHdrs[offset + n].keccakHash
if rev[n-1].parentHash != hashN:
when extraTraceMessages:
trace info & ": hash mismatch", peer, n,
parentHash=rev[n-1].parentHash, hashN
lhc.revHdrs.setLen(offset)
return false
# Finalise
lhc.parentHash = rev[rev.len-1].parentHash
when extraTraceMessages:
trace info & " extended chain record", peer, topNumber=topNumber.bnStr,
offset, nLhc=lhc.revHdrs.len
when verifyDataStructureOk:
lhc.verifyHeaderChainItem info
true
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -11,22 +11,14 @@
{.push raises:[].}
import
pkg/bearssl/rand,
pkg/chronicles,
pkg/eth/[common, p2p],
../../protocol,
../worker_desc,
"."/[staged, unproc]
"."/[blocks_staged, blocks_unproc, db, headers_staged, headers_unproc]
when enableTicker:
import ./start_stop/ticker
logScope:
topics = "flare start/stop"
const extraTraceMessages = false or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
@ -36,30 +28,33 @@ when enableTicker:
## Legacy stuff, will be probably be superseded by `metrics`
result = proc: auto =
TickerFlareStats(
base: ctx.layout.base,
least: ctx.layout.least,
final: ctx.layout.final,
beacon: ctx.lhc.beacon.header.number,
nStaged: ctx.stagedChunks(),
stagedTop: ctx.stagedTop(),
unprocTop: ctx.unprocTop(),
nUnprocessed: ctx.unprocTotal(),
nUnprocFragm: ctx.unprocChunks(),
reorg: ctx.pool.nReorg)
stateTop: ctx.dbStateBlockNumber(),
base: ctx.layout.base,
least: ctx.layout.least,
final: ctx.layout.final,
beacon: ctx.lhc.beacon.header.number,
proc updateBeaconHeaderCB(ctx: FlareCtxRef): SyncReqNewHeadCB =
nHdrStaged: ctx.headersStagedQueueLen(),
hdrStagedTop: ctx.headersStagedTopKey(),
hdrUnprocTop: ctx.headersUnprocTop(),
nHdrUnprocessed: ctx.headersUnprocTotal() + ctx.headersUnprocBorrowed(),
nHdrUnprocFragm: ctx.headersUnprocChunks(),
nBlkStaged: ctx.blocksStagedQueueLen(),
blkStagedBottom: ctx.blocksStagedBottomKey(),
blkUnprocTop: ctx.blk.topRequest,
nBlkUnprocessed: ctx.blocksUnprocTotal() + ctx.blocksUnprocBorrowed(),
nBlkUnprocFragm: ctx.blocksUnprocChunks(),
reorg: ctx.pool.nReorg)
proc updateBeaconHeaderCB(ctx: FlareCtxRef): SyncFinalisedBlockHashCB =
## Update beacon header. This function is intended as a call back function
## for the RPC module.
when extraTraceMessages:
var count = 0
result = proc(h: BlockHeader) {.gcsafe, raises: [].} =
if ctx.lhc.beacon.header.number < h.number:
when extraTraceMessages:
if count mod 77 == 0: # reduce some noise
trace "updateBeaconHeaderCB", blockNumber=("#" & $h.number), count
count.inc
ctx.lhc.beacon.header = h
ctx.lhc.beacon.changed = true
return proc(h: Hash256) {.gcsafe, raises: [].} =
# Rpc checks empty header against `Hash256()` rather than `EMPTY_ROOT_HASH`
if ctx.lhc.beacon.finalised == ZERO_HASH256:
ctx.lhc.beacon.finalised = h
# ------------------------------------------------------------------------------
# Public functions
@ -81,13 +76,45 @@ else:
# ---------
proc setupDatabase*(ctx: FlareCtxRef) =
## Initalise database related stuff
# Initialise up queues and lists
ctx.headersStagedInit()
ctx.blocksStagedInit()
ctx.headersUnprocInit()
ctx.blocksUnprocInit()
# Initalise for `persistBlocks()`. Note that the `ctx.chain` is of
# type `ForkedChainRef` while `ctx.pool.chain` is a `ChainRef`
ctx.pool.chain = ctx.chain.com.newChain()
# Load initial state from database if there is any
ctx.dbLoadLinkedHChainsLayout()
# Set blocks batch import value for `persistBlocks()`
if ctx.pool.nBodiesBatch < nFetchBodiesRequest:
if ctx.pool.nBodiesBatch == 0:
ctx.pool.nBodiesBatch = nFetchBodiesBatchDefault
else:
ctx.pool.nBodiesBatch = nFetchBodiesRequest
# Set length of `staged` queue
if ctx.pool.nBodiesBatch < nFetchBodiesBatchDefault:
const nBlocks = blocksStagedQueueLenMaxDefault * nFetchBodiesBatchDefault
ctx.pool.blocksStagedQuLenMax =
(nBlocks + ctx.pool.nBodiesBatch - 1) div ctx.pool.nBodiesBatch
else:
ctx.pool.blocksStagedQuLenMax = blocksStagedQueueLenMaxDefault
proc setupRpcMagic*(ctx: FlareCtxRef) =
## Helper for `setup()`: Enable external pivot update via RPC
ctx.chain.com.syncReqNewHead = ctx.updateBeaconHeaderCB
ctx.chain.com.syncFinalisedBlockHash = ctx.updateBeaconHeaderCB
proc destroyRpcMagic*(ctx: FlareCtxRef) =
## Helper for `release()`
ctx.chain.com.syncReqNewHead = SyncReqNewHeadCB(nil)
ctx.chain.com.syncFinalisedBlockHash = SyncFinalisedBlockHashCB(nil)
# ---------
@ -107,32 +134,6 @@ proc stopBuddy*(buddy: FlareBuddyRef) =
when enableTicker:
buddy.ctx.pool.ticker.stopBuddy()
# ---------
proc flipCoin*(ctx: FlareCtxRef): bool =
## This function is intended to randomise recurrent buddy processes. Each
## participant fetches a vote via `getVote()` and continues on a positive
## vote only. The scheduler will then re-queue the participant.
##
if ctx.pool.tossUp.nCoins == 0:
result = true
else:
if ctx.pool.tossUp.nLeft == 0:
ctx.pool.rng[].generate(ctx.pool.tossUp.coins)
ctx.pool.tossUp.nLeft = 8 * sizeof(ctx.pool.tossUp.coins)
ctx.pool.tossUp.nCoins.dec
ctx.pool.tossUp.nLeft.dec
result = bool(ctx.pool.tossUp.coins and 1)
ctx.pool.tossUp.coins = ctx.pool.tossUp.coins shr 1
proc setCoinTosser*(ctx: FlareCtxRef; nCoins = 8u) =
## Create a new sequence of `nCoins` oracles.
ctx.pool.tossUp.nCoins = nCoins
proc resCoinTosser*(ctx: FlareCtxRef) =
## Set up all oracles to be `true`
ctx.pool.tossUp.nCoins = 0
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -12,7 +12,7 @@
{.push raises: [].}
import
std/[strformat, strutils],
std/strutils,
pkg/[chronos, chronicles, eth/common, stint],
../../../../utils/prettify,
../../../types
@ -26,15 +26,24 @@ type
TickerFlareStats* = object
## Full sync state (see `TickerFullStatsUpdater`)
stateTop*: BlockNumber
base*: BlockNumber
least*: BlockNumber
final*: BlockNumber
beacon*: BlockNumber
unprocTop*: BlockNumber
nUnprocessed*: uint64
nUnprocFragm*: int
nStaged*: int
stagedTop*: BlockNumber
hdrUnprocTop*: BlockNumber
nHdrUnprocessed*: uint64
nHdrUnprocFragm*: int
nHdrStaged*: int
hdrStagedTop*: BlockNumber
blkUnprocTop*: BlockNumber
nBlkUnprocessed*: uint64
nBlkUnprocFragm*: int
nBlkStaged*: int
blkStagedBottom*: BlockNumber
reorg*: int
TickerRef* = ref object
@ -47,36 +56,14 @@ type
lastStats: TickerFlareStats
const
extraTraceMessages = false or true
## Enabled additional logging noise
tickerStartDelay = chronos.milliseconds(100)
tickerLogInterval = chronos.seconds(1)
tickerLogSuppressMax = chronos.seconds(100)
logTxt0 = "Flare ticker"
# ------------------------------------------------------------------------------
# Private functions: pretty printing
# ------------------------------------------------------------------------------
proc pc99(val: float): string =
if 0.99 <= val and val < 1.0: "99%"
elif 0.0 < val and val <= 0.01: "1%"
else: val.toPC(0)
proc toStr(a: Opt[int]): string =
if a.isNone: "n/a"
else: $a.unsafeGet
# ------------------------------------------------------------------------------
# Private functions: printing ticker messages
# ------------------------------------------------------------------------------
when extraTraceMessages:
template logTxt(info: static[string]): static[string] =
logTxt0 & " " & info
proc flareTicker(t: TickerRef) {.gcsafe.} =
let
data = t.flareCb()
@ -85,15 +72,24 @@ proc flareTicker(t: TickerRef) {.gcsafe.} =
if data != t.lastStats or
tickerLogSuppressMax < (now - t.visited):
let
T = data.stateTop.toStr
B = data.base.toStr
L = data.least.toStr
F = data.final.toStr
Z = data.beacon.toStr
staged = if data.nStaged == 0: "n/a"
else: data.stagedTop.toStr & "(" & $data.nStaged & ")"
unproc = if data.nUnprocFragm == 0: "n/a"
else: data.unprocTop.toStr & "(" &
data.nUnprocessed.toSI & "," & $data.nUnprocFragm & ")"
hS = if data.nHdrStaged == 0: "n/a"
else: data.hdrStagedTop.toStr & "(" & $data.nHdrStaged & ")"
hU = if data.nHdrUnprocFragm == 0: "n/a"
else: data.hdrUnprocTop.toStr & "(" &
data.nHdrUnprocessed.toSI & "," & $data.nHdrUnprocFragm & ")"
bS = if data.nBlkStaged == 0: "n/a"
else: data.blkStagedBottom.toStr & "(" & $data.nBlkStaged & ")"
bU = if data.nBlkUnprocFragm == 0: "n/a"
else: data.blkUnprocTop.toStr & "(" &
data.nBlkUnprocessed.toSI & "," & $data.nBlkUnprocFragm & ")"
reorg = data.reorg
peers = t.nBuddies
@ -104,7 +100,7 @@ proc flareTicker(t: TickerRef) {.gcsafe.} =
t.lastStats = data
t.visited = now
info logTxt0, up, peers, B, L, F, Z, staged, unproc, reorg, mem
info "State", up, peers, T, B, L, F, Z, hS, hU, bS, bU, reorg, mem
# ------------------------------------------------------------------------------
# Private functions: ticking log messages
@ -118,8 +114,7 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} =
proc setLogTicker(t: TickerRef; at: Moment) =
if t.flareCb.isNil:
when extraTraceMessages:
debug logTxt "was stopped", nBuddies=t.nBuddies
debug "Stopped", nBuddies=t.nBuddies
else:
# Store the `runLogTicker()` in a closure to avoid some garbage collection
# memory corruption issues that might occur otherwise.
@ -153,8 +148,7 @@ proc startBuddy*(t: TickerRef) =
t.nBuddies = 1
else:
t.nBuddies.inc
when extraTraceMessages:
debug logTxt "start buddy", nBuddies=t.nBuddies
debug "Start buddy", nBuddies=t.nBuddies
proc stopBuddy*(t: TickerRef) =
## Decrement buddies counter and stop ticker if there are no more registered
@ -162,8 +156,7 @@ proc stopBuddy*(t: TickerRef) =
if not t.isNil:
if 0 < t.nBuddies:
t.nBuddies.dec
when extraTraceMessages:
debug logTxt "stop buddy", nBuddies=t.nBuddies
debug "Stop buddy", nBuddies=t.nBuddies
# ------------------------------------------------------------------------------
# End

View File

@ -14,17 +14,13 @@ import
pkg/[chronicles, chronos],
pkg/eth/[common, rlp],
pkg/stew/sorted_set,
../../sync_desc,
../worker_desc,
./update/metrics,
"."/[db, unproc]
"."/[blocks_unproc, db, headers_staged, headers_unproc]
logScope:
topics = "flare update"
const extraTraceMessages = false # or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
@ -49,7 +45,7 @@ proc updateBeaconChange(ctx: FlareCtxRef): bool =
## ::
## G B==L L'==F'
## o----------------o---------------------o---->
## | <-- linked --> |
## | <-- linked --> | <-- unprocessed --> |
##
const info = "updateBeaconChange"
@ -57,15 +53,12 @@ proc updateBeaconChange(ctx: FlareCtxRef): bool =
# Need: `F < Z` and `B == L`
if z != 0 and z <= ctx.layout.final: # violates `F < Z`
when extraTraceMessages:
trace info & ": not applicable",
Z=("#" & $z), F=("#" & $ctx.layout.final)
trace info & ": not applicable", Z=z.bnStr, F=ctx.layout.final.bnStr
return false
if ctx.layout.base != ctx.layout.least: # violates `B == L`
when extraTraceMessages:
trace info & ": not applicable",
B=("#" & $ctx.layout.base), L=("#" & $ctx.layout.least)
trace info & ": not applicable",
B=ctx.layout.base.bnStr, L=ctx.layout.least.bnStr
return false
# Check consistency: `B == L <= F` for maximal `B` => `L == F`
@ -89,10 +82,12 @@ proc updateBeaconChange(ctx: FlareCtxRef): bool =
discard ctx.dbStoreLinkedHChainsLayout()
# Update range
ctx.unprocMerge(ctx.layout.base+1, ctx.layout.least-1)
doAssert ctx.headersUnprocTotal() == 0
doAssert ctx.headersUnprocBorrowed() == 0
doAssert ctx.headersStagedQueueIsEmpty()
ctx.headersUnprocSet(ctx.layout.base+1, ctx.layout.least-1)
when extraTraceMessages:
trace info & ": updated"
trace info & ": updated"
true
@ -110,20 +105,25 @@ proc mergeAdjacentChains(ctx: FlareCtxRef): bool =
if ctx.lhc.layout.baseHash != ctx.lhc.layout.leastParent:
# FIXME: Oops -- any better idea than to defect?
raiseAssert info & ": hashes do not match" &
" B=#" & $ctx.lhc.layout.base & " L=#" & $ctx.lhc.layout.least
" B=" & ctx.lhc.layout.base.bnStr & " L=" & $ctx.lhc.layout.least.bnStr
trace info & ": merging", B=ctx.lhc.layout.base.bnStr,
L=ctx.lhc.layout.least.bnStr
# Merge adjacent linked chains
ctx.lhc.layout = LinkedHChainsLayout(
base: ctx.layout.final,
base: ctx.layout.final, # `B`
baseHash: ctx.layout.finalHash,
least: ctx.layout.final,
least: ctx.layout.final, # `L`
leastParent: ctx.dbPeekParentHash(ctx.layout.final).expect "Hash256",
final: ctx.layout.final,
final: ctx.layout.final, # `F`
finalHash: ctx.layout.finalHash)
# Save state
discard ctx.dbStoreLinkedHChainsLayout()
true
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
@ -141,8 +141,31 @@ proc updateLinkedHChainsLayout*(ctx: FlareCtxRef): bool =
result = true
proc updateBlockRequests*(ctx: FlareCtxRef): bool =
## Update block requests if there staged block queue is empty
const info = "updateBlockRequests"
let t = ctx.dbStateBlockNumber()
if t < ctx.layout.base: # so the half open interval `(T,B]` is not empty
# One can fill/import/execute blocks by number from `(T,B]`
if ctx.blk.topRequest < ctx.layout.base:
# So there is some space
trace info & ": extending", T=t.bnStr, topReq=ctx.blk.topRequest.bnStr,
B=ctx.layout.base.bnStr
ctx.blocksUnprocCommit(0, max(t,ctx.blk.topRequest) + 1, ctx.layout.base)
ctx.blk.topRequest = ctx.layout.base
return true
false
proc updateMetrics*(ctx: FlareCtxRef) =
ctx.updateMetricsImpl()
let now = Moment.now()
if ctx.pool.nextUpdate < now:
ctx.updateMetricsImpl()
ctx.pool.nextUpdate = now + metricsUpdateInterval
# ------------------------------------------------------------------------------
# End

View File

@ -12,43 +12,56 @@
import
pkg/metrics,
../../worker_desc
../../worker_desc,
".."/[db, blocks_staged, headers_staged]
declareGauge flare_beacon_block_number, "" &
"Block number for latest finalised header"
"Block number of latest known finalised header"
declareGauge flare_era1_max_block_number, "" &
"Max block number for era1 blocks"
declareGauge flare_state_block_number, "" &
"Max block number of imported/executed blocks"
declareGauge flare_base_block_number, "" &
"Max block number inital header chain starting at genesis"
declareGauge flare_max_trusted_block_number, "" &
"Max block number for trusted headers chain starting at genesis"
declareGauge flare_least_block_number, "" &
"Starting/min block number for higher up headers chain"
declareGauge flare_least_verified_block_number, "" &
"Starting block number for verified higher up headers chain"
declareGauge flare_final_block_number, "" &
"Ending/max block number of higher up headers chain"
declareGauge flare_top_verified_block_number, "" &
"Top block number for verified higher up headers chain"
declareGauge flare_headers_staged_queue_len, "" &
"Number of header list records staged for serialised processing"
declareGauge flare_staged_headers_queue_size, "" &
"Number of isolated verified header chains, gaps to be filled"
declareGauge flare_headers_unprocessed, "" &
"Number of block numbers ready to fetch and stage headers"
declareGauge flare_blocks_staged_queue_len, "" &
"Number of block list records staged for importing"
declareGauge flare_blocks_unprocessed, "" &
"Number of block numbers ready to fetch and stage block data"
declareGauge flare_number_of_buddies, "" &
"Number of current worker instances"
"Number of currently active worker instances"
declareCounter flare_serial, "" &
"Serial counter for debugging"
template updateMetricsImpl*(ctx: FlareCtxRef) =
let now = Moment.now()
if ctx.pool.nextUpdate < now:
metrics.set(flare_era1_max_block_number, ctx.pool.e1AvailMax.int64)
metrics.set(flare_max_trusted_block_number, ctx.layout.base.int64)
metrics.set(flare_least_verified_block_number, ctx.layout.least.int64)
metrics.set(flare_top_verified_block_number, ctx.layout.final.int64)
metrics.set(flare_beacon_block_number, ctx.lhc.beacon.header.number.int64)
metrics.set(flare_staged_headers_queue_size, ctx.lhc.staged.len)
metrics.set(flare_number_of_buddies, ctx.pool.nBuddies)
flare_serial.inc(1)
ctx.pool.nextUpdate += metricsUpdateInterval
metrics.set(flare_beacon_block_number, ctx.lhc.beacon.header.number.int64)
metrics.set(flare_state_block_number, ctx.dbStateBlockNumber().int64)
metrics.set(flare_base_block_number, ctx.layout.base.int64)
metrics.set(flare_least_block_number, ctx.layout.least.int64)
metrics.set(flare_final_block_number, ctx.layout.final.int64)
metrics.set(flare_headers_staged_queue_len, ctx.headersStagedQueueLen())
metrics.set(flare_headers_unprocessed,
(ctx.headersUnprocTotal() + ctx.headersUnprocBorrowed()).int64)
metrics.set(flare_blocks_staged_queue_len, ctx.blocksStagedQueueLen())
metrics.set(flare_blocks_unprocessed,
(ctx.blocksUnprocTotal() + ctx.blocksUnprocBorrowed()).int64)
metrics.set(flare_number_of_buddies, ctx.pool.nBuddies)
# End

View File

@ -0,0 +1,138 @@
# Nimbus
# Copyright (c) 2021-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises:[].}
import
pkg/chronos
const
enableTicker* = true
## Log regular status updates similar to metrics. Great for debugging.
runsThisManyPeersOnly* = 8
## Set to `1` for running a single peer only at a time. Great for debugging.
##
## Otherwise, this setting limits the number of peers accepted by the
## `runStart()` peer initialiser. When testing with an unlimited number of
## peers with some double digit number of connected peers, the observed
## response times when fetching headers seemed to degrade considerable into
## seconds (rather than ms.) This will be further looked at to be confirmed
## or rejected as insignificant.
##
## FIXME: This setting has priority over the `maxPeers` setting of the
## `FlareSyncRef.init()` initaliser. This might be harmonised at
## a later stage.
# ----------------------
metricsUpdateInterval* = chronos.seconds(10)
## Wait at least this time before next update
daemonWaitInterval* = chronos.seconds(10)
## Some waiting time at the end of the daemon task which always lingers
## in the background.
workerIdleWaitInterval* = chronos.seconds(10)
## Sleep some time in multi-mode if there is nothing to do
asyncThreadSwitchTimeSlot* = chronos.nanoseconds(10)
## Nano-sleep to allows pseudo/async thread switch
# ----------------------
nFetchHeadersRequest* = 1_024
## Number of headers that will be requested with a single `eth/xx` message.
##
## On `Geth`, responses to larger requests are all truncted to 1024 header
## entries (see `Geth` constant `maxHeadersServe`.)
fetchHeadersReqThresholdZombie* = chronos.seconds(2)
fetchHeadersReqThresholdCount* = 3
## Response time allowance. If the response time for the set of headers
## exceeds this threshold for more than `fetchHeadersReqThresholdCount`
## times in a row, then this peer will be banned for a while.
fetchHeadersReqMinResponsePC* = 10
## Some peers only returned one header at a time. If these peers sit on a
## farm, they might collectively slow down the download process. So this
## constant sets a percentage of minimum headers needed to return so that
## the peers is not treated as a slow responder (see above for slow
## responder count.)
nFetchHeadersBatch* = 8 * nFetchHeadersRequest
## Length of the request/stage batch. Several headers are consecutively
## fetched and stashed together as a single record on the staged queue.
headersStagedQueueLengthLwm* = 32
## Limit the number of records in the staged headers queue.
##
## Queue entries start accumulating if one peer stalls while fetching the
## top chain so leaving a gap. This gap must be filled first before
## inserting the queue into a contiguous chain of headers.
##
## This low-water mark tryggers the system to do some **magic** to mitigate
## the above problem. Currently the **magic** is to let (pseudo) threads
## terminate and then restart all over again.
headersStagedQueueLengthHwm* = 48
## If this size is exceeded, the staged queue is flushed and resized to
## `headersStagedQueueLengthLwm-1` entries. Then contents is re-fetched
## from scratch.
# ----------------------
nFetchBodiesRequest* = 128
## Similar to `nFetchHeadersRequest`
fetchBodiesReqThresholdZombie* = chronos.seconds(2)
fetchBodiesReqThresholdCount* = 3
## Similar to `fetchHeadersReqThreshold*`
fetchBodiesReqMinResponsePC* = 10
## Similar to `fetchHeadersReqMinResponsePC`
nFetchBodiesBatchDefault* = 6 * nFetchBodiesRequest
## Similar to `nFetchHeadersBatch`
##
## This value can be overridden with a smaller value which must be at
## least `nFetchBodiesRequest`.
blocksStagedQueueLenMaxDefault* = 16
## Maximum number of staged header + bodies blocks records to be filled. If
## this size is reached, the process stops with staging with the exception
## of the lowest blockes (in case there is a gap.)
##
## This value might be adjusted with a larger value if
## `nFetchBodiesBatchDefault` is overridden with a smaller value.
##
## Some cursory measurements on `MainNet` suggest an average maximum block
## size ~25KiB (i.e. header + body) at block height ~4.5MiB. There will be
## as many as `nFetchBodiesBatch` blocks on a single staged blocks record.
## And there will be at most `blocksStagedQueueLengthMax+1` records on the
## staged blocks queue. (The `+1` is exceptional, appears when the least
## entry block number is too high and so leaves a gap to the ledger state
## block number.)
# ----------------------
static:
doAssert 0 < runsThisManyPeersOnly
doAssert 0 < nFetchHeadersRequest
doAssert nFetchHeadersRequest <= nFetchHeadersBatch
doAssert 0 < headersStagedQueueLengthLwm
doAssert headersStagedQueueLengthLwm < headersStagedQueueLengthHwm
doAssert 0 < nFetchBodiesRequest
doAssert nFetchBodiesRequest <= nFetchBodiesBatchDefault
doAssert 0 < blocksStagedQueueLenMaxDefault
# End

View File

@ -11,64 +11,13 @@
{.push raises:[].}
import
pkg/[bearssl/rand, chronos, chronos/timer],
pkg/chronos,
pkg/stew/[interval_set, sorted_set],
../../db/era1_db,
../sync_desc
../sync_desc,
./worker_config
export
sync_desc
const
enableTicker* = false or true
## Log regular status updates similar to metrics. Great for debugging.
metricsUpdateInterval* = chronos.seconds(10)
## Wait at least this time before next update
daemonWaitInterval* = chronos.seconds(30)
## Some waiting time at the end of the daemon task which always lingers
## in the background.
runMultiIdleWaitInterval* = chronos.seconds(30)
## Sllep some time in multi-mode if there is nothing to do
nFetchHeadersRequest* = 1_024
## Number of headers that will be requested with a single `eth/xx` message.
## Generously calculating a header with size 1k, fetching 1_024 headers
## would amount to a megabyte. As suggested in
## github.com/ethereum/devp2p/blob/master/caps/eth.md#blockheaders-0x04,
## the size of a message should not exceed 2 MiB.
##
## On live tests, responses to larger requests where all truncted to 1024
## header entries. It makes sense to not ask for more. So reserving
## smaller unprocessed slots that mostly all will be served leads to less
## fragmentation on a multi-peer downloading approach.
fetchHeaderReqZombieThreshold* = chronos.seconds(2)
## Response time allowance. If the response time for the set of headers
## exceeds this threshold, then this peer will be banned for a while.
nFetchHeadersOpportunisticly* = 8 * nFetchHeadersRequest
## Length of the request/stage batch. Several headers are consecutively
## fetched and stashed together as a single record on the staged queue.
## This is the size of an opportunistic run where the record stashed on
## the queue might be later discarded.
nFetchHeadersByTopHash* = 16 * nFetchHeadersRequest
## This entry is similar to `nFetchHeadersOpportunisticly` only that it
## will always be successfully merged into the database.
stagedQueueLengthLwm* = 24
## Limit the number of records in the staged queue. They start accumulating
## if one peer stalls while fetching the top chain so leaving a gap. This
## gap must be filled first before inserting the queue into a contiguous
## chain of headers. So this is a low-water mark where the system will
## try some magic to mitigate this problem.
stagedQueueLengthHwm* = 40
## If this size is exceeded, the staged queue is flushed and its contents
## is re-fetched from scratch.
sync_desc, worker_config
when enableTicker:
import ./worker/start_stop/ticker
@ -83,9 +32,6 @@ type
LinkedHChainQueue* = SortedSet[BlockNumber,LinkedHChain]
## Block intervals sorted by largest block number.
LinkedHChainQueueWalk* = SortedSetWalkRef[BlockNumber,LinkedHChain]
## Traversal descriptor
LinkedHChain* = object
## Public block items for the `LinkedHChainQueue` list, indexed by the
## largest block number. The list `revHdrs[]` is reversed, i.e. the largest
@ -96,6 +42,13 @@ type
revHdrs*: seq[Blob] ## Encoded linked header chain
parentHash*: Hash256 ## Parent hash of `headers[^1]`
StagedBlocksQueue* = SortedSet[BlockNumber,BlocksForImport]
## Blocks sorted by least block number.
BlocksForImport* = object
## Block request item sorted by least block number (i.e. from `blocks[0]`.)
blocks*: seq[EthBlock] ## List of blocks for import
# -------------------
LinkedHChainsLayout* = object
@ -125,45 +78,53 @@ type
BeaconHeader* = object
## Beacon state to be implicitely updated by RPC method
changed*: bool ## Set a marker if something has changed
header*: BlockHeader ## Running on beacon chain, last header
slow_start*: float ## Share of block number to use if positive
header*: BlockHeader ## Beacon chain, finalised header
finalised*: Hash256 ## From RPC, ghash of finalised header
LinkedHChainsSync* = object
## Sync state for linked header chains
beacon*: BeaconHeader ## See `Z` in README
unprocessed*: BnRangeSet ## Block or header ranges to fetch
borrowed*: uint64 ## Total of temp. fetched ranges
staged*: LinkedHChainQueue ## Blocks fetched but not stored yet
layout*: LinkedHChainsLayout ## Current header chains layout
lastLayout*: LinkedHChainsLayout ## Previous layout (for delta update)
BlocksImportSync* = object
## Sync state for blocks to import/execute
unprocessed*: BnRangeSet ## Blocks download requested
borrowed*: uint64 ## Total of temp. fetched ranges
topRequest*: BlockNumber ## Max requested block number
staged*: StagedBlocksQueue ## Blocks ready for import
# -------------------
FlareBuddyData* = object
## Local descriptor data extension
fetchBlocks*: BnRange
nHdrRespErrors*: int ## Number of errors/slow responses in a row
nBdyRespErrors*: int ## Ditto for bodies
FlareTossUp* = object
## Reminiscent of CSMA/CD. For the next number `nCoins` in a row, each
## participant can fetch a `true`/`false` value to decide whether to
## start doing something or delay.
nCoins*: uint ## Numner of coins to toss in a row
nLeft*: uint ## Number of flopped coins left
coins*: uint64 ## Sequence of fliopped coins
# Debugging and logging.
nMultiLoop*: int ## Number of runs
stoppedMultiRun*: chronos.Moment ## Time when run-multi stopped
multiRunIdle*: chronos.Duration ## Idle time between runs
FlareCtxData* = object
## Globally shared data extension
rng*: ref HmacDrbgContext ## Random generator, pre-initialised
nBuddies*: int ## Number of active workers
lhcSyncState*: LinkedHChainsSync ## Syncing by linked header chains
tossUp*: FlareTossUp ## Reminiscent of CSMA/CD
blkSyncState*: BlocksImportSync ## For importing/executing blocks
nextUpdate*: Moment ## For updating metrics
# Era1 related, disabled if `e1db` is `nil`
e1Dir*: string ## Pre-merge archive (if any)
e1db*: Era1DbRef ## Era1 db handle (if any)
e1AvailMax*: BlockNumber ## Last Era block applicable here
# Blocks import/execution settings for running `persistBlocks()` with
# `nBodiesBatch` blocks in each round (minimum value is
# `nFetchBodiesRequest`.)
chain*: ChainRef
importRunningOk*: bool ## Advisory lock, fetch vs. import
nBodiesBatch*: int ## Default `nFetchBodiesBatchDefault`
blocksStagedQuLenMax*: int ## Default `blocksStagedQueueLenMaxDefault`
# Info stuff, no functional contribution
nBuddies*: int ## Number of active workers (info only)
nReorg*: int ## Number of reorg invocations (info only)
# Debugging stuff
@ -176,11 +137,6 @@ type
FlareCtxRef* = CtxRef[FlareCtxData]
## Extended global descriptor
static:
doAssert 0 < nFetchHeadersRequest
doAssert nFetchHeadersRequest <= nFetchHeadersOpportunisticly
doAssert nFetchHeadersRequest <= nFetchHeadersByTopHash
# ------------------------------------------------------------------------------
# Public helpers
# ------------------------------------------------------------------------------
@ -189,6 +145,10 @@ func lhc*(ctx: FlareCtxRef): var LinkedHChainsSync =
## Shortcut
ctx.pool.lhcSyncState
func blk*(ctx: FlareCtxRef): var BlocksImportSync =
## Shortcut
ctx.pool.blkSyncState
func layout*(ctx: FlareCtxRef): var LinkedHChainsLayout =
## Shortcut
ctx.pool.lhcSyncState.layout
@ -207,6 +167,32 @@ proc `$`*(w: BnRange): string =
proc bnStr*(w: BlockNumber): string =
"#" & $w
# Source: `nimbus_import.shortLog()`
func toStr*(a: chronos.Duration, parts: int): string =
## Returns string representation of Duration ``a`` as nanoseconds value.
if a == nanoseconds(0):
return "0"
var
res = ""
v = a.nanoseconds()
parts = parts
template f(n: string, T: Duration) =
if v >= T.nanoseconds():
res.add($(uint64(v div T.nanoseconds())))
res.add(n)
v = v mod T.nanoseconds()
dec parts
if v == 0 or parts <= 0:
return res
f("s", Second)
f("ms", Millisecond)
f("us", Microsecond)
f("ns", Nanosecond)
res
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------