Jordan Hrycaj 430611d3bc
Beacon sync updates tbc (#2818)
* Clear rejected sync target so that it would not be processed again

* Use in-memory table to stash headers after FCU import has started

why:
  After block imported has started, there is no way to save/stash block
  headers persistently. The FCU handlers always maintain a positive
  transaction level and in some instances the current transaction is
  flushed and re-opened.

  This patch fixes an exception thrown when a block header has gone
  missing.

* When resuming sync, delete stale headers and state

why:
  Deleting headers saves some persistent space that would get lost
  otherwise. Deleting the state after resuming prevents from race
  conditions.

* On clean start hibernate sync `deamon` entity before first update from CL

details:
  Only reduces services are running
  * accept FCU from CL
  * fetch finalised header after accepting FCY (provides hash only)

* Improve text/meaning of some log messages

* Revisit error handling for useless peers

why:
  A peer is abandoned from if the error score is too high. This was not
  properly handled for some fringe case when the error was detected at
  staging time but fetching via eth/xx was ok.

* Clarify `break` meaning by using labelled `break` statements

* Fix action how to commit when sync target has been reached

why:
  The sync target block number might precede than latest FCU block number.
  This happens when the engine API squeezes in some request to execute
  and import subsequent blocks.

  This patch fixes and assert thrown when after reaching target the latest
  FCU block number is higher than the expected target block number.

* Update TODO list
2024-11-01 19:18:41 +00:00

156 lines
5.7 KiB
Nim

# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises:[].}
import
pkg/eth/[common, p2p],
../../../core/chain,
../../protocol,
../worker_desc,
./blocks_staged/staged_queue,
./headers_staged/staged_queue,
"."/[blocks_unproc, db, headers_unproc]
when enableTicker:
import ./start_stop/ticker
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
when enableTicker:
proc tickerUpdater(ctx: BeaconCtxRef): TickerStatsUpdater =
## Legacy stuff, will be probably be superseded by `metrics`
return proc: auto =
TickerStats(
stored: ctx.db.getSavedStateBlockNumber(),
base: ctx.chain.baseNumber(),
latest: ctx.chain.latestNumber(),
coupler: ctx.layout.coupler,
dangling: ctx.layout.dangling,
final: ctx.layout.final,
head: ctx.layout.head,
headOk: ctx.layout.headLocked,
target: ctx.target.consHead.number,
targetOk: ctx.target.final != 0,
nHdrStaged: ctx.headersStagedQueueLen(),
hdrStagedTop: ctx.headersStagedQueueTopKey(),
hdrUnprocTop: ctx.headersUnprocTop(),
nHdrUnprocessed: ctx.headersUnprocTotal() + ctx.headersUnprocBorrowed(),
nHdrUnprocFragm: ctx.headersUnprocChunks(),
nBlkStaged: ctx.blocksStagedQueueLen(),
blkStagedBottom: ctx.blocksStagedQueueBottomKey(),
blkUnprocTop: ctx.blk.topRequest,
nBlkUnprocessed: ctx.blocksUnprocTotal() + ctx.blocksUnprocBorrowed(),
nBlkUnprocFragm: ctx.blocksUnprocChunks(),
reorg: ctx.pool.nReorg,
nBuddies: ctx.pool.nBuddies)
proc updateBeaconHeaderCB(ctx: BeaconCtxRef): ReqBeaconSyncTargetCB =
## Update beacon header. This function is intended as a call back function
## for the RPC module.
return proc(h: Header; f: Hash32) {.gcsafe, raises: [].} =
# Check whether there is an update running (otherwise take next upate)
if not ctx.target.locked and # ignore if currently updating
ctx.target.final == 0 and # ignore if complete already
f != zeroHash32 and # finalised hash is set
ctx.layout.head < h.number and # update is advancing
ctx.target.consHead.number < h.number: # .. ditto
ctx.target.consHead = h
ctx.target.final = BlockNumber(0)
ctx.target.finalHash = f
ctx.target.changed = true
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
when enableTicker:
proc setupTicker*(ctx: BeaconCtxRef) =
## Helper for `setup()`: Start ticker
ctx.pool.ticker = TickerRef.init(ctx.tickerUpdater)
proc destroyTicker*(ctx: BeaconCtxRef) =
## Helper for `release()`
ctx.pool.ticker.destroy()
ctx.pool.ticker = TickerRef(nil)
else:
template setupTicker*(ctx: BeaconCtxRef) = discard
template destroyTicker*(ctx: BeaconCtxRef) = discard
# ---------
proc setupDatabase*(ctx: BeaconCtxRef; info: static[string]) =
## Initalise database related stuff
# Initialise up queues and lists
ctx.headersStagedQueueInit()
ctx.blocksStagedQueueInit()
ctx.headersUnprocInit()
ctx.blocksUnprocInit()
# Load initial state from database if there is any. If the loader returns
# `true`, then the syncer will resume from previous sync in which case the
# system becomes fully active. Otherwise there is some polling only waiting
# for a new target so there is reduced service (aka `hibernate`.).
ctx.hibernate = not ctx.dbLoadSyncStateLayout info
if ctx.hibernate:
trace info & ": hibernating", latest=ctx.chain.latestNumber.bnStr
# Set blocks batch import value for block import
if ctx.pool.nBodiesBatch < nFetchBodiesRequest:
if ctx.pool.nBodiesBatch == 0:
ctx.pool.nBodiesBatch = nFetchBodiesBatchDefault
else:
ctx.pool.nBodiesBatch = nFetchBodiesRequest
# Set length of `staged` queue
if ctx.pool.nBodiesBatch < nFetchBodiesBatchDefault:
const nBlocks = blocksStagedQueueLenMaxDefault * nFetchBodiesBatchDefault
ctx.pool.blocksStagedQuLenMax =
(nBlocks + ctx.pool.nBodiesBatch - 1) div ctx.pool.nBodiesBatch
else:
ctx.pool.blocksStagedQuLenMax = blocksStagedQueueLenMaxDefault
proc setupRpcMagic*(ctx: BeaconCtxRef) =
## Helper for `setup()`: Enable external pivot update via RPC
ctx.pool.chain.com.reqBeaconSyncTarget = ctx.updateBeaconHeaderCB
proc destroyRpcMagic*(ctx: BeaconCtxRef) =
## Helper for `release()`
ctx.pool.chain.com.reqBeaconSyncTarget = ReqBeaconSyncTargetCB(nil)
# ---------
proc startBuddy*(buddy: BeaconBuddyRef): bool =
## Convenience setting for starting a new worker
let
ctx = buddy.ctx
peer = buddy.peer
if peer.supports(protocol.eth) and peer.state(protocol.eth).initialized:
ctx.pool.nBuddies.inc # for metrics
return true
proc stopBuddy*(buddy: BeaconBuddyRef) =
buddy.ctx.pool.nBuddies.dec # for metrics
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------