diff --git a/nimbus/db/aristo/aristo_debug.nim b/nimbus/db/aristo/aristo_debug.nim index 799f7ed6b..6255d128e 100644 --- a/nimbus/db/aristo/aristo_debug.nim +++ b/nimbus/db/aristo/aristo_debug.nim @@ -291,8 +291,8 @@ proc ppXMap*( result.setLen(result.len - 1) result &= "}" -proc ppFilter( - fl: LayerDeltaRef; +proc ppBalancer( + fl: LayerRef; db: AristoDbRef; indent: int; ): string = @@ -398,18 +398,18 @@ proc ppLayer( if 2 < nOKs: result &= "".doPrefix(false) if vTopOk: - result &= "".doPrefix(true) & "vTop=" & layer.delta.vTop.ppVid + result &= "".doPrefix(true) & "vTop=" & layer.vTop.ppVid if sTabOk: let - tLen = layer.delta.sTab.len + tLen = layer.sTab.len info = "sTab(" & $tLen & ")" - result &= info.doPrefix(0 < tLen) & layer.delta.sTab.ppSTab(db,indent+2) + result &= info.doPrefix(0 < tLen) & layer.sTab.ppSTab(db,indent+2) if kMapOk: let - tLen = layer.delta.kMap.len + tLen = layer.kMap.len info = "kMap(" & $tLen & ")" result &= info.doPrefix(0 < tLen) - result &= db.ppXMap(layer.delta.kMap, indent+2) + result &= db.ppXMap(layer.kMap, indent+2) # ------------------------------------------------------------------------------ # Public functions @@ -528,53 +528,17 @@ proc pp*( layer: LayerRef; db: AristoDbRef; indent = 4; + balancerOk = false; + sTabOk = true, + kMapOk = true, + other = true, ): string = - layer.ppLayer( - db, vTopOk=true, sTabOk=true, kMapOk=true) - -proc pp*( - layer: LayerRef; - db: AristoDbRef; - xTabOk: bool; - indent = 4; - ): string = - layer.ppLayer( - db, vTopOk=true, sTabOk=xTabOk, kMapOk=true) - -proc pp*( - layer: LayerRef; - db: AristoDbRef; - xTabOk: bool; - kMapOk: bool; - other = false; - indent = 4; - ): string = - layer.ppLayer( - db, vTopOk=other, sTabOk=xTabOk, kMapOk=kMapOk) - - -proc pp*( - db: AristoDbRef; - xTabOk: bool; - indent = 4; - ): string = - db.layersCc.pp(db, xTabOk=xTabOk, indent=indent) - -proc pp*( - db: AristoDbRef; - xTabOk: bool; - kMapOk: bool; - other = false; - indent = 4; - ): string = - db.layersCc.pp(db, xTabOk=xTabOk, kMapOk=kMapOk, other=other, indent=indent) - -proc pp*( - filter: LayerDeltaRef; - db = AristoDbRef(nil); - indent = 4; - ): string = - filter.ppFilter(db.orDefault(), indent) + if balancerOk: + layer.ppLayer( + db.orDefault(), vTopOk=other, sTabOk=sTabOk, kMapOk=kMapOk) + else: + layer.ppLayer( + db.orDefault(), vTopOk=other, sTabOk=sTabOk, kMapOk=kMapOk) proc pp*( be: BackendRef; @@ -582,7 +546,7 @@ proc pp*( limit = 100; indent = 4; ): string = - result = db.balancer.ppFilter(db, indent+1) & indent.toPfx + result = db.balancer.ppBalancer(db, indent+1) & indent.toPfx case be.kind: of BackendMemory: result &= be.MemBackendRef.ppBe(db, limit, indent+1) @@ -599,11 +563,12 @@ proc pp*( topOk = true; stackOk = true; kMapOk = true; + sTabOk = true; limit = 100; ): string = if topOk: result = db.layersCc.pp( - db, xTabOk=true, kMapOk=kMapOk, other=true, indent=indent) + db, sTabOk=sTabOk, kMapOk=kMapOk, other=true, indent=indent) let stackOnlyOk = stackOk and not (topOk or balancerOk or backendOk) if not stackOnlyOk: result &= indent.toPfx & "level=" & $db.stack.len @@ -614,15 +579,15 @@ proc pp*( let m = layers.len - n - 1 l = db.layersCc m - a = w.delta.kMap.values.toSeq.filterIt(not it.isValid).len - c = l.delta.kMap.values.toSeq.filterIt(not it.isValid).len - result &= "(" & $(w.delta.kMap.len - a) & "," & $a & ")" - lStr &= " " & $m & "=(" & $(l.delta.kMap.len - c) & "," & $c & ")" + a = w.kMap.values.toSeq.filterIt(not it.isValid).len + c = l.kMap.values.toSeq.filterIt(not it.isValid).len + result &= "(" & $(w.kMap.len - a) & "," & $a & ")" + lStr &= " " & $m & "=(" & $(l.kMap.len - c) & "," & $c & ")" result &= " =>" & lStr if backendOk: result &= indent.toPfx & db.backend.pp(db, limit=limit, indent) elif balancerOk: - result &= indent.toPfx & db.balancer.ppFilter(db, indent+1) + result &= indent.toPfx & db.balancer.ppBalancer(db, indent+1) proc pp*(sdb: MerkleSignRef; indent = 4): string = result = "" & diff --git a/nimbus/db/aristo/aristo_delta.nim b/nimbus/db/aristo/aristo_delta.nim index b10cc6a5e..571425c36 100644 --- a/nimbus/db/aristo/aristo_delta.nim +++ b/nimbus/db/aristo/aristo_delta.nim @@ -16,9 +16,9 @@ import std/tables, eth/common, results, - ./aristo_desc, + "."/[aristo_desc, aristo_layers], ./aristo_desc/desc_backend, - ./aristo_delta/delta_siblings + ./aristo_delta/[delta_merge, delta_reverse] # ------------------------------------------------------------------------------ # Public functions, save to backend @@ -34,17 +34,16 @@ proc deltaPersistent*( nxtFid = 0u64; # Next filter ID (if any) reCentreOk = false; ): Result[void,AristoError] = - ## Resolve (i.e. move) the backend filter into the physical backend database. + ## Resolve (i.e. move) the balancer into the physical backend database. ## - ## This needs write permission on the backend DB for the argument `db` - ## descriptor (see the function `aristo_desc.isCentre()`.) With the argument - ## flag `reCentreOk` passed `true`, write permission will be temporarily + ## This needs write permission on the backend DB for the descriptor argument + ## `db` (see the function `aristo_desc.isCentre()`.) If the argument flag + ## `reCentreOk` is passed `true`, write permission will be temporarily ## acquired when needed. ## - ## When merging the current backend filter, its reverse will be is stored as - ## back log on the filter fifos (so the current state can be retrieved.) - ## Also, other non-centre descriptors are updated so there is no visible - ## database change for these descriptors. + ## When merging the current backend filter, its reverse will be is stored + ## on other non-centre descriptors so there is no visible database change + ## for these. ## let be = db.backend if be.isNil: @@ -64,9 +63,21 @@ proc deltaPersistent*( # Always re-centre to `parent` (in case `reCentreOk` was set) defer: discard parent.reCentre() - # Initialise peer filter balancer. - let updateSiblings = ? UpdateSiblingsRef.init db - defer: updateSiblings.rollback() + # Update forked balancers here do that errors are detected early (if any.) + if 0 < db.nForked: + let rev = db.revFilter(db.balancer).valueOr: + return err(error[1]) + if not rev.isEmpty: # Can an empty `rev` happen at all? + var unsharedRevOk = true + for w in db.forked: + if not w.db.balancer.isValid: + unsharedRevOk = false + # The `rev` filter can be modified if one can make sure that it is + # not shared (i.e. only previously merged into the w.db.balancer.) + # Note that it is trivially true for a single fork. + let modLowerOk = w.isLast and unsharedRevOk + w.db.balancer = deltaMerge( + w.db.balancer, modUpperOk=false, rev, modLowerOk=modLowerOk) let lSst = SavedState( key: EMPTY_ROOT_HASH, # placeholder for more @@ -93,8 +104,9 @@ proc deltaPersistent*( if not db.stoLeaves.lruUpdate(mixKey, vtx): discard db.stoLeaves.lruAppend(mixKey, vtx, accLruSize) - # Update dudes and this descriptor - ? updateSiblings.update().commit() + # Done with balancer, all saved to backend + db.balancer = LayerRef(nil) + ok() # ------------------------------------------------------------------------------ diff --git a/nimbus/db/aristo/aristo_delta/delta_merge.nim b/nimbus/db/aristo/aristo_delta/delta_merge.nim index b50732f3f..b491bf283 100644 --- a/nimbus/db/aristo/aristo_delta/delta_merge.nim +++ b/nimbus/db/aristo/aristo_delta/delta_merge.nim @@ -11,72 +11,70 @@ import std/tables, eth/common, - results, - ".."/[aristo_desc, aristo_get] + ".."/[aristo_desc, aristo_layers] # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ proc deltaMerge*( - db: AristoDbRef; - upper: LayerDeltaRef; # new filter, `nil` is ok - lower: LayerDeltaRef; # Trg filter, `nil` is ok - ): Result[LayerDeltaRef,(VertexID,AristoError)] = + upper: LayerRef; # Think of `top`, `nil` is ok + modUpperOk: bool; # May re-use/modify `upper` + lower: LayerRef; # Think of `balancer`, `nil` is ok + modLowerOk: bool; # May re-use/modify `lower` + ): LayerRef = ## Merge argument `upper` into the `lower` filter instance. ## ## Note that the namimg `upper` and `lower` indicate that the filters are - ## stacked and the database access is `upper -> lower -> backend` whereas - ## the `src/trg` matching logic goes the other way round. + ## stacked and the database access is `upper -> lower -> backend`. ## - # Degenerate case: `upper` is void if lower.isNil: - if upper.isNil: - # Even more degenerate case when both filters are void - return ok LayerDeltaRef(nil) - return ok(upper) + # Degenerate case: `upper` is void + result = upper - # Degenerate case: `upper` is non-trivial and `lower` is void - if upper.isNil: - return ok(lower) + elif upper.isNil: + # Degenerate case: `lower` is void + result = lower - # There is no need to deep copy table vertices as they will not be modified. - let newFilter = LayerDeltaRef( - sTab: lower.sTab, - kMap: lower.kMap, - vTop: upper.vTop) + elif modLowerOk: + # Can modify `lower` which is the prefered action mode but applies only + # in cases where the `lower` argument is not shared. + lower.vTop = upper.vTop + layersMergeOnto(upper, lower[]) + result = lower - for (rvid,vtx) in upper.sTab.pairs: - if vtx.isValid or not newFilter.sTab.hasKey rvid: - newFilter.sTab[rvid] = vtx - elif newFilter.sTab.getOrVoid(rvid).isValid: - let rc = db.getVtxUbe rvid - if rc.isOk: - newFilter.sTab[rvid] = vtx # VertexRef(nil) - elif rc.error == GetVtxNotFound: - newFilter.sTab.del rvid - else: - return err((rvid.vid,rc.error)) + elif not modUpperOk: + # Cannot modify any argument layers. + result = LayerRef( + sTab: lower.sTab, # shallow copy (entries will not be modified) + kMap: lower.kMap, + accLeaves: lower.accLeaves, + stoLeaves: lower.stoLeaves, + vTop: upper.vTop) + layersMergeOnto(upper, result[]) - for (rvid,key) in upper.kMap.pairs: - if key.isValid or not newFilter.kMap.hasKey rvid: - newFilter.kMap[rvid] = key - elif newFilter.kMap.getOrVoid(rvid).isValid: - let rc = db.getKeyUbe rvid - if rc.isOk: - newFilter.kMap[rvid] = key - elif rc.error == GetKeyNotFound: - newFilter.kMap.del rvid - else: - return err((rvid.vid,rc.error)) + else: + # Otherwise avoid copying some tables by modifyinh `upper`. This is not + # completely free as the merge direction changes to merging the `lower` + # layer up into the higher prioritised `upper` layer (note that the `lower` + # argument filter is read-only.) Here again, the `upper` argument must not + # be a shared layer/filter. + for (rvid,vtx) in lower.sTab.pairs: + if not upper.sTab.hasKey(rvid): + upper.sTab[rvid] = vtx - for (accPath,leafVtx) in upper.accLeaves.pairs: - newFilter.accLeaves[accPath] = leafVtx + for (rvid,key) in lower.kMap.pairs: + if not upper.kMap.hasKey(rvid): + upper.kMap[rvid] = key - for (mixPath,leafVtx) in upper.stoLeaves.pairs: - newFilter.stoLeaves[mixPath] = leafVtx + for (accPath,leafVtx) in lower.accLeaves.pairs: + if not upper.accLeaves.hasKey(accPath): + upper.accLeaves[accPath] = leafVtx - ok newFilter + for (mixPath,leafVtx) in lower.stoLeaves.pairs: + if not upper.stoLeaves.hasKey(mixPath): + upper.stoLeaves[mixPath] = leafVtx + result = upper # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/aristo/aristo_delta/delta_reverse.nim b/nimbus/db/aristo/aristo_delta/delta_reverse.nim index 5c3f84fe4..2b9609759 100644 --- a/nimbus/db/aristo/aristo_delta/delta_reverse.nim +++ b/nimbus/db/aristo/aristo_delta/delta_reverse.nim @@ -20,16 +20,15 @@ import proc revFilter*( db: AristoDbRef; # Database - filter: LayerDeltaRef; # Filter to revert - ): Result[LayerDeltaRef,(VertexID,AristoError)] = + filter: LayerRef; # Filter to revert + ): Result[LayerRef,(VertexID,AristoError)] = ## Assemble reverse filter for the `filter` argument, i.e. changes to the ## backend that reverse the effect of applying the this read-only filter. ## ## This read-only filter is calculated against the current unfiltered ## backend (excluding optionally installed read-only filter.) ## - # Register MPT state roots for reverting back - let rev = LayerDeltaRef() + let rev = LayerRef() # Get vid generator state on backend block: diff --git a/nimbus/db/aristo/aristo_delta/delta_siblings.nim b/nimbus/db/aristo/aristo_delta/delta_siblings.nim deleted file mode 100644 index 9cd7ac6fc..000000000 --- a/nimbus/db/aristo/aristo_delta/delta_siblings.nim +++ /dev/null @@ -1,120 +0,0 @@ -# nimbus-eth1 -# Copyright (c) 2023-2024 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or -# http://www.apache.org/licenses/LICENSE-2.0) -# * MIT license ([LICENSE-MIT](LICENSE-MIT) or -# http://opensource.org/licenses/MIT) -# at your option. This file may not be copied, modified, or distributed -# except according to those terms. - -import - results, - eth/common, - ../aristo_desc, - "."/[delta_merge, delta_reverse] - -type - UpdateState = enum - Initial = 0 - Updated, - Finished - - UpdateSiblingsRef* = ref object - ## Update transactional context - state: UpdateState - db: AristoDbRef ## Main database access - balancers: seq[(AristoDbRef,LayerDeltaRef)] ## Rollback data - rev: LayerDeltaRef ## Reverse filter set up - -# ------------------------------------------------------------------------------ -# Public contructor, commit, rollback -# ------------------------------------------------------------------------------ - -proc rollback*(ctx: UpdateSiblingsRef) = - ## Rollback any changes made by the `update()` function. Subsequent - ## `rollback()` or `commit()` calls will be without effect. - if ctx.state == Updated: - for (d,f) in ctx.balancers: - d.balancer = f - ctx.state = Finished - - -proc commit*(ctx: UpdateSiblingsRef): Result[void,AristoError] = - ## Accept updates. Subsequent `rollback()` calls will be without effect. - if ctx.state != Updated: - ctx.rollback() - return err(FilSiblingsCommitUnfinshed) - ctx.db.balancer = LayerDeltaRef(nil) - ctx.state = Finished - ok() - -proc commit*( - rc: Result[UpdateSiblingsRef,AristoError]; - ): Result[void,AristoError] = - ## Variant of `commit()` for joining with `update()` - (? rc).commit() - - -proc init*( - T: type UpdateSiblingsRef; # Context type - db: AristoDbRef; # Main database - ): Result[T,AristoError] = - ## Set up environment for installing the reverse of the `db` argument current - ## read-only filter onto every associated descriptor referring to the same - ## database. - if not db.isCentre: - return err(FilBackendRoMode) - if db.nForked == 0: - return ok T(db: db) # No need to do anything - - func fromVae(err: (VertexID,AristoError)): AristoError = - err[1] - - # Filter rollback context - ok T( - db: db, - rev: ? db.revFilter(db.balancer).mapErr fromVae) # Reverse filter - -# ------------------------------------------------------------------------------ -# Public functions -# ------------------------------------------------------------------------------ - -proc update*(ctx: UpdateSiblingsRef): Result[UpdateSiblingsRef,AristoError] = - ## This function installs the reverse of the `init()` argument `db` current - ## read-only filter onto every associated descriptor referring to the same - ## database. If the function fails, a `rollback()` is called automatically. - ## - ## This function might do some real work so it was detached from `init()` so - ## it can be called late but before the physical database is updated. - ## - if ctx.state == Initial: - ctx.state = Updated - if not ctx.rev.isNil: - let db = ctx.db - # Update distributed filters. Note that the physical backend database - # must not have been updated, yet. So the new root key for the backend - # will be `db.balancer.kMap[$1]`. - for w in db.forked: - if w.balancer.isNil: - # Sharing the `ctx.rev` ref is safe as it is read-inly - w.balancer = ctx.rev - else: - let rc = db.deltaMerge(w.balancer, ctx.rev) - if rc.isErr: - ctx.rollback() - return err(rc.error[1]) - ctx.balancers.add (w, w.balancer) - w.balancer = rc.value - ok(ctx) - -proc update*( - rc: Result[UpdateSiblingsRef,AristoError] - ): Result[UpdateSiblingsRef,AristoError] = - ## Variant of `update()` for joining with `init()` - (? rc).update() - -# ------------------------------------------------------------------------------ -# End -# ------------------------------------------------------------------------------ - diff --git a/nimbus/db/aristo/aristo_desc.nim b/nimbus/db/aristo/aristo_desc.nim index f46ca9092..35a083c2b 100644 --- a/nimbus/db/aristo/aristo_desc.nim +++ b/nimbus/db/aristo/aristo_desc.nim @@ -74,7 +74,7 @@ type ## Three tier database object supporting distributed instances. top*: LayerRef ## Database working layer, mutable stack*: seq[LayerRef] ## Stashed immutable parent layers - balancer*: LayerDeltaRef ## Baland out concurrent backend access + balancer*: LayerRef ## Balance out concurrent backend access backend*: BackendRef ## Backend database (may well be `nil`) txRef*: AristoTxRef ## Latest active transaction @@ -152,8 +152,8 @@ func isValid*(nd: NodeRef): bool = func isValid*(pid: PathID): bool = pid != VOID_PATH_ID -func isValid*(filter: LayerDeltaRef): bool = - filter != LayerDeltaRef(nil) +func isValid*(layer: LayerRef): bool = + layer != LayerRef(nil) func isValid*(root: Hash256): bool = root != EMPTY_ROOT_HASH @@ -250,11 +250,11 @@ proc fork*( if not noTopLayer: clone.top = LayerRef.init() if not db.balancer.isNil: - clone.top.delta.vTop = db.balancer.vTop + clone.top.vTop = db.balancer.vTop else: let rc = clone.backend.getTuvFn() if rc.isOk: - clone.top.delta.vTop = rc.value + clone.top.vTop = rc.value elif rc.error != GetTuvNotFound: return err(rc.error) @@ -263,13 +263,19 @@ proc fork*( ok clone -iterator forked*(db: AristoDbRef): AristoDbRef = +iterator forked*(db: AristoDbRef): tuple[db: AristoDbRef, isLast: bool] = ## Interate over all non centre descriptors (see comments on `reCentre()` ## for details.) + ## + ## The second `isLast` yielded loop entry is `true` if the yielded tuple + ## is the last entry in the list. + ## if not db.dudes.isNil: - for dude in db.getCentre.dudes.peers.items: + var nLeft = db.dudes.peers.len + for dude in db.dudes.peers.items: if dude != db.dudes.centre: - yield dude + nLeft.dec + yield (dude, nLeft == 1) func nForked*(db: AristoDbRef): int = ## Returns the number of non centre descriptors (see comments on `reCentre()` @@ -313,12 +319,12 @@ iterator rstack*(db: AristoDbRef): LayerRef = for i in 0.. 0: doAssert level <= db.stack.len - db.stack[^level].delta + db.stack[^level] elif level == -1: doAssert db.balancer != nil db.balancer diff --git a/nimbus/db/aristo/aristo_desc/desc_structural.nim b/nimbus/db/aristo/aristo_desc/desc_structural.nim index 27fae40a5..7e3fef54b 100644 --- a/nimbus/db/aristo/aristo_desc/desc_structural.nim +++ b/nimbus/db/aristo/aristo_desc/desc_structural.nim @@ -89,7 +89,8 @@ type key*: Hash256 ## Some state hash (if any) serial*: uint64 ## Generic identifier from application - LayerDeltaRef* = ref object + LayerRef* = ref LayerObj + LayerObj* = object ## Delta layers are stacked implying a tables hierarchy. Table entries on ## a higher level take precedence over lower layer table entries. So an ## existing key-value table entry of a layer on top supersedes same key @@ -119,12 +120,7 @@ type accLeaves*: Table[Hash256, VertexRef] ## Account path -> VertexRef stoLeaves*: Table[Hash256, VertexRef] ## Storage path -> VertexRef - LayerRef* = ref LayerObj - LayerObj* = object - ## Hexary trie database layer structures. Any layer holds the full - ## change relative to the backend. - delta*: LayerDeltaRef ## Most structural tables held as deltas - txUid*: uint ## Transaction identifier if positive + txUid*: uint ## Transaction identifier if positive # ------------------------------------------------------------------------------ # Public helpers (misc) @@ -132,7 +128,7 @@ type func init*(T: type LayerRef): T = ## Constructor, returns empty layer - T(delta: LayerDeltaRef()) + T() func hash*(node: NodeRef): Hash = ## Table/KeyedQueue/HashSet mixin diff --git a/nimbus/db/aristo/aristo_init/persistent.nim b/nimbus/db/aristo/aristo_init/persistent.nim index 25f6182d7..0e78e4e59 100644 --- a/nimbus/db/aristo/aristo_init/persistent.nim +++ b/nimbus/db/aristo/aristo_init/persistent.nim @@ -51,8 +51,7 @@ proc newAristoRdbDbRef( return err(rc.error) rc.value ok((AristoDbRef( - top: LayerRef( - delta: LayerDeltaRef(vTop: vTop)), + top: LayerRef(vTop: vTop), backend: be), oCfs)) # ------------------------------------------------------------------------------ diff --git a/nimbus/db/aristo/aristo_layers.nim b/nimbus/db/aristo/aristo_layers.nim index c09a5cb10..b4d8a33dd 100644 --- a/nimbus/db/aristo/aristo_layers.nim +++ b/nimbus/db/aristo/aristo_layers.nim @@ -30,7 +30,7 @@ func dup(sTab: Table[RootedVertexID,VertexRef]): Table[RootedVertexID,VertexRef] # ------------------------------------------------------------------------------ func vTop*(db: AristoDbRef): VertexID = - db.top.delta.vTop + db.top.vTop # ------------------------------------------------------------------------------ # Public getters/helpers @@ -42,7 +42,7 @@ func nLayersVtx*(db: AristoDbRef): int = ## layers as there might be duplicate entries for the same vertex ID on ## different layers. ## - db.stack.mapIt(it.delta.sTab.len).foldl(a + b, db.top.delta.sTab.len) + db.stack.mapIt(it.sTab.len).foldl(a + b, db.top.sTab.len) func nLayersKey*(db: AristoDbRef): int = ## Number of vertex ID/key entries on the cache layers. This is an upper @@ -50,7 +50,7 @@ func nLayersKey*(db: AristoDbRef): int = ## layers as there might be duplicate entries for the same vertex ID on ## different layers. ## - db.stack.mapIt(it.delta.kMap.len).foldl(a + b, db.top.delta.kMap.len) + db.stack.mapIt(it.kMap.len).foldl(a + b, db.top.kMap.len) # ------------------------------------------------------------------------------ # Public functions: getter variants @@ -60,11 +60,11 @@ func layersGetVtx*(db: AristoDbRef; rvid: RootedVertexID): Opt[(VertexRef, int)] ## Find a vertex on the cache layers. An `ok()` result might contain a ## `nil` vertex if it is stored on the cache that way. ## - db.top.delta.sTab.withValue(rvid, item): + db.top.sTab.withValue(rvid, item): return Opt.some((item[], 0)) for i, w in enumerate(db.rstack): - w.delta.sTab.withValue(rvid, item): + w.sTab.withValue(rvid, item): return Opt.some((item[], i + 1)) Opt.none((VertexRef, int)) @@ -78,11 +78,11 @@ func layersGetKey*(db: AristoDbRef; rvid: RootedVertexID): Opt[(HashKey, int)] = ## Find a hash key on the cache layers. An `ok()` result might contain a void ## hash key if it is stored on the cache that way. ## - db.top.delta.kMap.withValue(rvid, item): + db.top.kMap.withValue(rvid, item): return Opt.some((item[], 0)) for i, w in enumerate(db.rstack): - w.delta.kMap.withValue(rvid, item): + w.kMap.withValue(rvid, item): return ok((item[], i + 1)) Opt.none((HashKey, int)) @@ -92,21 +92,21 @@ func layersGetKeyOrVoid*(db: AristoDbRef; rvid: RootedVertexID): HashKey = (db.layersGetKey(rvid).valueOr (VOID_HASH_KEY, 0))[0] func layersGetAccLeaf*(db: AristoDbRef; accPath: Hash256): Opt[VertexRef] = - db.top.delta.accLeaves.withValue(accPath, item): + db.top.accLeaves.withValue(accPath, item): return Opt.some(item[]) for w in db.rstack: - w.delta.accLeaves.withValue(accPath, item): + w.accLeaves.withValue(accPath, item): return Opt.some(item[]) Opt.none(VertexRef) func layersGetStoLeaf*(db: AristoDbRef; mixPath: Hash256): Opt[VertexRef] = - db.top.delta.stoLeaves.withValue(mixPath, item): + db.top.stoLeaves.withValue(mixPath, item): return Opt.some(item[]) for w in db.rstack: - w.delta.stoLeaves.withValue(mixPath, item): + w.stoLeaves.withValue(mixPath, item): return Opt.some(item[]) Opt.none(VertexRef) @@ -121,7 +121,7 @@ func layersPutVtx*( vtx: VertexRef; ) = ## Store a (potentally empty) vertex on the top layer - db.top.delta.sTab[rvid] = vtx + db.top.sTab[rvid] = vtx func layersResVtx*( db: AristoDbRef; @@ -138,7 +138,7 @@ func layersPutKey*( key: HashKey; ) = ## Store a (potentally void) hash key on the top layer - db.top.delta.kMap[rvid] = key + db.top.kMap[rvid] = key func layersResKey*(db: AristoDbRef; rvid: RootedVertexID) = @@ -157,30 +157,39 @@ proc layersUpdateVtx*( func layersPutAccLeaf*(db: AristoDbRef; accPath: Hash256; leafVtx: VertexRef) = - db.top.delta.accLeaves[accPath] = leafVtx + db.top.accLeaves[accPath] = leafVtx func layersPutStoLeaf*(db: AristoDbRef; mixPath: Hash256; leafVtx: VertexRef) = - db.top.delta.stoLeaves[mixPath] = leafVtx + db.top.stoLeaves[mixPath] = leafVtx # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ +func isEmpty*(ly: LayerRef): bool = + ## Returns `true` if the layer does not contain any changes, i.e. all the + ## tables are empty. The field `txUid` is ignored, here. + ly.sTab.len == 0 and + ly.kMap.len == 0 and + ly.accLeaves.len == 0 and + ly.stoLeaves.len == 0 + + func layersMergeOnto*(src: LayerRef; trg: var LayerObj) = ## Merges the argument `src` into the argument `trg` and returns `trg`. For ## the result layer, the `txUid` value set to `0`. ## trg.txUid = 0 - for (vid,vtx) in src.delta.sTab.pairs: - trg.delta.sTab[vid] = vtx - for (vid,key) in src.delta.kMap.pairs: - trg.delta.kMap[vid] = key - trg.delta.vTop = src.delta.vTop - for (accPath,leafVtx) in src.delta.accLeaves.pairs: - trg.delta.accLeaves[accPath] = leafVtx - for (mixPath,leafVtx) in src.delta.stoLeaves.pairs: - trg.delta.stoLeaves[mixPath] = leafVtx + for (vid,vtx) in src.sTab.pairs: + trg.sTab[vid] = vtx + for (vid,key) in src.kMap.pairs: + trg.kMap[vid] = key + trg.vTop = src.vTop + for (accPath,leafVtx) in src.accLeaves.pairs: + trg.accLeaves[accPath] = leafVtx + for (mixPath,leafVtx) in src.stoLeaves.pairs: + trg.stoLeaves[mixPath] = leafVtx func layersCc*(db: AristoDbRef; level = high(int)): LayerRef = ## Provide a collapsed copy of layers up to a particular transaction level. @@ -192,24 +201,22 @@ func layersCc*(db: AristoDbRef; level = high(int)): LayerRef = # Set up initial layer (bottom layer) result = LayerRef( - delta: LayerDeltaRef( - sTab: layers[0].delta.sTab.dup, # explicit dup for ref values - kMap: layers[0].delta.kMap, - vTop: layers[^1].delta.vTop, - accLeaves: layers[0].delta.accLeaves, - stoLeaves: layers[0].delta.stoLeaves, - )) + sTab: layers[0].sTab.dup, # explicit dup for ref values + kMap: layers[0].kMap, + vTop: layers[^1].vTop, + accLeaves: layers[0].accLeaves, + stoLeaves: layers[0].stoLeaves) # Consecutively merge other layers on top for n in 1 ..< layers.len: - for (vid,vtx) in layers[n].delta.sTab.pairs: - result.delta.sTab[vid] = vtx - for (vid,key) in layers[n].delta.kMap.pairs: - result.delta.kMap[vid] = key - for (accPath,vtx) in layers[n].delta.accLeaves.pairs: - result.delta.accLeaves[accPath] = vtx - for (mixPath,vtx) in layers[n].delta.stoLeaves.pairs: - result.delta.stoLeaves[mixPath] = vtx + for (vid,vtx) in layers[n].sTab.pairs: + result.sTab[vid] = vtx + for (vid,key) in layers[n].kMap.pairs: + result.kMap[vid] = key + for (accPath,vtx) in layers[n].accLeaves.pairs: + result.accLeaves[accPath] = vtx + for (mixPath,vtx) in layers[n].stoLeaves.pairs: + result.stoLeaves[mixPath] = vtx # ------------------------------------------------------------------------------ # Public iterators @@ -226,12 +233,12 @@ iterator layersWalkVtx*( ## the one with a zero vertex which are othewise skipped by the iterator. ## The `seen` argument must not be modified while the iterator is active. ## - for (rvid,vtx) in db.top.delta.sTab.pairs: + for (rvid,vtx) in db.top.sTab.pairs: yield (rvid,vtx) seen.incl rvid.vid for w in db.rstack: - for (rvid,vtx) in w.delta.sTab.pairs: + for (rvid,vtx) in w.sTab.pairs: if rvid.vid notin seen: yield (rvid,vtx) seen.incl rvid.vid @@ -251,12 +258,12 @@ iterator layersWalkKey*( ## Walk over all `(VertexID,HashKey)` pairs on the cache layers. Note that ## entries are unsorted. var seen: HashSet[VertexID] - for (rvid,key) in db.top.delta.kMap.pairs: + for (rvid,key) in db.top.kMap.pairs: yield (rvid,key) seen.incl rvid.vid for w in db.rstack: - for (rvid,key) in w.delta.kMap.pairs: + for (rvid,key) in w.kMap.pairs: if rvid.vid notin seen: yield (rvid,key) seen.incl rvid.vid diff --git a/nimbus/db/aristo/aristo_tx.nim b/nimbus/db/aristo/aristo_tx.nim index 42006e56c..908ecd121 100644 --- a/nimbus/db/aristo/aristo_tx.nim +++ b/nimbus/db/aristo/aristo_tx.nim @@ -134,7 +134,7 @@ proc findTx*( if db.txRef.isNil: # Try `(vid,key)` on top layer - let topKey = db.top.delta.kMap.getOrVoid rvid + let topKey = db.top.kMap.getOrVoid rvid if topKey == key: return ok(0) @@ -143,11 +143,11 @@ proc findTx*( for (n,tx,layer,error) in db.txRef.txFrameWalk: if error != AristoError(0): return err(error) - if layer.delta.kMap.getOrVoid(rvid) == key: + if layer.kMap.getOrVoid(rvid) == key: return ok(n) # Try bottom layer - let botKey = db.stack[0].delta.kMap.getOrVoid rvid + let botKey = db.stack[0].kMap.getOrVoid rvid if botKey == key: return ok(db.stack.len) diff --git a/nimbus/db/aristo/aristo_tx/tx_fork.nim b/nimbus/db/aristo/aristo_tx/tx_fork.nim index f5dfcb8a2..83ebde7f5 100644 --- a/nimbus/db/aristo/aristo_tx/tx_fork.nim +++ b/nimbus/db/aristo/aristo_tx/tx_fork.nim @@ -64,8 +64,7 @@ proc txFork*( let stackLayer = block: let rc = db.getTuvBE() if rc.isOk: - LayerRef( - delta: LayerDeltaRef(vTop: rc.value)) + LayerRef(vTop: rc.value) elif rc.error == GetTuvNotFound: LayerRef.init() else: diff --git a/nimbus/db/aristo/aristo_tx/tx_frame.nim b/nimbus/db/aristo/aristo_tx/tx_frame.nim index 5dfe0c628..dbc9c67e4 100644 --- a/nimbus/db/aristo/aristo_tx/tx_frame.nim +++ b/nimbus/db/aristo/aristo_tx/tx_frame.nim @@ -14,7 +14,6 @@ {.push raises: [].} import - std/tables, results, ".."/[aristo_desc, aristo_layers] @@ -81,10 +80,10 @@ proc txFrameBegin*(db: AristoDbRef): Result[AristoTxRef,AristoError] = if db.txFrameLevel != db.stack.len: return err(TxStackGarbled) - let vTop = db.top.delta.vTop + let vTop = db.top.vTop db.stack.add db.top db.top = LayerRef( - delta: LayerDeltaRef(vTop: vTop), + vTop: vTop, txUid: db.getTxUid) db.txRef = AristoTxRef( @@ -123,18 +122,11 @@ proc txFrameCommit*( let db = ? tx.getDbDescFromTopTx() # Pop layer from stack and merge database top layer onto it - let merged = block: - if db.top.delta.sTab.len == 0 and - db.top.delta.kMap.len == 0: - # Avoid `layersMergeOnto()` - db.top.delta = db.stack[^1].delta - db.stack.setLen(db.stack.len-1) - db.top - else: - let layer = db.stack[^1] - db.stack.setLen(db.stack.len-1) - db.top.layersMergeOnto layer[] - layer + let merged = db.stack[^1] + db.stack.setLen(db.stack.len-1) + if not db.top.isEmpty(): + # Only call `layersMergeOnto()` if layer is empty + db.top.layersMergeOnto merged[] # Install `merged` stack top layer and update stack db.top = merged diff --git a/nimbus/db/aristo/aristo_tx/tx_stow.nim b/nimbus/db/aristo/aristo_tx/tx_stow.nim index 0c04cf94f..2c14fe386 100644 --- a/nimbus/db/aristo/aristo_tx/tx_stow.nim +++ b/nimbus/db/aristo/aristo_tx/tx_stow.nim @@ -14,10 +14,25 @@ {.push raises: [].} import - std/tables, results, ../aristo_delta/delta_merge, - ".."/[aristo_desc, aristo_delta] + ".."/[aristo_desc, aristo_delta, aristo_layers] + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +proc txStowOk*( + db: AristoDbRef; # Database + persistent: bool; # Stage only unless `true` + ): Result[void,AristoError] = + if not db.txRef.isNil: + return err(TxPendingTx) + if 0 < db.stack.len: + return err(TxStackGarbled) + if persistent and not db.deltaPersistentOk(): + return err(TxBackendNotWritable) + ok() # ------------------------------------------------------------------------------ # Public functions @@ -30,24 +45,18 @@ proc txStow*( ): Result[void,AristoError] = ## Worker for `stow()` and `persist()` variants. ## - if not db.txRef.isNil: - return err(TxPendingTx) - if 0 < db.stack.len: - return err(TxStackGarbled) - if persistent and not db.deltaPersistentOk(): - return err(TxBackendNotWritable) + ? db.txStowOk persistent - if db.top.delta.sTab.len != 0 or - db.top.delta.kMap.len != 0 or - db.top.delta.accLeaves.len != 0 or - db.top.delta.stoLeaves.len != 0: - - # Note that `deltaMerge()` will return the 1st argument if the 2nd is `nil` - db.balancer = db.deltaMerge(db.top.delta, db.balancer).valueOr: - return err(error[1]) + if not db.top.isEmpty(): + # Note that `deltaMerge()` will return the `db.top` argument if the + # `db.balancer` is `nil`. Also, the `db.balancer` is read-only. In the + # case that there are no forked peers one can ignore that restriction as + # no balancer is shared. + db.balancer = deltaMerge( + db.top, modUpperOk = true, db.balancer, modLowerOk = db.nForked()==0) # New empty top layer - db.top = LayerRef(delta: LayerDeltaRef(vTop: db.balancer.vTop)) + db.top = LayerRef(vTop: db.balancer.vTop) if persistent: # Merge/move `balancer` into persistent tables (unless missing) diff --git a/nimbus/db/aristo/aristo_vid.nim b/nimbus/db/aristo/aristo_vid.nim index ced296746..ec4604260 100644 --- a/nimbus/db/aristo/aristo_vid.nim +++ b/nimbus/db/aristo/aristo_vid.nim @@ -24,17 +24,16 @@ import proc vidFetch*(db: AristoDbRef): VertexID = ## Fetch next vertex ID. ## - if db.top.delta.vTop == 0: - db.top.delta.vTop = VertexID(LEAST_FREE_VID) + if db.top.vTop == 0: + db.top.vTop = VertexID(LEAST_FREE_VID) else: - db.top.delta.vTop.inc - db.top.delta.vTop + db.top.vTop.inc + db.top.vTop proc vidDispose*(db: AristoDbRef; vid: VertexID) = ## Only top vertexIDs are disposed - if vid == db.top.delta.vTop and - LEAST_FREE_VID < db.top.delta.vTop.distinctBase: - db.top.delta.vTop.dec + if vid == db.top.vTop and LEAST_FREE_VID < db.top.vTop.distinctBase: + db.top.vTop.dec # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/aristo/aristo_walk/walk_private.nim b/nimbus/db/aristo/aristo_walk/walk_private.nim index d07ebb090..c065d661d 100644 --- a/nimbus/db/aristo/aristo_walk/walk_private.nim +++ b/nimbus/db/aristo/aristo_walk/walk_private.nim @@ -23,12 +23,12 @@ iterator walkVtxBeImpl*[T]( ): tuple[rvid: RootedVertexID, vtx: VertexRef] = ## Generic iterator when T is VoidBackendRef: - let filter = if db.balancer.isNil: LayerDeltaRef() else: db.balancer + let filter = if db.balancer.isNil: LayerRef() else: db.balancer else: mixin walkVtx - let filter = LayerDeltaRef() + let filter = LayerRef() if not db.balancer.isNil: filter.sTab = db.balancer.sTab # copy table @@ -52,12 +52,12 @@ iterator walkKeyBeImpl*[T]( ): tuple[rvid: RootedVertexID, key: HashKey] = ## Generic iterator when T is VoidBackendRef: - let filter = if db.balancer.isNil: LayerDeltaRef() else: db.balancer + let filter = if db.balancer.isNil: LayerRef() else: db.balancer else: mixin walkKey - let filter = LayerDeltaRef() + let filter = LayerRef() if not db.balancer.isNil: filter.kMap = db.balancer.kMap # copy table diff --git a/nimbus/db/kvt/kvt_debug.nim b/nimbus/db/kvt/kvt_debug.nim index 525aa6139..592495a5a 100644 --- a/nimbus/db/kvt/kvt_debug.nim +++ b/nimbus/db/kvt/kvt_debug.nim @@ -1,5 +1,5 @@ # nimbus-eth1 -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or # http://www.apache.org/licenses/LICENSE-2.0) @@ -134,11 +134,11 @@ proc ppBe[T](be: T; db: KvtDbRef; indent: int): string = proc ppLayer(layer: LayerRef; db: KvtDbRef; indent = 4): string = let - tLen = layer.delta.sTab.len + tLen = layer.sTab.len info = "tab(" & $tLen & ")" pfx1 = indent.toPfx(1) pfx2 = if 0 < tLen: indent.toPfx(2) else: " " - "" & pfx1 & info & pfx2 & layer.delta.sTab.ppTab(db,indent+2) + "" & pfx1 & info & pfx2 & layer.sTab.ppTab(db,indent+2) # ------------------------------------------------------------------------------ # Public functions @@ -152,7 +152,7 @@ proc pp*( case be.kind: of BackendMemory: result &= be.MemBackendRef.ppBe(db, indent) - of BackendRocksDB: + of BackendRocksDB,BackendRdbTriggered: result &= be.RdbBackendRef.ppBe(db, indent) of BackendVoid: result &= "" diff --git a/nimbus/db/kvt/kvt_delta.nim b/nimbus/db/kvt/kvt_delta.nim index 2ee0feb8f..e44a90e69 100644 --- a/nimbus/db/kvt/kvt_delta.nim +++ b/nimbus/db/kvt/kvt_delta.nim @@ -23,23 +23,12 @@ import # Public functions # ------------------------------------------------------------------------------ -proc deltaMerge*( - db: KvtDbRef; # Database - delta: LayerDeltaRef; # Filter to apply to database - ) = - ## Merge the argument `delta` into the balancer filter layer. Note that - ## this function has no control of the filter source. Having merged the - ## argument `delta`, all the `top` and `stack` layers should be cleared. - ## - db.merge(delta, db.balancer) - - -proc deltaUpdateOk*(db: KvtDbRef): bool = +proc deltaPersistentOk*(db: KvtDbRef): bool = ## Check whether the balancer filter can be merged into the backend not db.backend.isNil and db.isCentre -proc deltaUpdate*( +proc deltaPersistent*( db: KvtDbRef; # Database reCentreOk = false; ): Result[void,KvtError] = @@ -71,17 +60,30 @@ proc deltaUpdate*( # Always re-centre to `parent` (in case `reCentreOk` was set) defer: discard parent.reCentre() + # Update forked balancers here do that errors are detected early (if any.) + if 0 < db.nForked: + let rev = db.revFilter(db.balancer).valueOr: + return err(error[1]) + if 0 < rev.sTab.len: # Can an empty `rev` happen at all? + var unsharedRevOk = true + for w in db.forked: + if not w.db.balancer.isValid: + unsharedRevOk = false + # The `rev` filter can be modified if one can make sure that it is + # not shared (i.e. only previously merged into the w.db.balancer.) + # Note that it is trivially true for a single fork. + let modLowerOk = w.isLast and unsharedRevOk + w.db.balancer = deltaMerge( + w.db.balancer, modUpperOk=false, rev, modLowerOk=modLowerOk) + # Store structural single trie entries let writeBatch = ? be.putBegFn() be.putKvpFn(writeBatch, db.balancer.sTab.pairs.toSeq) ? be.putEndFn writeBatch - # Update peer filter balance. - let rev = db.deltaReverse db.balancer - for w in db.forked: - db.merge(rev, w.balancer) + # Done with balancer, all saved to backend + db.balancer = LayerRef(nil) - db.balancer = LayerDeltaRef(nil) ok() # ------------------------------------------------------------------------------ diff --git a/nimbus/db/kvt/kvt_delta/delta_merge.nim b/nimbus/db/kvt/kvt_delta/delta_merge.nim index 277316034..13ba42b31 100644 --- a/nimbus/db/kvt/kvt_delta/delta_merge.nim +++ b/nimbus/db/kvt/kvt_delta/delta_merge.nim @@ -10,37 +10,61 @@ import std/tables, - results, - ".."/[kvt_desc, kvt_utils] + eth/common, + ../kvt_desc + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +proc layersMergeOnto(src: LayerRef; trg: var LayerObj) = + for (key,val) in src.sTab.pairs: + trg.sTab[key] = val # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ -proc merge*( - db: KvtDbRef; # Database - upper: LayerDeltaRef; # Filter to apply onto `lower` - lower: var LayerDeltaRef; # Target filter, will be modified - ) = - ## Merge the argument filter `upper` onto the argument filter `lower` - ## relative to the *unfiltered* backend database on `db.backened`. The `lower` - ## filter argument will have been modified. +proc deltaMerge*( + upper: LayerRef; # Think of `top`, `nil` is ok + modUpperOk: bool; # May re-use/modify `upper` + lower: LayerRef; # Think of `balancer`, `nil` is ok + modLowerOk: bool; # May re-use/modify `lower` + ): LayerRef = + ## Merge argument `upper` into the `lower` filter instance. ## - ## In case that the argument `lower` is `nil`, it will be replaced by `upper`. + ## Note that the namimg `upper` and `lower` indicate that the filters are + ## stacked and the database access is `upper -> lower -> backend`. ## if lower.isNil: - lower = upper - elif not upper.isNil: - for (key,val) in upper.sTab.pairs: - if val.isValid or not lower.sTab.hasKey key: - lower.sTab[key] = val - elif lower.sTab.getOrVoid(key).isValid: - let rc = db.getUbe key - if rc.isOk: - lower.sTab[key] = val # empty blob - else: - doAssert rc.error == GetNotFound - lower.sTab.del key # no need to keep that in merged filter + # Degenerate case: `upper` is void + result = upper + + elif upper.isNil: + # Degenerate case: `lower` is void + result = lower + + elif modLowerOk: + # Can modify `lower` which is the prefered action mode but applies only + # in cases where the `lower` argument is not shared. + layersMergeOnto(upper, lower[]) + result = lower + + elif not modUpperOk: + # Cannot modify any argument layers. + result = LayerRef(sTab: lower.sTab) + layersMergeOnto(upper, result[]) + + else: + # Otherwise avoid copying some tables by modifyinh `upper`. This is not + # completely free as the merge direction changes to merging the `lower` + # layer up into the higher prioritised `upper` layer (note that the `lower` + # argument filter is read-only.) Here again, the `upper` argument must not + # be a shared layer/filter. + for (key,val) in lower.sTab.pairs: + if not upper.sTab.hasKey(key): + upper.sTab[key] = val + result = upper # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/kvt/kvt_delta/delta_reverse.nim b/nimbus/db/kvt/kvt_delta/delta_reverse.nim index e3aca6ec6..3be589139 100644 --- a/nimbus/db/kvt/kvt_delta/delta_reverse.nim +++ b/nimbus/db/kvt/kvt_delta/delta_reverse.nim @@ -10,6 +10,7 @@ import std/tables, + eth/common, results, ".."/[kvt_desc, kvt_utils] @@ -17,27 +18,29 @@ import # Public functions # ------------------------------------------------------------------------------ -proc deltaReverse*( +proc revFilter*( db: KvtDbRef; # Database - delta: LayerDeltaRef; # Filter to revert - ): LayerDeltaRef = - ## Assemble a reverse filter for the `delta` argument, i.e. changes to the - ## backend that reverse the effect of applying this to the balancer filter. - ## The resulting filter is calculated against the current *unfiltered* - ## backend (excluding optionally installed balancer filters.) + filter: LayerRef; # Filter to revert + ): Result[LayerRef,(Blob,KvtError)] = + ## Assemble reverse filter for the `filter` argument, i.e. changes to the + ## backend that reverse the effect of applying the this read-only filter. ## - ## If `delta` is `nil`, the result will be `nil` as well. - if not delta.isNil: - result = LayerDeltaRef() + ## This read-only filter is calculated against the current unfiltered + ## backend (excluding optionally installed read-only filter.) + ## + let rev = LayerRef() - # Calculate reverse changes for the `sTab[]` structural table - for key in delta.sTab.keys: - let rc = db.getUbe key - if rc.isOk: - result.sTab[key] = rc.value - else: - doAssert rc.error == GetNotFound - result.sTab[key] = EmptyBlob + # Calculate reverse changes for the `sTab[]` structural table + for key in filter.sTab.keys: + let rc = db.getUbe key + if rc.isOk: + rev.sTab[key] = rc.value + elif rc.error == GetNotFound: + rev.sTab[key] = EmptyBlob + else: + return err((key,rc.error)) + + ok(rev) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/kvt/kvt_desc.nim b/nimbus/db/kvt/kvt_desc.nim index b65cb545d..6422029b6 100644 --- a/nimbus/db/kvt/kvt_desc.nim +++ b/nimbus/db/kvt/kvt_desc.nim @@ -46,7 +46,7 @@ type ## Three tier database object supporting distributed instances. top*: LayerRef ## Database working layer, mutable stack*: seq[LayerRef] ## Stashed immutable parent layers - balancer*: LayerDeltaRef ## Apply read filter (locks writing) + balancer*: LayerRef ## Balance out concurrent backend access backend*: BackendRef ## Backend database (may well be `nil`) txRef*: KvtTxRef ## Latest active transaction @@ -82,6 +82,9 @@ func getOrVoid*(tab: Table[Blob,Blob]; w: Blob): Blob = func isValid*(key: Blob): bool = key != EmptyBlob +func isValid*(layer: LayerRef): bool = + layer != LayerRef(nil) + # ------------------------------------------------------------------------------ # Public functions, miscellaneous # ------------------------------------------------------------------------------ @@ -162,13 +165,18 @@ proc fork*( ok clone -iterator forked*(db: KvtDbRef): KvtDbRef = +iterator forked*(db: KvtDbRef): tuple[db: KvtDbRef, isLast: bool] = ## Interate over all non centre descriptors (see comments on `reCentre()` ## for details.) + ## + ## The second `isLast` yielded loop entry is `true` if the yielded tuple + ## is the last entry in the list. if not db.dudes.isNil: + var nLeft = db.dudes.peers.len for dude in db.dudes.peers.items: if dude != db.dudes.centre: - yield dude + nLeft.dec + yield (dude, nLeft == 1) func nForked*(db: KvtDbRef): int = ## Returns the number of non centre descriptors (see comments on `reCentre()` diff --git a/nimbus/db/kvt/kvt_desc/desc_structural.nim b/nimbus/db/kvt/kvt_desc/desc_structural.nim index 81de518af..f3d919e46 100644 --- a/nimbus/db/kvt/kvt_desc/desc_structural.nim +++ b/nimbus/db/kvt/kvt_desc/desc_structural.nim @@ -18,14 +18,11 @@ import eth/common type - LayerDeltaRef* = ref object - ## Delta tables relative to previous layer - sTab*: Table[Blob,Blob] ## Structural data table - - LayerRef* = ref object + LayerRef* = ref LayerObj + LayerObj* = object ## Kvt database layer structures. Any layer holds the full ## change relative to the backend. - delta*: LayerDeltaRef ## Structural tables held as deltas + sTab*: Table[Blob,Blob] ## Structural data table txUid*: uint ## Transaction identifier if positive # ------------------------------------------------------------------------------ @@ -34,11 +31,11 @@ type func init*(T: type LayerRef): T = ## Constructor, returns empty layer - T(delta: LayerDeltaRef()) + T() -func dup*(ly: LayerDeltaRef): LayerDeltaRef = +func dup*(ly: LayerRef): LayerRef = ## Duplicate/copy - LayerDeltaRef(sTab: ly.sTab) + LayerRef(sTab: ly.sTab) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/kvt/kvt_layers.nim b/nimbus/db/kvt/kvt_layers.nim index 220dbabf9..0b5b90347 100644 --- a/nimbus/db/kvt/kvt_layers.nim +++ b/nimbus/db/kvt/kvt_layers.nim @@ -25,7 +25,7 @@ func nLayersKeys*(db: KvtDbRef): int = ## bound for the number of effective key/value mappings held on the cache ## layers as there might be duplicate entries for the same key on different ## layers. - db.stack.mapIt(it.delta.sTab.len).foldl(a + b, db.top.delta.sTab.len) + db.stack.mapIt(it.sTab.len).foldl(a + b, db.top.sTab.len) # ------------------------------------------------------------------------------ # Public functions: get function @@ -37,11 +37,11 @@ func layersLen*(db: KvtDbRef; key: openArray[byte]|seq[byte]): Opt[int] = when key isnot seq[byte]: let key = @key - db.top.delta.sTab.withValue(key, item): + db.top.sTab.withValue(key, item): return Opt.some(item[].len()) for w in db.rstack: - w.delta.sTab.withValue(key, item): + w.sTab.withValue(key, item): return Opt.some(item[].len()) Opt.none(int) @@ -58,11 +58,11 @@ func layersGet*(db: KvtDbRef; key: openArray[byte]|seq[byte]): Opt[Blob] = when key isnot seq[byte]: let key = @key - db.top.delta.sTab.withValue(key, item): + db.top.sTab.withValue(key, item): return Opt.some(item[]) for w in db.rstack: - w.delta.sTab.withValue(key, item): + w.sTab.withValue(key, item): return Opt.some(item[]) Opt.none(Blob) @@ -73,7 +73,7 @@ func layersGet*(db: KvtDbRef; key: openArray[byte]|seq[byte]): Opt[Blob] = func layersPut*(db: KvtDbRef; key: openArray[byte]; data: openArray[byte]) = ## Store a (potentally empty) value on the top layer - db.top.delta.sTab[@key] = @data + db.top.sTab[@key] = @data # ------------------------------------------------------------------------------ # Public functions @@ -87,12 +87,12 @@ func layersCc*(db: KvtDbRef; level = high(int)): LayerRef = else: db.stack[0 .. level] # Set up initial layer (bottom layer) - result = LayerRef(delta: LayerDeltaRef(sTab: layers[0].delta.sTab)) + result = LayerRef(sTab: layers[0].sTab) # Consecutively merge other layers on top for n in 1 ..< layers.len: - for (key,val) in layers[n].delta.sTab.pairs: - result.delta.sTab[key] = val + for (key,val) in layers[n].sTab.pairs: + result.sTab[key] = val # ------------------------------------------------------------------------------ # Public iterators @@ -109,12 +109,12 @@ iterator layersWalk*( ## the one with a zero vertex which are othewise skipped by the iterator. ## The `seen` argument must not be modified while the iterator is active. ## - for (key,val) in db.top.delta.sTab.pairs: + for (key,val) in db.top.sTab.pairs: yield (key,val) seen.incl key for w in db.rstack: - for (key,val) in w.delta.sTab.pairs: + for (key,val) in w.sTab.pairs: if key notin seen: yield (key,val) seen.incl key diff --git a/nimbus/db/kvt/kvt_tx/tx_frame.nim b/nimbus/db/kvt/kvt_tx/tx_frame.nim index 18a8ccd39..0903e5206 100644 --- a/nimbus/db/kvt/kvt_tx/tx_frame.nim +++ b/nimbus/db/kvt/kvt_tx/tx_frame.nim @@ -83,7 +83,6 @@ proc txFrameBegin*(db: KvtDbRef): Result[KvtTxRef,KvtError] = db.stack.add db.top db.top = LayerRef( - delta: LayerDeltaRef(), txUid: db.getTxUid) db.txRef = KvtTxRef( db: db, @@ -122,8 +121,8 @@ proc txFrameCommit*( # Replace the top two layers by its merged version let merged = db.stack[^1] - for (key,val) in db.top.delta.sTab.pairs: - merged.delta.sTab[key] = val + for (key,val) in db.top.sTab.pairs: + merged.sTab[key] = val # Install `merged` layer db.top = merged diff --git a/nimbus/db/kvt/kvt_tx/tx_stow.nim b/nimbus/db/kvt/kvt_tx/tx_stow.nim index c156efbbf..1e2609690 100644 --- a/nimbus/db/kvt/kvt_tx/tx_stow.nim +++ b/nimbus/db/kvt/kvt_tx/tx_stow.nim @@ -16,6 +16,7 @@ import std/tables, results, + ../kvt_delta/delta_merge, ".."/[kvt_desc, kvt_delta] # ------------------------------------------------------------------------------ @@ -31,10 +32,8 @@ proc txStowOk*( return err(TxPendingTx) if 0 < db.stack.len: return err(TxStackGarbled) - - if persistent and not db.deltaUpdateOk(): + if persistent and not db.deltaPersistentOk(): return err(TxBackendNotWritable) - ok() proc txStow*( @@ -49,13 +48,20 @@ proc txStow*( ## ? db.txStowOk persistent - if 0 < db.top.delta.sTab.len: - db.deltaMerge db.top.delta - db.top.delta = LayerDeltaRef() + if 0 < db.top.sTab.len: + # Note that `deltaMerge()` will return the `db.top` argument if the + # `db.balancer` is `nil`. Also, the `db.balancer` is read-only. In the + # case that there are no forked peers one can ignore that restriction as + # no balancer is shared. + db.balancer = deltaMerge( + db.top, modUpperOk = true, db.balancer, modLowerOk = db.nForked()==0) + + # New empty top layer + db.top = LayerRef() if persistent: # Move `balancer` data into persistent tables - ? db.deltaUpdate() + ? db.deltaPersistent() ok() diff --git a/tests/test_aristo/test_balancer.nim b/tests/test_aristo/test_balancer.nim index 4c0e363e4..4ebfe3dc5 100644 --- a/tests/test_aristo/test_balancer.nim +++ b/tests/test_aristo/test_balancer.nim @@ -149,7 +149,7 @@ proc cleanUp(dx: var DbTriplet) = # ---------------------- -proc isDbEq(a, b: LayerDeltaRef; db: AristoDbRef; noisy = true): bool = +proc isDbEq(a, b: LayerRef; db: AristoDbRef; noisy = true): bool = ## Verify that argument filter `a` has the same effect on the ## physical/unfiltered backend of `db` as argument filter `b`. if a.isNil: @@ -255,8 +255,8 @@ proc testBalancer*( # Resulting clause (11) filters from `aristo/README.md` example # which will be used in the second part of the tests var - c11Filter1 = LayerDeltaRef(nil) - c11Filter3 = LayerDeltaRef(nil) + c11Filter1 = LayerRef(nil) + c11Filter3 = LayerRef(nil) # Work through clauses (8)..(11) from `aristo/README.md` example block: @@ -278,14 +278,14 @@ proc testBalancer*( block: let rc = db1.persist() xCheckRc rc.error == 0 - xCheck db1.balancer == LayerDeltaRef(nil) + xCheck db1.balancer == LayerRef(nil) xCheck db2.balancer == db3.balancer block: let rc = db2.stow() # non-persistent xCheckRc rc.error == 0: noisy.say "*** testDistributedAccess (3)", "n=", n, "db2".dump db2 - xCheck db1.balancer == LayerDeltaRef(nil) + xCheck db1.balancer == LayerRef(nil) xCheck db2.balancer != db3.balancer # Clause (11) from `aristo/README.md` example @@ -293,7 +293,7 @@ proc testBalancer*( block: let rc = db2.persist() xCheckRc rc.error == 0 - xCheck db2.balancer == LayerDeltaRef(nil) + xCheck db2.balancer == LayerRef(nil) # Check/verify backends block: @@ -326,7 +326,7 @@ proc testBalancer*( block: let rc = db2.persist() xCheckRc rc.error == 0 - xCheck db2.balancer == LayerDeltaRef(nil) + xCheck db2.balancer == LayerRef(nil) xCheck db1.balancer == db3.balancer # Clause (13) from `aristo/README.md` example