diff --git a/nimbus/core/chain/forked_chain.nim b/nimbus/core/chain/forked_chain.nim index 184cf99c2..bf0b9588c 100644 --- a/nimbus/core/chain/forked_chain.nim +++ b/nimbus/core/chain/forked_chain.nim @@ -19,11 +19,12 @@ import ../../evm/state, ../validate, ../executor/process_block, - ./forked_chain/chain_desc + ./forked_chain/[chain_desc, chain_kvt] export BlockDesc, ForkedChainRef, + chain_kvt, common, core_db diff --git a/nimbus/core/chain/forked_chain/chain_kvt.nim b/nimbus/core/chain/forked_chain/chain_kvt.nim new file mode 100644 index 000000000..07c4e2fa4 --- /dev/null +++ b/nimbus/core/chain/forked_chain/chain_kvt.nim @@ -0,0 +1,72 @@ +# Nimbus +# Copyright (c) 2024 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed except +# according to those terms. + +## Persistent kvt that ideally bypasses the `FC` logic. This can be used as a +## cache where in-memory storade would be much of a burden (e.g. all `mainnet` +## headers.) +## +## Currently, it is not always possible to store. But fortunately (for the +## syncer application) it works for some time at the beginning when the `FC` +## module is initialised and no block operation has been performed, yet. + +{.push raises: [].} + +import + pkg/results, + ../../../common, + ../../../db/core_db, + ./chain_desc + +proc fcKvtAvailable*(c: ForkedChainRef): bool = + ## Returns `true` if `kvt` data can be saved persistently. + c.db.txFrameLevel() == 0 + +proc fcKvtPersistent*(c: ForkedChainRef): bool = + ## Save cached `kvt` data if possible. This function has the side effect + ## that it saves all cached db data including `Aristo` data (although there + ## should not be any.) + ## + if c.fcKvtAvailable(): + c.db.persistent(c.db.getSavedStateBlockNumber()).isOkOr: + raiseAssert "fcKvtPersistent: persistent() failed: " & $$error + return true + +proc fcKvtHasKey*(c: ForkedChainRef, key: openArray[byte]): bool = + ## Check whether the argument `key` exists on the `kvt` table (i.e. `get()` + ## would succeed.) + ## + c.db.ctx.getKvt().hasKey(key) + +proc fcKvtGet*(c: ForkedChainRef, key: openArray[byte]): Opt[seq[byte]] = + ## Fetch data entry from `kvt` table. + ## + var w = c.db.ctx.getKvt().get(key).valueOr: + return err() + ok(move w) + +proc fcKvtPut*(c: ForkedChainRef, key, data: openArray[byte]): bool = + ## Cache data on the `kvt` table marked for saving persistently. If the `kvt` + ## table is unavailable, this function does nothing and returns `false`. + ## + if c.fcKvtAvailable(): + c.db.ctx.getKvt().put(key, data).isOkOr: + raiseAssert "fcKvtPut: put() failed: " & $$error + return true + +proc fcKvtDel*(c: ForkedChainRef, key: openArray[byte]): bool = + ## Cache key for deletion on the `kvt` table. If the `kvt` table is + ## unavailable, this function does nothing and returns `false`. + ## + if c.fcKvtAvailable(): + c.db.ctx.getKvt().del(key).isOkOr: + raiseAssert "fcKvtDel: del() failed: " & $$error + return true + +# End diff --git a/nimbus/db/aristo/aristo_api.nim b/nimbus/db/aristo/aristo_api.nim index d2490263c..2809d2e60 100644 --- a/nimbus/db/aristo/aristo_api.nim +++ b/nimbus/db/aristo/aristo_api.nim @@ -450,7 +450,7 @@ when AutoValidateApiHooks: doAssert not api.hasStorageData.isNil doAssert not api.isTop.isNil - doAssert not api.level.isNil + doAssert not api.txFrameLevel.isNil doAssert not api.mergeAccountRecord.isNil doAssert not api.mergeStorageData.isNil diff --git a/nimbus/db/storage_types.nim b/nimbus/db/storage_types.nim index 392048c67..e6eb41145 100644 --- a/nimbus/db/storage_types.nim +++ b/nimbus/db/storage_types.nim @@ -24,8 +24,7 @@ type dataDirId = 7 safeHash = 8 finalizedHash = 9 - beaconState = 10 - beaconHeader = 11 + beaconHeader = 10 DbKey* = object # The first byte stores the key type. The rest are key-specific values @@ -87,11 +86,6 @@ func hashIndexKey*(hash: Hash32, index: uint16): HashIndexKey = result[32] = byte(index and 0xFF) result[33] = byte((index shl 8) and 0xFF) -func beaconStateKey*(u: uint8): DbKey = - result.data[0] = byte ord(beaconState) - result.data[1] = u - result.dataEndPos = 1 - func beaconHeaderKey*(u: BlockNumber): DbKey = result.data[0] = byte ord(beaconHeader) doAssert sizeof(u) <= 32 diff --git a/nimbus/sync/beacon/worker/blocks_staged.nim b/nimbus/sync/beacon/worker/blocks_staged.nim index 510ca7ad8..d18eef58f 100644 --- a/nimbus/sync/beacon/worker/blocks_staged.nim +++ b/nimbus/sync/beacon/worker/blocks_staged.nim @@ -104,7 +104,11 @@ proc fetchAndCheck( blk.blocks[offset + n].uncles = bodies[n].uncles blk.blocks[offset + n].withdrawals = bodies[n].withdrawals - return offset < blk.blocks.len.uint64 + if offset < blk.blocks.len.uint64: + return true + + buddy.only.nBdyProcErrors.inc + return false # ------------------------------------------------------------------------------ # Public functions @@ -195,13 +199,11 @@ proc blocksStagedCollect*( # Fetch and extend staging record if not await buddy.fetchAndCheck(ivReq, blk, info): + haveError = true # Throw away first time block fetch data. Keep other data for a # partially assembled list. if nBlkBlocks == 0: - buddy.only.nBdyProcErrors.inc - haveError = true - if ((0 < buddy.only.nBdyRespErrors or 0 < buddy.only.nBdyProcErrors) and buddy.ctrl.stopped) or fetchBodiesReqErrThresholdCount < buddy.only.nBdyRespErrors or @@ -288,7 +290,7 @@ proc blocksStagedImport*( nBlocks = qItem.data.blocks.len iv = BnRange.new(qItem.key, qItem.key + nBlocks.uint64 - 1) - trace info & ": import blocks ..", iv, nBlocks, + debug info & ": import blocks ..", iv, nBlocks, B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr var maxImport = iv.maxPt @@ -317,6 +319,9 @@ proc blocksStagedImport*( maxImport = ctx.chain.latestNumber() break importLoop + # Update, so it can be followed nicely + ctx.updateMetrics() + # Allow pseudo/async thread switch. try: await sleepAsync asyncThreadSwitchTimeSlot except CancelledError: discard @@ -325,9 +330,6 @@ proc blocksStagedImport*( maxImport = ctx.chain.latestNumber() break importLoop - # Update, so it can be followed nicely - ctx.updateMetrics() - # Occasionally mark the chain finalized if (n + 1) mod finaliserChainLengthMax == 0 or (n + 1) == nBlocks: let @@ -363,7 +365,7 @@ proc blocksStagedImport*( # Update, so it can be followed nicely ctx.updateMetrics() - trace info & ": import done", iv, nBlocks, B=ctx.chain.baseNumber.bnStr, + debug info & ": import done", iv, nBlocks, B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr, F=ctx.layout.final.bnStr return true diff --git a/nimbus/sync/beacon/worker/db.nim b/nimbus/sync/beacon/worker/db.nim index 60dfff07d..c165b1421 100644 --- a/nimbus/sync/beacon/worker/db.nim +++ b/nimbus/sync/beacon/worker/db.nim @@ -19,15 +19,19 @@ import ../worker_desc, ./headers_unproc -const - LhcStateKey = 1.beaconStateKey +let + LhcStateKey = 0.beaconHeaderKey # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ +template kvtNotAvailable(info: static[string]): string = + info & ": kvt table not available (locked by FC module)" + + proc fetchSyncStateLayout(ctx: BeaconCtxRef): Opt[SyncStateLayout] = - let data = ctx.db.ctx.getKvt().get(LhcStateKey.toOpenArray).valueOr: + let data = ctx.pool.chain.fcKvtGet(LhcStateKey.toOpenArray).valueOr: return err() try: return ok(rlp.decode(data, SyncStateLayout)) @@ -42,29 +46,25 @@ proc deleteStaleHeadersAndState( info: static[string]; ) = ## Delete stale headers and state - let - kvt = ctx.db.ctx.getKvt() - stateNum = ctx.db.getSavedStateBlockNumber() # for persisting + let c = ctx.pool.chain + if not c.fcKvtAvailable(): + trace kvtNotAvailable(info) + return var bn = upTo - while 0 < bn and kvt.hasKey(beaconHeaderKey(bn).toOpenArray): - discard kvt.del(beaconHeaderKey(bn).toOpenArray) + while 0 < bn and c.fcKvtHasKey(beaconHeaderKey(bn).toOpenArray): + discard c.fcKvtDel(beaconHeaderKey(bn).toOpenArray) bn.dec - # Occasionallly persist the deleted headers. This will succeed if - # this function is called early enough after restart when there is - # no database transaction pending. + # Occasionallly persist the deleted headers (so that the internal DB cache + # does not grow extra large.) This will succeed if this function is called + # early enough after restart when there is no database transaction pending. if (upTo - bn) mod 8192 == 0: - ctx.db.persistent(stateNum).isOkOr: - debug info & ": cannot persist deleted sync headers", error=($$error) - # So be it, stop here. - return + discard c.fcKvtPersistent() - # Delete persistent state, there will be no use of it anymore - discard kvt.del(LhcStateKey.toOpenArray) - ctx.db.persistent(stateNum).isOkOr: - debug info & ": cannot persist deleted sync headers", error=($$error) - return + # Delete persistent state record, there will be no use of it anymore + discard c.fcKvtDel(LhcStateKey.toOpenArray) + discard c.fcKvtPersistent() if bn < upTo: debug info & ": deleted stale sync headers", iv=BnRange.new(bn+1,upTo) @@ -75,17 +75,12 @@ proc deleteStaleHeadersAndState( proc dbStoreSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) = ## Save chain layout to persistent db - let data = rlp.encode(ctx.layout) - ctx.db.ctx.getKvt().put(LhcStateKey.toOpenArray, data).isOkOr: - raiseAssert info & " put() failed: " & $$error - - # While executing blocks there are frequent save cycles. Otherwise, an - # extra save request might help to pick up an interrupted sync session. - if ctx.db.txFrameLevel() == 0 and ctx.stash.len == 0: - let number = ctx.db.getSavedStateBlockNumber() - ctx.db.persistent(number).isOkOr: - raiseAssert info & " persistent() failed: " & $$error - + let c = ctx.pool.chain + if c.fcKvtAvailable(): + discard c.fcKvtPut(LhcStateKey.toOpenArray, rlp.encode(ctx.layout)) + discard c.fcKvtPersistent() + else: + trace kvtNotAvailable(info) proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]): bool = ## Restore chain layout from persistent db. It returns `true` if a previous @@ -165,18 +160,16 @@ proc dbHeadersStash*( ## .. ## let - txFrameLevel = ctx.db.txFrameLevel() + c = ctx.pool.chain last = first + revBlobs.len.uint64 - 1 - if 0 < txFrameLevel: + if not c.fcKvtAvailable(): # Need to cache it because FCU has blocked writing through to disk. for n,data in revBlobs: ctx.stash[last - n.uint64] = data else: - let kvt = ctx.db.ctx.getKvt() for n,data in revBlobs: let key = beaconHeaderKey(last - n.uint64) - kvt.put(key.toOpenArray, data).isOkOr: - raiseAssert info & ": put() failed: " & $$error + discard c.fcKvtPut(key.toOpenArray, data) proc dbHeaderPeek*(ctx: BeaconCtxRef; num: BlockNumber): Opt[Header] = ## Retrieve some stashed header. @@ -189,7 +182,7 @@ proc dbHeaderPeek*(ctx: BeaconCtxRef; num: BlockNumber): Opt[Header] = # Use persistent storage next let key = beaconHeaderKey(num) - rc = ctx.db.ctx.getKvt().get(key.toOpenArray) + rc = ctx.pool.chain.fcKvtGet(key.toOpenArray) if rc.isOk: try: return ok(rlp.decode(rc.value, Header)) @@ -206,7 +199,7 @@ proc dbHeaderUnstash*(ctx: BeaconCtxRef; bn: BlockNumber) = ctx.stash.withValue(bn, _): ctx.stash.del bn return - discard ctx.db.ctx.getKvt().del(beaconHeaderKey(bn).toOpenArray) + discard ctx.pool.chain.fcKvtDel(beaconHeaderKey(bn).toOpenArray) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/beacon/worker/headers_staged.nim b/nimbus/sync/beacon/worker/headers_staged.nim index 7fb934cfb..775db4a6b 100644 --- a/nimbus/sync/beacon/worker/headers_staged.nim +++ b/nimbus/sync/beacon/worker/headers_staged.nim @@ -44,6 +44,7 @@ proc fetchAndCheck( # While assembling a `LinkedHChainRef`, verify that the `revHeaders` list # was sound, i.e. contiguous, linked, etc. if not revHeaders.extendLinkedHChain(buddy, ivReq.maxPt, lhc): + buddy.only.nHdrProcErrors.inc return false return true @@ -150,13 +151,11 @@ proc headersStagedCollect*( # Fetch and extend chain record if not await buddy.fetchAndCheck(ivReq, lhc, info): + haveError = true # Throw away opportunistic data (or first time header fetch.) Keep # other data for a partially assembled list. if isOpportunistic or nLhcHeaders == 0: - buddy.only.nHdrProcErrors.inc - haveError = true - if ((0 < buddy.only.nHdrRespErrors or 0 < buddy.only.nHdrProcErrors) and buddy.ctrl.stopped) or fetchHeadersReqErrThresholdCount < buddy.only.nHdrRespErrors or @@ -252,7 +251,7 @@ proc headersStagedProcess*(ctx: BeaconCtxRef; info: static[string]): int = result += qItem.data.revHdrs.len # count headers - trace info & ": stashed consecutive headers", + debug info & ": stashed consecutive headers", nListsLeft=ctx.hdr.staged.len, nStashed=result if headersStagedQueueLengthLwm < ctx.hdr.staged.len: