From 143f2e99f52ca37beeb6103053320c9767ad8d50 Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Fri, 3 May 2024 17:38:17 +0000 Subject: [PATCH] Core db+aristo fixes and tx handling updates (#2164) * Aristo: Rename journal related sources and functions why: Previously, the naming was hinged on the phrases `fifo`, `filter` etc. which reflect the inner workings of cascaded filters. This was unfortunate for reading/understanding the source code for actions where the focus is the journal as a whole. * Aristo: Fix buffer overflow (path length truncating error) * Aristo: Tighten `hikeUp()` stop check, update error code why: Detect dangling vertex links. These are legit with `snap` sync processing but not with regular processing. * Aristo: Raise assert in regular mode `merge()` at a dangling link/edge why: With `snap` sync processing, partial trees are ok and can be amended. Not so in regular mode. Previously there was only a debug message when a non-legit dangling edge was encountered. * Aristo: Make sure that vertices are copied before modification why: Otherwise vertices from lower layers might also be modified * Aristo: Fix relaxed mode for validity checker `check()` * Remove cruft * Aristo: Update API for transaction handling details: + Split `aristo_tx.nim` into sub-modules + Split `forkWith()` into `findTx()` + `forkTx()` + Removed `forkTop()`, `forkBase()` (now superseded by new `forkTx()`) * CoreDb+Aristo: Fix initialiser (missing methods) --- nimbus/db/aristo/aristo_api.nim | 168 ++++---- nimbus/db/aristo/aristo_check/check_be.nim | 23 +- .../db/aristo/aristo_check/check_journal.nim | 2 +- nimbus/db/aristo/aristo_debug.nim | 2 +- nimbus/db/aristo/aristo_delete.nim | 4 - nimbus/db/aristo/aristo_desc/desc_error.nim | 5 +- .../db/aristo/aristo_desc/desc_structural.nim | 10 +- nimbus/db/aristo/aristo_hike.nim | 17 +- .../{aristo_filter.nim => aristo_journal.nim} | 37 +- .../filter_merge.nim | 0 .../filter_reverse.nim | 0 .../filter_siblings.nim | 0 .../aristo_journal/filter_state_root.nim | 77 ++++ .../journal_get.nim} | 83 +--- .../journal_ops.nim} | 46 +- .../journal_scheduler.nim} | 97 ++--- nimbus/db/aristo/aristo_merge.nim | 55 +-- nimbus/db/aristo/aristo_path.nim | 2 +- nimbus/db/aristo/aristo_tx.nim | 402 ++++-------------- nimbus/db/aristo/aristo_tx/tx_fork.nim | 152 +++++++ nimbus/db/aristo/aristo_tx/tx_frame.nim | 209 +++++++++ nimbus/db/aristo/aristo_tx/tx_stow.nim | 91 ++++ .../backend/aristo_db/aristo_replicate.nim | 2 +- .../backend/aristo_db/handlers_aristo.nim | 17 +- .../backend/aristo_db/handlers_kvt.nim | 2 +- nimbus/sync/full/worker.nim | 2 +- tests/test_aristo/test_filter.nim | 20 +- tests/test_aristo/test_helpers.nim | 2 +- tests/test_aristo/test_misc.nim | 10 +- 29 files changed, 919 insertions(+), 618 deletions(-) rename nimbus/db/aristo/{aristo_filter.nim => aristo_journal.nim} (90%) rename nimbus/db/aristo/{aristo_filter => aristo_journal}/filter_merge.nim (100%) rename nimbus/db/aristo/{aristo_filter => aristo_journal}/filter_reverse.nim (100%) rename nimbus/db/aristo/{aristo_filter => aristo_journal}/filter_siblings.nim (100%) create mode 100644 nimbus/db/aristo/aristo_journal/filter_state_root.nim rename nimbus/db/aristo/{aristo_filter/filter_helpers.nim => aristo_journal/journal_get.nim} (57%) rename nimbus/db/aristo/{aristo_filter/filter_fifos.nim => aristo_journal/journal_ops.nim} (83%) rename nimbus/db/aristo/{aristo_filter/filter_scheduler.nim => aristo_journal/journal_scheduler.nim} (89%) create mode 100644 nimbus/db/aristo/aristo_tx/tx_fork.nim create mode 100644 nimbus/db/aristo/aristo_tx/tx_frame.nim create mode 100644 nimbus/db/aristo/aristo_tx/tx_stow.nim diff --git a/nimbus/db/aristo/aristo_api.nim b/nimbus/db/aristo/aristo_api.nim index 0d9dbebbd..8ff106eb0 100644 --- a/nimbus/db/aristo/aristo_api.nim +++ b/nimbus/db/aristo/aristo_api.nim @@ -17,8 +17,8 @@ import eth/[common, trie/nibbles], results, ./aristo_desc/desc_backend, - ./aristo_filter/filter_helpers, ./aristo_init/memory_db, + ./aristo_journal/journal_get, "."/[aristo_delete, aristo_desc, aristo_fetch, aristo_get, aristo_hashify, aristo_hike, aristo_init, aristo_merge, aristo_path, aristo_profile, aristo_serialise, aristo_tx, aristo_vid] @@ -95,6 +95,28 @@ type ## Cascaded attempt to traverse the `Aristo Trie` and fetch the value ## of a leaf vertex. This function is complementary to `mergePayload()`. + AristoApiFindTxFn* = + proc(db: AristoDbRef; + vid: VertexID; + key: HashKey; + ): Result[int,AristoError] + {.noRaise.} + ## Find the transaction where the vertex with ID `vid` exists and has + ## the Merkle hash key `key`. If there is no transaction available, + ## search in the filter and then in the backend. + ## + ## If the above procedure succeeds, an integer indicating the transaction + ## level is returned: + ## + ## * `0` -- top level, current layer + ## * `1`,`2`,`..` -- some transaction level further down the stack + ## * `-1` -- the filter between transaction stack and database backend + ## * `-2` -- the databse backend + ## + ## A successful return code might be used for the `forkTx()` call for + ## creating a forked descriptor that provides the pair `(vid,key)`. + ## + AristoApiFinishFn* = proc(db: AristoDbRef; flush = false; @@ -119,52 +141,35 @@ type ## A non centre descriptor should always be destructed after use (see ## also# comments on `fork()`.) - AristoApiForkTopFn* = + AristoApiForkTxFn* = proc(db: AristoDbRef; + backLevel: int; dontHashify = false; ): Result[AristoDbRef,AristoError] {.noRaise.} - ## Clone a descriptor in a way so that there is exactly one active - ## transaction. - ## - ## If the arguent flag `dontHashify` is passed `true`, the clone - ## descriptor will *NOT* be hashified right after construction. - ## - ## Use `aristo_desc.forget()` to clean up this descriptor. - - AristoApiForkWithFn* = - proc(db: AristoDbRef; - vid: VertexID; - key: HashKey; - dontHashify = false; - ): Result[AristoDbRef,AristoError] - {.noRaise.} - ## Find the transaction where the vertex with ID `vid` exists and has - ## the Merkle hash key `key`. If there is no transaction available, - ## search in the filter and then in the backend. - ## - ## If the above procedure succeeds, a new descriptor is forked with - ## exactly one transaction which contains the all the bottom layers up - ## until the layer where the `(vid,key)` pair is found. In case the - ## pair was found on the filter or the backend, this transaction is - ## empty. - - AristoApiGetFromJournalFn* = - proc(be: BackendRef; - fid: Option[FilterID]; - earlierOK = false; - ): Result[FilterIndexPair,AristoError] - {.noRaise.} - ## For a positive argument `fid`, find the filter on the journal with ID - ## not larger than `fid` (i e. the resulting filter might be older.) - ## - ## If the argument `earlierOK` is passed `false`, the function succeeds - ## only if the filter ID of the returned filter is equal to the argument - ## `fid`. - ## - ## In case that the argument `fid` is zera (i.e. `FilterID(0)`), the - ## filter with the smallest filter ID (i.e. the oldest filter) is - ## returned. In that case, the argument `earlierOK` is ignored. + ## Fork a new descriptor obtained from parts of the argument database + ## as described by arguments `db` and `backLevel`. + ## + ## If the argument `backLevel` is non-negative, the forked descriptor + ## will provide the database view where the first `backLevel` transaction + ## layers are stripped and the remaing layers are squashed into a single + ## transaction. + ## + ## If `backLevel` is `-1`, a database descriptor with empty transaction + ## layers will be provided where the `roFilter` between database and + ## transaction layers are kept in place. + ## + ## If `backLevel` is `-2`, a database descriptor with empty transaction + ## layers will be provided without an `roFilter`. + ## + ## The returned database descriptor will always have transaction level one. + ## If there were no transactions that could be squashed, an empty + ## transaction is added. + ## + ## If the arguent flag `dontHashify` is passed `true`, the forked descriptor + ## will *NOT* be hashified right after construction. + ## + ## Use `aristo_desc.forget()` to clean up this descriptor. AristoApiGetKeyRcFn* = proc(db: AristoDbRef; @@ -207,6 +212,23 @@ type ## Getter, returns `true` if the argument `tx` referes to the current ## top level transaction. + AristoApiJournalGetInxFn* = + proc(be: BackendRef; + fid: Option[FilterID]; + earlierOK = false; + ): Result[JournalInx,AristoError] + {.noRaise.} + ## For a positive argument `fid`, find the filter on the journal with ID + ## not larger than `fid` (i e. the resulting filter might be older.) + ## + ## If the argument `earlierOK` is passed `false`, the function succeeds + ## only if the filter ID of the returned filter is equal to the argument + ## `fid`. + ## + ## In case that the argument `fid` is zera (i.e. `FilterID(0)`), the + ## filter with the smallest filter ID (i.e. the oldest filter) is + ## returned. In that case, the argument `earlierOK` is ignored. + AristoApiLevelFn* = proc(db: AristoDbRef; ): int @@ -371,16 +393,16 @@ type delete*: AristoApiDeleteFn delTree*: AristoApiDelTreeFn fetchPayload*: AristoApiFetchPayloadFn + findTx*: AristoApiFindTxFn finish*: AristoApiFinishFn forget*: AristoApiForgetFn - forkTop*: AristoApiForkTopFn - forkWith*: AristoApiForkWithFn - getFromJournal*: AristoApiGetFromJournalFn + forkTx*: AristoApiForkTxFn getKeyRc*: AristoApiGetKeyRcFn hashify*: AristoApiHashifyFn hasPath*: AristoApiHasPathFn hikeUp*: AristoApiHikeUpFn isTop*: AristoApiIsTopFn + journalGetInx*: AristoApiJournalGetInxFn level*: AristoApiLevelFn nForked*: AristoApiNForkedFn merge*: AristoApiMergeFn @@ -404,16 +426,16 @@ type AristoApiProfDeleteFn = "delete" AristoApiProfDelTreeFn = "delTree" AristoApiProfFetchPayloadFn = "fetchPayload" + AristoApiProfFindTxFn = "findTx" AristoApiProfFinishFn = "finish" AristoApiProfForgetFn = "forget" - AristoApiProfForkTopFn = "forkTop" - AristoApiProfForkWithFn = "forkWith" - AristoApiProfGetFromJournalFn = "getFromJournal" + AristoApiProfForkTxFn = "forkTx" AristoApiProfGetKeyRcFn = "getKeyRc" AristoApiProfHashifyFn = "hashify" AristoApiProfHasPathFn = "hasPath" AristoApiProfHikeUpFn = "hikeUp" AristoApiProfIsTopFn = "isTop" + AristoApiProfJournalGetInxFn = "journalGetInx" AristoApiProfLevelFn = "level" AristoApiProfNForkedFn = "nForked" AristoApiProfMergeFn = "merge" @@ -455,16 +477,16 @@ when AutoValidateApiHooks: doAssert not api.delete.isNil doAssert not api.delTree.isNil doAssert not api.fetchPayload.isNil + doAssert not api.findTx.isNil doAssert not api.finish.isNil doAssert not api.forget.isNil - doAssert not api.forkTop.isNil - doAssert not api.forkWith.isNil - doAssert not api.getFromJournal.isNil + doAssert not api.forkTx.isNil doAssert not api.getKeyRc.isNil doAssert not api.hashify.isNil doAssert not api.hasPath.isNil doAssert not api.hikeUp.isNil doAssert not api.isTop.isNil + doAssert not api.journalGetInx.isNil doAssert not api.level.isNil doAssert not api.nForked.isNil doAssert not api.merge.isNil @@ -508,16 +530,16 @@ func init*(api: var AristoApiObj) = api.delete = delete api.delTree = delTree api.fetchPayload = fetchPayload + api.findTx = findTx api.finish = finish api.forget = forget - api.forkTop = forkTop - api.forkWith = forkWith - api.getFromJournal = getFromJournal + api.forkTx = forkTx api.getKeyRc = getKeyRc api.hashify = hashify api.hasPath = hasPath api.hikeUp = hikeUp - api.isTop = isTop + api.isTop = isTop + api.journalGetInx = journalGetInx api.level = level api.nForked = nForked api.merge = merge @@ -544,16 +566,16 @@ func dup*(api: AristoApiRef): AristoApiRef = delete: api.delete, delTree: api.delTree, fetchPayload: api.fetchPayload, + findTx: api.findTx, finish: api.finish, forget: api.forget, - forkTop: api.forkTop, - forkWith: api.forkWith, - getFromJournal: api.getFromJournal, + forkTx: api.forkTx, getKeyRc: api.getKeyRc, hashify: api.hashify, hasPath: api.hasPath, hikeUp: api.hikeUp, isTop: api.isTop, + journalGetInx: api.journalGetInx, level: api.level, nForked: api.nForked, merge: api.merge, @@ -617,6 +639,11 @@ func init*( AristoApiProfFetchPayloadFn.profileRunner: result = api.fetchPayload(a, b, c) + profApi.findTx = + proc(a: AristoDbRef; b: VertexID; c: HashKey): auto = + AristoApiProfFindTxFn.profileRunner: + result = api.findTx(a, b, c) + profApi.finish = proc(a: AristoDbRef; b = false) = AristoApiProfFinishFn.profileRunner: @@ -627,20 +654,10 @@ func init*( AristoApiProfForgetFn.profileRunner: result = api.forget(a) - profApi.forkTop = - proc(a: AristoDbRef; b = false): auto = - AristoApiProfForkTopFn.profileRunner: - result = api.forkTop(a, b) - - profApi.forkWith = - proc(a: AristoDbRef; b: VertexID; c: HashKey; d = false): auto = - AristoApiProfForkWithFn.profileRunner: - result = api.forkWith(a, b, c, d) - - profApi.getFromJournal = - proc(a: BackendRef; b: Option[FilterID]; c = false): auto = - AristoApiProfGetFromJournalFn.profileRunner: - result = api.getFromJournal(a, b, c) + profApi.forkTx = + proc(a: AristoDbRef; b: int; c = false): auto = + AristoApiProfForkTxFn.profileRunner: + result = api.forkTx(a, b, c) profApi.getKeyRc = proc(a: AristoDbRef; b: VertexID): auto = @@ -667,6 +684,11 @@ func init*( AristoApiProfIsTopFn.profileRunner: result = api.isTop(a) + profApi.journalGetInx = + proc(a: BackendRef; b: Option[FilterID]; c = false): auto = + AristoApiProfJournalGetInxFn.profileRunner: + result = api.journalGetInx(a, b, c) + profApi.level = proc(a: AristoDbRef): auto = AristoApiProfLevelFn.profileRunner: diff --git a/nimbus/db/aristo/aristo_check/check_be.nim b/nimbus/db/aristo/aristo_check/check_be.nim index da00ad1a7..7a9e30dc2 100644 --- a/nimbus/db/aristo/aristo_check/check_be.nim +++ b/nimbus/db/aristo/aristo_check/check_be.nim @@ -160,20 +160,31 @@ proc checkBE*[T: RdbBackendRef|MemBackendRef|VoidBackendRef]( # Check top layer cache against backend if cache: - if 0 < db.dirty.len: - return err((VertexID(0),CheckBeCacheIsDirty)) + let checkKeysOk = block: + if db.dirty.len == 0: + true + elif relax: + false + else: + return err((VertexID(0),CheckBeCacheIsDirty)) # Check structural table for (vid,vtx) in db.layersWalkVtx: - let key = db.layersGetKey(vid).valueOr: - # A `kMap[]` entry must exist. - return err((vid,CheckBeCacheKeyMissing)) + let key = block: + let rc = db.layersGetKey(vid) + if rc.isOk: + rc.value + elif checkKeysOk: + # A `kMap[]` entry must exist. + return err((vid,CheckBeCacheKeyMissing)) + else: + VOID_HASH_KEY if vtx.isValid: # Register existing vid against backend generator state discard vids.reduce Interval[VertexID,uint64].new(vid,vid) else: # Some vertex is to be deleted, the key must be empty - if key.isValid: + if checkKeysOk and key.isValid: return err((vid,CheckBeCacheKeyNonEmpty)) # There must be a representation on the backend DB unless in a TX if db.getVtxBE(vid).isErr and db.stack.len == 0: diff --git a/nimbus/db/aristo/aristo_check/check_journal.nim b/nimbus/db/aristo/aristo_check/check_journal.nim index 86c9bbdc5..5ae352eb1 100644 --- a/nimbus/db/aristo/aristo_check/check_journal.nim +++ b/nimbus/db/aristo/aristo_check/check_journal.nim @@ -14,7 +14,7 @@ import std/[algorithm, sequtils, sets, tables], eth/common, results, - ../aristo_filter/filter_scheduler, + ../aristo_journal/journal_scheduler, ../aristo_walk/persistent, ".."/[aristo_desc, aristo_blobify] diff --git a/nimbus/db/aristo/aristo_debug.nim b/nimbus/db/aristo/aristo_debug.nim index 84c1d2935..47110810b 100644 --- a/nimbus/db/aristo/aristo_debug.nim +++ b/nimbus/db/aristo/aristo_debug.nim @@ -17,7 +17,7 @@ import stew/[byteutils, interval_set], ./aristo_desc/desc_backend, ./aristo_init/[memory_db, memory_only, rocks_db], - ./aristo_filter/filter_scheduler, + ./aristo_journal/journal_scheduler, "."/[aristo_constants, aristo_desc, aristo_hike, aristo_layers] # ------------------------------------------------------------------------------ diff --git a/nimbus/db/aristo/aristo_delete.nim b/nimbus/db/aristo/aristo_delete.nim index 75cb9cd3a..90b47ce31 100644 --- a/nimbus/db/aristo/aristo_delete.nim +++ b/nimbus/db/aristo/aristo_delete.nim @@ -17,15 +17,11 @@ import std/[sets, typetraits], - chronicles, eth/[common, trie/nibbles], results, "."/[aristo_desc, aristo_get, aristo_hike, aristo_layers, aristo_path, aristo_utils, aristo_vid] -logScope: - topics = "aristo-delete" - type SaveToVaeVidFn = proc(err: AristoError): (VertexID,AristoError) {.gcsafe, raises: [].} diff --git a/nimbus/db/aristo/aristo_desc/desc_error.nim b/nimbus/db/aristo/aristo_desc/desc_error.nim index 9ee1cf65d..2949e8c11 100644 --- a/nimbus/db/aristo/aristo_desc/desc_error.nim +++ b/nimbus/db/aristo/aristo_desc/desc_error.nim @@ -68,7 +68,9 @@ type # Path function `hikeUp()` HikeBranchMissingEdge HikeBranchTailEmpty + HikeDanglingEdge HikeEmptyPath + HikeExtMissingEdge HikeExtTailEmpty HikeExtTailMismatch HikeLeafUnexpected @@ -261,7 +263,8 @@ type TxArgStaleTx TxArgsUseless TxBackendNotWritable - TxGarbledSpan + TxLevelTooDeep + TxLevelUseless TxNoPendingTx TxNotFound TxNotTopTx diff --git a/nimbus/db/aristo/aristo_desc/desc_structural.nim b/nimbus/db/aristo/aristo_desc/desc_structural.nim index 8e38d5b0e..ff8ce1b5e 100644 --- a/nimbus/db/aristo/aristo_desc/desc_structural.nim +++ b/nimbus/db/aristo/aristo_desc/desc_structural.nim @@ -128,11 +128,6 @@ type final*: LayerFinalRef ## Stored as latest version txUid*: uint ## Transaction identifier if positive - FilterIndexPair* = object - ## Helper structure for fetching fiters from journal. - inx*: int ## Non negative journal index. latest=`0` - fil*: FilterRef ## Valid filter - # ---------------------- QidLayoutRef* = ref object @@ -151,6 +146,11 @@ type ctx*: QidLayoutRef ## Organisation of the FIFO state*: seq[(QueueID,QueueID)] ## Current fill state + JournalInx* = tuple + ## Helper structure for fetching fiters from the journal. + inx: int ## Non negative journal index. latest=`0` + fil: FilterRef ## Valid filter + const DefaultQidWrap = QueueID(0x3fff_ffff_ffff_ffffu64) diff --git a/nimbus/db/aristo/aristo_hike.nim b/nimbus/db/aristo/aristo_hike.nim index cfe2cc35d..ef4b08603 100644 --- a/nimbus/db/aristo/aristo_hike.nim +++ b/nimbus/db/aristo/aristo_hike.nim @@ -94,7 +94,7 @@ proc hikeUp*( return err((VertexID(0),HikeEmptyPath,hike)) var vid = root - while vid.isValid: + while true: var leg = Leg(wp: VidVtxPair(vid: vid), nibble: -1) # Fetch next vertex @@ -103,19 +103,25 @@ proc hikeUp*( return err((vid,error,hike)) if hike.legs.len == 0: return err((vid,HikeNoLegs,hike)) - break + # The vertex ID `vid` was a follow up from a parent vertex, but there is + # no child vertex on the database. So `vid` is a dangling link which is + # allowed only if there is a partial trie (e.g. with `snap` sync.) + return err((vid,HikeDanglingEdge,hike)) case leg.wp.vtx.vType: of Leaf: + # This must be the last vertex, so there cannot be any `tail` left. if hike.tail.len == hike.tail.sharedPrefixLen(leg.wp.vtx.lPfx): # Bingo, got full path hike.legs.add leg hike.tail = EmptyNibbleSeq + # This is the only loop exit break return err((vid,HikeLeafUnexpected,hike)) of Branch: + # There must be some more data (aka `tail`) after a `Branch` vertex. if hike.tail.len == 0: hike.legs.add leg return err((vid,HikeBranchTailEmpty,hike)) @@ -133,6 +139,7 @@ proc hikeUp*( vid = nextVid of Extension: + # There must be some more data (aka `tail`) after an `Extension` vertex. if hike.tail.len == 0: hike.legs.add leg hike.tail = EmptyNibbleSeq @@ -141,9 +148,13 @@ proc hikeUp*( if leg.wp.vtx.ePfx.len != hike.tail.sharedPrefixLen(leg.wp.vtx.ePfx): return err((vid,HikeExtTailMismatch,hike)) # Need to branch from here + let nextVid = leg.wp.vtx.eVid + if not nextVid.isValid: + return err((vid,HikeExtMissingEdge,hike)) + hike.legs.add leg hike.tail = hike.tail.slice(leg.wp.vtx.ePfx.len) - vid = leg.wp.vtx.eVid + vid = nextVid ok hike diff --git a/nimbus/db/aristo/aristo_filter.nim b/nimbus/db/aristo/aristo_journal.nim similarity index 90% rename from nimbus/db/aristo/aristo_filter.nim rename to nimbus/db/aristo/aristo_journal.nim index 9c2966a67..a5259757e 100644 --- a/nimbus/db/aristo/aristo_filter.nim +++ b/nimbus/db/aristo/aristo_journal.nim @@ -8,8 +8,8 @@ # at your option. This file may not be copied, modified, or distributed # except according to those terms. -## Aristo DB -- Patricia Trie filter management -## ============================================= +## Aristo DB -- Filter and journal management +## ========================================== ## import @@ -18,14 +18,15 @@ import results, "."/[aristo_desc, aristo_get, aristo_vid], ./aristo_desc/desc_backend, - ./aristo_filter/[ - filter_fifos, filter_helpers, filter_merge, filter_reverse, filter_siblings] + ./aristo_journal/[ + filter_state_root, filter_merge, filter_reverse, filter_siblings, + journal_get, journal_ops] # ------------------------------------------------------------------------------ # Public functions, construct filters # ------------------------------------------------------------------------------ -proc fwdFilter*( +proc journalFwdFilter*( db: AristoDbRef; # Database layer: LayerRef; # Layer to derive filter from chunkedMpt = false; # Relax for snap/proof scenario @@ -64,7 +65,7 @@ proc fwdFilter*( # Public functions, apply/install filters # ------------------------------------------------------------------------------ -proc merge*( +proc journalMerge*( db: AristoDbRef; # Database filter: FilterRef; # Filter to apply to database ): Result[void,(VertexID,AristoError)] = @@ -93,12 +94,12 @@ proc merge*( ok() -proc canResolveBackendFilter*(db: AristoDbRef): bool = +proc journalUpdateOk*(db: AristoDbRef): bool = ## Check whether the read-only filter can be merged into the backend not db.backend.isNil and db.isCentre -proc resolveBackendFilter*( +proc journalUpdate*( db: AristoDbRef; # Database nxtFid = none(FilterID); # Next filter ID (if any) reCentreOk = false; @@ -143,16 +144,16 @@ proc resolveBackendFilter*( defer: updateSiblings.rollback() # Figure out how to save the reverse filter on a cascades slots queue - var instr: FifoInstr + var instr: JournalOpsMod if not be.journal.isNil: # Otherwise ignore block getInstr: # Compile instruction for updating filters on the cascaded fifos if db.roFilter.isValid: - let ovLap = be.getJournalOverlap db.roFilter + let ovLap = be.journalGetOverlap db.roFilter if 0 < ovLap: - instr = ? be.fifosDelete ovLap # Revert redundant entries + instr = ? be.journalOpsDeleteSlots ovLap # Revert redundant entries break getInstr - instr = ? be.fifosStore( + instr = ? be.journalOpsPushSlot( updateSiblings.rev, # Store reverse filter nxtFid) # Set filter ID (if any) @@ -178,7 +179,7 @@ proc resolveBackendFilter*( ok() -proc forkByJournal*( +proc journalFork*( db: AristoDbRef; episode: int; ): Result[AristoDbRef,AristoError] = @@ -195,19 +196,19 @@ proc forkByJournal*( if episode < 0: return err(FilNegativeEpisode) let - instr = ? be.fifosFetch(backSteps = episode+1) + instr = ? be.journalOpsFetchSlots(backSteps = episode+1) clone = ? db.fork(noToplayer = true) clone.top = LayerRef.init() clone.top.final.vGen = instr.fil.vGen clone.roFilter = instr.fil ok clone -proc forkByJournal*( +proc journalFork*( db: AristoDbRef; fid: Option[FilterID]; earlierOK = false; ): Result[AristoDbRef,AristoError] = - ## Variant of `forkByJounal()` for forking to a particular filter ID (or the + ## Variant of `journalFork()` for forking to a particular filter ID (or the ## nearest predecessot if `earlierOK` is passed `true`.) if there is some ## filter ID `fid`. ## @@ -218,8 +219,8 @@ proc forkByJournal*( if be.isNil: return err(FilBackendMissing) - let fip = ? be.getFromJournal(fid, earlierOK) - db.forkByJournal fip.inx + let fip = ? be.journalGetInx(fid, earlierOK) + db.journalFork fip.inx # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/aristo/aristo_filter/filter_merge.nim b/nimbus/db/aristo/aristo_journal/filter_merge.nim similarity index 100% rename from nimbus/db/aristo/aristo_filter/filter_merge.nim rename to nimbus/db/aristo/aristo_journal/filter_merge.nim diff --git a/nimbus/db/aristo/aristo_filter/filter_reverse.nim b/nimbus/db/aristo/aristo_journal/filter_reverse.nim similarity index 100% rename from nimbus/db/aristo/aristo_filter/filter_reverse.nim rename to nimbus/db/aristo/aristo_journal/filter_reverse.nim diff --git a/nimbus/db/aristo/aristo_filter/filter_siblings.nim b/nimbus/db/aristo/aristo_journal/filter_siblings.nim similarity index 100% rename from nimbus/db/aristo/aristo_filter/filter_siblings.nim rename to nimbus/db/aristo/aristo_journal/filter_siblings.nim diff --git a/nimbus/db/aristo/aristo_journal/filter_state_root.nim b/nimbus/db/aristo/aristo_journal/filter_state_root.nim new file mode 100644 index 000000000..632f43387 --- /dev/null +++ b/nimbus/db/aristo/aristo_journal/filter_state_root.nim @@ -0,0 +1,77 @@ +# nimbus-eth1 +# Copyright (c) 2023-2024 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +import + std/tables, + eth/common, + results, + ".."/[aristo_desc, aristo_get] + +type + LayerStateRoot* = tuple + ## Helper structure for analysing state roots. + be: Hash256 ## Backend state root + fg: Hash256 ## Layer or filter implied state root + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc getLayerStateRoots*( + db: AristoDbRef; + delta: LayerDeltaRef; + chunkedMpt: bool; + ): Result[LayerStateRoot,AristoError] = + ## Get the Merkle hash key for target state root to arrive at after this + ## reverse filter was applied. + ## + var spr: LayerStateRoot + + let sprBeKey = block: + let rc = db.getKeyBE VertexID(1) + if rc.isOk: + rc.value + elif rc.error == GetKeyNotFound: + VOID_HASH_KEY + else: + return err(rc.error) + spr.be = sprBeKey.to(Hash256) + + spr.fg = block: + let key = delta.kMap.getOrVoid VertexID(1) + if key.isValid: + key.to(Hash256) + else: + EMPTY_ROOT_HASH + if spr.fg.isValid: + return ok(spr) + + if not delta.kMap.hasKey(VertexID(1)) and + not delta.sTab.hasKey(VertexID(1)): + # This layer is unusable, need both: vertex and key + return err(FilPrettyPointlessLayer) + elif not delta.sTab.getOrVoid(VertexID(1)).isValid: + # Root key and vertex has been deleted + return ok(spr) + + if chunkedMpt: + if sprBeKey == delta.kMap.getOrVoid VertexID(1): + spr.fg = spr.be + return ok(spr) + + if delta.sTab.len == 0 and + delta.kMap.len == 0: + return err(FilPrettyPointlessLayer) + + err(FilStateRootMismatch) + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/db/aristo/aristo_filter/filter_helpers.nim b/nimbus/db/aristo/aristo_journal/journal_get.nim similarity index 57% rename from nimbus/db/aristo/aristo_filter/filter_helpers.nim rename to nimbus/db/aristo/aristo_journal/journal_get.nim index d1a1b7cf8..6ecc3e34c 100644 --- a/nimbus/db/aristo/aristo_filter/filter_helpers.nim +++ b/nimbus/db/aristo/aristo_journal/journal_get.nim @@ -9,76 +9,21 @@ # except according to those terms. import - std/[options, tables], + std/options, eth/common, results, - ".."/[aristo_desc, aristo_desc/desc_backend, aristo_get], - ./filter_scheduler - -type - StateRootPair* = object - ## Helper structure for analysing state roots. - be*: Hash256 ## Backend state root - fg*: Hash256 ## Layer or filter implied state root + ".."/[aristo_desc, aristo_desc/desc_backend], + ./journal_scheduler # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ -proc getLayerStateRoots*( - db: AristoDbRef; - delta: LayerDeltaRef; - chunkedMpt: bool; - ): Result[StateRootPair,AristoError] = - ## Get the Merkle hash key for target state root to arrive at after this - ## reverse filter was applied. - ## - var spr: StateRootPair - - let sprBeKey = block: - let rc = db.getKeyBE VertexID(1) - if rc.isOk: - rc.value - elif rc.error == GetKeyNotFound: - VOID_HASH_KEY - else: - return err(rc.error) - spr.be = sprBeKey.to(Hash256) - - spr.fg = block: - let key = delta.kMap.getOrVoid VertexID(1) - if key.isValid: - key.to(Hash256) - else: - EMPTY_ROOT_HASH - if spr.fg.isValid: - return ok(spr) - - if not delta.kMap.hasKey(VertexID(1)) and - not delta.sTab.hasKey(VertexID(1)): - # This layer is unusable, need both: vertex and key - return err(FilPrettyPointlessLayer) - elif not delta.sTab.getOrVoid(VertexID(1)).isValid: - # Root key and vertex has been deleted - return ok(spr) - - if chunkedMpt: - if sprBeKey == delta.kMap.getOrVoid VertexID(1): - spr.fg = spr.be - return ok(spr) - - if delta.sTab.len == 0 and - delta.kMap.len == 0: - return err(FilPrettyPointlessLayer) - - err(FilStateRootMismatch) - - -proc getFromJournal*( +proc journalGetInx*( be: BackendRef; fid = none(FilterID); earlierOK = false; - ): Result[FilterIndexPair,AristoError] = + ): Result[JournalInx,AristoError] = ## If there is some argument `fid`, find the filter on the journal with ID ## not larger than `fid` (i e. the resulting filter must not be more recent.) ## @@ -112,7 +57,7 @@ proc getFromJournal*( if not qid.isValid: return err(FilFilterNotFound) - var fip = FilterIndexPair() + var fip: JournalInx fip.fil = block: if cache[0] == qid: cache[1] @@ -127,15 +72,17 @@ proc getFromJournal*( ok fip -proc getJournalOverlap*( +proc journalGetOverlap*( be: BackendRef; filter: FilterRef; ): int = - ## Return the number of journal filters in the leading chain that is - ## reverted by the argument `filter`. A heuristc approach is used here - ## for an argument `filter` with a valid filter ID when the chain is - ## longer than one items. Only single step filter overlaps are guaranteed - ## to be found. + ## This function will find the overlap of an argument `filter` which is + ## composed by some recent filter slots from the journal. + ## + ## The function returns the number of most recent journal filters that are + ## reverted by the argument `filter`. This requires that `src`, `trg`, and + ## `fid` of the argument `filter` is properly calculated (e.g. using + ## `journalOpsFetchSlots()`.) ## # Check against the top-fifo entry. let qid = be.journal[0] @@ -155,7 +102,7 @@ proc getJournalOverlap*( # Check against some stored filter IDs if filter.isValid: - let fp = be.getFromJournal(some(filter.fid), earlierOK=true).valueOr: + let fp = be.journalGetInx(some(filter.fid), earlierOK=true).valueOr: return 0 if filter.trg == fp.fil.trg: return 1 + fp.inx diff --git a/nimbus/db/aristo/aristo_filter/filter_fifos.nim b/nimbus/db/aristo/aristo_journal/journal_ops.nim similarity index 83% rename from nimbus/db/aristo/aristo_filter/filter_fifos.nim rename to nimbus/db/aristo/aristo_journal/journal_ops.nim index ea7f9a313..f0ad4c6d1 100644 --- a/nimbus/db/aristo/aristo_filter/filter_fifos.nim +++ b/nimbus/db/aristo/aristo_journal/journal_ops.nim @@ -12,17 +12,18 @@ import std/[options, tables], results, ".."/[aristo_desc, aristo_desc/desc_backend], - "."/[filter_merge, filter_scheduler] + "."/[filter_merge, journal_scheduler] type - FifoInstr* = object - ## Database backend instructions for storing or deleting filters. + JournalOpsMod* = object + ## Database journal instructions for storing or deleting filters. put*: seq[(QueueID,FilterRef)] scd*: QidSchedRef - FetchInstr* = object + JournalOpsFetch* = object + ## Database journal instructions for merge-fetching slots. fil*: FilterRef - del*: FifoInstr + del*: JournalOpsMod # ------------------------------------------------------------------------------ # Private functions @@ -63,14 +64,16 @@ template getNextFidOrReturn(be: BackendRef; fid: Option[FilterID]): FilterID = # Public functions # ------------------------------------------------------------------------------ -proc fifosStore*( +proc journalOpsPushSlot*( be: BackendRef; # Database backend - filter: FilterRef; # Filter to save + filter: FilterRef; # Filter to store fid: Option[FilterID]; # Next filter ID (if any) - ): Result[FifoInstr,AristoError] = + ): Result[JournalOpsMod,AristoError] = ## Calculate backend instructions for storing the arguent `filter` on the ## argument backend `be`. ## + ## The journal is not modified by this function. + ## if be.journal.isNil: return err(FilQuSchedDisabled) @@ -81,7 +84,7 @@ proc fifosStore*( # Update journal filters and calculate database update var - instr = FifoInstr(scd: upd.fifo) + instr = JournalOpsMod(scd: upd.journal) dbClear: seq[QueueID] hold: seq[FilterRef] saved = false @@ -138,14 +141,16 @@ proc fifosStore*( ok instr -proc fifosFetch*( +proc journalOpsFetchSlots*( be: BackendRef; # Database backend backSteps: int; # Backstep this many filters - ): Result[FetchInstr,AristoError] = + ): Result[JournalOpsFetch,AristoError] = ## This function returns the single filter obtained by squash merging the ## topmost `backSteps` filters on the backend journal fifo. Also, backend - ## instructions are calculated and returned for deleting the merged journal - ## filters on the fifo. + ## instructions are calculated and returned for deleting the extracted + ## journal slots. + ## + ## The journal is not modified by this function. ## if be.journal.isNil: return err(FilQuSchedDisabled) @@ -154,7 +159,7 @@ proc fifosFetch*( # Get instructions let fetch = be.journal.fetchItems backSteps - var instr = FetchInstr(del: FifoInstr(scd: fetch.fifo)) + var instr = JournalOpsFetch(del: JournalOpsMod(scd: fetch.journal)) # Follow `HoldQid` instructions and combine journal filters for sub-queues # and push intermediate results on the `hold` stack @@ -186,11 +191,16 @@ proc fifosFetch*( ok instr -proc fifosDelete*( +proc journalOpsDeleteSlots*( be: BackendRef; # Database backend backSteps: int; # Backstep this many filters - ): Result[FifoInstr,AristoError] = - ## Variant of `fetch()` for calculating the deletion part only. + ): Result[JournalOpsMod,AristoError] = + ## Calculate backend instructions for deleting the most recent `backSteps` + ## slots on the journal. This is basically the deletion calculator part + ## from `journalOpsFetchSlots()`. + ## + ## The journal is not modified by this function. + ## if be.journal.isNil: return err(FilQuSchedDisabled) if backSteps <= 0: @@ -198,7 +208,7 @@ proc fifosDelete*( # Get instructions let fetch = be.journal.fetchItems backSteps - var instr = FifoInstr(scd: fetch.fifo) + var instr = JournalOpsMod(scd: fetch.journal) # Follow `HoldQid` instructions for producing the list of entries that # need to be deleted diff --git a/nimbus/db/aristo/aristo_filter/filter_scheduler.nim b/nimbus/db/aristo/aristo_journal/journal_scheduler.nim similarity index 89% rename from nimbus/db/aristo/aristo_filter/filter_scheduler.nim rename to nimbus/db/aristo/aristo_journal/journal_scheduler.nim index 3278077f2..23f65888b 100644 --- a/nimbus/db/aristo/aristo_filter/filter_scheduler.nim +++ b/nimbus/db/aristo/aristo_journal/journal_scheduler.nim @@ -279,12 +279,12 @@ func capacity*( func addItem*( - fifo: QidSchedRef; # Cascaded fifos descriptor - ): tuple[exec: seq[QidAction], fifo: QidSchedRef] = + journal: QidSchedRef; # Cascaded fifos descriptor + ): tuple[exec: seq[QidAction], journal: QidSchedRef] = ## Get the instructions for adding a new slot to the cascades queues. The - ## argument `fifo` is a complete state of the addresses of a cascaded *FIFO* - ## when applied to a database. Only the *FIFO* queue addresses are needed - ## in order to describe how to add another item. + ## argument `journal` is a complete state of the addresses of a cascaded + ## *FIFO* when applied to a database. Only the *FIFO* queue addresses are + ## needed in order to describe how to add another item. ## ## The function returns a list of instructions what to do when adding a new ## item and the new state of the cascaded *FIFO*. The following instructions @@ -310,9 +310,9 @@ func addItem*( ## -- another item. ## let - ctx = fifo.ctx.q + ctx = journal.ctx.q var - state = fifo.state + state = journal.state deferred: seq[QidAction] # carry over to next sub-queue revActions: seq[QidAction] # instructions in reverse order @@ -370,15 +370,15 @@ func addItem*( op: DelQid, qid: deferred[0].qid) - (revActions.reversed, QidSchedRef(ctx: fifo.ctx, state: state)) + (revActions.reversed, QidSchedRef(ctx: journal.ctx, state: state)) func fetchItems*( - fifo: QidSchedRef; # Cascaded fifos descriptor + journal: QidSchedRef; # Cascaded fifos descriptor size: int; # Leading items to merge - ): tuple[exec: seq[QidAction], fifo: QidSchedRef] = + ): tuple[exec: seq[QidAction], journal: QidSchedRef] = ## Get the instructions for extracting the latest `size` items from the - ## cascaded queues. argument `fifo` is a complete state of the addresses of + ## cascaded queues. argument `journal` is a complete state of the addresses of ## a cascaded *FIFO* when applied to a database. Only the *FIFO* queue ## addresses are used in order to describe how to add another item. ## @@ -395,13 +395,13 @@ func fetchItems*( ## The extracted items will then be available from the hold queue. var actions: seq[QidAction] - state = fifo.state + state = journal.state if 0 < size: var size = size.uint64 - for n in 0 ..< fifo.state.len: - let q = fifo.state[n] + for n in 0 ..< journal.state.len: + let q = journal.state[n] if q[0] == 0: discard @@ -447,7 +447,7 @@ func fetchItems*( # | : # | wrap let - wrap = fifo.ctx.q[n].wrap + wrap = journal.ctx.q[n].wrap qSize1 = q[1] - QueueID(0) if size <= qSize1: @@ -490,35 +490,35 @@ func fetchItems*( state[n] = (QueueID(0), QueueID(0)) - (actions, QidSchedRef(ctx: fifo.ctx, state: state)) + (actions, QidSchedRef(ctx: journal.ctx, state: state)) func lengths*( - fifo: QidSchedRef; # Cascaded fifos descriptor + journal: QidSchedRef; # Cascaded fifos descriptor ): seq[int] = ## Return the list of lengths for all cascaded sub-fifos. - for n in 0 ..< fifo.state.len: - result.add fifo.state[n].fifoLen(fifo.ctx.q[n].wrap).int + for n in 0 ..< journal.state.len: + result.add journal.state[n].fifoLen(journal.ctx.q[n].wrap).int func len*( - fifo: QidSchedRef; # Cascaded fifos descriptor + journal: QidSchedRef; # Cascaded fifos descriptor ): int = - ## Size of the fifo - fifo.lengths.foldl(a + b, 0) + ## Size of the journal + journal.lengths.foldl(a + b, 0) func `[]`*( - fifo: QidSchedRef; # Cascaded fifos descriptor + journal: QidSchedRef; # Cascaded fifos descriptor inx: int; # Index into latest items ): QueueID = - ## Get the queue ID of the `inx`-th `fifo` entry where index `0` refers to + ## Get the queue ID of the `inx`-th `journal` entry where index `0` refers to ## the entry most recently added, `1` the one before, etc. If there is no ## such entry `QueueID(0)` is returned. if 0 <= inx: var inx = inx.uint64 - for n in 0 ..< fifo.state.len: - let q = fifo.state[n] + for n in 0 ..< journal.state.len: + let q = journal.state[n] if q[0] == 0: discard @@ -551,22 +551,22 @@ func `[]`*( inx -= qInxMax1 + 1 # Otherwise continue let - wrap = fifo.ctx.q[n].wrap + wrap = journal.ctx.q[n].wrap qInxMax0 = wrap - q[0] if inx <= qInxMax0: return n.globalQid(wrap - inx) inx -= qInxMax0 + 1 # Otherwise continue func `[]`*( - fifo: QidSchedRef; # Cascaded fifos descriptor + journal: QidSchedRef; # Cascaded fifos descriptor bix: BackwardsIndex; # Index into latest items ): QueueID = ## Variant of `[]` for provifing `[^bix]`. - fifo[fifo.len - bix.distinctBase] + journal[journal.len - bix.distinctBase] func `[]`*( - fifo: QidSchedRef; # Cascaded fifos descriptor + journal: QidSchedRef; # Cascaded fifos descriptor qid: QueueID; # Index into latest items ): int = ## .. @@ -575,12 +575,12 @@ func `[]`*( chn = (qid.uint64 shr 62).int qid = (qid.uint64 and 0x3fff_ffff_ffff_ffffu64).QueueID - if chn < fifo.state.len: + if chn < journal.state.len: var offs = 0 for n in 0 ..< chn: - offs += fifo.state[n].fifoLen(fifo.ctx.q[n].wrap).int + offs += journal.state[n].fifoLen(journal.ctx.q[n].wrap).int - let q = fifo.state[chn] + let q = journal.state[chn] if q[0] <= q[1]: # Single file # :: @@ -606,14 +606,14 @@ func `[]`*( return offs + (q[1] - qid).int if q[0] <= qid: - let wrap = fifo.ctx.q[chn].wrap + let wrap = journal.ctx.q[chn].wrap if qid <= wrap: return offs + (q[1] - QueueID(0)).int + (wrap - qid).int -1 proc le*( - fifo: QidSchedRef; # Cascaded fifos descriptor + journal: QidSchedRef; # Cascaded fifos descriptor fid: FilterID; # Upper (or right) bound fn: QuFilMap; # QueueID/FilterID mapping forceEQ = false; # Check for strict equality @@ -627,7 +627,7 @@ proc le*( ## var left = 0 - right = fifo.len - 1 + right = journal.len - 1 template toFid(qid: QueueID): FilterID = fn(qid).valueOr: @@ -639,30 +639,30 @@ proc le*( if 0 <= right: # Check left fringe let - maxQid = fifo[left] + maxQid = journal[left] maxFid = maxQid.toFid if maxFid <= fid: if forceEQ and maxFid != fid: return QueueID(0) return maxQid - # So `fid < fifo[left]` + # So `fid < journal[left]` # Check right fringe let - minQid = fifo[right] + minQid = journal[right] minFid = minQid.toFid if fid <= minFid: if minFid == fid: return minQid return QueueID(0) - # So `fifo[right] < fid` + # So `journal[right] < fid` # Bisection var rightQid = minQid # Might be used as end result while 1 < right - left: let pivot = (left + right) div 2 - pivQid = fifo[pivot] + pivQid = journal[pivot] pivFid = pivQid.toFid # # Example: @@ -671,19 +671,20 @@ proc le*( # inx: left ... pivot ... right # fid: 77 # - # with `fifo[left].toFid > fid > fifo[right].toFid` + # with `journal[left].toFid > fid > journal[right].toFid` # - if pivFid < fid: # fid >= fifo[half].toFid: + if pivFid < fid: # fid >= journal[half].toFid: right = pivot rightQid = pivQid - elif fid < pivFid: # fifo[half].toFid > fid + elif fid < pivFid: # journal[half].toFid > fid left = pivot else: return pivQid - # Now: `fifo[right].toFid < fid < fifo[left].toFid` (and `right == left+1`). + # Now: `journal[right].toFid < fid < journal[left].toFid` + # (and `right == left+1`). if not forceEQ: - # Make sure that `fifo[right].toFid` exists + # Make sure that `journal[right].toFid` exists if fn(rightQid).isOk: return rightQid @@ -691,12 +692,12 @@ proc le*( proc eq*( - fifo: QidSchedRef; # Cascaded fifos descriptor + journal: QidSchedRef; # Cascaded fifos descriptor fid: FilterID; # Filter ID to search for fn: QuFilMap; # QueueID/FilterID mapping ): QueueID = ## Variant of `le()` for strict equality. - fifo.le(fid, fn, forceEQ = true) + journal.le(fid, fn, forceEQ = true) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/aristo/aristo_merge.nim b/nimbus/db/aristo/aristo_merge.nim index e6e93d836..3e66b32c6 100644 --- a/nimbus/db/aristo/aristo_merge.nim +++ b/nimbus/db/aristo/aristo_merge.nim @@ -26,7 +26,6 @@ import std/[algorithm, sequtils, strutils, sets, tables, typetraits], - chronicles, eth/[common, trie/nibbles], results, stew/keyed_queue, @@ -34,9 +33,6 @@ import "."/[aristo_desc, aristo_get, aristo_hike, aristo_layers, aristo_path, aristo_serialise, aristo_utils, aristo_vid] -logScope: - topics = "aristo-merge" - type LeafTiePayload* = object ## Generalised key-value pair for a sub-trie. The main trie is the @@ -186,17 +182,15 @@ proc insertBranch( return err(MergeNonBranchProofModeLock) if linkVtx.vType == Leaf: - # Update vertex path lookup - let - path = hike.legsTo(NibblesSeq) & linkVtx.lPfx - rc = path.pathToTag() - if rc.isErr: - debug "Branch link leaf path garbled", linkID, path + # Double check path prefix + if 64 < hike.legsTo(NibblesSeq).len + linkVtx.lPfx.len: return err(MergeBranchLinkLeafGarbled) - let local = db.vidFetch(pristine = true) - db.setVtxAndKey(hike.root, local, linkVtx) - linkVtx.lPfx = linkVtx.lPfx.slice(1+n) + let + local = db.vidFetch(pristine = true) + linkDup = linkVtx.dup + db.setVtxAndKey(hike.root, local, linkDup) + linkDup.lPfx = linkDup.lPfx.slice(1+n) forkVtx.bVid[linkInx] = local elif linkVtx.ePfx.len == n + 1: @@ -204,9 +198,11 @@ proc insertBranch( forkVtx.bVid[linkInx] = linkVtx.eVid else: - let local = db.vidFetch - db.setVtxAndKey(hike.root, local, linkVtx) - linkVtx.ePfx = linkVtx.ePfx.slice(1+n) + let + local = db.vidFetch + linkDup = linkVtx.dup + db.setVtxAndKey(hike.root, local, linkDup) + linkDup.ePfx = linkDup.ePfx.slice(1+n) forkVtx.bVid[linkInx] = local block: @@ -285,13 +281,14 @@ proc concatBranchAndLeaf( # Append leaf vertex let + brDup = brVtx.dup vid = db.vidFetch(pristine = true) vtx = VertexRef( vType: Leaf, lPfx: hike.tail.slice(1), lData: payload) - brVtx.bVid[nibble] = vid - db.setVtxAndKey(hike.root, brVid, brVtx) + brDup.bVid[nibble] = vid + db.setVtxAndKey(hike.root, brVid, brDup) db.setVtxAndKey(hike.root, vid, vtx) okHike.legs.add Leg(wp: VidVtxPair(vtx: vtx, vid: vid), nibble: -1) @@ -330,8 +327,12 @@ proc topIsBranchAddLeaf( # if db.pPrf.len == 0: # Not much else that can be done here - debug "Dangling leaf link, reused", branch=hike.legs[^1].wp.vid, - nibble, linkID, leafPfx=hike.tail + raiseAssert "Dangling edge:" & + " pfx=" & $hike.legsTo(hike.legs.len-1,NibblesSeq) & + " branch=" & $parent & + " nibble=" & $nibble & + " edge=" & $linkID & + " tail=" & $hike.tail # Reuse placeholder entry in table let vtx = VertexRef( @@ -413,15 +414,16 @@ proc topIsExtAddLeaf( return err(MergeBranchProofModeLock) let + brDup = brVtx.dup vid = db.vidFetch(pristine = true) vtx = VertexRef( vType: Leaf, lPfx: hike.tail.slice(1), lData: payload) - brVtx.bVid[nibble] = vid - db.setVtxAndKey(hike.root, brVid, brVtx) + brDup.bVid[nibble] = vid + db.setVtxAndKey(hike.root, brVid, brDup) db.setVtxAndKey(hike.root, vid, vtx) - okHike.legs.add Leg(wp: VidVtxPair(vtx: brVtx, vid: brVid), nibble: nibble) + okHike.legs.add Leg(wp: VidVtxPair(vtx: brDup, vid: brVid), nibble: nibble) okHike.legs.add Leg(wp: VidVtxPair(vtx: vtx, vid: vid), nibble: -1) ok okHike @@ -447,17 +449,18 @@ proc topIsEmptyAddLeaf( return err(MergeBranchProofModeLock) let + rootDup = rootVtx.dup leafVid = db.vidFetch(pristine = true) leafVtx = VertexRef( vType: Leaf, lPfx: hike.tail.slice(1), lData: payload) - rootVtx.bVid[nibble] = leafVid - db.setVtxAndKey(hike.root, hike.root, rootVtx) + rootDup.bVid[nibble] = leafVid + db.setVtxAndKey(hike.root, hike.root, rootDup) db.setVtxAndKey(hike.root, leafVid, leafVtx) return ok Hike( root: hike.root, - legs: @[Leg(wp: VidVtxPair(vtx: rootVtx, vid: hike.root), nibble: nibble), + legs: @[Leg(wp: VidVtxPair(vtx: rootDup, vid: hike.root), nibble: nibble), Leg(wp: VidVtxPair(vtx: leafVtx, vid: leafVid), nibble: -1)]) db.insertBranch(hike, hike.root, rootVtx, payload) diff --git a/nimbus/db/aristo/aristo_path.nim b/nimbus/db/aristo/aristo_path.nim index 3e50b805d..07c03f047 100644 --- a/nimbus/db/aristo/aristo_path.nim +++ b/nimbus/db/aristo/aristo_path.nim @@ -49,7 +49,7 @@ func pathAsBlob*(tag: PathID): Blob = if 64 <= tag.length: return key else: - return key[0 .. (tag.length + 1) div 2] + return key[0 .. (tag.length - 1) div 2] func pathAsHEP*(tag: PathID; isLeaf = false): Blob = ## Convert the `tag` argument to a hex encoded partial path as used in `eth` diff --git a/nimbus/db/aristo/aristo_tx.nim b/nimbus/db/aristo/aristo_tx.nim index 731d942a3..072d69408 100644 --- a/nimbus/db/aristo/aristo_tx.nim +++ b/nimbus/db/aristo/aristo_tx.nim @@ -14,126 +14,10 @@ {.push raises: [].} import - std/[options, tables], + std/options, results, - "."/[aristo_desc, aristo_filter, aristo_get, aristo_layers, aristo_hashify] - -func isTop*(tx: AristoTxRef): bool {.gcsafe.} -func level*(db: AristoDbRef): int {.gcsafe.} -proc txBegin*(db: AristoDbRef): Result[AristoTxRef,AristoError] {.gcsafe.} - -# ------------------------------------------------------------------------------ -# Private helpers -# ------------------------------------------------------------------------------ - -func getDbDescFromTopTx(tx: AristoTxRef): Result[AristoDbRef,AristoError] = - if not tx.isTop(): - return err(TxNotTopTx) - let db = tx.db - if tx.level != db.stack.len: - return err(TxStackGarbled) - ok db - -proc getTxUid(db: AristoDbRef): uint = - if db.txUidGen == high(uint): - db.txUidGen = 0 - db.txUidGen.inc - db.txUidGen - -iterator txWalk(tx: AristoTxRef): (AristoTxRef,LayerRef,AristoError) = - ## Walk down the transaction chain. - let db = tx.db - var tx = tx - - block body: - # Start at top layer if tx refers to that - if tx.level == db.stack.len: - if tx.txUid != db.top.txUid: - yield (tx,db.top,TxStackGarbled) - break body - - # Yield the top level - yield (tx,db.top,AristoError(0)) - - # Walk down the transaction stack - for level in (tx.level-1).countDown(1): - tx = tx.parent - if tx.isNil or tx.level != level: - yield (tx,LayerRef(nil),TxStackGarbled) - break body - - var layer = db.stack[level] - if tx.txUid != layer.txUid: - yield (tx,layer,TxStackGarbled) - break body - - yield (tx,layer,AristoError(0)) - -# --------- - -proc stowImpl( - db: AristoDbRef; # Database - nxtFid: Option[FilterID]; # Next filter ID (zero is OK) - persistent: bool; # Stage only unless `true` - chunkedMpt: bool; # Partial data (e.g. from `snap`) - ): Result[void,AristoError] = - ## Worker for `stow()` variants. - ## - if not db.txRef.isNil: - return err(TxPendingTx) - if 0 < db.stack.len: - return err(TxStackGarbled) - if persistent and not db.canResolveBackendFilter(): - return err(TxBackendNotWritable) - - # Update Merkle hashes (unless disabled) - db.hashify().isOkOr: - return err(error[1]) - - let fwd = db.fwdFilter(db.top, chunkedMpt).valueOr: - return err(error[1]) - - if fwd.isValid: - # Merge `top` layer into `roFilter` - db.merge(fwd).isOkOr: - return err(error[1]) - - # Special treatment for `snap` proofs (aka `chunkedMpt`) - let final = - if chunkedMpt: LayerFinalRef(fRpp: db.top.final.fRpp) - else: LayerFinalRef() - - # New empty top layer (probably with `snap` proofs and `vGen` carry over) - db.top = LayerRef( - delta: LayerDeltaRef(), - final: final) - if db.roFilter.isValid: - db.top.final.vGen = db.roFilter.vGen - else: - let rc = db.getIdgUbe() - if rc.isOk: - db.top.final.vGen = rc.value - else: - # It is OK if there was no `Idg`. Otherwise something serious happened - # and there is no way to recover easily. - doAssert rc.error == GetIdgNotFound - - if persistent: - # Merge `roFiler` into persistent tables - ? db.resolveBackendFilter nxtFid - db.roFilter = FilterRef(nil) - - # Special treatment for `snap` proofs (aka `chunkedMpt`) - let final = - if chunkedMpt: LayerFinalRef(vGen: db.vGen, fRpp: db.top.final.fRpp) - else: LayerFinalRef(vGen: db.vGen) - - # New empty top layer (probably with `snap` proofs carry over) - db.top = LayerRef( - delta: LayerDeltaRef(), - final: final, - txUid: db.top.txUid) - ok() + ./aristo_tx/[tx_fork, tx_frame, tx_stow], + "."/[aristo_desc, aristo_get] # ------------------------------------------------------------------------------ # Public functions, getters @@ -141,24 +25,20 @@ proc stowImpl( func txTop*(db: AristoDbRef): Result[AristoTxRef,AristoError] = ## Getter, returns top level transaction if there is any. - if db.txRef.isNil: - err(TxNoPendingTx) - else: - ok(db.txRef) + db.txFrameTop() func isTop*(tx: AristoTxRef): bool = ## Getter, returns `true` if the argument `tx` referes to the current top ## level transaction. - tx.db.txRef == tx and tx.db.top.txUid == tx.txUid + tx.txFrameIsTop() func level*(tx: AristoTxRef): int = ## Getter, positive nesting level of transaction argument `tx` - tx.level + tx.txFrameLevel() func level*(db: AristoDbRef): int = ## Getter, non-negative nesting level (i.e. number of pending transactions) - if not db.txRef.isNil: - result = db.txRef.level + db.txFrameLevel() # ------------------------------------------------------------------------------ # Public functions @@ -170,146 +50,88 @@ func to*(tx: AristoTxRef; T: type[AristoDbRef]): T = proc forkTx*( - tx: AristoTxRef; # Transaction descriptor + db: AristoDbRef; + backLevel: int; # Backward location of transaction dontHashify = false; # Process/fix MPT hashes ): Result[AristoDbRef,AristoError] = - ## Clone a transaction into a new DB descriptor accessing the same backend - ## database (if any) as the argument `db`. The new descriptor is linked to - ## the transaction parent and is fully functional as a forked instance (see - ## comments on `aristo_desc.reCentre()` for details.) + ## Fork a new descriptor obtained from parts of the argument database + ## as described by arguments `db` and `backLevel`. ## - ## Input situation: - ## :: - ## tx -> db0 with tx is top transaction, tx.level > 0 + ## If the argument `backLevel` is non-negative, the forked descriptor will + ## provide the database view where the first `backLevel` transaction layers + ## are stripped and the remaing layers are squashed into a single transaction. ## - ## Output situation: - ## :: - ## tx -> db0 \ - ## > share the same backend - ## tx1 -> db1 / + ## If `backLevel` is `-1`, a database descriptor with empty transaction + ## layers will be provided where the `roFilter` between database and + ## transaction layers are kept in place. ## - ## where `tx.level > 0`, `db1.level == 1` and `db1` is returned. The - ## transaction `tx1` can be retrieved via `db1.txTop()`. + ## If `backLevel` is `-2`, a database descriptor with empty transaction + ## layers will be provided without an `roFilter`. ## - ## The new DB descriptor will contain a copy of the argument transaction - ## `tx` as top layer of level 1 (i.e. this is he only transaction.) Rolling - ## back will end up at the backend layer (incl. backend filter.) + ## The returned database descriptor will always have transaction level one. + ## If there were no transactions that could be squashed, an empty + ## transaction is added. ## - ## If the arguent flag `dontHashify` is passed `true`, the clone descriptor + ## If the arguent flag `dontHashify` is passed `true`, the forked descriptor ## will *NOT* be hashified right after construction. ## ## Use `aristo_desc.forget()` to clean up this descriptor. ## - let db = tx.db + # Fork top layer (with or without pending transaction)? + if backLevel == 0: + return db.txForkTop dontHashify - # Verify `tx` argument - if db.txRef == tx: - if db.top.txUid != tx.txUid: - return err(TxArgStaleTx) - elif db.stack.len <= tx.level: - return err(TxArgStaleTx) - elif db.stack[tx.level].txUid != tx.txUid: - return err(TxArgStaleTx) + # Fork bottom layer (=> 0 < db.stack.len) + if backLevel == db.stack.len: + return db.txForkBase dontHashify - # Provide new empty stack layer - let stackLayer = block: - let rc = db.getIdgBE() - if rc.isOk: - LayerRef( - delta: LayerDeltaRef(), - final: LayerFinalRef(vGen: rc.value)) - elif rc.error == GetIdgNotFound: - LayerRef.init() - else: - return err(rc.error) + # Inspect transaction stack + if 0 < backLevel: + var tx = db.txRef + if tx.isNil or db.stack.len < backLevel: + return err(TxLevelTooDeep) - # Set up clone associated to `db` - let txClone = ? db.fork(noToplayer = true, noFilter = false) - txClone.top = db.layersCc tx.level # Provide tx level 1 stack - txClone.stack = @[stackLayer] # Zero level stack - txClone.top.txUid = 1 - txClone.txUidGen = 1 + # Fetch tx of level `backLevel` (seed to skip some items) + for _ in 0 ..< backLevel: + tx = tx.parent + if tx.isNil: + return err(TxStackGarbled) + return tx.txFork dontHashify - # Install transaction similar to `tx` on clone - txClone.txRef = AristoTxRef( - db: txClone, - txUid: 1, - level: 1) + # Plain fork, include `roFilter` + if backLevel == -1: + let xb = ? db.fork(noFilter=false) + discard xb.txFrameBegin() + return ok(xb) - if not dontHashify: - txClone.hashify().isOkOr: - discard txClone.forget() - return err(error[1]) + # Plain fork, unfiltered backend + if backLevel == -2: + let xb = ? db.fork(noFilter=true) + discard xb.txFrameBegin() + return ok(xb) - ok(txClone) + err(TxLevelUseless) -proc forkTop*( - db: AristoDbRef; - dontHashify = false; # Process/fix MPT hashes - ): Result[AristoDbRef,AristoError] = - ## Variant of `forkTx()` for the top transaction if there is any. Otherwise - ## the top layer is cloned, and an empty transaction is set up. After - ## successful fork the returned descriptor has transaction level 1. - ## - ## Use `aristo_desc.forget()` to clean up this descriptor. - ## - if db.txRef.isNil: - let dbClone = ? db.fork(noToplayer=true, noFilter=false) - dbClone.top = db.layersCc # Is a deep copy - - if not dontHashify: - dbClone.hashify().isOkOr: - discard dbClone.forget() - return err(error[1]) - - discard dbClone.txBegin - return ok(dbClone) - # End if() - - db.txRef.forkTx dontHashify - - -proc forkBase*( - db: AristoDbRef; - dontHashify = false; # Process/fix MPT hashes - ): Result[AristoDbRef,AristoError] = - ## Variant of `forkTx()`, sort of the opposite of `forkTop()`. This is the - ## equivalent of top layer forking after all tranactions have been rolled - ## back. - ## - ## Use `aristo_desc.forget()` to clean up this descriptor. - ## - if not db.txRef.isNil: - let dbClone = ? db.fork(noToplayer=true, noFilter=false) - dbClone.top = db.layersCc 0 - - if not dontHashify: - dbClone.hashify().isOkOr: - discard dbClone.forget() - return err(error[1]) - - discard dbClone.txBegin - return ok(dbClone) - # End if() - - db.forkTop dontHashify - - -proc forkWith*( +proc findTx*( db: AristoDbRef; vid: VertexID; # Pivot vertex (typically `VertexID(1)`) - key: HashKey; # Hash key of pivot verte - dontHashify = true; # Process/fix MPT hashes - ): Result[AristoDbRef,AristoError] = + key: HashKey; # Hash key of pivot vertex + ): Result[int,AristoError] = ## Find the transaction where the vertex with ID `vid` exists and has the ## Merkle hash key `key`. If there is no transaction available, search in ## the filter and then in the backend. ## - ## If the above procedure succeeds, a new descriptor is forked with exactly - ## one transaction which contains the all the bottom layers up until the - ## layer where the `(vid,key)` pair is found. In case the pair was found on - ## the filter or the backend, this transaction is empty. + ## If the above procedure succeeds, an integer indicating the transaction + ## level integer is returned: + ## + ## * `0` -- top level, current layer + ## * `1`, `2`, ... -- some transaction level further down the stack + ## * `-1` -- the filter between transaction stack and database backend + ## * `-2` -- the databse backend + ## + ## A successful return code might be used for the `forkTx()` call for + ## creating a forked descriptor that provides the pair `(vid,key)`. ## if not vid.isValid or not key.isValid: @@ -319,38 +141,33 @@ proc forkWith*( # Try `(vid,key)` on top layer let topKey = db.top.delta.kMap.getOrVoid vid if topKey == key: - return db.forkTop dontHashify + return ok(0) else: # Find `(vid,key)` on transaction layers - for (tx,layer,error) in db.txRef.txWalk: + var n = 0 + for (n,tx,layer,error) in db.txRef.txFrameWalk: if error != AristoError(0): return err(error) if layer.delta.kMap.getOrVoid(vid) == key: - return tx.forkTx dontHashify + return ok(n) # Try bottom layer let botKey = db.stack[0].delta.kMap.getOrVoid vid if botKey == key: - return db.forkBase dontHashify + return ok(db.stack.len) - # Try `(vid,key)` on filter + # Try `(vid,key)` on roFilter if not db.roFilter.isNil: let roKey = db.roFilter.kMap.getOrVoid vid if roKey == key: - let rc = db.fork(noFilter = false) - if rc.isOk: - discard rc.value.txBegin - return rc + return ok(-1) # Try `(vid,key)` on unfiltered backend block: let beKey = db.getKeyUbe(vid).valueOr: VOID_HASH_KEY if beKey == key: - let rc = db.fork(noFilter = true) - if rc.isOk: - discard rc.value.txBegin - return rc + return ok(-2) err(TxNotFound) @@ -369,23 +186,7 @@ proc txBegin*(db: AristoDbRef): Result[AristoTxRef,AristoError] = ## ... continue using db ... ## tx.commit() ## - if db.level != db.stack.len: - return err(TxStackGarbled) - - db.stack.add db.top - db.top = LayerRef( - delta: LayerDeltaRef(), - final: db.top.final.dup, - txUid: db.getTxUid) - - db.txRef = AristoTxRef( - db: db, - txUid: db.top.txUid, - parent: db.txRef, - level: db.stack.len) - - ok db.txRef - + db.txFrameBegin() proc rollback*( tx: AristoTxRef; # Top transaction on database @@ -394,15 +195,7 @@ proc rollback*( ## performed for this transactio. The previous transaction is returned if ## there was any. ## - let db = ? tx.getDbDescFromTopTx() - - # Roll back to previous layer. - db.top = db.stack[^1] - db.stack.setLen(db.stack.len-1) - - db.txRef = db.txRef.parent - ok() - + tx.txFrameRollback() proc commit*( tx: AristoTxRef; # Top transaction on database @@ -411,32 +204,7 @@ proc commit*( ## performed through this handle and merges it to the previous layer. The ## previous transaction is returned if there was any. ## - let db = ? tx.getDbDescFromTopTx() - db.hashify().isOkOr: - return err(error[1]) - - # Pop layer from stack and merge database top layer onto it - let merged = block: - if db.top.delta.sTab.len == 0 and - db.top.delta.kMap.len == 0: - # Avoid `layersMergeOnto()` - db.top.delta = db.stack[^1].delta - db.stack.setLen(db.stack.len-1) - db.top - else: - let layer = db.stack[^1] - db.stack.setLen(db.stack.len-1) - db.top.layersMergeOnto layer[] - layer - - # Install `merged` stack top layer and update stack - db.top = merged - db.txRef = tx.parent - if 0 < db.stack.len: - db.txRef.txUid = db.getTxUid - db.top.txUid = db.txRef.txUid - ok() - + tx.txFrameCommit() proc collapse*( tx: AristoTxRef; # Top transaction on database @@ -447,20 +215,10 @@ proc collapse*( ## :: ## while true: ## discard tx.commit() # ditto for rollback() - ## if db.topTx.isErr: break - ## tx = db.topTx.value + ## if db.txTop.isErr: break + ## tx = db.txTop.value ## - let db = ? tx.getDbDescFromTopTx() - - if commit: - # For commit, hashify the current layer if requested and install it - db.hashify().isOkOr: - return err(error[1]) - - db.top.txUid = 0 - db.stack.setLen(0) - db.txRef = AristoTxRef(nil) - ok() + tx.txFrameCollapse commit # ------------------------------------------------------------------------------ # Public functions: save to database @@ -491,7 +249,7 @@ proc persist*( ## In this case, the `chunkedMpt` argument must be set `true` (see alse ## `fwdFilter()`.) ## - db.stowImpl(nxtFid, persistent=true, chunkedMpt=chunkedMpt) + db.txStow(nxtFid, persistent=true, chunkedMpt=chunkedMpt) proc stow*( db: AristoDbRef; # Database @@ -510,7 +268,7 @@ proc stow*( ## In this case, the `chunkedMpt` argument must be set `true` (see alse ## `fwdFilter()`.) ## - db.stowImpl(nxtFid=none(FilterID), persistent=false, chunkedMpt=chunkedMpt) + db.txStow(nxtFid=none(FilterID), persistent=false, chunkedMpt=chunkedMpt) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/aristo/aristo_tx/tx_fork.nim b/nimbus/db/aristo/aristo_tx/tx_fork.nim new file mode 100644 index 000000000..326a571a6 --- /dev/null +++ b/nimbus/db/aristo/aristo_tx/tx_fork.nim @@ -0,0 +1,152 @@ +# nimbus-eth1 +# Copyright (c) 2023-2024 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +## Aristo DB -- Transaction fork helpers +## ===================================== +## +{.push raises: [].} + +import + results, + ./tx_frame, + ".."/[aristo_desc, aristo_get, aristo_layers, aristo_hashify] + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc txFork*( + tx: AristoTxRef; # Transaction descriptor + dontHashify = false; # Process/fix MPT hashes + ): Result[AristoDbRef,AristoError] = + ## Clone a transaction into a new DB descriptor accessing the same backend + ## database (if any) as the argument `db`. The new descriptor is linked to + ## the transaction parent and is fully functional as a forked instance (see + ## comments on `aristo_desc.reCentre()` for details.) + ## + ## Input situation: + ## :: + ## tx -> db0 with tx is top transaction, tx.level > 0 + ## + ## Output situation: + ## :: + ## tx -> db0 \ + ## > share the same backend + ## tx1 -> db1 / + ## + ## where `tx.level > 0`, `db1.level == 1` and `db1` is returned. The + ## transaction `tx1` can be retrieved via `db1.txTop()`. + ## + ## The new DB descriptor will contain a copy of the argument transaction + ## `tx` as top layer of level 1 (i.e. this is he only transaction.) Rolling + ## back will end up at the backend layer (incl. backend filter.) + ## + ## If the arguent flag `dontHashify` is passed `true`, the clone descriptor + ## will *NOT* be hashified right after construction. + ## + ## Use `aristo_desc.forget()` to clean up this descriptor. + ## + let db = tx.db + + # Verify `tx` argument + if db.txRef == tx: + if db.top.txUid != tx.txUid: + return err(TxArgStaleTx) + elif db.stack.len <= tx.level: + return err(TxArgStaleTx) + elif db.stack[tx.level].txUid != tx.txUid: + return err(TxArgStaleTx) + + # Provide new empty stack layer + let stackLayer = block: + let rc = db.getIdgBE() + if rc.isOk: + LayerRef( + delta: LayerDeltaRef(), + final: LayerFinalRef(vGen: rc.value)) + elif rc.error == GetIdgNotFound: + LayerRef.init() + else: + return err(rc.error) + + # Set up clone associated to `db` + let txClone = ? db.fork(noToplayer = true, noFilter = false) + txClone.top = db.layersCc tx.level # Provide tx level 1 stack + txClone.stack = @[stackLayer] # Zero level stack + txClone.top.txUid = 1 + txClone.txUidGen = 1 + + # Install transaction similar to `tx` on clone + txClone.txRef = AristoTxRef( + db: txClone, + txUid: 1, + level: 1) + + if not dontHashify: + txClone.hashify().isOkOr: + discard txClone.forget() + return err(error[1]) + + ok(txClone) + + +proc txForkTop*( + db: AristoDbRef; + dontHashify = false; # Process/fix MPT hashes + ): Result[AristoDbRef,AristoError] = + ## Variant of `forkTx()` for the top transaction if there is any. Otherwise + ## the top layer is cloned, and an empty transaction is set up. After + ## successful fork the returned descriptor has transaction level 1. + ## + ## Use `aristo_desc.forget()` to clean up this descriptor. + ## + if db.txRef.isNil: + let txClone = ? db.fork(noToplayer=true, noFilter=false) + txClone.top = db.layersCc # Is a deep copy + + if not dontHashify: + txClone.hashify().isOkOr: + discard txClone.forget() + return err(error[1]) + + discard txClone.txFrameBegin() + return ok(txClone) + # End if() + + db.txRef.txFork dontHashify + + +proc txForkBase*( + db: AristoDbRef; + dontHashify = false; # Process/fix MPT hashes + ): Result[AristoDbRef,AristoError] = + ## Variant of `forkTx()`, sort of the opposite of `forkTop()`. This is the + ## equivalent of top layer forking after all tranactions have been rolled + ## back. + ## + ## Use `aristo_desc.forget()` to clean up this descriptor. + ## + if db.txRef.isNil: + return db.txForkTop dontHashify + + let txClone = ? db.fork(noToplayer=true, noFilter=false) + txClone.top = db.layersCc 0 + + if not dontHashify: + txClone.hashify().isOkOr: + discard txClone.forget() + return err(error[1]) + + discard txClone.txFrameBegin() + ok(txClone) + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/db/aristo/aristo_tx/tx_frame.nim b/nimbus/db/aristo/aristo_tx/tx_frame.nim new file mode 100644 index 000000000..318dfd226 --- /dev/null +++ b/nimbus/db/aristo/aristo_tx/tx_frame.nim @@ -0,0 +1,209 @@ +# nimbus-eth1 +# Copyright (c) 2023-2024 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +## Aristo DB -- Transaction frames helper +## ====================================== +## +{.push raises: [].} + +import + std/tables, + results, + ".."/[aristo_desc, aristo_layers, aristo_hashify] + +func txFrameIsTop*(tx: AristoTxRef): bool + +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +func getDbDescFromTopTx(tx: AristoTxRef): Result[AristoDbRef,AristoError] = + if not tx.txFrameIsTop(): + return err(TxNotTopTx) + let db = tx.db + if tx.level != db.stack.len: + return err(TxStackGarbled) + ok db + +proc getTxUid(db: AristoDbRef): uint = + if db.txUidGen == high(uint): + db.txUidGen = 0 + db.txUidGen.inc + db.txUidGen + +# ------------------------------------------------------------------------------ +# Public functions, getters +# ------------------------------------------------------------------------------ + +func txFrameTop*(db: AristoDbRef): Result[AristoTxRef,AristoError] = + ## Getter, returns top level transaction if there is any. + if db.txRef.isNil: + err(TxNoPendingTx) + else: + ok(db.txRef) + +func txFrameIsTop*(tx: AristoTxRef): bool = + ## Getter, returns `true` if the argument `tx` referes to the current top + ## level transaction. + tx.db.txRef == tx and tx.db.top.txUid == tx.txUid + +func txFrameLevel*(tx: AristoTxRef): int = + ## Getter, positive nesting level of transaction argument `tx` + tx.level + +func txFrameLevel*(db: AristoDbRef): int = + ## Getter, non-negative nesting level (i.e. number of pending transactions) + if not db.txRef.isNil: + result = db.txRef.level + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc txFrameBegin*(db: AristoDbRef): Result[AristoTxRef,AristoError] = + ## Starts a new transaction. + ## + ## Example: + ## :: + ## proc doSomething(db: AristoDbRef) = + ## let tx = db.begin + ## defer: tx.rollback() + ## ... continue using db ... + ## tx.commit() + ## + if db.txFrameLevel != db.stack.len: + return err(TxStackGarbled) + + db.stack.add db.top + db.top = LayerRef( + delta: LayerDeltaRef(), + final: db.top.final.dup, + txUid: db.getTxUid) + + db.txRef = AristoTxRef( + db: db, + txUid: db.top.txUid, + parent: db.txRef, + level: db.stack.len) + + ok db.txRef + + +proc txFrameRollback*( + tx: AristoTxRef; # Top transaction on database + ): Result[void,AristoError] = + ## Given a *top level* handle, this function discards all database operations + ## performed for this transactio. The previous transaction is returned if + ## there was any. + ## + let db = ? tx.getDbDescFromTopTx() + + # Roll back to previous layer. + db.top = db.stack[^1] + db.stack.setLen(db.stack.len-1) + + db.txRef = db.txRef.parent + ok() + + +proc txFrameCommit*( + tx: AristoTxRef; # Top transaction on database + ): Result[void,AristoError] = + ## Given a *top level* handle, this function accepts all database operations + ## performed through this handle and merges it to the previous layer. The + ## previous transaction is returned if there was any. + ## + let db = ? tx.getDbDescFromTopTx() + db.hashify().isOkOr: + return err(error[1]) + + # Pop layer from stack and merge database top layer onto it + let merged = block: + if db.top.delta.sTab.len == 0 and + db.top.delta.kMap.len == 0: + # Avoid `layersMergeOnto()` + db.top.delta = db.stack[^1].delta + db.stack.setLen(db.stack.len-1) + db.top + else: + let layer = db.stack[^1] + db.stack.setLen(db.stack.len-1) + db.top.layersMergeOnto layer[] + layer + + # Install `merged` stack top layer and update stack + db.top = merged + db.txRef = tx.parent + if 0 < db.stack.len: + db.txRef.txUid = db.getTxUid + db.top.txUid = db.txRef.txUid + ok() + + +proc txFrameCollapse*( + tx: AristoTxRef; # Top transaction on database + commit: bool; # Commit if `true`, otherwise roll back + ): Result[void,AristoError] = + ## Iterated application of `commit()` or `rollback()` performing the + ## something similar to + ## :: + ## while true: + ## discard tx.commit() # ditto for rollback() + ## if db.txTop.isErr: break + ## tx = db.txTop.value + ## + let db = ? tx.getDbDescFromTopTx() + + if commit: + # For commit, hashify the current layer if requested and install it + db.hashify().isOkOr: + return err(error[1]) + + db.top.txUid = 0 + db.stack.setLen(0) + db.txRef = AristoTxRef(nil) + ok() + +# ------------------------------------------------------------------------------ +# Public iterators +# ------------------------------------------------------------------------------ + +iterator txFrameWalk*(tx: AristoTxRef): (int,AristoTxRef,LayerRef,AristoError) = + ## Walk down the transaction stack chain. + let db = tx.db + var tx = tx + + block body: + # Start at top layer if tx refers to that + if tx.level == db.stack.len: + if tx.txUid != db.top.txUid: + yield (-1,tx,db.top,TxStackGarbled) + break body + + # Yield the top level + yield (0,tx,db.top,AristoError(0)) + + # Walk down the transaction stack + for level in (tx.level-1).countDown(1): + tx = tx.parent + if tx.isNil or tx.level != level: + yield (-1,tx,LayerRef(nil),TxStackGarbled) + break body + + var layer = db.stack[level] + if tx.txUid != layer.txUid: + yield (-1,tx,layer,TxStackGarbled) + break body + + yield (db.stack.len-level,tx,layer,AristoError(0)) + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/db/aristo/aristo_tx/tx_stow.nim b/nimbus/db/aristo/aristo_tx/tx_stow.nim new file mode 100644 index 000000000..9254b572a --- /dev/null +++ b/nimbus/db/aristo/aristo_tx/tx_stow.nim @@ -0,0 +1,91 @@ +# nimbus-eth1 +# Copyright (c) 2023-2024 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +## Aristo DB -- Transaction stow/save helper +## ========================================= +## +{.push raises: [].} + +import + std/options, + results, + ".."/[aristo_desc, aristo_get, aristo_journal, aristo_layers, aristo_hashify] + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc txStow*( + db: AristoDbRef; # Database + nxtFid: Option[FilterID]; # Next filter ID (zero is OK) + persistent: bool; # Stage only unless `true` + chunkedMpt: bool; # Partial data (e.g. from `snap`) + ): Result[void,AristoError] = + ## Worker for `stow()` and `persist()` variants. + ## + if not db.txRef.isNil: + return err(TxPendingTx) + if 0 < db.stack.len: + return err(TxStackGarbled) + if persistent and not db.journalUpdateOk(): + return err(TxBackendNotWritable) + + # Update Merkle hashes (unless disabled) + db.hashify().isOkOr: + return err(error[1]) + + let fwd = db.journalFwdFilter(db.top, chunkedMpt).valueOr: + return err(error[1]) + + if fwd.isValid: + # Merge `top` layer into `roFilter` + db.journalMerge(fwd).isOkOr: + return err(error[1]) + + # Special treatment for `snap` proofs (aka `chunkedMpt`) + let final = + if chunkedMpt: LayerFinalRef(fRpp: db.top.final.fRpp) + else: LayerFinalRef() + + # New empty top layer (probably with `snap` proofs and `vGen` carry over) + db.top = LayerRef( + delta: LayerDeltaRef(), + final: final) + if db.roFilter.isValid: + db.top.final.vGen = db.roFilter.vGen + else: + let rc = db.getIdgUbe() + if rc.isOk: + db.top.final.vGen = rc.value + else: + # It is OK if there was no `Idg`. Otherwise something serious happened + # and there is no way to recover easily. + doAssert rc.error == GetIdgNotFound + + if persistent: + # Merge `roFiler` into persistent tables + ? db.journalUpdate nxtFid + db.roFilter = FilterRef(nil) + + # Special treatment for `snap` proofs (aka `chunkedMpt`) + let final = + if chunkedMpt: LayerFinalRef(vGen: db.vGen, fRpp: db.top.final.fRpp) + else: LayerFinalRef(vGen: db.vGen) + + # New empty top layer (probably with `snap` proofs carry over) + db.top = LayerRef( + delta: LayerDeltaRef(), + final: final, + txUid: db.top.txUid) + ok() + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/db/core_db/backend/aristo_db/aristo_replicate.nim b/nimbus/db/core_db/backend/aristo_db/aristo_replicate.nim index ae20fb3b9..da4468409 100644 --- a/nimbus/db/core_db/backend/aristo_db/aristo_replicate.nim +++ b/nimbus/db/core_db/backend/aristo_db/aristo_replicate.nim @@ -29,7 +29,7 @@ iterator aristoReplicate[T]( root = dsc.rootID mpt = dsc.to(AristoDbRef) api = dsc.to(AristoApiRef) - p = api.forkTop(mpt).valueOrApiError "aristoReplicate()" + p = api.forkTx(mpt,0).valueOrApiError "aristoReplicate()" defer: discard api.forget(p) for (vid,key,vtx,node) in T.replicate(p): if key.len == 32: diff --git a/nimbus/db/core_db/backend/aristo_db/handlers_aristo.nim b/nimbus/db/core_db/backend/aristo_db/handlers_aristo.nim index d8b373221..d07dd0cfd 100644 --- a/nimbus/db/core_db/backend/aristo_db/handlers_aristo.nim +++ b/nimbus/db/core_db/backend/aristo_db/handlers_aristo.nim @@ -314,9 +314,11 @@ proc accMethods(cAcc: AristoCoreDxAccRef): CoreDbAccFns = colType: CtAccounts) proc accCloneMpt(): CoreDbRc[CoreDxMptRef] = - ok(AristoCoreDxMptRef( + var xpt = AristoCoreDxMptRef( base: base, - mptRoot: AccountsVID)) + mptRoot: AccountsVID) + xpt.methods = xpt.mptMethods + ok(db.bless xpt) proc accFetch(address: EthAddress): CoreDbRc[CoreDbAccount] = const info = "acc/fetchFn()" @@ -570,7 +572,7 @@ func toVoidRc*[T]( proc getFromJournal*(base: AristoBaseRef; fid: Option[FilterID]): FilterRef = let be = base.ctx.mpt.backend if not be.isNil: - let fp = base.api.getFromJournal(be, fid, earlierOK=true).valueOr: + let fp = base.api.journalGetInx(be, fid, earlierOK=true).valueOr: return FilterRef(nil) return fp.fil @@ -730,9 +732,16 @@ proc init*( vid = VertexID(colType) key = colState.to(HashKey) + # Find `(vid,key)` on transaction stack + inx = block: + let rc = api.findTx(base.ctx.mpt, vid, key) + if rc.isErr: + return err(rc.error.toError(base, info)) + rc.value + # Fork MPT descriptor that provides `(vid,key)` newMpt = block: - let rc = api.forkWith(base.ctx.mpt, vid, key) + let rc = api.forkTx(base.ctx.mpt, inx) if rc.isErr: return err(rc.error.toError(base, info)) rc.value diff --git a/nimbus/db/core_db/backend/aristo_db/handlers_kvt.nim b/nimbus/db/core_db/backend/aristo_db/handlers_kvt.nim index ef6e71a6f..f5f36ee5e 100644 --- a/nimbus/db/core_db/backend/aristo_db/handlers_kvt.nim +++ b/nimbus/db/core_db/backend/aristo_db/handlers_kvt.nim @@ -279,7 +279,7 @@ func init*(T: type KvtBaseRef; db: CoreDbRef; kdb: KvtDbRef): T = base: result, kvt: kdb) dsc.methods = dsc.kvtMethods() - result.cache = KvtCoreDxKvtRef(db.bless dsc) + result.cache = db.bless dsc when CoreDbEnableApiProfiling: let profApi = KvtApiProfRef.init(result.api, kdb.backend) diff --git a/nimbus/sync/full/worker.nim b/nimbus/sync/full/worker.nim index b4da43c68..2fdd1136a 100644 --- a/nimbus/sync/full/worker.nim +++ b/nimbus/sync/full/worker.nim @@ -15,7 +15,7 @@ import chronos, eth/p2p, ../../db/aristo/aristo_desc, - ../../db/aristo/aristo_filter/filter_scheduler, + ../../db/aristo/aristo_journal/journal_scheduler, ".."/[protocol, sync_desc], ../handlers/eth, ../misc/[best_pivot, block_queue, sync_ctrl, ticker], diff --git a/tests/test_aristo/test_filter.nim b/tests/test_aristo/test_filter.nim index f5b7e96b7..7f5a30fdb 100644 --- a/tests/test_aristo/test_filter.nim +++ b/tests/test_aristo/test_filter.nim @@ -22,10 +22,10 @@ import aristo_debug, aristo_desc, aristo_desc/desc_backend, - aristo_filter, - aristo_filter/filter_fifos, - aristo_filter/filter_scheduler, aristo_get, + aristo_journal, + aristo_journal/journal_ops, + aristo_journal/journal_scheduler, aristo_layers, aristo_merge, aristo_persistent, @@ -191,7 +191,7 @@ proc dbTriplet(w: LeafQuartet; rdbPath: string): Result[DbTriplet,AristoError] = check rc.error == 0 return - let dx = [db, db.forkTop.value, db.forkTop.value] + let dx = [db, db.forkTx(0).value, db.forkTx(0).value] xCheck dx[0].nForked == 2 # Reduce unwanted tx layers @@ -417,7 +417,7 @@ proc storeFilter( ): bool = ## .. let instr = block: - let rc = be.fifosStore(filter, none(FilterID)) + let rc = be.journalOpsPushSlot(filter, none(FilterID)) xCheckRc rc.error == 0 rc.value @@ -450,11 +450,11 @@ proc fetchDelete( ## ... let vfyInst = block: - let rc = be.fifosDelete(backSteps = backSteps) + let rc = be.journalOpsDeleteSlots(backSteps = backSteps) xCheckRc rc.error == 0 rc.value instr = block: - let rc = be.fifosFetch(backSteps = backSteps) + let rc = be.journalOpsFetchSlots(backSteps = backSteps) xCheckRc rc.error == 0 rc.value qid = be.journal.le(instr.fil.fid, be.qid2fidFn) @@ -842,7 +842,7 @@ proc testFilterBacklog*( # Realign to earlier state xb = block: - let rc = db.forkByJournal(episode = episode) + let rc = db.journalFork(episode = episode) xCheckRc rc.error == 0 rc.value block: @@ -851,7 +851,7 @@ proc testFilterBacklog*( # Store this state backend database (temporarily re-centre) block: - let rc = xb.resolveBackendFilter(reCentreOk = true) + let rc = xb.journalUpdate(reCentreOk = true) xCheckRc rc.error == 0 xCheck db.isCentre block: @@ -863,7 +863,7 @@ proc testFilterBacklog*( # Restore previous database state block: - let rc = db.resolveBackendFilter() + let rc = db.journalUpdate() xCheckRc rc.error == 0 block: let rc = db.check(relax=false) diff --git a/tests/test_aristo/test_helpers.nim b/tests/test_aristo/test_helpers.nim index e7e22f22c..a49ebea36 100644 --- a/tests/test_aristo/test_helpers.nim +++ b/tests/test_aristo/test_helpers.nim @@ -13,7 +13,7 @@ import eth/common, rocksdb, ../../nimbus/db/aristo/[ - aristo_debug, aristo_desc, aristo_delete, aristo_filter/filter_scheduler, + aristo_debug, aristo_desc, aristo_delete, aristo_journal/journal_scheduler, aristo_hashify, aristo_hike, aristo_merge], ../../nimbus/db/kvstore_rocksdb, ../../nimbus/sync/protocol/snap/snap_types, diff --git a/tests/test_aristo/test_misc.nim b/tests/test_aristo/test_misc.nim index 6f9a6e7a1..689a48e27 100644 --- a/tests/test_aristo/test_misc.nim +++ b/tests/test_aristo/test_misc.nim @@ -21,7 +21,7 @@ import ../../nimbus/db/aristo/[ aristo_check, aristo_debug, aristo_desc, aristo_blobify, aristo_layers, aristo_vid], - ../../nimbus/db/aristo/aristo_filter/filter_scheduler, + ../../nimbus/db/aristo/aristo_journal/journal_scheduler, ../replay/xcheck, ./test_helpers @@ -396,7 +396,7 @@ proc testQidScheduler*( let w = scd.addItem() let execOk = list.exec(serial=n, instr=w.exec, relax=false) xCheck execOk - scd[] = w.fifo[] + scd[] = w.journal[] let validateOk = list.validate(scd, serial=n, relax=false) xCheck validateOk: show(serial=n, exec=w.exec) @@ -436,7 +436,7 @@ proc testQidScheduler*( list.del qid xCheck delIDs.len == 0 - scd[] = fetch.fifo[] + scd[] = fetch.journal[] # ------------------- @@ -445,7 +445,7 @@ proc testQidScheduler*( let w = scd.addItem() let execOk = list.exec(serial=n, instr=w.exec, relax=true) xCheck execOk - scd[] = w.fifo[] + scd[] = w.journal[] let validateOk = list.validate(scd, serial=n, relax=true) xCheck validateOk: show(serial=n, exec=w.exec) @@ -455,7 +455,7 @@ proc testQidScheduler*( let w = scd.addItem() let execOk = list.exec(serial=n, instr=w.exec, relax=false) xCheck execOk - scd[] = w.fifo[] + scd[] = w.journal[] let validateOk = list.validate(scd, serial=n, relax=false) xCheck validateOk