diff --git a/nimbus/sync/beacon/TODO.md b/nimbus/sync/beacon/TODO.md index 4c8326445..600ee7f31 100644 --- a/nimbus/sync/beacon/TODO.md +++ b/nimbus/sync/beacon/TODO.md @@ -1 +1,35 @@ +## General TODO items + * Update/resolve code fragments which are tagged FIXME + +## Open issues + +### 1. Weird behaviour of the RPC/engine API + +See issue [#2816](https://github.com/status-im/nimbus-eth1/issues/2816) + +### 2. Some assert + + Error: unhandled exception: key not found: 0x441a0f..027bc96a [AssertionDefect] + +which happened on several `holesky` tests immediately after loging somehing like + + NTC 2024-10-31 21:37:34.728 Finalized blocks persisted file=forked_chain.nim:231 numberOfBlocks=129 last=044d22843cbe baseNumber=2646764 baseHash=21ec11c1deac + +or from another machine with literally the same exception text (but the stack-trace differs) + + NTC 2024-10-31 21:58:07.616 Finalized blocks persisted file=forked_chain.nim:231 numberOfBlocks=129 last=9cbcc52953a8 baseNumber=2646857 baseHash=9db5c2ac537b + + +### 3. Some assert + +Seen on `holesky`, sometimes the header chain cannot not be joined with its +lower end after completing due to different hashes leading to an assert failure + + Error: unhandled exception: header chains C-D joining hashes do not match L=#2646126 lHash=3bc2beb1b565 C=#2646126 cHash=3bc2beb1b565 D=#2646127 dParent=671c7c6cb904 + +which was preceeded somewhat earlier by log entries + + INF 2024-10-31 18:21:16.464 Forkchoice requested sync to new head file=api_forkchoice.nim:107 number=2646126 hash=3bc2beb1b565 + [..] + INF 2024-10-31 18:21:25.872 Forkchoice requested sync to new head file=api_forkchoice.nim:107 number=2646126 hash=671c7c6cb904 diff --git a/nimbus/sync/beacon/worker.nim b/nimbus/sync/beacon/worker.nim index 9832363d3..a4c5aec1a 100644 --- a/nimbus/sync/beacon/worker.nim +++ b/nimbus/sync/beacon/worker.nim @@ -34,12 +34,14 @@ proc bodiesToFetchOk(buddy: BeaconBuddyRef): bool = proc napUnlessSomethingToFetch(buddy: BeaconBuddyRef): Future[bool] {.async.} = ## When idle, save cpu cycles waiting for something to do. - if buddy.ctx.pool.importRunningOk or - not (buddy.headersToFetchOk() or + if buddy.ctx.pool.blockImportOk or # currently importing blocks + buddy.ctx.hibernate or # not activated yet? + not (buddy.headersToFetchOk() or # something on TODO list buddy.bodiesToFetchOk()): await sleepAsync workerIdleWaitInterval return true - return false + else: + return false # ------------------------------------------------------------------------------ # Public start/stop and admin functions @@ -54,9 +56,6 @@ proc setup*(ctx: BeaconCtxRef; info: static[string]): bool = # Debugging stuff, might be an empty template ctx.setupTicker() - - # Enable background daemon - ctx.daemon = true true proc release*(ctx: BeaconCtxRef; info: static[string]) = @@ -70,20 +69,20 @@ proc start*(buddy: BeaconBuddyRef; info: static[string]): bool = let peer = buddy.peer if runsThisManyPeersOnly <= buddy.ctx.pool.nBuddies: - debug info & ": peers limit reached", peer + if not buddy.ctx.hibernate: debug info & ": peers limit reached", peer return false if not buddy.startBuddy(): - debug info & ": failed", peer + if not buddy.ctx.hibernate: debug info & ": failed", peer return false - debug info & ": new peer", peer + if not buddy.ctx.hibernate: debug info & ": new peer", peer true proc stop*(buddy: BeaconBuddyRef; info: static[string]) = ## Clean up this peer - debug info & ": release peer", peer=buddy.peer, - nInvocations=buddy.only.nMultiLoop, + if not buddy.ctx.hibernate: debug info & ": release peer", peer=buddy.peer, + ctrl=buddy.ctrl.state, nInvocations=buddy.only.nMultiLoop, lastIdleGap=buddy.only.multiRunIdle.toStr buddy.stopBuddy() @@ -98,8 +97,14 @@ proc runDaemon*(ctx: BeaconCtxRef; info: static[string]) {.async.} = ## as `true` not before there is some activity on the `runPool()`, ## `runSingle()`, or `runMulti()` functions. ## + ## On a fresh start, the flag `ctx.daemon` will not be set `true` before the + ## first usable request from the CL (via RPC) stumbles in. + ## # Check for a possible header layout and body request changes ctx.updateSyncStateLayout info + if ctx.hibernate: + return + ctx.updateBlockRequests info # Execute staged block records. @@ -110,8 +115,8 @@ proc runDaemon*(ctx: BeaconCtxRef; info: static[string]) {.async.} = # place. So there might be some peers active. If they are waiting for # a message reply, this will most probably time out as all processing # power is usurped by the import task here. - ctx.pool.importRunningOk = true - defer: ctx.pool.importRunningOk = false + ctx.pool.blockImportOk = true + defer: ctx.pool.blockImportOk = false # Import from staged queue. while await ctx.blocksStagedImport(info): diff --git a/nimbus/sync/beacon/worker/blocks_staged.nim b/nimbus/sync/beacon/worker/blocks_staged.nim index 1fc8c01bb..79d3abb21 100644 --- a/nimbus/sync/beacon/worker/blocks_staged.nim +++ b/nimbus/sync/beacon/worker/blocks_staged.nim @@ -18,7 +18,14 @@ import ../worker_desc, ./blocks_staged/bodies, ./update/metrics, - "."/[blocks_unproc, db] + "."/[blocks_unproc, db, helpers] + +# ------------------------------------------------------------------------------ +# Private debugging & logging helpers +# ------------------------------------------------------------------------------ + +formatIt(Hash32): + it.data.short # ------------------------------------------------------------------------------ # Private functions @@ -50,11 +57,16 @@ proc fetchAndCheck( blk.blocks.setLen(offset + ivReq.len) var blockHash = newSeq[Hash32](ivReq.len) for n in 1u ..< ivReq.len: - let header = ctx.dbPeekHeader(ivReq.minPt + n).expect "stashed header" + let header = ctx.dbPeekHeader(ivReq.minPt + n).valueOr: + # There is nothing one can do here + raiseAssert info & " stashed header missing: n=" & $n & + " ivReq=" & $ivReq & " nth=" & (ivReq.minPt + n).bnStr blockHash[n - 1] = header.parentHash blk.blocks[offset + n].header = header - blk.blocks[offset].header = - ctx.dbPeekHeader(ivReq.minPt).expect "stashed header" + blk.blocks[offset].header = ctx.dbPeekHeader(ivReq.minPt).valueOr: + # There is nothing one can do here + raiseAssert info & " stashed header missing: n=0" & + " ivReq=" & $ivReq & " nth=" & ivReq.minPt.bnStr blockHash[ivReq.len - 1] = rlp.encode(blk.blocks[offset + ivReq.len - 1].header).keccak256 @@ -177,11 +189,17 @@ proc blocksStagedCollect*( # Fetch and extend staging record if not await buddy.fetchAndCheck(ivReq, blk, info): + + # Throw away first time block fetch data. Keep other data for a + # partially assembled list. if nBlkBlocks == 0: - if 0 < buddy.only.nBdyRespErrors and buddy.ctrl.stopped: + buddy.only.nBdyRespErrors.inc + + if (1 < buddy.only.nBdyRespErrors and buddy.ctrl.stopped) or + fetchBodiesReqThresholdCount < buddy.only.nBdyRespErrors: # Make sure that this peer does not immediately reconnect buddy.ctrl.zombie = true - trace info & ": list completely failed", peer, iv, ivReq, + trace info & ": current block list discarded", peer, iv, ivReq, ctrl=buddy.ctrl.state, nRespErrors=buddy.only.nBdyRespErrors ctx.blocksUnprocCommit(iv.len, iv) # At this stage allow a task switch so that some other peer might try @@ -238,7 +256,8 @@ proc blocksStagedImport*( let imported = ctx.chain.latestNumber() if qItem.key != imported + 1: trace info & ": there is a gap L vs. staged", - B=ctx.chain.baseNumber.bnStr, L=imported.bnStr, staged=qItem.key.bnStr + B=ctx.chain.baseNumber.bnStr, L=imported.bnStr, staged=qItem.key.bnStr, + C=ctx.layout.coupler.bnStr doAssert imported < qItem.key return false @@ -253,45 +272,49 @@ proc blocksStagedImport*( B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr var maxImport = iv.maxPt - for n in 0 ..< nBlocks: - let nBn = qItem.data.blocks[n].header.number - ctx.pool.chain.importBlock(qItem.data.blocks[n]).isOkOr: - warn info & ": import block error", iv, B=ctx.chain.baseNumber.bnStr, - L=ctx.chain.latestNumber.bnStr, nBn=nBn.bnStr, `error`=error - # Restore what is left over below - maxImport = ctx.chain.latestNumber() - break - - # Allow pseudo/async thread switch. - await sleepAsync asyncThreadSwitchTimeSlot - if not ctx.daemon: - # Shutdown? - maxImport = ctx.chain.latestNumber() - break - - # Update, so it can be followed nicely - ctx.updateMetrics() - - # Occasionally mark the chain finalized - if (n + 1) mod finaliserChainLengthMax == 0 or (n + 1) == nBlocks: - let - nthHash = qItem.data.getNthHash(n) - finHash = if nBn < ctx.layout.final: nthHash else: ctx.layout.finalHash - - doAssert nBn == ctx.chain.latestNumber() - ctx.pool.chain.forkChoice(headHash=nthHash, finalizedHash=finHash).isOkOr: - warn info & ": fork choice error", n, iv, B=ctx.chain.baseNumber.bnStr, - L=ctx.chain.latestNumber.bnStr, F=ctx.layout.final.bnStr, nthHash, - finHash=(if finHash == nthHash: "nHash" else: "F"), `error`=error + block importLoop: + for n in 0 ..< nBlocks: + let nBn = qItem.data.blocks[n].header.number + ctx.pool.chain.importBlock(qItem.data.blocks[n]).isOkOr: + warn info & ": import block error", n, iv, + B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr, + nthBn=nBn.bnStr, nthHash=qItem.data.getNthHash(n), `error`=error # Restore what is left over below maxImport = ctx.chain.latestNumber() - break + break importLoop # Allow pseudo/async thread switch. await sleepAsync asyncThreadSwitchTimeSlot if not ctx.daemon: + # Shutdown? maxImport = ctx.chain.latestNumber() - break + break importLoop + + # Update, so it can be followed nicely + ctx.updateMetrics() + + # Occasionally mark the chain finalized + if (n + 1) mod finaliserChainLengthMax == 0 or (n + 1) == nBlocks: + let + nthHash = qItem.data.getNthHash(n) + finHash = if nBn < ctx.layout.final: nthHash + else: ctx.layout.finalHash + + doAssert nBn == ctx.chain.latestNumber() + ctx.pool.chain.forkChoice(nthHash, finHash).isOkOr: + warn info & ": fork choice error", n, iv, + B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr, + F=ctx.layout.final.bnStr, nthBn=nBn.bnStr, nthHash, + finHash=(if finHash == nthHash: "nthHash" else: "F"), `error`=error + # Restore what is left over below + maxImport = ctx.chain.latestNumber() + break importLoop + + # Allow pseudo/async thread switch. + await sleepAsync asyncThreadSwitchTimeSlot + if not ctx.daemon: + maxImport = ctx.chain.latestNumber() + break importLoop # Import probably incomplete, so a partial roll back may be needed if maxImport < iv.maxPt: diff --git a/nimbus/sync/beacon/worker/db.nim b/nimbus/sync/beacon/worker/db.nim index 143ca418b..d8ae4c2f0 100644 --- a/nimbus/sync/beacon/worker/db.nim +++ b/nimbus/sync/beacon/worker/db.nim @@ -13,7 +13,7 @@ import pkg/[chronicles, chronos], pkg/eth/[common, rlp], - pkg/stew/[byteutils, interval_set, sorted_set], + pkg/stew/[interval_set, sorted_set], pkg/results, "../../.."/[common, core/chain, db/storage_types], ../worker_desc, @@ -22,13 +22,6 @@ import const LhcStateKey = 1.beaconStateKey -# ------------------------------------------------------------------------------ -# Private debugging & logging helpers -# ------------------------------------------------------------------------------ - -formatIt(Hash32): - it.data.toHex - # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ @@ -42,6 +35,40 @@ proc fetchSyncStateLayout(ctx: BeaconCtxRef): Opt[SyncStateLayout] = discard err() + +proc deleteStaleHeadersAndState( + ctx: BeaconCtxRef; + upTo: BlockNumber; + info: static[string]; + ) = + ## Delete stale headers and state + let + kvt = ctx.db.ctx.getKvt() + stateNum = ctx.db.getSavedStateBlockNumber() # for persisting + + var bn = upTo + while 0 < bn and kvt.hasKey(beaconHeaderKey(bn).toOpenArray): + discard kvt.del(beaconHeaderKey(bn).toOpenArray) + bn.dec + + # Occasionallly persist the deleted headers. This will succeed if + # this function is called early enough after restart when there is + # no database transaction pending. + if (upTo - bn) mod 8192 == 0: + ctx.db.persistent(stateNum).isOkOr: + debug info & ": cannot persist deleted sync headers", error=($$error) + # So be it, stop here. + return + + # Delete persistent state, there will be no use of it anymore + discard kvt.del(LhcStateKey.toOpenArray) + ctx.db.persistent(stateNum).isOkOr: + debug info & ": cannot persist deleted sync headers", error=($$error) + return + + if bn < upTo: + debug info & ": deleted stale sync headers", iv=BnRange.new(bn+1,upTo) + # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ @@ -57,30 +84,32 @@ proc dbStoreSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) = # While executing blocks there are frequent save cycles. Otherwise, an # extra save request might help to pick up an interrupted sync session. - let txLevel = ctx.db.level() - if txLevel == 0: + if ctx.db.level() == 0 and ctx.stash.len == 0: let number = ctx.db.getSavedStateBlockNumber() ctx.db.persistent(number).isOkOr: - debug info & ": failed to save sync state persistently", error=($$error) - return - else: - trace info & ": sync state not saved, tx pending", txLevel - return - - trace info & ": saved sync state persistently" + raiseAssert info & " persistent() failed: " & $$error -proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) = - ## Restore chain layout from persistent db +proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]): bool = + ## Restore chain layout from persistent db. It returns `true` if a previous + ## state could be loaded, and `false` if a new state was created. let rc = ctx.fetchSyncStateLayout() latest = ctx.chain.latestNumber() - # See `dbLoadSyncStateAvailable()` for comments + # If there was a manual import after a previous sync, then saved state + # might be outdated. if rc.isOk and + # The base number is the least record of the FCU chains/tree. So the + # finalised entry must not be smaller. ctx.chain.baseNumber() <= rc.value.final and + # If the latest FCU number is not larger than the head, there is nothing + # to do (might also happen after a manual import.) latest < rc.value.head: + + # Assign saved sync state ctx.sst.layout = rc.value + ctx.sst.lastLayout = rc.value # Add interval of unprocessed block range `(L,C]` from `README.md` ctx.blocksUnprocSet(latest+1, ctx.layout.coupler) @@ -93,6 +122,8 @@ proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) = C=ctx.layout.coupler.bnStr, D=ctx.layout.dangling.bnStr, F=ctx.layout.final.bnStr, H=ctx.layout.head.bnStr + true + else: let latestHash = ctx.chain.latestHash() @@ -115,9 +146,25 @@ proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) = headHash: latestHash, headLocked: false) - trace info & ": new sync state", L="C", C="D", D="F", F="H", H=latest.bnStr + ctx.sst.lastLayout = ctx.layout - ctx.sst.lastLayout = ctx.layout + if rc.isOk: + # Some stored headers might have become stale, so delete them. Even + # though it is not critical, stale headers just stay on the database + # forever occupying space without purpose. Also, delete the state record. + # After deleting headers, the state record becomes stale as well. + if rc.value.head <= latest: + # After manual import, the `latest` state might be ahead of the old + # `head` which leaves a gap `(rc.value.head,latest)` of missing headers. + # So the `deleteStaleHeadersAndState()` clean up routine needs to start + # at the `head` and work backwards. + ctx.deleteStaleHeadersAndState(rc.value.head, info) + else: + # Delete stale headers with block numbers starting at to `latest` wile + # working backwards. + ctx.deleteStaleHeadersAndState(latest, info) + + false # ------------------ @@ -139,15 +186,28 @@ proc dbStashHeaders*( ## .. ## let - kvt = ctx.db.ctx.getKvt() + txLevel = ctx.db.level() last = first + revBlobs.len.uint64 - 1 - for n,data in revBlobs: - let key = beaconHeaderKey(last - n.uint64) - kvt.put(key.toOpenArray, data).isOkOr: - raiseAssert info & ": put() failed: " & $$error + if 0 < txLevel: + # Need to cache it because FCU has blocked writing through to disk. + for n,data in revBlobs: + ctx.stash[last - n.uint64] = data + else: + let kvt = ctx.db.ctx.getKvt() + for n,data in revBlobs: + let key = beaconHeaderKey(last - n.uint64) + kvt.put(key.toOpenArray, data).isOkOr: + raiseAssert info & ": put() failed: " & $$error proc dbPeekHeader*(ctx: BeaconCtxRef; num: BlockNumber): Opt[Header] = ## Retrieve some stashed header. + # Try cache first + ctx.stash.withValue(num, val): + try: + return ok(rlp.decode(val[], Header)) + except RlpError: + discard + # Use persistent storage next let key = beaconHeaderKey(num) rc = ctx.db.ctx.getKvt().get(key.toOpenArray) @@ -164,6 +224,9 @@ proc dbPeekParentHash*(ctx: BeaconCtxRef; num: BlockNumber): Opt[Hash32] = proc dbUnstashHeader*(ctx: BeaconCtxRef; bn: BlockNumber) = ## Remove header from temporary DB list + ctx.stash.withValue(bn, val): + ctx.stash.del bn + return discard ctx.db.ctx.getKvt().del(beaconHeaderKey(bn).toOpenArray) # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/beacon/worker/headers_staged.nim b/nimbus/sync/beacon/worker/headers_staged.nim index 83b850047..5c79169bc 100644 --- a/nimbus/sync/beacon/worker/headers_staged.nim +++ b/nimbus/sync/beacon/worker/headers_staged.nim @@ -43,7 +43,7 @@ proc fetchAndCheck( # While assembling a `LinkedHChainRef`, verify that the `revHeaders` list # was sound, i.e. contiguous, linked, etc. - if not revHeaders.extendLinkedHChain(buddy, ivReq.maxPt, lhc, info): + if not revHeaders.extendLinkedHChain(buddy, ivReq.maxPt, lhc): return false return true @@ -81,10 +81,18 @@ proc headerStagedUpdateTarget*( let final = rc.value[0].number if final < ctx.chain.baseNumber(): trace info & ": finalised number too low", peer, - B=ctx.chain.baseNumber.bnStr, finalised=rc.value[0].number.bnStr + B=ctx.chain.baseNumber.bnStr, finalised=final.bnStr, + delta=(ctx.chain.baseNumber - final) + ctx.target.reset else: ctx.target.final = final + # Activate running (unless done yet) + if ctx.hibernate: + ctx.hibernate = false + trace info & ": activated syncer", peer, + finalised=final.bnStr, head=ctx.layout.head.bnStr + # Update, so it can be followed nicely ctx.updateMetrics() @@ -155,13 +163,17 @@ proc headersStagedCollect*( # Fetch and extend chain record if not await buddy.fetchAndCheck(ivReq, lhc, info): - # Throw away opportunistic data (or first time header fetch.) Turn back - # unused data. + # Throw away opportunistic data (or first time header fetch.) Keep + # other data for a partially assembled list. if isOpportunistic or nLhcHeaders == 0: - if 0 < buddy.only.nHdrRespErrors and buddy.ctrl.stopped: + buddy.only.nHdrRespErrors.inc + + if (0 < buddy.only.nHdrRespErrors and buddy.ctrl.stopped) or + fetchHeadersReqThresholdCount < buddy.only.nHdrRespErrors: # Make sure that this peer does not immediately reconnect buddy.ctrl.zombie = true - trace info & ": completely failed", peer, iv, ivReq, isOpportunistic, + trace info & ": current header list discarded", peer, iv, ivReq, + isOpportunistic, ctrl=buddy.ctrl.state, nRespErrors=buddy.only.nHdrRespErrors ctx.headersUnprocCommit(iv.len, iv) # At this stage allow a task switch so that some other peer might try @@ -197,7 +209,7 @@ proc headersStagedCollect*( raiseAssert info & ": duplicate key on staged queue iv=" & $iv qItem.data = lhc[] - trace info & ": staged headers", peer, + trace info & ": staged header list", peer, topBlock=iv.maxPt.bnStr, nHeaders=lhc.revHdrs.len, nStaged=ctx.hdr.staged.len, isOpportunistic, ctrl=buddy.ctrl.state @@ -209,16 +221,15 @@ proc headersStagedProcess*(ctx: BeaconCtxRef; info: static[string]): int = ## chains layout and the persistent tables. The function returns the number ## of records processed and saved. while true: - # Fetch largest block + # Fetch list with largest block numbers let qItem = ctx.hdr.staged.le(high BlockNumber).valueOr: - trace info & ": no staged headers", error=error break # all done let dangling = ctx.layout.dangling iv = BnRange.new(qItem.key - qItem.data.revHdrs.len.uint64 + 1, qItem.key) if iv.maxPt+1 < dangling: - trace info & ": there is a gap", iv, D=dangling.bnStr, nSaved=result + trace info & ": there is a gap", iv, D=dangling.bnStr, nStashed=result break # there is a gap -- come back later # Overlap must not happen @@ -235,8 +246,8 @@ proc headersStagedProcess*(ctx: BeaconCtxRef; info: static[string]): int = if qItem.data.hash != ctx.layout.danglingParent: # Discard wrong chain and merge back the range into the `unproc` list. ctx.headersUnprocCommit(0,iv) - trace info & ": discarding staged record", - iv, D=dangling.bnStr, lap=result + trace info & ": discarding staged header list", iv, D=dangling.bnStr, + nStashed=result, nDiscarded=qItem.data.revHdrs.len break # Store headers on database @@ -245,10 +256,10 @@ proc headersStagedProcess*(ctx: BeaconCtxRef; info: static[string]): int = ctx.layout.danglingParent = qItem.data.parentHash ctx.dbStoreSyncStateLayout info - result.inc # count records + result += qItem.data.revHdrs.len # count headers - trace info & ": staged header lists saved", - nStaged=ctx.hdr.staged.len, nSaved=result + trace info & ": consecutive headers stashed", + nListsLeft=ctx.hdr.staged.len, nStashed=result if headersStagedQueueLengthLwm < ctx.hdr.staged.len: ctx.poolMode = true diff --git a/nimbus/sync/beacon/worker/headers_staged/linked_hchain.nim b/nimbus/sync/beacon/worker/headers_staged/linked_hchain.nim index 2edcb5f94..1116abf20 100644 --- a/nimbus/sync/beacon/worker/headers_staged/linked_hchain.nim +++ b/nimbus/sync/beacon/worker/headers_staged/linked_hchain.nim @@ -12,7 +12,6 @@ import pkg/eth/[common, p2p, rlp], - pkg/stew/byteutils, ../../../../common, ../../worker_desc @@ -25,7 +24,6 @@ proc extendLinkedHChain*( buddy: BeaconBuddyRef; topNumber: BlockNumber; lhc: ref LinkedHChain; # update in place - info: static[string]; ): bool = ## Returns sort of `lhc[] += rev[]` where `lhc[]` is updated in place. diff --git a/nimbus/sync/beacon/worker/helpers.nim b/nimbus/sync/beacon/worker/helpers.nim index 2ed702866..59b5e53e5 100644 --- a/nimbus/sync/beacon/worker/helpers.nim +++ b/nimbus/sync/beacon/worker/helpers.nim @@ -15,7 +15,11 @@ import pkg/chronos, pkg/eth/common, - pkg/stew/interval_set + pkg/stew/interval_set, + ../../../utils/utils + +export + short func bnStr*(w: BlockNumber): string = "#" & $w diff --git a/nimbus/sync/beacon/worker/start_stop.nim b/nimbus/sync/beacon/worker/start_stop.nim index 16dc878c8..9bda33677 100644 --- a/nimbus/sync/beacon/worker/start_stop.nim +++ b/nimbus/sync/beacon/worker/start_stop.nim @@ -57,16 +57,19 @@ when enableTicker: reorg: ctx.pool.nReorg, nBuddies: ctx.pool.nBuddies) + proc updateBeaconHeaderCB(ctx: BeaconCtxRef): ReqBeaconSyncTargetCB = ## Update beacon header. This function is intended as a call back function ## for the RPC module. return proc(h: Header; f: Hash32) {.gcsafe, raises: [].} = + # Check whether there is an update running (otherwise take next upate) - if not ctx.target.locked: - # Rpc checks empty header against a zero hash rather than `emptyRoot` - if f != zeroHash32 and - ctx.layout.head < h.number and - ctx.target.consHead.number < h.number: + if not ctx.target.locked and # ignore if currently updating + ctx.target.final == 0 and # ignore if complete already + f != zeroHash32 and # finalised hash is set + ctx.layout.head < h.number and # update is advancing + ctx.target.consHead.number < h.number: # .. ditto + ctx.target.consHead = h ctx.target.final = BlockNumber(0) ctx.target.finalHash = f @@ -101,8 +104,13 @@ proc setupDatabase*(ctx: BeaconCtxRef; info: static[string]) = ctx.headersUnprocInit() ctx.blocksUnprocInit() - # Load initial state from database if there is any - ctx.dbLoadSyncStateLayout info + # Load initial state from database if there is any. If the loader returns + # `true`, then the syncer will resume from previous sync in which case the + # system becomes fully active. Otherwise there is some polling only waiting + # for a new target so there is reduced service (aka `hibernate`.). + ctx.hibernate = not ctx.dbLoadSyncStateLayout info + if ctx.hibernate: + trace info & ": hibernating", latest=ctx.chain.latestNumber.bnStr # Set blocks batch import value for block import if ctx.pool.nBodiesBatch < nFetchBodiesRequest: diff --git a/nimbus/sync/beacon/worker/update.nim b/nimbus/sync/beacon/worker/update.nim index 080509708..2ae6b4fa0 100644 --- a/nimbus/sync/beacon/worker/update.nim +++ b/nimbus/sync/beacon/worker/update.nim @@ -13,12 +13,12 @@ import pkg/[chronicles, chronos], pkg/eth/[common, rlp], - pkg/stew/sorted_set, + pkg/stew/[byteutils, sorted_set], ../../../core/chain, ../worker_desc, ./update/metrics, ./headers_staged/staged_queue, - "."/[blocks_unproc, db, headers_unproc] + "."/[blocks_unproc, db, headers_unproc, helpers] # ------------------------------------------------------------------------------ # Private functions @@ -88,7 +88,7 @@ proc updateTargetChange(ctx: BeaconCtxRef; info: static[string]) = doAssert ctx.headersStagedQueueIsEmpty() ctx.headersUnprocSet(ctx.layout.coupler+1, ctx.layout.dangling-1) - trace info & ": updated sync state", C=ctx.layout.coupler.bnStr, + trace info & ": updated sync state/new target", C=ctx.layout.coupler.bnStr, uTop=ctx.headersUnprocTop(), D=ctx.layout.dangling.bnStr, H=ctx.layout.head.bnStr, T=target.bnStr @@ -109,10 +109,15 @@ proc mergeAdjacentChains(ctx: BeaconCtxRef; info: static[string]) = # Verify adjacent chains if ctx.layout.couplerHash != ctx.layout.danglingParent: # FIXME: Oops -- any better idea than to defect? - raiseAssert info & ": hashes do not match" & - " C=" & ctx.layout.coupler.bnStr & " D=" & $ctx.layout.dangling.bnStr + raiseAssert info & ": header chains C-D joining hashes do not match" & + " L=" & ctx.chain.latestNumber().bnStr & + " lHash=" & ctx.chain.latestHash.short & + " C=" & ctx.layout.coupler.bnStr & + " cHash=" & ctx.layout.couplerHash.short & + " D=" & $ctx.layout.dangling.bnStr & + " dParent=" & ctx.layout.danglingParent.short - trace info & ": merging adjacent chains", C=ctx.layout.coupler.bnStr, + trace info & ": merging adjacent header chains", C=ctx.layout.coupler.bnStr, D=ctx.layout.dangling.bnStr # Merge adjacent linked chains @@ -133,6 +138,21 @@ proc mergeAdjacentChains(ctx: BeaconCtxRef; info: static[string]) = # Update, so it can be followed nicely ctx.updateMetrics() + +proc updateTargetReached(ctx: BeaconCtxRef; info: static[string]) = + # Open up layout for update + ctx.layout.headLocked = false + + # Clean up target bucket and await a new target. + ctx.target.reset + ctx.hibernate = true + + let + latest {.used.} = ctx.chain.latestNumber() + head {.used.} = ctx.layout.head + trace info & ": hibernating, awaiting new sync target", + L=(if head == latest: "H" else: latest.bnStr), H=head.bnStr + # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ @@ -143,13 +163,11 @@ proc updateSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) = # Check whether the target has been reached. In that case, unlock the # consensus head `H` from the current layout so that it can be updated # in time. - if ctx.layout.headLocked: - # So we have a session - let latest= ctx.chain.latestNumber() - if ctx.layout.head <= latest: - doAssert ctx.layout.head == latest - ctx.layout.headLocked = false - + if ctx.layout.headLocked and # there is an active session + ctx.layout.head <= ctx.chain.latestNumber(): # and target has been reached + # Note that `latest` might exceed the `head`. This will happen when the + # engine API got some request to execute and import subsequent blocks. + ctx.updateTargetReached info # Check whether there is something to do regarding beacon node change if not ctx.layout.headLocked and # there was an active import request diff --git a/nimbus/sync/beacon/worker_config.nim b/nimbus/sync/beacon/worker_config.nim index cd1aa80f7..f41029e65 100644 --- a/nimbus/sync/beacon/worker_config.nim +++ b/nimbus/sync/beacon/worker_config.nim @@ -92,8 +92,8 @@ const nFetchBodiesRequest* = 128 ## Similar to `nFetchHeadersRequest` - fetchBodiesReqThresholdZombie* = chronos.seconds(2) - fetchBodiesReqThresholdCount* = 3 + fetchBodiesReqThresholdZombie* = chronos.seconds(4) + fetchBodiesReqThresholdCount* = 5 ## Similar to `fetchHeadersReqThreshold*` fetchBodiesReqMinResponsePC* = 10 diff --git a/nimbus/sync/beacon/worker_desc.nim b/nimbus/sync/beacon/worker_desc.nim index 8251f3149..ca55ae1ef 100644 --- a/nimbus/sync/beacon/worker_desc.nim +++ b/nimbus/sync/beacon/worker_desc.nim @@ -52,6 +52,16 @@ type ## Block request item sorted by least block number (i.e. from `blocks[0]`.) blocks*: seq[EthBlock] ## List of blocks for import + KvtCache* = Table[BlockNumber,seq[byte]] + ## This cache type is intended for holding block headers that cannot be + ## reliably saved persistently. This is the situation after blocks are + ## imported as the FCU handlers always maintain a positive transaction + ## level and in some instances the current transaction is flushed and + ## re-opened. + ## + ## The number of block headers to hold in memory after block import has + ## started is the distance to the new `canonical execution head`. + # ------------------- SyncStateTarget* = object @@ -133,8 +143,9 @@ type # Blocks import/execution settings for importing with # `nBodiesBatch` blocks in each round (minimum value is # `nFetchBodiesRequest`.) - chain*: ForkedChainRef ## Database - importRunningOk*: bool ## Advisory lock, fetch vs. import + chain*: ForkedChainRef ## Core database, FCU support + stash*: KvtCache ## Temporary header and state table + blockImportOk*: bool ## Don't fetch data while block importing nBodiesBatch*: int ## Default `nFetchBodiesBatchDefault` blocksStagedQuLenMax*: int ## Default `blocksStagedQueueLenMaxDefault` @@ -179,10 +190,30 @@ func chain*(ctx: BeaconCtxRef): ForkedChainRef = ## Getter ctx.pool.chain +func stash*(ctx: BeaconCtxRef): var KvtCache = + ## Getter + ctx.pool.stash + func db*(ctx: BeaconCtxRef): CoreDbRef = ## Getter ctx.pool.chain.db +# ----- + +func hibernate*(ctx: BeaconCtxRef): bool = + ## Getter, re-interpretation of the daemon flag for reduced service mode + # No need for running the daemon with reduced service mode. So it is + # convenient to use this flag for indicating this. + not ctx.daemon + +proc `hibernate=`*(ctx: BeaconCtxRef; val: bool) = + ## Setter + ctx.daemon = not val + + # Control some error messages on the scheduler (e.g. zombie/banned-peer + # reconnection attempts, LRU flushing out oldest peer etc.) + ctx.noisyLog = not val + # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/sync_desc.nim b/nimbus/sync/sync_desc.nim index 236d795f0..c1c179b3d 100644 --- a/nimbus/sync/sync_desc.nim +++ b/nimbus/sync/sync_desc.nim @@ -38,6 +38,7 @@ type CtxRef*[S] = ref object ## Shared state among all syncing peer workers (aka buddies.) + noisyLog*: bool ## Hold back `trace` and `debug` msgs if `false` poolMode*: bool ## Activate `runPool()` workers if set `true` daemon*: bool ## Enable global background job pool*: S ## Shared context for all worker peers diff --git a/nimbus/sync/sync_sched.nim b/nimbus/sync/sync_sched.nim index d6a0011ea..41cb7c326 100644 --- a/nimbus/sync/sync_sched.nim +++ b/nimbus/sync/sync_sched.nim @@ -341,12 +341,12 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) = now = Moment.now() ttz = zombie.value.zombified + zombieTimeToLinger if ttz < Moment.now(): - trace "Reconnecting zombie peer ignored", peer, + if dsc.ctx.noisyLog: trace "Reconnecting zombie peer ignored", peer, nPeers, nWorkers=dsc.buddies.len, maxWorkers, canRequeue=(now-ttz) return # Zombie can be removed from the database dsc.buddies.del peer.key - trace "Zombie peer timeout, ready for requeing", peer, + if dsc.ctx.noisyLog: trace "Zombie peer timeout, ready for requeing", peer, nPeers, nWorkers=dsc.buddies.len, maxWorkers # Initialise worker for this peer @@ -357,7 +357,7 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) = ctrl: BuddyCtrlRef(), peer: peer)) if not buddy.worker.runStart(): - trace "Ignoring useless peer", peer, nPeers, + if dsc.ctx.noisyLog: trace "Ignoring useless peer", peer, nPeers, nWorkers=dsc.buddies.len, maxWorkers buddy.worker.ctrl.zombie = true return @@ -373,7 +373,7 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) = leastVal = dsc.buddies.shift.value # unqueue first/least item oldest = leastVal.data.worker if oldest.isNil: - trace "Dequeuing zombie peer", + if dsc.ctx.noisyLog: trace "Dequeuing zombie peer", # Fake `Peer` pretty print for `oldest` oldest=("Node[" & $leastVal.key.address & "]"), since=leastVal.data.zombified, nPeers, nWorkers=dsc.buddies.len, @@ -382,8 +382,8 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) = else: # This could happen if there are idle entries in the table, i.e. # somehow hanging runners. - trace "Peer table full! Dequeuing least used entry", oldest, - nPeers, nWorkers=dsc.buddies.len, maxWorkers + if dsc.ctx.noisyLog: trace "Peer table full! Dequeuing least used entry", + oldest, nPeers, nWorkers=dsc.buddies.len, maxWorkers # Setting to `zombie` will trigger the worker to terminate (if any.) oldest.ctrl.zombie = true @@ -400,12 +400,12 @@ proc onPeerDisconnected[S,W](dsc: RunnerSyncRef[S,W], peer: Peer) = nWorkers = dsc.buddies.len rc = dsc.buddies.eq peer.key if rc.isErr: - debug "Disconnected, unregistered peer", peer, nPeers, nWorkers, maxWorkers - discard + if dsc.ctx.noisyLog: debug "Disconnected, unregistered peer", peer, + nPeers, nWorkers, maxWorkers elif rc.value.worker.isNil: # Re-visiting zombie - trace "Ignore zombie", peer, nPeers, nWorkers, maxWorkers - discard + if dsc.ctx.noisyLog: trace "Ignore zombie", peer, + nPeers, nWorkers, maxWorkers elif rc.value.worker.ctrl.zombie: # Don't disconnect, leave them fall out of the LRU cache. The effect is, # that reconnecting might be blocked, for a while. For few peers cases, @@ -414,7 +414,8 @@ proc onPeerDisconnected[S,W](dsc: RunnerSyncRef[S,W], peer: Peer) = rc.value.worker = nil rc.value.dsc = nil rc.value.zombified = Moment.now() - trace "Disconnected, zombie", peer, nPeers, nWorkers, maxWorkers + if dsc.ctx.noisyLog: trace "Disconnected, zombie", peer, + nPeers, nWorkers, maxWorkers else: rc.value.worker.ctrl.stopped = true # in case it is hanging somewhere dsc.buddies.del peer.key