Beacon sync updates tbc (#2818)

* Clear rejected sync target so that it would not be processed again

* Use in-memory table to stash headers after FCU import has started

why:
  After block imported has started, there is no way to save/stash block
  headers persistently. The FCU handlers always maintain a positive
  transaction level and in some instances the current transaction is
  flushed and re-opened.

  This patch fixes an exception thrown when a block header has gone
  missing.

* When resuming sync, delete stale headers and state

why:
  Deleting headers saves some persistent space that would get lost
  otherwise. Deleting the state after resuming prevents from race
  conditions.

* On clean start hibernate sync `deamon` entity before first update from CL

details:
  Only reduces services are running
  * accept FCU from CL
  * fetch finalised header after accepting FCY (provides hash only)

* Improve text/meaning of some log messages

* Revisit error handling for useless peers

why:
  A peer is abandoned from if the error score is too high. This was not
  properly handled for some fringe case when the error was detected at
  staging time but fetching via eth/xx was ok.

* Clarify `break` meaning by using labelled `break` statements

* Fix action how to commit when sync target has been reached

why:
  The sync target block number might precede than latest FCU block number.
  This happens when the engine API squeezes in some request to execute
  and import subsequent blocks.

  This patch fixes and assert thrown when after reaching target the latest
  FCU block number is higher than the expected target block number.

* Update TODO list
This commit is contained in:
Jordan Hrycaj 2024-11-01 19:18:41 +00:00 committed by GitHub
parent 73661fd8a4
commit 430611d3bc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 329 additions and 132 deletions

View File

@ -1 +1,35 @@
## General TODO items
* Update/resolve code fragments which are tagged FIXME
## Open issues
### 1. Weird behaviour of the RPC/engine API
See issue [#2816](https://github.com/status-im/nimbus-eth1/issues/2816)
### 2. Some assert
Error: unhandled exception: key not found: 0x441a0f..027bc96a [AssertionDefect]
which happened on several `holesky` tests immediately after loging somehing like
NTC 2024-10-31 21:37:34.728 Finalized blocks persisted file=forked_chain.nim:231 numberOfBlocks=129 last=044d22843cbe baseNumber=2646764 baseHash=21ec11c1deac
or from another machine with literally the same exception text (but the stack-trace differs)
NTC 2024-10-31 21:58:07.616 Finalized blocks persisted file=forked_chain.nim:231 numberOfBlocks=129 last=9cbcc52953a8 baseNumber=2646857 baseHash=9db5c2ac537b
### 3. Some assert
Seen on `holesky`, sometimes the header chain cannot not be joined with its
lower end after completing due to different hashes leading to an assert failure
Error: unhandled exception: header chains C-D joining hashes do not match L=#2646126 lHash=3bc2beb1b565 C=#2646126 cHash=3bc2beb1b565 D=#2646127 dParent=671c7c6cb904
which was preceeded somewhat earlier by log entries
INF 2024-10-31 18:21:16.464 Forkchoice requested sync to new head file=api_forkchoice.nim:107 number=2646126 hash=3bc2beb1b565
[..]
INF 2024-10-31 18:21:25.872 Forkchoice requested sync to new head file=api_forkchoice.nim:107 number=2646126 hash=671c7c6cb904

View File

@ -34,11 +34,13 @@ proc bodiesToFetchOk(buddy: BeaconBuddyRef): bool =
proc napUnlessSomethingToFetch(buddy: BeaconBuddyRef): Future[bool] {.async.} =
## When idle, save cpu cycles waiting for something to do.
if buddy.ctx.pool.importRunningOk or
not (buddy.headersToFetchOk() or
if buddy.ctx.pool.blockImportOk or # currently importing blocks
buddy.ctx.hibernate or # not activated yet?
not (buddy.headersToFetchOk() or # something on TODO list
buddy.bodiesToFetchOk()):
await sleepAsync workerIdleWaitInterval
return true
else:
return false
# ------------------------------------------------------------------------------
@ -54,9 +56,6 @@ proc setup*(ctx: BeaconCtxRef; info: static[string]): bool =
# Debugging stuff, might be an empty template
ctx.setupTicker()
# Enable background daemon
ctx.daemon = true
true
proc release*(ctx: BeaconCtxRef; info: static[string]) =
@ -70,20 +69,20 @@ proc start*(buddy: BeaconBuddyRef; info: static[string]): bool =
let peer = buddy.peer
if runsThisManyPeersOnly <= buddy.ctx.pool.nBuddies:
debug info & ": peers limit reached", peer
if not buddy.ctx.hibernate: debug info & ": peers limit reached", peer
return false
if not buddy.startBuddy():
debug info & ": failed", peer
if not buddy.ctx.hibernate: debug info & ": failed", peer
return false
debug info & ": new peer", peer
if not buddy.ctx.hibernate: debug info & ": new peer", peer
true
proc stop*(buddy: BeaconBuddyRef; info: static[string]) =
## Clean up this peer
debug info & ": release peer", peer=buddy.peer,
nInvocations=buddy.only.nMultiLoop,
if not buddy.ctx.hibernate: debug info & ": release peer", peer=buddy.peer,
ctrl=buddy.ctrl.state, nInvocations=buddy.only.nMultiLoop,
lastIdleGap=buddy.only.multiRunIdle.toStr
buddy.stopBuddy()
@ -98,8 +97,14 @@ proc runDaemon*(ctx: BeaconCtxRef; info: static[string]) {.async.} =
## as `true` not before there is some activity on the `runPool()`,
## `runSingle()`, or `runMulti()` functions.
##
## On a fresh start, the flag `ctx.daemon` will not be set `true` before the
## first usable request from the CL (via RPC) stumbles in.
##
# Check for a possible header layout and body request changes
ctx.updateSyncStateLayout info
if ctx.hibernate:
return
ctx.updateBlockRequests info
# Execute staged block records.
@ -110,8 +115,8 @@ proc runDaemon*(ctx: BeaconCtxRef; info: static[string]) {.async.} =
# place. So there might be some peers active. If they are waiting for
# a message reply, this will most probably time out as all processing
# power is usurped by the import task here.
ctx.pool.importRunningOk = true
defer: ctx.pool.importRunningOk = false
ctx.pool.blockImportOk = true
defer: ctx.pool.blockImportOk = false
# Import from staged queue.
while await ctx.blocksStagedImport(info):

View File

@ -18,7 +18,14 @@ import
../worker_desc,
./blocks_staged/bodies,
./update/metrics,
"."/[blocks_unproc, db]
"."/[blocks_unproc, db, helpers]
# ------------------------------------------------------------------------------
# Private debugging & logging helpers
# ------------------------------------------------------------------------------
formatIt(Hash32):
it.data.short
# ------------------------------------------------------------------------------
# Private functions
@ -50,11 +57,16 @@ proc fetchAndCheck(
blk.blocks.setLen(offset + ivReq.len)
var blockHash = newSeq[Hash32](ivReq.len)
for n in 1u ..< ivReq.len:
let header = ctx.dbPeekHeader(ivReq.minPt + n).expect "stashed header"
let header = ctx.dbPeekHeader(ivReq.minPt + n).valueOr:
# There is nothing one can do here
raiseAssert info & " stashed header missing: n=" & $n &
" ivReq=" & $ivReq & " nth=" & (ivReq.minPt + n).bnStr
blockHash[n - 1] = header.parentHash
blk.blocks[offset + n].header = header
blk.blocks[offset].header =
ctx.dbPeekHeader(ivReq.minPt).expect "stashed header"
blk.blocks[offset].header = ctx.dbPeekHeader(ivReq.minPt).valueOr:
# There is nothing one can do here
raiseAssert info & " stashed header missing: n=0" &
" ivReq=" & $ivReq & " nth=" & ivReq.minPt.bnStr
blockHash[ivReq.len - 1] =
rlp.encode(blk.blocks[offset + ivReq.len - 1].header).keccak256
@ -177,11 +189,17 @@ proc blocksStagedCollect*(
# Fetch and extend staging record
if not await buddy.fetchAndCheck(ivReq, blk, info):
# Throw away first time block fetch data. Keep other data for a
# partially assembled list.
if nBlkBlocks == 0:
if 0 < buddy.only.nBdyRespErrors and buddy.ctrl.stopped:
buddy.only.nBdyRespErrors.inc
if (1 < buddy.only.nBdyRespErrors and buddy.ctrl.stopped) or
fetchBodiesReqThresholdCount < buddy.only.nBdyRespErrors:
# Make sure that this peer does not immediately reconnect
buddy.ctrl.zombie = true
trace info & ": list completely failed", peer, iv, ivReq,
trace info & ": current block list discarded", peer, iv, ivReq,
ctrl=buddy.ctrl.state, nRespErrors=buddy.only.nBdyRespErrors
ctx.blocksUnprocCommit(iv.len, iv)
# At this stage allow a task switch so that some other peer might try
@ -238,7 +256,8 @@ proc blocksStagedImport*(
let imported = ctx.chain.latestNumber()
if qItem.key != imported + 1:
trace info & ": there is a gap L vs. staged",
B=ctx.chain.baseNumber.bnStr, L=imported.bnStr, staged=qItem.key.bnStr
B=ctx.chain.baseNumber.bnStr, L=imported.bnStr, staged=qItem.key.bnStr,
C=ctx.layout.coupler.bnStr
doAssert imported < qItem.key
return false
@ -253,21 +272,23 @@ proc blocksStagedImport*(
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr
var maxImport = iv.maxPt
block importLoop:
for n in 0 ..< nBlocks:
let nBn = qItem.data.blocks[n].header.number
ctx.pool.chain.importBlock(qItem.data.blocks[n]).isOkOr:
warn info & ": import block error", iv, B=ctx.chain.baseNumber.bnStr,
L=ctx.chain.latestNumber.bnStr, nBn=nBn.bnStr, `error`=error
warn info & ": import block error", n, iv,
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
nthBn=nBn.bnStr, nthHash=qItem.data.getNthHash(n), `error`=error
# Restore what is left over below
maxImport = ctx.chain.latestNumber()
break
break importLoop
# Allow pseudo/async thread switch.
await sleepAsync asyncThreadSwitchTimeSlot
if not ctx.daemon:
# Shutdown?
maxImport = ctx.chain.latestNumber()
break
break importLoop
# Update, so it can be followed nicely
ctx.updateMetrics()
@ -276,22 +297,24 @@ proc blocksStagedImport*(
if (n + 1) mod finaliserChainLengthMax == 0 or (n + 1) == nBlocks:
let
nthHash = qItem.data.getNthHash(n)
finHash = if nBn < ctx.layout.final: nthHash else: ctx.layout.finalHash
finHash = if nBn < ctx.layout.final: nthHash
else: ctx.layout.finalHash
doAssert nBn == ctx.chain.latestNumber()
ctx.pool.chain.forkChoice(headHash=nthHash, finalizedHash=finHash).isOkOr:
warn info & ": fork choice error", n, iv, B=ctx.chain.baseNumber.bnStr,
L=ctx.chain.latestNumber.bnStr, F=ctx.layout.final.bnStr, nthHash,
finHash=(if finHash == nthHash: "nHash" else: "F"), `error`=error
ctx.pool.chain.forkChoice(nthHash, finHash).isOkOr:
warn info & ": fork choice error", n, iv,
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
F=ctx.layout.final.bnStr, nthBn=nBn.bnStr, nthHash,
finHash=(if finHash == nthHash: "nthHash" else: "F"), `error`=error
# Restore what is left over below
maxImport = ctx.chain.latestNumber()
break
break importLoop
# Allow pseudo/async thread switch.
await sleepAsync asyncThreadSwitchTimeSlot
if not ctx.daemon:
maxImport = ctx.chain.latestNumber()
break
break importLoop
# Import probably incomplete, so a partial roll back may be needed
if maxImport < iv.maxPt:

View File

@ -13,7 +13,7 @@
import
pkg/[chronicles, chronos],
pkg/eth/[common, rlp],
pkg/stew/[byteutils, interval_set, sorted_set],
pkg/stew/[interval_set, sorted_set],
pkg/results,
"../../.."/[common, core/chain, db/storage_types],
../worker_desc,
@ -22,13 +22,6 @@ import
const
LhcStateKey = 1.beaconStateKey
# ------------------------------------------------------------------------------
# Private debugging & logging helpers
# ------------------------------------------------------------------------------
formatIt(Hash32):
it.data.toHex
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
@ -42,6 +35,40 @@ proc fetchSyncStateLayout(ctx: BeaconCtxRef): Opt[SyncStateLayout] =
discard
err()
proc deleteStaleHeadersAndState(
ctx: BeaconCtxRef;
upTo: BlockNumber;
info: static[string];
) =
## Delete stale headers and state
let
kvt = ctx.db.ctx.getKvt()
stateNum = ctx.db.getSavedStateBlockNumber() # for persisting
var bn = upTo
while 0 < bn and kvt.hasKey(beaconHeaderKey(bn).toOpenArray):
discard kvt.del(beaconHeaderKey(bn).toOpenArray)
bn.dec
# Occasionallly persist the deleted headers. This will succeed if
# this function is called early enough after restart when there is
# no database transaction pending.
if (upTo - bn) mod 8192 == 0:
ctx.db.persistent(stateNum).isOkOr:
debug info & ": cannot persist deleted sync headers", error=($$error)
# So be it, stop here.
return
# Delete persistent state, there will be no use of it anymore
discard kvt.del(LhcStateKey.toOpenArray)
ctx.db.persistent(stateNum).isOkOr:
debug info & ": cannot persist deleted sync headers", error=($$error)
return
if bn < upTo:
debug info & ": deleted stale sync headers", iv=BnRange.new(bn+1,upTo)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
@ -57,30 +84,32 @@ proc dbStoreSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) =
# While executing blocks there are frequent save cycles. Otherwise, an
# extra save request might help to pick up an interrupted sync session.
let txLevel = ctx.db.level()
if txLevel == 0:
if ctx.db.level() == 0 and ctx.stash.len == 0:
let number = ctx.db.getSavedStateBlockNumber()
ctx.db.persistent(number).isOkOr:
debug info & ": failed to save sync state persistently", error=($$error)
return
else:
trace info & ": sync state not saved, tx pending", txLevel
return
trace info & ": saved sync state persistently"
raiseAssert info & " persistent() failed: " & $$error
proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) =
## Restore chain layout from persistent db
proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]): bool =
## Restore chain layout from persistent db. It returns `true` if a previous
## state could be loaded, and `false` if a new state was created.
let
rc = ctx.fetchSyncStateLayout()
latest = ctx.chain.latestNumber()
# See `dbLoadSyncStateAvailable()` for comments
# If there was a manual import after a previous sync, then saved state
# might be outdated.
if rc.isOk and
# The base number is the least record of the FCU chains/tree. So the
# finalised entry must not be smaller.
ctx.chain.baseNumber() <= rc.value.final and
# If the latest FCU number is not larger than the head, there is nothing
# to do (might also happen after a manual import.)
latest < rc.value.head:
# Assign saved sync state
ctx.sst.layout = rc.value
ctx.sst.lastLayout = rc.value
# Add interval of unprocessed block range `(L,C]` from `README.md`
ctx.blocksUnprocSet(latest+1, ctx.layout.coupler)
@ -93,6 +122,8 @@ proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) =
C=ctx.layout.coupler.bnStr, D=ctx.layout.dangling.bnStr,
F=ctx.layout.final.bnStr, H=ctx.layout.head.bnStr
true
else:
let
latestHash = ctx.chain.latestHash()
@ -115,10 +146,26 @@ proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) =
headHash: latestHash,
headLocked: false)
trace info & ": new sync state", L="C", C="D", D="F", F="H", H=latest.bnStr
ctx.sst.lastLayout = ctx.layout
if rc.isOk:
# Some stored headers might have become stale, so delete them. Even
# though it is not critical, stale headers just stay on the database
# forever occupying space without purpose. Also, delete the state record.
# After deleting headers, the state record becomes stale as well.
if rc.value.head <= latest:
# After manual import, the `latest` state might be ahead of the old
# `head` which leaves a gap `(rc.value.head,latest)` of missing headers.
# So the `deleteStaleHeadersAndState()` clean up routine needs to start
# at the `head` and work backwards.
ctx.deleteStaleHeadersAndState(rc.value.head, info)
else:
# Delete stale headers with block numbers starting at to `latest` wile
# working backwards.
ctx.deleteStaleHeadersAndState(latest, info)
false
# ------------------
proc dbStashHeaders*(
@ -139,8 +186,14 @@ proc dbStashHeaders*(
## ..
##
let
kvt = ctx.db.ctx.getKvt()
txLevel = ctx.db.level()
last = first + revBlobs.len.uint64 - 1
if 0 < txLevel:
# Need to cache it because FCU has blocked writing through to disk.
for n,data in revBlobs:
ctx.stash[last - n.uint64] = data
else:
let kvt = ctx.db.ctx.getKvt()
for n,data in revBlobs:
let key = beaconHeaderKey(last - n.uint64)
kvt.put(key.toOpenArray, data).isOkOr:
@ -148,6 +201,13 @@ proc dbStashHeaders*(
proc dbPeekHeader*(ctx: BeaconCtxRef; num: BlockNumber): Opt[Header] =
## Retrieve some stashed header.
# Try cache first
ctx.stash.withValue(num, val):
try:
return ok(rlp.decode(val[], Header))
except RlpError:
discard
# Use persistent storage next
let
key = beaconHeaderKey(num)
rc = ctx.db.ctx.getKvt().get(key.toOpenArray)
@ -164,6 +224,9 @@ proc dbPeekParentHash*(ctx: BeaconCtxRef; num: BlockNumber): Opt[Hash32] =
proc dbUnstashHeader*(ctx: BeaconCtxRef; bn: BlockNumber) =
## Remove header from temporary DB list
ctx.stash.withValue(bn, val):
ctx.stash.del bn
return
discard ctx.db.ctx.getKvt().del(beaconHeaderKey(bn).toOpenArray)
# ------------------------------------------------------------------------------

View File

@ -43,7 +43,7 @@ proc fetchAndCheck(
# While assembling a `LinkedHChainRef`, verify that the `revHeaders` list
# was sound, i.e. contiguous, linked, etc.
if not revHeaders.extendLinkedHChain(buddy, ivReq.maxPt, lhc, info):
if not revHeaders.extendLinkedHChain(buddy, ivReq.maxPt, lhc):
return false
return true
@ -81,10 +81,18 @@ proc headerStagedUpdateTarget*(
let final = rc.value[0].number
if final < ctx.chain.baseNumber():
trace info & ": finalised number too low", peer,
B=ctx.chain.baseNumber.bnStr, finalised=rc.value[0].number.bnStr
B=ctx.chain.baseNumber.bnStr, finalised=final.bnStr,
delta=(ctx.chain.baseNumber - final)
ctx.target.reset
else:
ctx.target.final = final
# Activate running (unless done yet)
if ctx.hibernate:
ctx.hibernate = false
trace info & ": activated syncer", peer,
finalised=final.bnStr, head=ctx.layout.head.bnStr
# Update, so it can be followed nicely
ctx.updateMetrics()
@ -155,13 +163,17 @@ proc headersStagedCollect*(
# Fetch and extend chain record
if not await buddy.fetchAndCheck(ivReq, lhc, info):
# Throw away opportunistic data (or first time header fetch.) Turn back
# unused data.
# Throw away opportunistic data (or first time header fetch.) Keep
# other data for a partially assembled list.
if isOpportunistic or nLhcHeaders == 0:
if 0 < buddy.only.nHdrRespErrors and buddy.ctrl.stopped:
buddy.only.nHdrRespErrors.inc
if (0 < buddy.only.nHdrRespErrors and buddy.ctrl.stopped) or
fetchHeadersReqThresholdCount < buddy.only.nHdrRespErrors:
# Make sure that this peer does not immediately reconnect
buddy.ctrl.zombie = true
trace info & ": completely failed", peer, iv, ivReq, isOpportunistic,
trace info & ": current header list discarded", peer, iv, ivReq,
isOpportunistic,
ctrl=buddy.ctrl.state, nRespErrors=buddy.only.nHdrRespErrors
ctx.headersUnprocCommit(iv.len, iv)
# At this stage allow a task switch so that some other peer might try
@ -197,7 +209,7 @@ proc headersStagedCollect*(
raiseAssert info & ": duplicate key on staged queue iv=" & $iv
qItem.data = lhc[]
trace info & ": staged headers", peer,
trace info & ": staged header list", peer,
topBlock=iv.maxPt.bnStr, nHeaders=lhc.revHdrs.len,
nStaged=ctx.hdr.staged.len, isOpportunistic, ctrl=buddy.ctrl.state
@ -209,16 +221,15 @@ proc headersStagedProcess*(ctx: BeaconCtxRef; info: static[string]): int =
## chains layout and the persistent tables. The function returns the number
## of records processed and saved.
while true:
# Fetch largest block
# Fetch list with largest block numbers
let qItem = ctx.hdr.staged.le(high BlockNumber).valueOr:
trace info & ": no staged headers", error=error
break # all done
let
dangling = ctx.layout.dangling
iv = BnRange.new(qItem.key - qItem.data.revHdrs.len.uint64 + 1, qItem.key)
if iv.maxPt+1 < dangling:
trace info & ": there is a gap", iv, D=dangling.bnStr, nSaved=result
trace info & ": there is a gap", iv, D=dangling.bnStr, nStashed=result
break # there is a gap -- come back later
# Overlap must not happen
@ -235,8 +246,8 @@ proc headersStagedProcess*(ctx: BeaconCtxRef; info: static[string]): int =
if qItem.data.hash != ctx.layout.danglingParent:
# Discard wrong chain and merge back the range into the `unproc` list.
ctx.headersUnprocCommit(0,iv)
trace info & ": discarding staged record",
iv, D=dangling.bnStr, lap=result
trace info & ": discarding staged header list", iv, D=dangling.bnStr,
nStashed=result, nDiscarded=qItem.data.revHdrs.len
break
# Store headers on database
@ -245,10 +256,10 @@ proc headersStagedProcess*(ctx: BeaconCtxRef; info: static[string]): int =
ctx.layout.danglingParent = qItem.data.parentHash
ctx.dbStoreSyncStateLayout info
result.inc # count records
result += qItem.data.revHdrs.len # count headers
trace info & ": staged header lists saved",
nStaged=ctx.hdr.staged.len, nSaved=result
trace info & ": consecutive headers stashed",
nListsLeft=ctx.hdr.staged.len, nStashed=result
if headersStagedQueueLengthLwm < ctx.hdr.staged.len:
ctx.poolMode = true

View File

@ -12,7 +12,6 @@
import
pkg/eth/[common, p2p, rlp],
pkg/stew/byteutils,
../../../../common,
../../worker_desc
@ -25,7 +24,6 @@ proc extendLinkedHChain*(
buddy: BeaconBuddyRef;
topNumber: BlockNumber;
lhc: ref LinkedHChain; # update in place
info: static[string];
): bool =
## Returns sort of `lhc[] += rev[]` where `lhc[]` is updated in place.

View File

@ -15,7 +15,11 @@
import
pkg/chronos,
pkg/eth/common,
pkg/stew/interval_set
pkg/stew/interval_set,
../../../utils/utils
export
short
func bnStr*(w: BlockNumber): string =
"#" & $w

View File

@ -57,16 +57,19 @@ when enableTicker:
reorg: ctx.pool.nReorg,
nBuddies: ctx.pool.nBuddies)
proc updateBeaconHeaderCB(ctx: BeaconCtxRef): ReqBeaconSyncTargetCB =
## Update beacon header. This function is intended as a call back function
## for the RPC module.
return proc(h: Header; f: Hash32) {.gcsafe, raises: [].} =
# Check whether there is an update running (otherwise take next upate)
if not ctx.target.locked:
# Rpc checks empty header against a zero hash rather than `emptyRoot`
if f != zeroHash32 and
ctx.layout.head < h.number and
ctx.target.consHead.number < h.number:
if not ctx.target.locked and # ignore if currently updating
ctx.target.final == 0 and # ignore if complete already
f != zeroHash32 and # finalised hash is set
ctx.layout.head < h.number and # update is advancing
ctx.target.consHead.number < h.number: # .. ditto
ctx.target.consHead = h
ctx.target.final = BlockNumber(0)
ctx.target.finalHash = f
@ -101,8 +104,13 @@ proc setupDatabase*(ctx: BeaconCtxRef; info: static[string]) =
ctx.headersUnprocInit()
ctx.blocksUnprocInit()
# Load initial state from database if there is any
ctx.dbLoadSyncStateLayout info
# Load initial state from database if there is any. If the loader returns
# `true`, then the syncer will resume from previous sync in which case the
# system becomes fully active. Otherwise there is some polling only waiting
# for a new target so there is reduced service (aka `hibernate`.).
ctx.hibernate = not ctx.dbLoadSyncStateLayout info
if ctx.hibernate:
trace info & ": hibernating", latest=ctx.chain.latestNumber.bnStr
# Set blocks batch import value for block import
if ctx.pool.nBodiesBatch < nFetchBodiesRequest:

View File

@ -13,12 +13,12 @@
import
pkg/[chronicles, chronos],
pkg/eth/[common, rlp],
pkg/stew/sorted_set,
pkg/stew/[byteutils, sorted_set],
../../../core/chain,
../worker_desc,
./update/metrics,
./headers_staged/staged_queue,
"."/[blocks_unproc, db, headers_unproc]
"."/[blocks_unproc, db, headers_unproc, helpers]
# ------------------------------------------------------------------------------
# Private functions
@ -88,7 +88,7 @@ proc updateTargetChange(ctx: BeaconCtxRef; info: static[string]) =
doAssert ctx.headersStagedQueueIsEmpty()
ctx.headersUnprocSet(ctx.layout.coupler+1, ctx.layout.dangling-1)
trace info & ": updated sync state", C=ctx.layout.coupler.bnStr,
trace info & ": updated sync state/new target", C=ctx.layout.coupler.bnStr,
uTop=ctx.headersUnprocTop(),
D=ctx.layout.dangling.bnStr, H=ctx.layout.head.bnStr, T=target.bnStr
@ -109,10 +109,15 @@ proc mergeAdjacentChains(ctx: BeaconCtxRef; info: static[string]) =
# Verify adjacent chains
if ctx.layout.couplerHash != ctx.layout.danglingParent:
# FIXME: Oops -- any better idea than to defect?
raiseAssert info & ": hashes do not match" &
" C=" & ctx.layout.coupler.bnStr & " D=" & $ctx.layout.dangling.bnStr
raiseAssert info & ": header chains C-D joining hashes do not match" &
" L=" & ctx.chain.latestNumber().bnStr &
" lHash=" & ctx.chain.latestHash.short &
" C=" & ctx.layout.coupler.bnStr &
" cHash=" & ctx.layout.couplerHash.short &
" D=" & $ctx.layout.dangling.bnStr &
" dParent=" & ctx.layout.danglingParent.short
trace info & ": merging adjacent chains", C=ctx.layout.coupler.bnStr,
trace info & ": merging adjacent header chains", C=ctx.layout.coupler.bnStr,
D=ctx.layout.dangling.bnStr
# Merge adjacent linked chains
@ -133,6 +138,21 @@ proc mergeAdjacentChains(ctx: BeaconCtxRef; info: static[string]) =
# Update, so it can be followed nicely
ctx.updateMetrics()
proc updateTargetReached(ctx: BeaconCtxRef; info: static[string]) =
# Open up layout for update
ctx.layout.headLocked = false
# Clean up target bucket and await a new target.
ctx.target.reset
ctx.hibernate = true
let
latest {.used.} = ctx.chain.latestNumber()
head {.used.} = ctx.layout.head
trace info & ": hibernating, awaiting new sync target",
L=(if head == latest: "H" else: latest.bnStr), H=head.bnStr
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
@ -143,13 +163,11 @@ proc updateSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) =
# Check whether the target has been reached. In that case, unlock the
# consensus head `H` from the current layout so that it can be updated
# in time.
if ctx.layout.headLocked:
# So we have a session
let latest= ctx.chain.latestNumber()
if ctx.layout.head <= latest:
doAssert ctx.layout.head == latest
ctx.layout.headLocked = false
if ctx.layout.headLocked and # there is an active session
ctx.layout.head <= ctx.chain.latestNumber(): # and target has been reached
# Note that `latest` might exceed the `head`. This will happen when the
# engine API got some request to execute and import subsequent blocks.
ctx.updateTargetReached info
# Check whether there is something to do regarding beacon node change
if not ctx.layout.headLocked and # there was an active import request

View File

@ -92,8 +92,8 @@ const
nFetchBodiesRequest* = 128
## Similar to `nFetchHeadersRequest`
fetchBodiesReqThresholdZombie* = chronos.seconds(2)
fetchBodiesReqThresholdCount* = 3
fetchBodiesReqThresholdZombie* = chronos.seconds(4)
fetchBodiesReqThresholdCount* = 5
## Similar to `fetchHeadersReqThreshold*`
fetchBodiesReqMinResponsePC* = 10

View File

@ -52,6 +52,16 @@ type
## Block request item sorted by least block number (i.e. from `blocks[0]`.)
blocks*: seq[EthBlock] ## List of blocks for import
KvtCache* = Table[BlockNumber,seq[byte]]
## This cache type is intended for holding block headers that cannot be
## reliably saved persistently. This is the situation after blocks are
## imported as the FCU handlers always maintain a positive transaction
## level and in some instances the current transaction is flushed and
## re-opened.
##
## The number of block headers to hold in memory after block import has
## started is the distance to the new `canonical execution head`.
# -------------------
SyncStateTarget* = object
@ -133,8 +143,9 @@ type
# Blocks import/execution settings for importing with
# `nBodiesBatch` blocks in each round (minimum value is
# `nFetchBodiesRequest`.)
chain*: ForkedChainRef ## Database
importRunningOk*: bool ## Advisory lock, fetch vs. import
chain*: ForkedChainRef ## Core database, FCU support
stash*: KvtCache ## Temporary header and state table
blockImportOk*: bool ## Don't fetch data while block importing
nBodiesBatch*: int ## Default `nFetchBodiesBatchDefault`
blocksStagedQuLenMax*: int ## Default `blocksStagedQueueLenMaxDefault`
@ -179,10 +190,30 @@ func chain*(ctx: BeaconCtxRef): ForkedChainRef =
## Getter
ctx.pool.chain
func stash*(ctx: BeaconCtxRef): var KvtCache =
## Getter
ctx.pool.stash
func db*(ctx: BeaconCtxRef): CoreDbRef =
## Getter
ctx.pool.chain.db
# -----
func hibernate*(ctx: BeaconCtxRef): bool =
## Getter, re-interpretation of the daemon flag for reduced service mode
# No need for running the daemon with reduced service mode. So it is
# convenient to use this flag for indicating this.
not ctx.daemon
proc `hibernate=`*(ctx: BeaconCtxRef; val: bool) =
## Setter
ctx.daemon = not val
# Control some error messages on the scheduler (e.g. zombie/banned-peer
# reconnection attempts, LRU flushing out oldest peer etc.)
ctx.noisyLog = not val
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -38,6 +38,7 @@ type
CtxRef*[S] = ref object
## Shared state among all syncing peer workers (aka buddies.)
noisyLog*: bool ## Hold back `trace` and `debug` msgs if `false`
poolMode*: bool ## Activate `runPool()` workers if set `true`
daemon*: bool ## Enable global background job
pool*: S ## Shared context for all worker peers

View File

@ -341,12 +341,12 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) =
now = Moment.now()
ttz = zombie.value.zombified + zombieTimeToLinger
if ttz < Moment.now():
trace "Reconnecting zombie peer ignored", peer,
if dsc.ctx.noisyLog: trace "Reconnecting zombie peer ignored", peer,
nPeers, nWorkers=dsc.buddies.len, maxWorkers, canRequeue=(now-ttz)
return
# Zombie can be removed from the database
dsc.buddies.del peer.key
trace "Zombie peer timeout, ready for requeing", peer,
if dsc.ctx.noisyLog: trace "Zombie peer timeout, ready for requeing", peer,
nPeers, nWorkers=dsc.buddies.len, maxWorkers
# Initialise worker for this peer
@ -357,7 +357,7 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) =
ctrl: BuddyCtrlRef(),
peer: peer))
if not buddy.worker.runStart():
trace "Ignoring useless peer", peer, nPeers,
if dsc.ctx.noisyLog: trace "Ignoring useless peer", peer, nPeers,
nWorkers=dsc.buddies.len, maxWorkers
buddy.worker.ctrl.zombie = true
return
@ -373,7 +373,7 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) =
leastVal = dsc.buddies.shift.value # unqueue first/least item
oldest = leastVal.data.worker
if oldest.isNil:
trace "Dequeuing zombie peer",
if dsc.ctx.noisyLog: trace "Dequeuing zombie peer",
# Fake `Peer` pretty print for `oldest`
oldest=("Node[" & $leastVal.key.address & "]"),
since=leastVal.data.zombified, nPeers, nWorkers=dsc.buddies.len,
@ -382,8 +382,8 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) =
else:
# This could happen if there are idle entries in the table, i.e.
# somehow hanging runners.
trace "Peer table full! Dequeuing least used entry", oldest,
nPeers, nWorkers=dsc.buddies.len, maxWorkers
if dsc.ctx.noisyLog: trace "Peer table full! Dequeuing least used entry",
oldest, nPeers, nWorkers=dsc.buddies.len, maxWorkers
# Setting to `zombie` will trigger the worker to terminate (if any.)
oldest.ctrl.zombie = true
@ -400,12 +400,12 @@ proc onPeerDisconnected[S,W](dsc: RunnerSyncRef[S,W], peer: Peer) =
nWorkers = dsc.buddies.len
rc = dsc.buddies.eq peer.key
if rc.isErr:
debug "Disconnected, unregistered peer", peer, nPeers, nWorkers, maxWorkers
discard
if dsc.ctx.noisyLog: debug "Disconnected, unregistered peer", peer,
nPeers, nWorkers, maxWorkers
elif rc.value.worker.isNil:
# Re-visiting zombie
trace "Ignore zombie", peer, nPeers, nWorkers, maxWorkers
discard
if dsc.ctx.noisyLog: trace "Ignore zombie", peer,
nPeers, nWorkers, maxWorkers
elif rc.value.worker.ctrl.zombie:
# Don't disconnect, leave them fall out of the LRU cache. The effect is,
# that reconnecting might be blocked, for a while. For few peers cases,
@ -414,7 +414,8 @@ proc onPeerDisconnected[S,W](dsc: RunnerSyncRef[S,W], peer: Peer) =
rc.value.worker = nil
rc.value.dsc = nil
rc.value.zombified = Moment.now()
trace "Disconnected, zombie", peer, nPeers, nWorkers, maxWorkers
if dsc.ctx.noisyLog: trace "Disconnected, zombie", peer,
nPeers, nWorkers, maxWorkers
else:
rc.value.worker.ctrl.stopped = true # in case it is hanging somewhere
dsc.buddies.del peer.key