nimbus-eth1/nimbus/sync/flare/worker/blocks_unproc.nim

116 lines
3.4 KiB
Nim
Raw Normal View History

Flare sync (#2627) * Cosmetics, small fixes, add stashed headers verifier * Remove direct `Era1` support why: Era1 is indirectly supported by using the import tool before syncing. * Clarify database persistent save function. why: Function relied on the last saved state block number which was wrong. It now relies on the tx-level. If it is 0, then data are saved directly. Otherwise the task that owns the tx will do it. * Extracted configuration constants into separate file * Enable single peer mode for debugging * Fix peer losing issue in multi-mode details: Running concurrent download peers was previously programmed as running a batch downloading and storing ~8k headers and then leaving the `async` function to be restarted by a scheduler. This was unfortunate because of occasionally occurring long waiting times for restart. While the time gap until restarting were typically observed a few millisecs, there were always a few outliers which well exceed several seconds. This seemed to let remote peers run into timeouts. * Prefix function names `unprocXxx()` and `stagedYyy()` by `headers` why: There will be other `unproc` and `staged` modules. * Remove cruft, update logging * Fix accounting issue details: When staging after fetching headers from the network, there was an off by 1 error occurring when the result was by one smaller than requested. Also, a whole range was mis-accounted when a peer was terminating connection immediately after responding. * Fix slow/error header accounting when fetching why: Originally set for detecting slow headers in a row, the counter was wrongly extended to general errors. * Ban peers for a while that respond with too few headers continuously why: Some peers only returned one header at a time. If these peers sit on a farm, they might collectively slow down the download process. * Update RPC beacon header updater why: Old function hook has slightly changed its meaning since it was used for snap sync. Also, the old hook is used by other functions already. * Limit number of peers or set to single peer mode details: Merge several concepts, single peer mode being one of it. * Some code clean up, fixings for removing of compiler warnings * De-noise header fetch related sources why: Header download looks relatively stable, so general debugging is not needed, anymore. This is the equivalent of removing the scaffold from the part of the building where work has completed. * More clean up and code prettification for headers stuff * Implement body fetch and block import details: Available headers are used stage blocks by combining existing headers with newly fetched blocks. Then these blocks are imported/executed via `persistBlocks()`. * Logger cosmetics and cleanup * Remove staged block queue debugging details: Feature still available, just not executed anymore * Docu, logging update * Update/simplify `runDaemon()` * Re-calibrate block body requests and soft config for import blocks batch why: * For fetching, larger fetch requests are mostly truncated anyway on MainNet. * For executing, smaller batch sizes reduce the memory needed for the price of longer execution times. * Update metrics counters * Docu update * Some fixes, formatting updates, etc. * Update `borrowed` type: uint -. uint64 also: Always convert to `uint64` rather than `uint` where appropriate
2024-09-27 15:07:42 +00:00
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises:[].}
import
pkg/eth/p2p,
pkg/results,
pkg/stew/interval_set,
../worker_desc
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc blocksUnprocFetch*(
ctx: FlareCtxRef;
maxLen: uint64;
): Result[BnRange,void] =
## Fetch interval from block ranges with maximal size `maxLen`, where
## `0` is interpreted as `2^64`.
##
let
q = ctx.blk.unprocessed
# Fetch bottom/left interval with least block numbers
jv = q.ge().valueOr:
return err()
# Curb interval to maximal length `maxLen`
iv = block:
if maxLen == 0 or (0 < jv.len and jv.len <= maxLen):
jv
else:
# Curb interval `jv` to length `maxLen`
#
# Note that either (fringe case):
# (`jv.len`==0) => (`jv`==`[0,high(u64)]`) => `jv.maxPt`==`high(u64)`
# or (in the non-fringe case)
# (`maxLen` < `jv.len`) => (`jv.minPt` + `maxLen` - 1 < `jv.maxPt`)
#
BnRange.new(jv.minPt, jv.minPt + maxLen - 1)
discard q.reduce(iv)
ctx.blk.borrowed += iv.len
ok(iv)
proc blocksUnprocCommit*(ctx: FlareCtxRef; borrowed: uint) =
## Commit back all processed range
ctx.blk.borrowed -= borrowed
proc blocksUnprocCommit*(ctx: FlareCtxRef; borrowed: uint; retuor: BnRange) =
## Merge back unprocessed range `retour`
ctx.blocksUnprocCommit borrowed
doAssert ctx.blk.unprocessed.merge(retuor) == retuor.len
proc blocksUnprocCommit*(
ctx: FlareCtxRef;
borrowed: uint;
rMinPt: BlockNumber;
rMaxPt: BlockNumber) =
## Variant of `blocksUnprocCommit()`
ctx.blocksUnprocCommit borrowed
doAssert ctx.blk.unprocessed.merge(rMinPt, rMaxPt) == rMaxPt - rMinPt + 1
proc blocksUnprocCovered*(ctx: FlareCtxRef; minPt,maxPt: BlockNumber): uint64 =
## Check whether range is fully contained
ctx.blk.unprocessed.covered(minPt, maxPt)
proc blocksUnprocCovered*(ctx: FlareCtxRef; pt: BlockNumber): bool =
## Check whether point is contained
ctx.blk.unprocessed.covered(pt, pt) == 1
proc blocksUnprocTop*(ctx: FlareCtxRef): BlockNumber =
let iv = ctx.blk.unprocessed.le().valueOr:
return BlockNumber(0)
iv.maxPt
proc blocksUnprocBottom*(ctx: FlareCtxRef): BlockNumber =
let iv = ctx.blk.unprocessed.ge().valueOr:
return high(BlockNumber)
iv.minPt
proc blocksUnprocTotal*(ctx: FlareCtxRef): uint64 =
ctx.blk.unprocessed.total()
proc blocksUnprocBorrowed*(ctx: FlareCtxRef): uint64 =
ctx.blk.borrowed
proc blocksUnprocChunks*(ctx: FlareCtxRef): int =
ctx.blk.unprocessed.chunks()
proc blocksUnprocIsEmpty*(ctx: FlareCtxRef): bool =
ctx.blk.unprocessed.chunks() == 0
# ------------------
proc blocksUnprocInit*(ctx: FlareCtxRef) =
## Constructor
ctx.blk.unprocessed = BnRangeSet.init()
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------