From 5ac362fe6f85effa0708b9b94c06405604b7d7a5 Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Thu, 18 Jul 2024 21:32:32 +0000 Subject: [PATCH] Aristo and kvt balancer management update (#2504) * Aristo: Merge `delta_siblings` module into `deltaPersistent()` * Aristo: Add `isEmpty()` for canonical checking whether a layer is empty * Aristo: Merge `LayerDeltaRef` into `LayerObj` why: No need to maintain nested object refs anymore. Previously the `LayerDeltaRef` object had a companion `LayerFinalRef` which held non-delta layer information. * Kvt: Merge `LayerDeltaRef` into `LayerRef` why: No need to maintain nested object refs (as with `Aristo`) * Kvt: Re-write balancer logic similar to `Aristo` why: Although `Kvt` was a cheap copy of `Aristo` it sort of got out of sync and the balancer code was wrong. * Update iterator over forked peers why: Yield additional field `isLast` indicating that the last iteration cycle was approached. * Optimise balancer calculation. why: One can often avoid providing a new object containing the merge of two layers for the balancer. This avoids copying tables. In some cases this is replaced by `hasKey()` look ups though. One uses one of the two to combine and merges the other into the first. Of course, this needs some checks for making sure that none of the components to merge is eventually shared with something else. * Fix copyright year --- nimbus/db/aristo/aristo_debug.nim | 85 ++++--------- nimbus/db/aristo/aristo_delta.nim | 42 +++--- nimbus/db/aristo/aristo_delta/delta_merge.nim | 94 +++++++------- .../db/aristo/aristo_delta/delta_reverse.nim | 7 +- .../db/aristo/aristo_delta/delta_siblings.nim | 120 ------------------ nimbus/db/aristo/aristo_desc.nim | 28 ++-- .../db/aristo/aristo_desc/desc_structural.nim | 12 +- nimbus/db/aristo/aristo_init/persistent.nim | 3 +- nimbus/db/aristo/aristo_layers.nim | 93 +++++++------- nimbus/db/aristo/aristo_tx.nim | 6 +- nimbus/db/aristo/aristo_tx/tx_fork.nim | 3 +- nimbus/db/aristo/aristo_tx/tx_frame.nim | 22 +--- nimbus/db/aristo/aristo_tx/tx_stow.nim | 43 ++++--- nimbus/db/aristo/aristo_vid.nim | 13 +- nimbus/db/aristo/aristo_walk/walk_private.nim | 8 +- nimbus/db/kvt/kvt_debug.nim | 8 +- nimbus/db/kvt/kvt_delta.nim | 38 +++--- nimbus/db/kvt/kvt_delta/delta_merge.nim | 70 ++++++---- nimbus/db/kvt/kvt_delta/delta_reverse.nim | 39 +++--- nimbus/db/kvt/kvt_desc.nim | 14 +- nimbus/db/kvt/kvt_desc/desc_structural.nim | 15 +-- nimbus/db/kvt/kvt_layers.nim | 22 ++-- nimbus/db/kvt/kvt_tx/tx_frame.nim | 5 +- nimbus/db/kvt/kvt_tx/tx_stow.nim | 20 ++- tests/test_aristo/test_balancer.nim | 14 +- 25 files changed, 362 insertions(+), 462 deletions(-) delete mode 100644 nimbus/db/aristo/aristo_delta/delta_siblings.nim diff --git a/nimbus/db/aristo/aristo_debug.nim b/nimbus/db/aristo/aristo_debug.nim index 799f7ed6b..6255d128e 100644 --- a/nimbus/db/aristo/aristo_debug.nim +++ b/nimbus/db/aristo/aristo_debug.nim @@ -291,8 +291,8 @@ proc ppXMap*( result.setLen(result.len - 1) result &= "}" -proc ppFilter( - fl: LayerDeltaRef; +proc ppBalancer( + fl: LayerRef; db: AristoDbRef; indent: int; ): string = @@ -398,18 +398,18 @@ proc ppLayer( if 2 < nOKs: result &= "".doPrefix(false) if vTopOk: - result &= "".doPrefix(true) & "vTop=" & layer.delta.vTop.ppVid + result &= "".doPrefix(true) & "vTop=" & layer.vTop.ppVid if sTabOk: let - tLen = layer.delta.sTab.len + tLen = layer.sTab.len info = "sTab(" & $tLen & ")" - result &= info.doPrefix(0 < tLen) & layer.delta.sTab.ppSTab(db,indent+2) + result &= info.doPrefix(0 < tLen) & layer.sTab.ppSTab(db,indent+2) if kMapOk: let - tLen = layer.delta.kMap.len + tLen = layer.kMap.len info = "kMap(" & $tLen & ")" result &= info.doPrefix(0 < tLen) - result &= db.ppXMap(layer.delta.kMap, indent+2) + result &= db.ppXMap(layer.kMap, indent+2) # ------------------------------------------------------------------------------ # Public functions @@ -528,53 +528,17 @@ proc pp*( layer: LayerRef; db: AristoDbRef; indent = 4; + balancerOk = false; + sTabOk = true, + kMapOk = true, + other = true, ): string = - layer.ppLayer( - db, vTopOk=true, sTabOk=true, kMapOk=true) - -proc pp*( - layer: LayerRef; - db: AristoDbRef; - xTabOk: bool; - indent = 4; - ): string = - layer.ppLayer( - db, vTopOk=true, sTabOk=xTabOk, kMapOk=true) - -proc pp*( - layer: LayerRef; - db: AristoDbRef; - xTabOk: bool; - kMapOk: bool; - other = false; - indent = 4; - ): string = - layer.ppLayer( - db, vTopOk=other, sTabOk=xTabOk, kMapOk=kMapOk) - - -proc pp*( - db: AristoDbRef; - xTabOk: bool; - indent = 4; - ): string = - db.layersCc.pp(db, xTabOk=xTabOk, indent=indent) - -proc pp*( - db: AristoDbRef; - xTabOk: bool; - kMapOk: bool; - other = false; - indent = 4; - ): string = - db.layersCc.pp(db, xTabOk=xTabOk, kMapOk=kMapOk, other=other, indent=indent) - -proc pp*( - filter: LayerDeltaRef; - db = AristoDbRef(nil); - indent = 4; - ): string = - filter.ppFilter(db.orDefault(), indent) + if balancerOk: + layer.ppLayer( + db.orDefault(), vTopOk=other, sTabOk=sTabOk, kMapOk=kMapOk) + else: + layer.ppLayer( + db.orDefault(), vTopOk=other, sTabOk=sTabOk, kMapOk=kMapOk) proc pp*( be: BackendRef; @@ -582,7 +546,7 @@ proc pp*( limit = 100; indent = 4; ): string = - result = db.balancer.ppFilter(db, indent+1) & indent.toPfx + result = db.balancer.ppBalancer(db, indent+1) & indent.toPfx case be.kind: of BackendMemory: result &= be.MemBackendRef.ppBe(db, limit, indent+1) @@ -599,11 +563,12 @@ proc pp*( topOk = true; stackOk = true; kMapOk = true; + sTabOk = true; limit = 100; ): string = if topOk: result = db.layersCc.pp( - db, xTabOk=true, kMapOk=kMapOk, other=true, indent=indent) + db, sTabOk=sTabOk, kMapOk=kMapOk, other=true, indent=indent) let stackOnlyOk = stackOk and not (topOk or balancerOk or backendOk) if not stackOnlyOk: result &= indent.toPfx & "level=" & $db.stack.len @@ -614,15 +579,15 @@ proc pp*( let m = layers.len - n - 1 l = db.layersCc m - a = w.delta.kMap.values.toSeq.filterIt(not it.isValid).len - c = l.delta.kMap.values.toSeq.filterIt(not it.isValid).len - result &= "(" & $(w.delta.kMap.len - a) & "," & $a & ")" - lStr &= " " & $m & "=(" & $(l.delta.kMap.len - c) & "," & $c & ")" + a = w.kMap.values.toSeq.filterIt(not it.isValid).len + c = l.kMap.values.toSeq.filterIt(not it.isValid).len + result &= "(" & $(w.kMap.len - a) & "," & $a & ")" + lStr &= " " & $m & "=(" & $(l.kMap.len - c) & "," & $c & ")" result &= " =>" & lStr if backendOk: result &= indent.toPfx & db.backend.pp(db, limit=limit, indent) elif balancerOk: - result &= indent.toPfx & db.balancer.ppFilter(db, indent+1) + result &= indent.toPfx & db.balancer.ppBalancer(db, indent+1) proc pp*(sdb: MerkleSignRef; indent = 4): string = result = "" & diff --git a/nimbus/db/aristo/aristo_delta.nim b/nimbus/db/aristo/aristo_delta.nim index b10cc6a5e..571425c36 100644 --- a/nimbus/db/aristo/aristo_delta.nim +++ b/nimbus/db/aristo/aristo_delta.nim @@ -16,9 +16,9 @@ import std/tables, eth/common, results, - ./aristo_desc, + "."/[aristo_desc, aristo_layers], ./aristo_desc/desc_backend, - ./aristo_delta/delta_siblings + ./aristo_delta/[delta_merge, delta_reverse] # ------------------------------------------------------------------------------ # Public functions, save to backend @@ -34,17 +34,16 @@ proc deltaPersistent*( nxtFid = 0u64; # Next filter ID (if any) reCentreOk = false; ): Result[void,AristoError] = - ## Resolve (i.e. move) the backend filter into the physical backend database. + ## Resolve (i.e. move) the balancer into the physical backend database. ## - ## This needs write permission on the backend DB for the argument `db` - ## descriptor (see the function `aristo_desc.isCentre()`.) With the argument - ## flag `reCentreOk` passed `true`, write permission will be temporarily + ## This needs write permission on the backend DB for the descriptor argument + ## `db` (see the function `aristo_desc.isCentre()`.) If the argument flag + ## `reCentreOk` is passed `true`, write permission will be temporarily ## acquired when needed. ## - ## When merging the current backend filter, its reverse will be is stored as - ## back log on the filter fifos (so the current state can be retrieved.) - ## Also, other non-centre descriptors are updated so there is no visible - ## database change for these descriptors. + ## When merging the current backend filter, its reverse will be is stored + ## on other non-centre descriptors so there is no visible database change + ## for these. ## let be = db.backend if be.isNil: @@ -64,9 +63,21 @@ proc deltaPersistent*( # Always re-centre to `parent` (in case `reCentreOk` was set) defer: discard parent.reCentre() - # Initialise peer filter balancer. - let updateSiblings = ? UpdateSiblingsRef.init db - defer: updateSiblings.rollback() + # Update forked balancers here do that errors are detected early (if any.) + if 0 < db.nForked: + let rev = db.revFilter(db.balancer).valueOr: + return err(error[1]) + if not rev.isEmpty: # Can an empty `rev` happen at all? + var unsharedRevOk = true + for w in db.forked: + if not w.db.balancer.isValid: + unsharedRevOk = false + # The `rev` filter can be modified if one can make sure that it is + # not shared (i.e. only previously merged into the w.db.balancer.) + # Note that it is trivially true for a single fork. + let modLowerOk = w.isLast and unsharedRevOk + w.db.balancer = deltaMerge( + w.db.balancer, modUpperOk=false, rev, modLowerOk=modLowerOk) let lSst = SavedState( key: EMPTY_ROOT_HASH, # placeholder for more @@ -93,8 +104,9 @@ proc deltaPersistent*( if not db.stoLeaves.lruUpdate(mixKey, vtx): discard db.stoLeaves.lruAppend(mixKey, vtx, accLruSize) - # Update dudes and this descriptor - ? updateSiblings.update().commit() + # Done with balancer, all saved to backend + db.balancer = LayerRef(nil) + ok() # ------------------------------------------------------------------------------ diff --git a/nimbus/db/aristo/aristo_delta/delta_merge.nim b/nimbus/db/aristo/aristo_delta/delta_merge.nim index b50732f3f..b491bf283 100644 --- a/nimbus/db/aristo/aristo_delta/delta_merge.nim +++ b/nimbus/db/aristo/aristo_delta/delta_merge.nim @@ -11,72 +11,70 @@ import std/tables, eth/common, - results, - ".."/[aristo_desc, aristo_get] + ".."/[aristo_desc, aristo_layers] # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ proc deltaMerge*( - db: AristoDbRef; - upper: LayerDeltaRef; # new filter, `nil` is ok - lower: LayerDeltaRef; # Trg filter, `nil` is ok - ): Result[LayerDeltaRef,(VertexID,AristoError)] = + upper: LayerRef; # Think of `top`, `nil` is ok + modUpperOk: bool; # May re-use/modify `upper` + lower: LayerRef; # Think of `balancer`, `nil` is ok + modLowerOk: bool; # May re-use/modify `lower` + ): LayerRef = ## Merge argument `upper` into the `lower` filter instance. ## ## Note that the namimg `upper` and `lower` indicate that the filters are - ## stacked and the database access is `upper -> lower -> backend` whereas - ## the `src/trg` matching logic goes the other way round. + ## stacked and the database access is `upper -> lower -> backend`. ## - # Degenerate case: `upper` is void if lower.isNil: - if upper.isNil: - # Even more degenerate case when both filters are void - return ok LayerDeltaRef(nil) - return ok(upper) + # Degenerate case: `upper` is void + result = upper - # Degenerate case: `upper` is non-trivial and `lower` is void - if upper.isNil: - return ok(lower) + elif upper.isNil: + # Degenerate case: `lower` is void + result = lower - # There is no need to deep copy table vertices as they will not be modified. - let newFilter = LayerDeltaRef( - sTab: lower.sTab, - kMap: lower.kMap, - vTop: upper.vTop) + elif modLowerOk: + # Can modify `lower` which is the prefered action mode but applies only + # in cases where the `lower` argument is not shared. + lower.vTop = upper.vTop + layersMergeOnto(upper, lower[]) + result = lower - for (rvid,vtx) in upper.sTab.pairs: - if vtx.isValid or not newFilter.sTab.hasKey rvid: - newFilter.sTab[rvid] = vtx - elif newFilter.sTab.getOrVoid(rvid).isValid: - let rc = db.getVtxUbe rvid - if rc.isOk: - newFilter.sTab[rvid] = vtx # VertexRef(nil) - elif rc.error == GetVtxNotFound: - newFilter.sTab.del rvid - else: - return err((rvid.vid,rc.error)) + elif not modUpperOk: + # Cannot modify any argument layers. + result = LayerRef( + sTab: lower.sTab, # shallow copy (entries will not be modified) + kMap: lower.kMap, + accLeaves: lower.accLeaves, + stoLeaves: lower.stoLeaves, + vTop: upper.vTop) + layersMergeOnto(upper, result[]) - for (rvid,key) in upper.kMap.pairs: - if key.isValid or not newFilter.kMap.hasKey rvid: - newFilter.kMap[rvid] = key - elif newFilter.kMap.getOrVoid(rvid).isValid: - let rc = db.getKeyUbe rvid - if rc.isOk: - newFilter.kMap[rvid] = key - elif rc.error == GetKeyNotFound: - newFilter.kMap.del rvid - else: - return err((rvid.vid,rc.error)) + else: + # Otherwise avoid copying some tables by modifyinh `upper`. This is not + # completely free as the merge direction changes to merging the `lower` + # layer up into the higher prioritised `upper` layer (note that the `lower` + # argument filter is read-only.) Here again, the `upper` argument must not + # be a shared layer/filter. + for (rvid,vtx) in lower.sTab.pairs: + if not upper.sTab.hasKey(rvid): + upper.sTab[rvid] = vtx - for (accPath,leafVtx) in upper.accLeaves.pairs: - newFilter.accLeaves[accPath] = leafVtx + for (rvid,key) in lower.kMap.pairs: + if not upper.kMap.hasKey(rvid): + upper.kMap[rvid] = key - for (mixPath,leafVtx) in upper.stoLeaves.pairs: - newFilter.stoLeaves[mixPath] = leafVtx + for (accPath,leafVtx) in lower.accLeaves.pairs: + if not upper.accLeaves.hasKey(accPath): + upper.accLeaves[accPath] = leafVtx - ok newFilter + for (mixPath,leafVtx) in lower.stoLeaves.pairs: + if not upper.stoLeaves.hasKey(mixPath): + upper.stoLeaves[mixPath] = leafVtx + result = upper # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/aristo/aristo_delta/delta_reverse.nim b/nimbus/db/aristo/aristo_delta/delta_reverse.nim index 5c3f84fe4..2b9609759 100644 --- a/nimbus/db/aristo/aristo_delta/delta_reverse.nim +++ b/nimbus/db/aristo/aristo_delta/delta_reverse.nim @@ -20,16 +20,15 @@ import proc revFilter*( db: AristoDbRef; # Database - filter: LayerDeltaRef; # Filter to revert - ): Result[LayerDeltaRef,(VertexID,AristoError)] = + filter: LayerRef; # Filter to revert + ): Result[LayerRef,(VertexID,AristoError)] = ## Assemble reverse filter for the `filter` argument, i.e. changes to the ## backend that reverse the effect of applying the this read-only filter. ## ## This read-only filter is calculated against the current unfiltered ## backend (excluding optionally installed read-only filter.) ## - # Register MPT state roots for reverting back - let rev = LayerDeltaRef() + let rev = LayerRef() # Get vid generator state on backend block: diff --git a/nimbus/db/aristo/aristo_delta/delta_siblings.nim b/nimbus/db/aristo/aristo_delta/delta_siblings.nim deleted file mode 100644 index 9cd7ac6fc..000000000 --- a/nimbus/db/aristo/aristo_delta/delta_siblings.nim +++ /dev/null @@ -1,120 +0,0 @@ -# nimbus-eth1 -# Copyright (c) 2023-2024 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or -# http://www.apache.org/licenses/LICENSE-2.0) -# * MIT license ([LICENSE-MIT](LICENSE-MIT) or -# http://opensource.org/licenses/MIT) -# at your option. This file may not be copied, modified, or distributed -# except according to those terms. - -import - results, - eth/common, - ../aristo_desc, - "."/[delta_merge, delta_reverse] - -type - UpdateState = enum - Initial = 0 - Updated, - Finished - - UpdateSiblingsRef* = ref object - ## Update transactional context - state: UpdateState - db: AristoDbRef ## Main database access - balancers: seq[(AristoDbRef,LayerDeltaRef)] ## Rollback data - rev: LayerDeltaRef ## Reverse filter set up - -# ------------------------------------------------------------------------------ -# Public contructor, commit, rollback -# ------------------------------------------------------------------------------ - -proc rollback*(ctx: UpdateSiblingsRef) = - ## Rollback any changes made by the `update()` function. Subsequent - ## `rollback()` or `commit()` calls will be without effect. - if ctx.state == Updated: - for (d,f) in ctx.balancers: - d.balancer = f - ctx.state = Finished - - -proc commit*(ctx: UpdateSiblingsRef): Result[void,AristoError] = - ## Accept updates. Subsequent `rollback()` calls will be without effect. - if ctx.state != Updated: - ctx.rollback() - return err(FilSiblingsCommitUnfinshed) - ctx.db.balancer = LayerDeltaRef(nil) - ctx.state = Finished - ok() - -proc commit*( - rc: Result[UpdateSiblingsRef,AristoError]; - ): Result[void,AristoError] = - ## Variant of `commit()` for joining with `update()` - (? rc).commit() - - -proc init*( - T: type UpdateSiblingsRef; # Context type - db: AristoDbRef; # Main database - ): Result[T,AristoError] = - ## Set up environment for installing the reverse of the `db` argument current - ## read-only filter onto every associated descriptor referring to the same - ## database. - if not db.isCentre: - return err(FilBackendRoMode) - if db.nForked == 0: - return ok T(db: db) # No need to do anything - - func fromVae(err: (VertexID,AristoError)): AristoError = - err[1] - - # Filter rollback context - ok T( - db: db, - rev: ? db.revFilter(db.balancer).mapErr fromVae) # Reverse filter - -# ------------------------------------------------------------------------------ -# Public functions -# ------------------------------------------------------------------------------ - -proc update*(ctx: UpdateSiblingsRef): Result[UpdateSiblingsRef,AristoError] = - ## This function installs the reverse of the `init()` argument `db` current - ## read-only filter onto every associated descriptor referring to the same - ## database. If the function fails, a `rollback()` is called automatically. - ## - ## This function might do some real work so it was detached from `init()` so - ## it can be called late but before the physical database is updated. - ## - if ctx.state == Initial: - ctx.state = Updated - if not ctx.rev.isNil: - let db = ctx.db - # Update distributed filters. Note that the physical backend database - # must not have been updated, yet. So the new root key for the backend - # will be `db.balancer.kMap[$1]`. - for w in db.forked: - if w.balancer.isNil: - # Sharing the `ctx.rev` ref is safe as it is read-inly - w.balancer = ctx.rev - else: - let rc = db.deltaMerge(w.balancer, ctx.rev) - if rc.isErr: - ctx.rollback() - return err(rc.error[1]) - ctx.balancers.add (w, w.balancer) - w.balancer = rc.value - ok(ctx) - -proc update*( - rc: Result[UpdateSiblingsRef,AristoError] - ): Result[UpdateSiblingsRef,AristoError] = - ## Variant of `update()` for joining with `init()` - (? rc).update() - -# ------------------------------------------------------------------------------ -# End -# ------------------------------------------------------------------------------ - diff --git a/nimbus/db/aristo/aristo_desc.nim b/nimbus/db/aristo/aristo_desc.nim index f46ca9092..35a083c2b 100644 --- a/nimbus/db/aristo/aristo_desc.nim +++ b/nimbus/db/aristo/aristo_desc.nim @@ -74,7 +74,7 @@ type ## Three tier database object supporting distributed instances. top*: LayerRef ## Database working layer, mutable stack*: seq[LayerRef] ## Stashed immutable parent layers - balancer*: LayerDeltaRef ## Baland out concurrent backend access + balancer*: LayerRef ## Balance out concurrent backend access backend*: BackendRef ## Backend database (may well be `nil`) txRef*: AristoTxRef ## Latest active transaction @@ -152,8 +152,8 @@ func isValid*(nd: NodeRef): bool = func isValid*(pid: PathID): bool = pid != VOID_PATH_ID -func isValid*(filter: LayerDeltaRef): bool = - filter != LayerDeltaRef(nil) +func isValid*(layer: LayerRef): bool = + layer != LayerRef(nil) func isValid*(root: Hash256): bool = root != EMPTY_ROOT_HASH @@ -250,11 +250,11 @@ proc fork*( if not noTopLayer: clone.top = LayerRef.init() if not db.balancer.isNil: - clone.top.delta.vTop = db.balancer.vTop + clone.top.vTop = db.balancer.vTop else: let rc = clone.backend.getTuvFn() if rc.isOk: - clone.top.delta.vTop = rc.value + clone.top.vTop = rc.value elif rc.error != GetTuvNotFound: return err(rc.error) @@ -263,13 +263,19 @@ proc fork*( ok clone -iterator forked*(db: AristoDbRef): AristoDbRef = +iterator forked*(db: AristoDbRef): tuple[db: AristoDbRef, isLast: bool] = ## Interate over all non centre descriptors (see comments on `reCentre()` ## for details.) + ## + ## The second `isLast` yielded loop entry is `true` if the yielded tuple + ## is the last entry in the list. + ## if not db.dudes.isNil: - for dude in db.getCentre.dudes.peers.items: + var nLeft = db.dudes.peers.len + for dude in db.dudes.peers.items: if dude != db.dudes.centre: - yield dude + nLeft.dec + yield (dude, nLeft == 1) func nForked*(db: AristoDbRef): int = ## Returns the number of non centre descriptors (see comments on `reCentre()` @@ -313,12 +319,12 @@ iterator rstack*(db: AristoDbRef): LayerRef = for i in 0.. 0: doAssert level <= db.stack.len - db.stack[^level].delta + db.stack[^level] elif level == -1: doAssert db.balancer != nil db.balancer diff --git a/nimbus/db/aristo/aristo_desc/desc_structural.nim b/nimbus/db/aristo/aristo_desc/desc_structural.nim index 27fae40a5..7e3fef54b 100644 --- a/nimbus/db/aristo/aristo_desc/desc_structural.nim +++ b/nimbus/db/aristo/aristo_desc/desc_structural.nim @@ -89,7 +89,8 @@ type key*: Hash256 ## Some state hash (if any) serial*: uint64 ## Generic identifier from application - LayerDeltaRef* = ref object + LayerRef* = ref LayerObj + LayerObj* = object ## Delta layers are stacked implying a tables hierarchy. Table entries on ## a higher level take precedence over lower layer table entries. So an ## existing key-value table entry of a layer on top supersedes same key @@ -119,12 +120,7 @@ type accLeaves*: Table[Hash256, VertexRef] ## Account path -> VertexRef stoLeaves*: Table[Hash256, VertexRef] ## Storage path -> VertexRef - LayerRef* = ref LayerObj - LayerObj* = object - ## Hexary trie database layer structures. Any layer holds the full - ## change relative to the backend. - delta*: LayerDeltaRef ## Most structural tables held as deltas - txUid*: uint ## Transaction identifier if positive + txUid*: uint ## Transaction identifier if positive # ------------------------------------------------------------------------------ # Public helpers (misc) @@ -132,7 +128,7 @@ type func init*(T: type LayerRef): T = ## Constructor, returns empty layer - T(delta: LayerDeltaRef()) + T() func hash*(node: NodeRef): Hash = ## Table/KeyedQueue/HashSet mixin diff --git a/nimbus/db/aristo/aristo_init/persistent.nim b/nimbus/db/aristo/aristo_init/persistent.nim index 25f6182d7..0e78e4e59 100644 --- a/nimbus/db/aristo/aristo_init/persistent.nim +++ b/nimbus/db/aristo/aristo_init/persistent.nim @@ -51,8 +51,7 @@ proc newAristoRdbDbRef( return err(rc.error) rc.value ok((AristoDbRef( - top: LayerRef( - delta: LayerDeltaRef(vTop: vTop)), + top: LayerRef(vTop: vTop), backend: be), oCfs)) # ------------------------------------------------------------------------------ diff --git a/nimbus/db/aristo/aristo_layers.nim b/nimbus/db/aristo/aristo_layers.nim index c09a5cb10..b4d8a33dd 100644 --- a/nimbus/db/aristo/aristo_layers.nim +++ b/nimbus/db/aristo/aristo_layers.nim @@ -30,7 +30,7 @@ func dup(sTab: Table[RootedVertexID,VertexRef]): Table[RootedVertexID,VertexRef] # ------------------------------------------------------------------------------ func vTop*(db: AristoDbRef): VertexID = - db.top.delta.vTop + db.top.vTop # ------------------------------------------------------------------------------ # Public getters/helpers @@ -42,7 +42,7 @@ func nLayersVtx*(db: AristoDbRef): int = ## layers as there might be duplicate entries for the same vertex ID on ## different layers. ## - db.stack.mapIt(it.delta.sTab.len).foldl(a + b, db.top.delta.sTab.len) + db.stack.mapIt(it.sTab.len).foldl(a + b, db.top.sTab.len) func nLayersKey*(db: AristoDbRef): int = ## Number of vertex ID/key entries on the cache layers. This is an upper @@ -50,7 +50,7 @@ func nLayersKey*(db: AristoDbRef): int = ## layers as there might be duplicate entries for the same vertex ID on ## different layers. ## - db.stack.mapIt(it.delta.kMap.len).foldl(a + b, db.top.delta.kMap.len) + db.stack.mapIt(it.kMap.len).foldl(a + b, db.top.kMap.len) # ------------------------------------------------------------------------------ # Public functions: getter variants @@ -60,11 +60,11 @@ func layersGetVtx*(db: AristoDbRef; rvid: RootedVertexID): Opt[(VertexRef, int)] ## Find a vertex on the cache layers. An `ok()` result might contain a ## `nil` vertex if it is stored on the cache that way. ## - db.top.delta.sTab.withValue(rvid, item): + db.top.sTab.withValue(rvid, item): return Opt.some((item[], 0)) for i, w in enumerate(db.rstack): - w.delta.sTab.withValue(rvid, item): + w.sTab.withValue(rvid, item): return Opt.some((item[], i + 1)) Opt.none((VertexRef, int)) @@ -78,11 +78,11 @@ func layersGetKey*(db: AristoDbRef; rvid: RootedVertexID): Opt[(HashKey, int)] = ## Find a hash key on the cache layers. An `ok()` result might contain a void ## hash key if it is stored on the cache that way. ## - db.top.delta.kMap.withValue(rvid, item): + db.top.kMap.withValue(rvid, item): return Opt.some((item[], 0)) for i, w in enumerate(db.rstack): - w.delta.kMap.withValue(rvid, item): + w.kMap.withValue(rvid, item): return ok((item[], i + 1)) Opt.none((HashKey, int)) @@ -92,21 +92,21 @@ func layersGetKeyOrVoid*(db: AristoDbRef; rvid: RootedVertexID): HashKey = (db.layersGetKey(rvid).valueOr (VOID_HASH_KEY, 0))[0] func layersGetAccLeaf*(db: AristoDbRef; accPath: Hash256): Opt[VertexRef] = - db.top.delta.accLeaves.withValue(accPath, item): + db.top.accLeaves.withValue(accPath, item): return Opt.some(item[]) for w in db.rstack: - w.delta.accLeaves.withValue(accPath, item): + w.accLeaves.withValue(accPath, item): return Opt.some(item[]) Opt.none(VertexRef) func layersGetStoLeaf*(db: AristoDbRef; mixPath: Hash256): Opt[VertexRef] = - db.top.delta.stoLeaves.withValue(mixPath, item): + db.top.stoLeaves.withValue(mixPath, item): return Opt.some(item[]) for w in db.rstack: - w.delta.stoLeaves.withValue(mixPath, item): + w.stoLeaves.withValue(mixPath, item): return Opt.some(item[]) Opt.none(VertexRef) @@ -121,7 +121,7 @@ func layersPutVtx*( vtx: VertexRef; ) = ## Store a (potentally empty) vertex on the top layer - db.top.delta.sTab[rvid] = vtx + db.top.sTab[rvid] = vtx func layersResVtx*( db: AristoDbRef; @@ -138,7 +138,7 @@ func layersPutKey*( key: HashKey; ) = ## Store a (potentally void) hash key on the top layer - db.top.delta.kMap[rvid] = key + db.top.kMap[rvid] = key func layersResKey*(db: AristoDbRef; rvid: RootedVertexID) = @@ -157,30 +157,39 @@ proc layersUpdateVtx*( func layersPutAccLeaf*(db: AristoDbRef; accPath: Hash256; leafVtx: VertexRef) = - db.top.delta.accLeaves[accPath] = leafVtx + db.top.accLeaves[accPath] = leafVtx func layersPutStoLeaf*(db: AristoDbRef; mixPath: Hash256; leafVtx: VertexRef) = - db.top.delta.stoLeaves[mixPath] = leafVtx + db.top.stoLeaves[mixPath] = leafVtx # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ +func isEmpty*(ly: LayerRef): bool = + ## Returns `true` if the layer does not contain any changes, i.e. all the + ## tables are empty. The field `txUid` is ignored, here. + ly.sTab.len == 0 and + ly.kMap.len == 0 and + ly.accLeaves.len == 0 and + ly.stoLeaves.len == 0 + + func layersMergeOnto*(src: LayerRef; trg: var LayerObj) = ## Merges the argument `src` into the argument `trg` and returns `trg`. For ## the result layer, the `txUid` value set to `0`. ## trg.txUid = 0 - for (vid,vtx) in src.delta.sTab.pairs: - trg.delta.sTab[vid] = vtx - for (vid,key) in src.delta.kMap.pairs: - trg.delta.kMap[vid] = key - trg.delta.vTop = src.delta.vTop - for (accPath,leafVtx) in src.delta.accLeaves.pairs: - trg.delta.accLeaves[accPath] = leafVtx - for (mixPath,leafVtx) in src.delta.stoLeaves.pairs: - trg.delta.stoLeaves[mixPath] = leafVtx + for (vid,vtx) in src.sTab.pairs: + trg.sTab[vid] = vtx + for (vid,key) in src.kMap.pairs: + trg.kMap[vid] = key + trg.vTop = src.vTop + for (accPath,leafVtx) in src.accLeaves.pairs: + trg.accLeaves[accPath] = leafVtx + for (mixPath,leafVtx) in src.stoLeaves.pairs: + trg.stoLeaves[mixPath] = leafVtx func layersCc*(db: AristoDbRef; level = high(int)): LayerRef = ## Provide a collapsed copy of layers up to a particular transaction level. @@ -192,24 +201,22 @@ func layersCc*(db: AristoDbRef; level = high(int)): LayerRef = # Set up initial layer (bottom layer) result = LayerRef( - delta: LayerDeltaRef( - sTab: layers[0].delta.sTab.dup, # explicit dup for ref values - kMap: layers[0].delta.kMap, - vTop: layers[^1].delta.vTop, - accLeaves: layers[0].delta.accLeaves, - stoLeaves: layers[0].delta.stoLeaves, - )) + sTab: layers[0].sTab.dup, # explicit dup for ref values + kMap: layers[0].kMap, + vTop: layers[^1].vTop, + accLeaves: layers[0].accLeaves, + stoLeaves: layers[0].stoLeaves) # Consecutively merge other layers on top for n in 1 ..< layers.len: - for (vid,vtx) in layers[n].delta.sTab.pairs: - result.delta.sTab[vid] = vtx - for (vid,key) in layers[n].delta.kMap.pairs: - result.delta.kMap[vid] = key - for (accPath,vtx) in layers[n].delta.accLeaves.pairs: - result.delta.accLeaves[accPath] = vtx - for (mixPath,vtx) in layers[n].delta.stoLeaves.pairs: - result.delta.stoLeaves[mixPath] = vtx + for (vid,vtx) in layers[n].sTab.pairs: + result.sTab[vid] = vtx + for (vid,key) in layers[n].kMap.pairs: + result.kMap[vid] = key + for (accPath,vtx) in layers[n].accLeaves.pairs: + result.accLeaves[accPath] = vtx + for (mixPath,vtx) in layers[n].stoLeaves.pairs: + result.stoLeaves[mixPath] = vtx # ------------------------------------------------------------------------------ # Public iterators @@ -226,12 +233,12 @@ iterator layersWalkVtx*( ## the one with a zero vertex which are othewise skipped by the iterator. ## The `seen` argument must not be modified while the iterator is active. ## - for (rvid,vtx) in db.top.delta.sTab.pairs: + for (rvid,vtx) in db.top.sTab.pairs: yield (rvid,vtx) seen.incl rvid.vid for w in db.rstack: - for (rvid,vtx) in w.delta.sTab.pairs: + for (rvid,vtx) in w.sTab.pairs: if rvid.vid notin seen: yield (rvid,vtx) seen.incl rvid.vid @@ -251,12 +258,12 @@ iterator layersWalkKey*( ## Walk over all `(VertexID,HashKey)` pairs on the cache layers. Note that ## entries are unsorted. var seen: HashSet[VertexID] - for (rvid,key) in db.top.delta.kMap.pairs: + for (rvid,key) in db.top.kMap.pairs: yield (rvid,key) seen.incl rvid.vid for w in db.rstack: - for (rvid,key) in w.delta.kMap.pairs: + for (rvid,key) in w.kMap.pairs: if rvid.vid notin seen: yield (rvid,key) seen.incl rvid.vid diff --git a/nimbus/db/aristo/aristo_tx.nim b/nimbus/db/aristo/aristo_tx.nim index 42006e56c..908ecd121 100644 --- a/nimbus/db/aristo/aristo_tx.nim +++ b/nimbus/db/aristo/aristo_tx.nim @@ -134,7 +134,7 @@ proc findTx*( if db.txRef.isNil: # Try `(vid,key)` on top layer - let topKey = db.top.delta.kMap.getOrVoid rvid + let topKey = db.top.kMap.getOrVoid rvid if topKey == key: return ok(0) @@ -143,11 +143,11 @@ proc findTx*( for (n,tx,layer,error) in db.txRef.txFrameWalk: if error != AristoError(0): return err(error) - if layer.delta.kMap.getOrVoid(rvid) == key: + if layer.kMap.getOrVoid(rvid) == key: return ok(n) # Try bottom layer - let botKey = db.stack[0].delta.kMap.getOrVoid rvid + let botKey = db.stack[0].kMap.getOrVoid rvid if botKey == key: return ok(db.stack.len) diff --git a/nimbus/db/aristo/aristo_tx/tx_fork.nim b/nimbus/db/aristo/aristo_tx/tx_fork.nim index f5dfcb8a2..83ebde7f5 100644 --- a/nimbus/db/aristo/aristo_tx/tx_fork.nim +++ b/nimbus/db/aristo/aristo_tx/tx_fork.nim @@ -64,8 +64,7 @@ proc txFork*( let stackLayer = block: let rc = db.getTuvBE() if rc.isOk: - LayerRef( - delta: LayerDeltaRef(vTop: rc.value)) + LayerRef(vTop: rc.value) elif rc.error == GetTuvNotFound: LayerRef.init() else: diff --git a/nimbus/db/aristo/aristo_tx/tx_frame.nim b/nimbus/db/aristo/aristo_tx/tx_frame.nim index 5dfe0c628..dbc9c67e4 100644 --- a/nimbus/db/aristo/aristo_tx/tx_frame.nim +++ b/nimbus/db/aristo/aristo_tx/tx_frame.nim @@ -14,7 +14,6 @@ {.push raises: [].} import - std/tables, results, ".."/[aristo_desc, aristo_layers] @@ -81,10 +80,10 @@ proc txFrameBegin*(db: AristoDbRef): Result[AristoTxRef,AristoError] = if db.txFrameLevel != db.stack.len: return err(TxStackGarbled) - let vTop = db.top.delta.vTop + let vTop = db.top.vTop db.stack.add db.top db.top = LayerRef( - delta: LayerDeltaRef(vTop: vTop), + vTop: vTop, txUid: db.getTxUid) db.txRef = AristoTxRef( @@ -123,18 +122,11 @@ proc txFrameCommit*( let db = ? tx.getDbDescFromTopTx() # Pop layer from stack and merge database top layer onto it - let merged = block: - if db.top.delta.sTab.len == 0 and - db.top.delta.kMap.len == 0: - # Avoid `layersMergeOnto()` - db.top.delta = db.stack[^1].delta - db.stack.setLen(db.stack.len-1) - db.top - else: - let layer = db.stack[^1] - db.stack.setLen(db.stack.len-1) - db.top.layersMergeOnto layer[] - layer + let merged = db.stack[^1] + db.stack.setLen(db.stack.len-1) + if not db.top.isEmpty(): + # Only call `layersMergeOnto()` if layer is empty + db.top.layersMergeOnto merged[] # Install `merged` stack top layer and update stack db.top = merged diff --git a/nimbus/db/aristo/aristo_tx/tx_stow.nim b/nimbus/db/aristo/aristo_tx/tx_stow.nim index 0c04cf94f..2c14fe386 100644 --- a/nimbus/db/aristo/aristo_tx/tx_stow.nim +++ b/nimbus/db/aristo/aristo_tx/tx_stow.nim @@ -14,10 +14,25 @@ {.push raises: [].} import - std/tables, results, ../aristo_delta/delta_merge, - ".."/[aristo_desc, aristo_delta] + ".."/[aristo_desc, aristo_delta, aristo_layers] + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +proc txStowOk*( + db: AristoDbRef; # Database + persistent: bool; # Stage only unless `true` + ): Result[void,AristoError] = + if not db.txRef.isNil: + return err(TxPendingTx) + if 0 < db.stack.len: + return err(TxStackGarbled) + if persistent and not db.deltaPersistentOk(): + return err(TxBackendNotWritable) + ok() # ------------------------------------------------------------------------------ # Public functions @@ -30,24 +45,18 @@ proc txStow*( ): Result[void,AristoError] = ## Worker for `stow()` and `persist()` variants. ## - if not db.txRef.isNil: - return err(TxPendingTx) - if 0 < db.stack.len: - return err(TxStackGarbled) - if persistent and not db.deltaPersistentOk(): - return err(TxBackendNotWritable) + ? db.txStowOk persistent - if db.top.delta.sTab.len != 0 or - db.top.delta.kMap.len != 0 or - db.top.delta.accLeaves.len != 0 or - db.top.delta.stoLeaves.len != 0: - - # Note that `deltaMerge()` will return the 1st argument if the 2nd is `nil` - db.balancer = db.deltaMerge(db.top.delta, db.balancer).valueOr: - return err(error[1]) + if not db.top.isEmpty(): + # Note that `deltaMerge()` will return the `db.top` argument if the + # `db.balancer` is `nil`. Also, the `db.balancer` is read-only. In the + # case that there are no forked peers one can ignore that restriction as + # no balancer is shared. + db.balancer = deltaMerge( + db.top, modUpperOk = true, db.balancer, modLowerOk = db.nForked()==0) # New empty top layer - db.top = LayerRef(delta: LayerDeltaRef(vTop: db.balancer.vTop)) + db.top = LayerRef(vTop: db.balancer.vTop) if persistent: # Merge/move `balancer` into persistent tables (unless missing) diff --git a/nimbus/db/aristo/aristo_vid.nim b/nimbus/db/aristo/aristo_vid.nim index ced296746..ec4604260 100644 --- a/nimbus/db/aristo/aristo_vid.nim +++ b/nimbus/db/aristo/aristo_vid.nim @@ -24,17 +24,16 @@ import proc vidFetch*(db: AristoDbRef): VertexID = ## Fetch next vertex ID. ## - if db.top.delta.vTop == 0: - db.top.delta.vTop = VertexID(LEAST_FREE_VID) + if db.top.vTop == 0: + db.top.vTop = VertexID(LEAST_FREE_VID) else: - db.top.delta.vTop.inc - db.top.delta.vTop + db.top.vTop.inc + db.top.vTop proc vidDispose*(db: AristoDbRef; vid: VertexID) = ## Only top vertexIDs are disposed - if vid == db.top.delta.vTop and - LEAST_FREE_VID < db.top.delta.vTop.distinctBase: - db.top.delta.vTop.dec + if vid == db.top.vTop and LEAST_FREE_VID < db.top.vTop.distinctBase: + db.top.vTop.dec # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/aristo/aristo_walk/walk_private.nim b/nimbus/db/aristo/aristo_walk/walk_private.nim index d07ebb090..c065d661d 100644 --- a/nimbus/db/aristo/aristo_walk/walk_private.nim +++ b/nimbus/db/aristo/aristo_walk/walk_private.nim @@ -23,12 +23,12 @@ iterator walkVtxBeImpl*[T]( ): tuple[rvid: RootedVertexID, vtx: VertexRef] = ## Generic iterator when T is VoidBackendRef: - let filter = if db.balancer.isNil: LayerDeltaRef() else: db.balancer + let filter = if db.balancer.isNil: LayerRef() else: db.balancer else: mixin walkVtx - let filter = LayerDeltaRef() + let filter = LayerRef() if not db.balancer.isNil: filter.sTab = db.balancer.sTab # copy table @@ -52,12 +52,12 @@ iterator walkKeyBeImpl*[T]( ): tuple[rvid: RootedVertexID, key: HashKey] = ## Generic iterator when T is VoidBackendRef: - let filter = if db.balancer.isNil: LayerDeltaRef() else: db.balancer + let filter = if db.balancer.isNil: LayerRef() else: db.balancer else: mixin walkKey - let filter = LayerDeltaRef() + let filter = LayerRef() if not db.balancer.isNil: filter.kMap = db.balancer.kMap # copy table diff --git a/nimbus/db/kvt/kvt_debug.nim b/nimbus/db/kvt/kvt_debug.nim index 525aa6139..592495a5a 100644 --- a/nimbus/db/kvt/kvt_debug.nim +++ b/nimbus/db/kvt/kvt_debug.nim @@ -1,5 +1,5 @@ # nimbus-eth1 -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or # http://www.apache.org/licenses/LICENSE-2.0) @@ -134,11 +134,11 @@ proc ppBe[T](be: T; db: KvtDbRef; indent: int): string = proc ppLayer(layer: LayerRef; db: KvtDbRef; indent = 4): string = let - tLen = layer.delta.sTab.len + tLen = layer.sTab.len info = "tab(" & $tLen & ")" pfx1 = indent.toPfx(1) pfx2 = if 0 < tLen: indent.toPfx(2) else: " " - "" & pfx1 & info & pfx2 & layer.delta.sTab.ppTab(db,indent+2) + "" & pfx1 & info & pfx2 & layer.sTab.ppTab(db,indent+2) # ------------------------------------------------------------------------------ # Public functions @@ -152,7 +152,7 @@ proc pp*( case be.kind: of BackendMemory: result &= be.MemBackendRef.ppBe(db, indent) - of BackendRocksDB: + of BackendRocksDB,BackendRdbTriggered: result &= be.RdbBackendRef.ppBe(db, indent) of BackendVoid: result &= "" diff --git a/nimbus/db/kvt/kvt_delta.nim b/nimbus/db/kvt/kvt_delta.nim index 2ee0feb8f..e44a90e69 100644 --- a/nimbus/db/kvt/kvt_delta.nim +++ b/nimbus/db/kvt/kvt_delta.nim @@ -23,23 +23,12 @@ import # Public functions # ------------------------------------------------------------------------------ -proc deltaMerge*( - db: KvtDbRef; # Database - delta: LayerDeltaRef; # Filter to apply to database - ) = - ## Merge the argument `delta` into the balancer filter layer. Note that - ## this function has no control of the filter source. Having merged the - ## argument `delta`, all the `top` and `stack` layers should be cleared. - ## - db.merge(delta, db.balancer) - - -proc deltaUpdateOk*(db: KvtDbRef): bool = +proc deltaPersistentOk*(db: KvtDbRef): bool = ## Check whether the balancer filter can be merged into the backend not db.backend.isNil and db.isCentre -proc deltaUpdate*( +proc deltaPersistent*( db: KvtDbRef; # Database reCentreOk = false; ): Result[void,KvtError] = @@ -71,17 +60,30 @@ proc deltaUpdate*( # Always re-centre to `parent` (in case `reCentreOk` was set) defer: discard parent.reCentre() + # Update forked balancers here do that errors are detected early (if any.) + if 0 < db.nForked: + let rev = db.revFilter(db.balancer).valueOr: + return err(error[1]) + if 0 < rev.sTab.len: # Can an empty `rev` happen at all? + var unsharedRevOk = true + for w in db.forked: + if not w.db.balancer.isValid: + unsharedRevOk = false + # The `rev` filter can be modified if one can make sure that it is + # not shared (i.e. only previously merged into the w.db.balancer.) + # Note that it is trivially true for a single fork. + let modLowerOk = w.isLast and unsharedRevOk + w.db.balancer = deltaMerge( + w.db.balancer, modUpperOk=false, rev, modLowerOk=modLowerOk) + # Store structural single trie entries let writeBatch = ? be.putBegFn() be.putKvpFn(writeBatch, db.balancer.sTab.pairs.toSeq) ? be.putEndFn writeBatch - # Update peer filter balance. - let rev = db.deltaReverse db.balancer - for w in db.forked: - db.merge(rev, w.balancer) + # Done with balancer, all saved to backend + db.balancer = LayerRef(nil) - db.balancer = LayerDeltaRef(nil) ok() # ------------------------------------------------------------------------------ diff --git a/nimbus/db/kvt/kvt_delta/delta_merge.nim b/nimbus/db/kvt/kvt_delta/delta_merge.nim index 277316034..13ba42b31 100644 --- a/nimbus/db/kvt/kvt_delta/delta_merge.nim +++ b/nimbus/db/kvt/kvt_delta/delta_merge.nim @@ -10,37 +10,61 @@ import std/tables, - results, - ".."/[kvt_desc, kvt_utils] + eth/common, + ../kvt_desc + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +proc layersMergeOnto(src: LayerRef; trg: var LayerObj) = + for (key,val) in src.sTab.pairs: + trg.sTab[key] = val # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ -proc merge*( - db: KvtDbRef; # Database - upper: LayerDeltaRef; # Filter to apply onto `lower` - lower: var LayerDeltaRef; # Target filter, will be modified - ) = - ## Merge the argument filter `upper` onto the argument filter `lower` - ## relative to the *unfiltered* backend database on `db.backened`. The `lower` - ## filter argument will have been modified. +proc deltaMerge*( + upper: LayerRef; # Think of `top`, `nil` is ok + modUpperOk: bool; # May re-use/modify `upper` + lower: LayerRef; # Think of `balancer`, `nil` is ok + modLowerOk: bool; # May re-use/modify `lower` + ): LayerRef = + ## Merge argument `upper` into the `lower` filter instance. ## - ## In case that the argument `lower` is `nil`, it will be replaced by `upper`. + ## Note that the namimg `upper` and `lower` indicate that the filters are + ## stacked and the database access is `upper -> lower -> backend`. ## if lower.isNil: - lower = upper - elif not upper.isNil: - for (key,val) in upper.sTab.pairs: - if val.isValid or not lower.sTab.hasKey key: - lower.sTab[key] = val - elif lower.sTab.getOrVoid(key).isValid: - let rc = db.getUbe key - if rc.isOk: - lower.sTab[key] = val # empty blob - else: - doAssert rc.error == GetNotFound - lower.sTab.del key # no need to keep that in merged filter + # Degenerate case: `upper` is void + result = upper + + elif upper.isNil: + # Degenerate case: `lower` is void + result = lower + + elif modLowerOk: + # Can modify `lower` which is the prefered action mode but applies only + # in cases where the `lower` argument is not shared. + layersMergeOnto(upper, lower[]) + result = lower + + elif not modUpperOk: + # Cannot modify any argument layers. + result = LayerRef(sTab: lower.sTab) + layersMergeOnto(upper, result[]) + + else: + # Otherwise avoid copying some tables by modifyinh `upper`. This is not + # completely free as the merge direction changes to merging the `lower` + # layer up into the higher prioritised `upper` layer (note that the `lower` + # argument filter is read-only.) Here again, the `upper` argument must not + # be a shared layer/filter. + for (key,val) in lower.sTab.pairs: + if not upper.sTab.hasKey(key): + upper.sTab[key] = val + result = upper # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/kvt/kvt_delta/delta_reverse.nim b/nimbus/db/kvt/kvt_delta/delta_reverse.nim index e3aca6ec6..3be589139 100644 --- a/nimbus/db/kvt/kvt_delta/delta_reverse.nim +++ b/nimbus/db/kvt/kvt_delta/delta_reverse.nim @@ -10,6 +10,7 @@ import std/tables, + eth/common, results, ".."/[kvt_desc, kvt_utils] @@ -17,27 +18,29 @@ import # Public functions # ------------------------------------------------------------------------------ -proc deltaReverse*( +proc revFilter*( db: KvtDbRef; # Database - delta: LayerDeltaRef; # Filter to revert - ): LayerDeltaRef = - ## Assemble a reverse filter for the `delta` argument, i.e. changes to the - ## backend that reverse the effect of applying this to the balancer filter. - ## The resulting filter is calculated against the current *unfiltered* - ## backend (excluding optionally installed balancer filters.) + filter: LayerRef; # Filter to revert + ): Result[LayerRef,(Blob,KvtError)] = + ## Assemble reverse filter for the `filter` argument, i.e. changes to the + ## backend that reverse the effect of applying the this read-only filter. ## - ## If `delta` is `nil`, the result will be `nil` as well. - if not delta.isNil: - result = LayerDeltaRef() + ## This read-only filter is calculated against the current unfiltered + ## backend (excluding optionally installed read-only filter.) + ## + let rev = LayerRef() - # Calculate reverse changes for the `sTab[]` structural table - for key in delta.sTab.keys: - let rc = db.getUbe key - if rc.isOk: - result.sTab[key] = rc.value - else: - doAssert rc.error == GetNotFound - result.sTab[key] = EmptyBlob + # Calculate reverse changes for the `sTab[]` structural table + for key in filter.sTab.keys: + let rc = db.getUbe key + if rc.isOk: + rev.sTab[key] = rc.value + elif rc.error == GetNotFound: + rev.sTab[key] = EmptyBlob + else: + return err((key,rc.error)) + + ok(rev) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/kvt/kvt_desc.nim b/nimbus/db/kvt/kvt_desc.nim index b65cb545d..6422029b6 100644 --- a/nimbus/db/kvt/kvt_desc.nim +++ b/nimbus/db/kvt/kvt_desc.nim @@ -46,7 +46,7 @@ type ## Three tier database object supporting distributed instances. top*: LayerRef ## Database working layer, mutable stack*: seq[LayerRef] ## Stashed immutable parent layers - balancer*: LayerDeltaRef ## Apply read filter (locks writing) + balancer*: LayerRef ## Balance out concurrent backend access backend*: BackendRef ## Backend database (may well be `nil`) txRef*: KvtTxRef ## Latest active transaction @@ -82,6 +82,9 @@ func getOrVoid*(tab: Table[Blob,Blob]; w: Blob): Blob = func isValid*(key: Blob): bool = key != EmptyBlob +func isValid*(layer: LayerRef): bool = + layer != LayerRef(nil) + # ------------------------------------------------------------------------------ # Public functions, miscellaneous # ------------------------------------------------------------------------------ @@ -162,13 +165,18 @@ proc fork*( ok clone -iterator forked*(db: KvtDbRef): KvtDbRef = +iterator forked*(db: KvtDbRef): tuple[db: KvtDbRef, isLast: bool] = ## Interate over all non centre descriptors (see comments on `reCentre()` ## for details.) + ## + ## The second `isLast` yielded loop entry is `true` if the yielded tuple + ## is the last entry in the list. if not db.dudes.isNil: + var nLeft = db.dudes.peers.len for dude in db.dudes.peers.items: if dude != db.dudes.centre: - yield dude + nLeft.dec + yield (dude, nLeft == 1) func nForked*(db: KvtDbRef): int = ## Returns the number of non centre descriptors (see comments on `reCentre()` diff --git a/nimbus/db/kvt/kvt_desc/desc_structural.nim b/nimbus/db/kvt/kvt_desc/desc_structural.nim index 81de518af..f3d919e46 100644 --- a/nimbus/db/kvt/kvt_desc/desc_structural.nim +++ b/nimbus/db/kvt/kvt_desc/desc_structural.nim @@ -18,14 +18,11 @@ import eth/common type - LayerDeltaRef* = ref object - ## Delta tables relative to previous layer - sTab*: Table[Blob,Blob] ## Structural data table - - LayerRef* = ref object + LayerRef* = ref LayerObj + LayerObj* = object ## Kvt database layer structures. Any layer holds the full ## change relative to the backend. - delta*: LayerDeltaRef ## Structural tables held as deltas + sTab*: Table[Blob,Blob] ## Structural data table txUid*: uint ## Transaction identifier if positive # ------------------------------------------------------------------------------ @@ -34,11 +31,11 @@ type func init*(T: type LayerRef): T = ## Constructor, returns empty layer - T(delta: LayerDeltaRef()) + T() -func dup*(ly: LayerDeltaRef): LayerDeltaRef = +func dup*(ly: LayerRef): LayerRef = ## Duplicate/copy - LayerDeltaRef(sTab: ly.sTab) + LayerRef(sTab: ly.sTab) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/kvt/kvt_layers.nim b/nimbus/db/kvt/kvt_layers.nim index 220dbabf9..0b5b90347 100644 --- a/nimbus/db/kvt/kvt_layers.nim +++ b/nimbus/db/kvt/kvt_layers.nim @@ -25,7 +25,7 @@ func nLayersKeys*(db: KvtDbRef): int = ## bound for the number of effective key/value mappings held on the cache ## layers as there might be duplicate entries for the same key on different ## layers. - db.stack.mapIt(it.delta.sTab.len).foldl(a + b, db.top.delta.sTab.len) + db.stack.mapIt(it.sTab.len).foldl(a + b, db.top.sTab.len) # ------------------------------------------------------------------------------ # Public functions: get function @@ -37,11 +37,11 @@ func layersLen*(db: KvtDbRef; key: openArray[byte]|seq[byte]): Opt[int] = when key isnot seq[byte]: let key = @key - db.top.delta.sTab.withValue(key, item): + db.top.sTab.withValue(key, item): return Opt.some(item[].len()) for w in db.rstack: - w.delta.sTab.withValue(key, item): + w.sTab.withValue(key, item): return Opt.some(item[].len()) Opt.none(int) @@ -58,11 +58,11 @@ func layersGet*(db: KvtDbRef; key: openArray[byte]|seq[byte]): Opt[Blob] = when key isnot seq[byte]: let key = @key - db.top.delta.sTab.withValue(key, item): + db.top.sTab.withValue(key, item): return Opt.some(item[]) for w in db.rstack: - w.delta.sTab.withValue(key, item): + w.sTab.withValue(key, item): return Opt.some(item[]) Opt.none(Blob) @@ -73,7 +73,7 @@ func layersGet*(db: KvtDbRef; key: openArray[byte]|seq[byte]): Opt[Blob] = func layersPut*(db: KvtDbRef; key: openArray[byte]; data: openArray[byte]) = ## Store a (potentally empty) value on the top layer - db.top.delta.sTab[@key] = @data + db.top.sTab[@key] = @data # ------------------------------------------------------------------------------ # Public functions @@ -87,12 +87,12 @@ func layersCc*(db: KvtDbRef; level = high(int)): LayerRef = else: db.stack[0 .. level] # Set up initial layer (bottom layer) - result = LayerRef(delta: LayerDeltaRef(sTab: layers[0].delta.sTab)) + result = LayerRef(sTab: layers[0].sTab) # Consecutively merge other layers on top for n in 1 ..< layers.len: - for (key,val) in layers[n].delta.sTab.pairs: - result.delta.sTab[key] = val + for (key,val) in layers[n].sTab.pairs: + result.sTab[key] = val # ------------------------------------------------------------------------------ # Public iterators @@ -109,12 +109,12 @@ iterator layersWalk*( ## the one with a zero vertex which are othewise skipped by the iterator. ## The `seen` argument must not be modified while the iterator is active. ## - for (key,val) in db.top.delta.sTab.pairs: + for (key,val) in db.top.sTab.pairs: yield (key,val) seen.incl key for w in db.rstack: - for (key,val) in w.delta.sTab.pairs: + for (key,val) in w.sTab.pairs: if key notin seen: yield (key,val) seen.incl key diff --git a/nimbus/db/kvt/kvt_tx/tx_frame.nim b/nimbus/db/kvt/kvt_tx/tx_frame.nim index 18a8ccd39..0903e5206 100644 --- a/nimbus/db/kvt/kvt_tx/tx_frame.nim +++ b/nimbus/db/kvt/kvt_tx/tx_frame.nim @@ -83,7 +83,6 @@ proc txFrameBegin*(db: KvtDbRef): Result[KvtTxRef,KvtError] = db.stack.add db.top db.top = LayerRef( - delta: LayerDeltaRef(), txUid: db.getTxUid) db.txRef = KvtTxRef( db: db, @@ -122,8 +121,8 @@ proc txFrameCommit*( # Replace the top two layers by its merged version let merged = db.stack[^1] - for (key,val) in db.top.delta.sTab.pairs: - merged.delta.sTab[key] = val + for (key,val) in db.top.sTab.pairs: + merged.sTab[key] = val # Install `merged` layer db.top = merged diff --git a/nimbus/db/kvt/kvt_tx/tx_stow.nim b/nimbus/db/kvt/kvt_tx/tx_stow.nim index c156efbbf..1e2609690 100644 --- a/nimbus/db/kvt/kvt_tx/tx_stow.nim +++ b/nimbus/db/kvt/kvt_tx/tx_stow.nim @@ -16,6 +16,7 @@ import std/tables, results, + ../kvt_delta/delta_merge, ".."/[kvt_desc, kvt_delta] # ------------------------------------------------------------------------------ @@ -31,10 +32,8 @@ proc txStowOk*( return err(TxPendingTx) if 0 < db.stack.len: return err(TxStackGarbled) - - if persistent and not db.deltaUpdateOk(): + if persistent and not db.deltaPersistentOk(): return err(TxBackendNotWritable) - ok() proc txStow*( @@ -49,13 +48,20 @@ proc txStow*( ## ? db.txStowOk persistent - if 0 < db.top.delta.sTab.len: - db.deltaMerge db.top.delta - db.top.delta = LayerDeltaRef() + if 0 < db.top.sTab.len: + # Note that `deltaMerge()` will return the `db.top` argument if the + # `db.balancer` is `nil`. Also, the `db.balancer` is read-only. In the + # case that there are no forked peers one can ignore that restriction as + # no balancer is shared. + db.balancer = deltaMerge( + db.top, modUpperOk = true, db.balancer, modLowerOk = db.nForked()==0) + + # New empty top layer + db.top = LayerRef() if persistent: # Move `balancer` data into persistent tables - ? db.deltaUpdate() + ? db.deltaPersistent() ok() diff --git a/tests/test_aristo/test_balancer.nim b/tests/test_aristo/test_balancer.nim index 4c0e363e4..4ebfe3dc5 100644 --- a/tests/test_aristo/test_balancer.nim +++ b/tests/test_aristo/test_balancer.nim @@ -149,7 +149,7 @@ proc cleanUp(dx: var DbTriplet) = # ---------------------- -proc isDbEq(a, b: LayerDeltaRef; db: AristoDbRef; noisy = true): bool = +proc isDbEq(a, b: LayerRef; db: AristoDbRef; noisy = true): bool = ## Verify that argument filter `a` has the same effect on the ## physical/unfiltered backend of `db` as argument filter `b`. if a.isNil: @@ -255,8 +255,8 @@ proc testBalancer*( # Resulting clause (11) filters from `aristo/README.md` example # which will be used in the second part of the tests var - c11Filter1 = LayerDeltaRef(nil) - c11Filter3 = LayerDeltaRef(nil) + c11Filter1 = LayerRef(nil) + c11Filter3 = LayerRef(nil) # Work through clauses (8)..(11) from `aristo/README.md` example block: @@ -278,14 +278,14 @@ proc testBalancer*( block: let rc = db1.persist() xCheckRc rc.error == 0 - xCheck db1.balancer == LayerDeltaRef(nil) + xCheck db1.balancer == LayerRef(nil) xCheck db2.balancer == db3.balancer block: let rc = db2.stow() # non-persistent xCheckRc rc.error == 0: noisy.say "*** testDistributedAccess (3)", "n=", n, "db2".dump db2 - xCheck db1.balancer == LayerDeltaRef(nil) + xCheck db1.balancer == LayerRef(nil) xCheck db2.balancer != db3.balancer # Clause (11) from `aristo/README.md` example @@ -293,7 +293,7 @@ proc testBalancer*( block: let rc = db2.persist() xCheckRc rc.error == 0 - xCheck db2.balancer == LayerDeltaRef(nil) + xCheck db2.balancer == LayerRef(nil) # Check/verify backends block: @@ -326,7 +326,7 @@ proc testBalancer*( block: let rc = db2.persist() xCheckRc rc.error == 0 - xCheck db2.balancer == LayerDeltaRef(nil) + xCheck db2.balancer == LayerRef(nil) xCheck db1.balancer == db3.balancer # Clause (13) from `aristo/README.md` example