Aristo and kvt balancer management update (#2504)
* Aristo: Merge `delta_siblings` module into `deltaPersistent()` * Aristo: Add `isEmpty()` for canonical checking whether a layer is empty * Aristo: Merge `LayerDeltaRef` into `LayerObj` why: No need to maintain nested object refs anymore. Previously the `LayerDeltaRef` object had a companion `LayerFinalRef` which held non-delta layer information. * Kvt: Merge `LayerDeltaRef` into `LayerRef` why: No need to maintain nested object refs (as with `Aristo`) * Kvt: Re-write balancer logic similar to `Aristo` why: Although `Kvt` was a cheap copy of `Aristo` it sort of got out of sync and the balancer code was wrong. * Update iterator over forked peers why: Yield additional field `isLast` indicating that the last iteration cycle was approached. * Optimise balancer calculation. why: One can often avoid providing a new object containing the merge of two layers for the balancer. This avoids copying tables. In some cases this is replaced by `hasKey()` look ups though. One uses one of the two to combine and merges the other into the first. Of course, this needs some checks for making sure that none of the components to merge is eventually shared with something else. * Fix copyright year
This commit is contained in:
parent
ee323d5ff8
commit
5ac362fe6f
|
@ -291,8 +291,8 @@ proc ppXMap*(
|
|||
result.setLen(result.len - 1)
|
||||
result &= "}"
|
||||
|
||||
proc ppFilter(
|
||||
fl: LayerDeltaRef;
|
||||
proc ppBalancer(
|
||||
fl: LayerRef;
|
||||
db: AristoDbRef;
|
||||
indent: int;
|
||||
): string =
|
||||
|
@ -398,18 +398,18 @@ proc ppLayer(
|
|||
if 2 < nOKs:
|
||||
result &= "<layer>".doPrefix(false)
|
||||
if vTopOk:
|
||||
result &= "".doPrefix(true) & "vTop=" & layer.delta.vTop.ppVid
|
||||
result &= "".doPrefix(true) & "vTop=" & layer.vTop.ppVid
|
||||
if sTabOk:
|
||||
let
|
||||
tLen = layer.delta.sTab.len
|
||||
tLen = layer.sTab.len
|
||||
info = "sTab(" & $tLen & ")"
|
||||
result &= info.doPrefix(0 < tLen) & layer.delta.sTab.ppSTab(db,indent+2)
|
||||
result &= info.doPrefix(0 < tLen) & layer.sTab.ppSTab(db,indent+2)
|
||||
if kMapOk:
|
||||
let
|
||||
tLen = layer.delta.kMap.len
|
||||
tLen = layer.kMap.len
|
||||
info = "kMap(" & $tLen & ")"
|
||||
result &= info.doPrefix(0 < tLen)
|
||||
result &= db.ppXMap(layer.delta.kMap, indent+2)
|
||||
result &= db.ppXMap(layer.kMap, indent+2)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
|
@ -528,53 +528,17 @@ proc pp*(
|
|||
layer: LayerRef;
|
||||
db: AristoDbRef;
|
||||
indent = 4;
|
||||
balancerOk = false;
|
||||
sTabOk = true,
|
||||
kMapOk = true,
|
||||
other = true,
|
||||
): string =
|
||||
if balancerOk:
|
||||
layer.ppLayer(
|
||||
db, vTopOk=true, sTabOk=true, kMapOk=true)
|
||||
|
||||
proc pp*(
|
||||
layer: LayerRef;
|
||||
db: AristoDbRef;
|
||||
xTabOk: bool;
|
||||
indent = 4;
|
||||
): string =
|
||||
db.orDefault(), vTopOk=other, sTabOk=sTabOk, kMapOk=kMapOk)
|
||||
else:
|
||||
layer.ppLayer(
|
||||
db, vTopOk=true, sTabOk=xTabOk, kMapOk=true)
|
||||
|
||||
proc pp*(
|
||||
layer: LayerRef;
|
||||
db: AristoDbRef;
|
||||
xTabOk: bool;
|
||||
kMapOk: bool;
|
||||
other = false;
|
||||
indent = 4;
|
||||
): string =
|
||||
layer.ppLayer(
|
||||
db, vTopOk=other, sTabOk=xTabOk, kMapOk=kMapOk)
|
||||
|
||||
|
||||
proc pp*(
|
||||
db: AristoDbRef;
|
||||
xTabOk: bool;
|
||||
indent = 4;
|
||||
): string =
|
||||
db.layersCc.pp(db, xTabOk=xTabOk, indent=indent)
|
||||
|
||||
proc pp*(
|
||||
db: AristoDbRef;
|
||||
xTabOk: bool;
|
||||
kMapOk: bool;
|
||||
other = false;
|
||||
indent = 4;
|
||||
): string =
|
||||
db.layersCc.pp(db, xTabOk=xTabOk, kMapOk=kMapOk, other=other, indent=indent)
|
||||
|
||||
proc pp*(
|
||||
filter: LayerDeltaRef;
|
||||
db = AristoDbRef(nil);
|
||||
indent = 4;
|
||||
): string =
|
||||
filter.ppFilter(db.orDefault(), indent)
|
||||
db.orDefault(), vTopOk=other, sTabOk=sTabOk, kMapOk=kMapOk)
|
||||
|
||||
proc pp*(
|
||||
be: BackendRef;
|
||||
|
@ -582,7 +546,7 @@ proc pp*(
|
|||
limit = 100;
|
||||
indent = 4;
|
||||
): string =
|
||||
result = db.balancer.ppFilter(db, indent+1) & indent.toPfx
|
||||
result = db.balancer.ppBalancer(db, indent+1) & indent.toPfx
|
||||
case be.kind:
|
||||
of BackendMemory:
|
||||
result &= be.MemBackendRef.ppBe(db, limit, indent+1)
|
||||
|
@ -599,11 +563,12 @@ proc pp*(
|
|||
topOk = true;
|
||||
stackOk = true;
|
||||
kMapOk = true;
|
||||
sTabOk = true;
|
||||
limit = 100;
|
||||
): string =
|
||||
if topOk:
|
||||
result = db.layersCc.pp(
|
||||
db, xTabOk=true, kMapOk=kMapOk, other=true, indent=indent)
|
||||
db, sTabOk=sTabOk, kMapOk=kMapOk, other=true, indent=indent)
|
||||
let stackOnlyOk = stackOk and not (topOk or balancerOk or backendOk)
|
||||
if not stackOnlyOk:
|
||||
result &= indent.toPfx & "level=" & $db.stack.len
|
||||
|
@ -614,15 +579,15 @@ proc pp*(
|
|||
let
|
||||
m = layers.len - n - 1
|
||||
l = db.layersCc m
|
||||
a = w.delta.kMap.values.toSeq.filterIt(not it.isValid).len
|
||||
c = l.delta.kMap.values.toSeq.filterIt(not it.isValid).len
|
||||
result &= "(" & $(w.delta.kMap.len - a) & "," & $a & ")"
|
||||
lStr &= " " & $m & "=(" & $(l.delta.kMap.len - c) & "," & $c & ")"
|
||||
a = w.kMap.values.toSeq.filterIt(not it.isValid).len
|
||||
c = l.kMap.values.toSeq.filterIt(not it.isValid).len
|
||||
result &= "(" & $(w.kMap.len - a) & "," & $a & ")"
|
||||
lStr &= " " & $m & "=(" & $(l.kMap.len - c) & "," & $c & ")"
|
||||
result &= " =>" & lStr
|
||||
if backendOk:
|
||||
result &= indent.toPfx & db.backend.pp(db, limit=limit, indent)
|
||||
elif balancerOk:
|
||||
result &= indent.toPfx & db.balancer.ppFilter(db, indent+1)
|
||||
result &= indent.toPfx & db.balancer.ppBalancer(db, indent+1)
|
||||
|
||||
proc pp*(sdb: MerkleSignRef; indent = 4): string =
|
||||
result = "" &
|
||||
|
|
|
@ -16,9 +16,9 @@ import
|
|||
std/tables,
|
||||
eth/common,
|
||||
results,
|
||||
./aristo_desc,
|
||||
"."/[aristo_desc, aristo_layers],
|
||||
./aristo_desc/desc_backend,
|
||||
./aristo_delta/delta_siblings
|
||||
./aristo_delta/[delta_merge, delta_reverse]
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions, save to backend
|
||||
|
@ -34,17 +34,16 @@ proc deltaPersistent*(
|
|||
nxtFid = 0u64; # Next filter ID (if any)
|
||||
reCentreOk = false;
|
||||
): Result[void,AristoError] =
|
||||
## Resolve (i.e. move) the backend filter into the physical backend database.
|
||||
## Resolve (i.e. move) the balancer into the physical backend database.
|
||||
##
|
||||
## This needs write permission on the backend DB for the argument `db`
|
||||
## descriptor (see the function `aristo_desc.isCentre()`.) With the argument
|
||||
## flag `reCentreOk` passed `true`, write permission will be temporarily
|
||||
## This needs write permission on the backend DB for the descriptor argument
|
||||
## `db` (see the function `aristo_desc.isCentre()`.) If the argument flag
|
||||
## `reCentreOk` is passed `true`, write permission will be temporarily
|
||||
## acquired when needed.
|
||||
##
|
||||
## When merging the current backend filter, its reverse will be is stored as
|
||||
## back log on the filter fifos (so the current state can be retrieved.)
|
||||
## Also, other non-centre descriptors are updated so there is no visible
|
||||
## database change for these descriptors.
|
||||
## When merging the current backend filter, its reverse will be is stored
|
||||
## on other non-centre descriptors so there is no visible database change
|
||||
## for these.
|
||||
##
|
||||
let be = db.backend
|
||||
if be.isNil:
|
||||
|
@ -64,9 +63,21 @@ proc deltaPersistent*(
|
|||
# Always re-centre to `parent` (in case `reCentreOk` was set)
|
||||
defer: discard parent.reCentre()
|
||||
|
||||
# Initialise peer filter balancer.
|
||||
let updateSiblings = ? UpdateSiblingsRef.init db
|
||||
defer: updateSiblings.rollback()
|
||||
# Update forked balancers here do that errors are detected early (if any.)
|
||||
if 0 < db.nForked:
|
||||
let rev = db.revFilter(db.balancer).valueOr:
|
||||
return err(error[1])
|
||||
if not rev.isEmpty: # Can an empty `rev` happen at all?
|
||||
var unsharedRevOk = true
|
||||
for w in db.forked:
|
||||
if not w.db.balancer.isValid:
|
||||
unsharedRevOk = false
|
||||
# The `rev` filter can be modified if one can make sure that it is
|
||||
# not shared (i.e. only previously merged into the w.db.balancer.)
|
||||
# Note that it is trivially true for a single fork.
|
||||
let modLowerOk = w.isLast and unsharedRevOk
|
||||
w.db.balancer = deltaMerge(
|
||||
w.db.balancer, modUpperOk=false, rev, modLowerOk=modLowerOk)
|
||||
|
||||
let lSst = SavedState(
|
||||
key: EMPTY_ROOT_HASH, # placeholder for more
|
||||
|
@ -93,8 +104,9 @@ proc deltaPersistent*(
|
|||
if not db.stoLeaves.lruUpdate(mixKey, vtx):
|
||||
discard db.stoLeaves.lruAppend(mixKey, vtx, accLruSize)
|
||||
|
||||
# Update dudes and this descriptor
|
||||
? updateSiblings.update().commit()
|
||||
# Done with balancer, all saved to backend
|
||||
db.balancer = LayerRef(nil)
|
||||
|
||||
ok()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
|
|
|
@ -11,72 +11,70 @@
|
|||
import
|
||||
std/tables,
|
||||
eth/common,
|
||||
results,
|
||||
".."/[aristo_desc, aristo_get]
|
||||
".."/[aristo_desc, aristo_layers]
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc deltaMerge*(
|
||||
db: AristoDbRef;
|
||||
upper: LayerDeltaRef; # new filter, `nil` is ok
|
||||
lower: LayerDeltaRef; # Trg filter, `nil` is ok
|
||||
): Result[LayerDeltaRef,(VertexID,AristoError)] =
|
||||
upper: LayerRef; # Think of `top`, `nil` is ok
|
||||
modUpperOk: bool; # May re-use/modify `upper`
|
||||
lower: LayerRef; # Think of `balancer`, `nil` is ok
|
||||
modLowerOk: bool; # May re-use/modify `lower`
|
||||
): LayerRef =
|
||||
## Merge argument `upper` into the `lower` filter instance.
|
||||
##
|
||||
## Note that the namimg `upper` and `lower` indicate that the filters are
|
||||
## stacked and the database access is `upper -> lower -> backend` whereas
|
||||
## the `src/trg` matching logic goes the other way round.
|
||||
## stacked and the database access is `upper -> lower -> backend`.
|
||||
##
|
||||
# Degenerate case: `upper` is void
|
||||
if lower.isNil:
|
||||
if upper.isNil:
|
||||
# Even more degenerate case when both filters are void
|
||||
return ok LayerDeltaRef(nil)
|
||||
return ok(upper)
|
||||
# Degenerate case: `upper` is void
|
||||
result = upper
|
||||
|
||||
# Degenerate case: `upper` is non-trivial and `lower` is void
|
||||
if upper.isNil:
|
||||
return ok(lower)
|
||||
elif upper.isNil:
|
||||
# Degenerate case: `lower` is void
|
||||
result = lower
|
||||
|
||||
# There is no need to deep copy table vertices as they will not be modified.
|
||||
let newFilter = LayerDeltaRef(
|
||||
sTab: lower.sTab,
|
||||
elif modLowerOk:
|
||||
# Can modify `lower` which is the prefered action mode but applies only
|
||||
# in cases where the `lower` argument is not shared.
|
||||
lower.vTop = upper.vTop
|
||||
layersMergeOnto(upper, lower[])
|
||||
result = lower
|
||||
|
||||
elif not modUpperOk:
|
||||
# Cannot modify any argument layers.
|
||||
result = LayerRef(
|
||||
sTab: lower.sTab, # shallow copy (entries will not be modified)
|
||||
kMap: lower.kMap,
|
||||
accLeaves: lower.accLeaves,
|
||||
stoLeaves: lower.stoLeaves,
|
||||
vTop: upper.vTop)
|
||||
layersMergeOnto(upper, result[])
|
||||
|
||||
for (rvid,vtx) in upper.sTab.pairs:
|
||||
if vtx.isValid or not newFilter.sTab.hasKey rvid:
|
||||
newFilter.sTab[rvid] = vtx
|
||||
elif newFilter.sTab.getOrVoid(rvid).isValid:
|
||||
let rc = db.getVtxUbe rvid
|
||||
if rc.isOk:
|
||||
newFilter.sTab[rvid] = vtx # VertexRef(nil)
|
||||
elif rc.error == GetVtxNotFound:
|
||||
newFilter.sTab.del rvid
|
||||
else:
|
||||
return err((rvid.vid,rc.error))
|
||||
# Otherwise avoid copying some tables by modifyinh `upper`. This is not
|
||||
# completely free as the merge direction changes to merging the `lower`
|
||||
# layer up into the higher prioritised `upper` layer (note that the `lower`
|
||||
# argument filter is read-only.) Here again, the `upper` argument must not
|
||||
# be a shared layer/filter.
|
||||
for (rvid,vtx) in lower.sTab.pairs:
|
||||
if not upper.sTab.hasKey(rvid):
|
||||
upper.sTab[rvid] = vtx
|
||||
|
||||
for (rvid,key) in upper.kMap.pairs:
|
||||
if key.isValid or not newFilter.kMap.hasKey rvid:
|
||||
newFilter.kMap[rvid] = key
|
||||
elif newFilter.kMap.getOrVoid(rvid).isValid:
|
||||
let rc = db.getKeyUbe rvid
|
||||
if rc.isOk:
|
||||
newFilter.kMap[rvid] = key
|
||||
elif rc.error == GetKeyNotFound:
|
||||
newFilter.kMap.del rvid
|
||||
else:
|
||||
return err((rvid.vid,rc.error))
|
||||
for (rvid,key) in lower.kMap.pairs:
|
||||
if not upper.kMap.hasKey(rvid):
|
||||
upper.kMap[rvid] = key
|
||||
|
||||
for (accPath,leafVtx) in upper.accLeaves.pairs:
|
||||
newFilter.accLeaves[accPath] = leafVtx
|
||||
for (accPath,leafVtx) in lower.accLeaves.pairs:
|
||||
if not upper.accLeaves.hasKey(accPath):
|
||||
upper.accLeaves[accPath] = leafVtx
|
||||
|
||||
for (mixPath,leafVtx) in upper.stoLeaves.pairs:
|
||||
newFilter.stoLeaves[mixPath] = leafVtx
|
||||
|
||||
ok newFilter
|
||||
for (mixPath,leafVtx) in lower.stoLeaves.pairs:
|
||||
if not upper.stoLeaves.hasKey(mixPath):
|
||||
upper.stoLeaves[mixPath] = leafVtx
|
||||
result = upper
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
|
|
@ -20,16 +20,15 @@ import
|
|||
|
||||
proc revFilter*(
|
||||
db: AristoDbRef; # Database
|
||||
filter: LayerDeltaRef; # Filter to revert
|
||||
): Result[LayerDeltaRef,(VertexID,AristoError)] =
|
||||
filter: LayerRef; # Filter to revert
|
||||
): Result[LayerRef,(VertexID,AristoError)] =
|
||||
## Assemble reverse filter for the `filter` argument, i.e. changes to the
|
||||
## backend that reverse the effect of applying the this read-only filter.
|
||||
##
|
||||
## This read-only filter is calculated against the current unfiltered
|
||||
## backend (excluding optionally installed read-only filter.)
|
||||
##
|
||||
# Register MPT state roots for reverting back
|
||||
let rev = LayerDeltaRef()
|
||||
let rev = LayerRef()
|
||||
|
||||
# Get vid generator state on backend
|
||||
block:
|
||||
|
|
|
@ -1,120 +0,0 @@
|
|||
# nimbus-eth1
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
||||
# http://opensource.org/licenses/MIT)
|
||||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
import
|
||||
results,
|
||||
eth/common,
|
||||
../aristo_desc,
|
||||
"."/[delta_merge, delta_reverse]
|
||||
|
||||
type
|
||||
UpdateState = enum
|
||||
Initial = 0
|
||||
Updated,
|
||||
Finished
|
||||
|
||||
UpdateSiblingsRef* = ref object
|
||||
## Update transactional context
|
||||
state: UpdateState
|
||||
db: AristoDbRef ## Main database access
|
||||
balancers: seq[(AristoDbRef,LayerDeltaRef)] ## Rollback data
|
||||
rev: LayerDeltaRef ## Reverse filter set up
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public contructor, commit, rollback
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc rollback*(ctx: UpdateSiblingsRef) =
|
||||
## Rollback any changes made by the `update()` function. Subsequent
|
||||
## `rollback()` or `commit()` calls will be without effect.
|
||||
if ctx.state == Updated:
|
||||
for (d,f) in ctx.balancers:
|
||||
d.balancer = f
|
||||
ctx.state = Finished
|
||||
|
||||
|
||||
proc commit*(ctx: UpdateSiblingsRef): Result[void,AristoError] =
|
||||
## Accept updates. Subsequent `rollback()` calls will be without effect.
|
||||
if ctx.state != Updated:
|
||||
ctx.rollback()
|
||||
return err(FilSiblingsCommitUnfinshed)
|
||||
ctx.db.balancer = LayerDeltaRef(nil)
|
||||
ctx.state = Finished
|
||||
ok()
|
||||
|
||||
proc commit*(
|
||||
rc: Result[UpdateSiblingsRef,AristoError];
|
||||
): Result[void,AristoError] =
|
||||
## Variant of `commit()` for joining with `update()`
|
||||
(? rc).commit()
|
||||
|
||||
|
||||
proc init*(
|
||||
T: type UpdateSiblingsRef; # Context type
|
||||
db: AristoDbRef; # Main database
|
||||
): Result[T,AristoError] =
|
||||
## Set up environment for installing the reverse of the `db` argument current
|
||||
## read-only filter onto every associated descriptor referring to the same
|
||||
## database.
|
||||
if not db.isCentre:
|
||||
return err(FilBackendRoMode)
|
||||
if db.nForked == 0:
|
||||
return ok T(db: db) # No need to do anything
|
||||
|
||||
func fromVae(err: (VertexID,AristoError)): AristoError =
|
||||
err[1]
|
||||
|
||||
# Filter rollback context
|
||||
ok T(
|
||||
db: db,
|
||||
rev: ? db.revFilter(db.balancer).mapErr fromVae) # Reverse filter
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc update*(ctx: UpdateSiblingsRef): Result[UpdateSiblingsRef,AristoError] =
|
||||
## This function installs the reverse of the `init()` argument `db` current
|
||||
## read-only filter onto every associated descriptor referring to the same
|
||||
## database. If the function fails, a `rollback()` is called automatically.
|
||||
##
|
||||
## This function might do some real work so it was detached from `init()` so
|
||||
## it can be called late but before the physical database is updated.
|
||||
##
|
||||
if ctx.state == Initial:
|
||||
ctx.state = Updated
|
||||
if not ctx.rev.isNil:
|
||||
let db = ctx.db
|
||||
# Update distributed filters. Note that the physical backend database
|
||||
# must not have been updated, yet. So the new root key for the backend
|
||||
# will be `db.balancer.kMap[$1]`.
|
||||
for w in db.forked:
|
||||
if w.balancer.isNil:
|
||||
# Sharing the `ctx.rev` ref is safe as it is read-inly
|
||||
w.balancer = ctx.rev
|
||||
else:
|
||||
let rc = db.deltaMerge(w.balancer, ctx.rev)
|
||||
if rc.isErr:
|
||||
ctx.rollback()
|
||||
return err(rc.error[1])
|
||||
ctx.balancers.add (w, w.balancer)
|
||||
w.balancer = rc.value
|
||||
ok(ctx)
|
||||
|
||||
proc update*(
|
||||
rc: Result[UpdateSiblingsRef,AristoError]
|
||||
): Result[UpdateSiblingsRef,AristoError] =
|
||||
## Variant of `update()` for joining with `init()`
|
||||
(? rc).update()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
||||
|
|
@ -74,7 +74,7 @@ type
|
|||
## Three tier database object supporting distributed instances.
|
||||
top*: LayerRef ## Database working layer, mutable
|
||||
stack*: seq[LayerRef] ## Stashed immutable parent layers
|
||||
balancer*: LayerDeltaRef ## Baland out concurrent backend access
|
||||
balancer*: LayerRef ## Balance out concurrent backend access
|
||||
backend*: BackendRef ## Backend database (may well be `nil`)
|
||||
|
||||
txRef*: AristoTxRef ## Latest active transaction
|
||||
|
@ -152,8 +152,8 @@ func isValid*(nd: NodeRef): bool =
|
|||
func isValid*(pid: PathID): bool =
|
||||
pid != VOID_PATH_ID
|
||||
|
||||
func isValid*(filter: LayerDeltaRef): bool =
|
||||
filter != LayerDeltaRef(nil)
|
||||
func isValid*(layer: LayerRef): bool =
|
||||
layer != LayerRef(nil)
|
||||
|
||||
func isValid*(root: Hash256): bool =
|
||||
root != EMPTY_ROOT_HASH
|
||||
|
@ -250,11 +250,11 @@ proc fork*(
|
|||
if not noTopLayer:
|
||||
clone.top = LayerRef.init()
|
||||
if not db.balancer.isNil:
|
||||
clone.top.delta.vTop = db.balancer.vTop
|
||||
clone.top.vTop = db.balancer.vTop
|
||||
else:
|
||||
let rc = clone.backend.getTuvFn()
|
||||
if rc.isOk:
|
||||
clone.top.delta.vTop = rc.value
|
||||
clone.top.vTop = rc.value
|
||||
elif rc.error != GetTuvNotFound:
|
||||
return err(rc.error)
|
||||
|
||||
|
@ -263,13 +263,19 @@ proc fork*(
|
|||
|
||||
ok clone
|
||||
|
||||
iterator forked*(db: AristoDbRef): AristoDbRef =
|
||||
iterator forked*(db: AristoDbRef): tuple[db: AristoDbRef, isLast: bool] =
|
||||
## Interate over all non centre descriptors (see comments on `reCentre()`
|
||||
## for details.)
|
||||
##
|
||||
## The second `isLast` yielded loop entry is `true` if the yielded tuple
|
||||
## is the last entry in the list.
|
||||
##
|
||||
if not db.dudes.isNil:
|
||||
for dude in db.getCentre.dudes.peers.items:
|
||||
var nLeft = db.dudes.peers.len
|
||||
for dude in db.dudes.peers.items:
|
||||
if dude != db.dudes.centre:
|
||||
yield dude
|
||||
nLeft.dec
|
||||
yield (dude, nLeft == 1)
|
||||
|
||||
func nForked*(db: AristoDbRef): int =
|
||||
## Returns the number of non centre descriptors (see comments on `reCentre()`
|
||||
|
@ -313,12 +319,12 @@ iterator rstack*(db: AristoDbRef): LayerRef =
|
|||
for i in 0..<db.stack.len:
|
||||
yield db.stack[db.stack.len - i - 1]
|
||||
|
||||
proc deltaAtLevel*(db: AristoDbRef, level: int): LayerDeltaRef =
|
||||
proc deltaAtLevel*(db: AristoDbRef, level: int): LayerRef =
|
||||
if level == 0:
|
||||
db.top.delta
|
||||
db.top
|
||||
elif level > 0:
|
||||
doAssert level <= db.stack.len
|
||||
db.stack[^level].delta
|
||||
db.stack[^level]
|
||||
elif level == -1:
|
||||
doAssert db.balancer != nil
|
||||
db.balancer
|
||||
|
|
|
@ -89,7 +89,8 @@ type
|
|||
key*: Hash256 ## Some state hash (if any)
|
||||
serial*: uint64 ## Generic identifier from application
|
||||
|
||||
LayerDeltaRef* = ref object
|
||||
LayerRef* = ref LayerObj
|
||||
LayerObj* = object
|
||||
## Delta layers are stacked implying a tables hierarchy. Table entries on
|
||||
## a higher level take precedence over lower layer table entries. So an
|
||||
## existing key-value table entry of a layer on top supersedes same key
|
||||
|
@ -119,11 +120,6 @@ type
|
|||
accLeaves*: Table[Hash256, VertexRef] ## Account path -> VertexRef
|
||||
stoLeaves*: Table[Hash256, VertexRef] ## Storage path -> VertexRef
|
||||
|
||||
LayerRef* = ref LayerObj
|
||||
LayerObj* = object
|
||||
## Hexary trie database layer structures. Any layer holds the full
|
||||
## change relative to the backend.
|
||||
delta*: LayerDeltaRef ## Most structural tables held as deltas
|
||||
txUid*: uint ## Transaction identifier if positive
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -132,7 +128,7 @@ type
|
|||
|
||||
func init*(T: type LayerRef): T =
|
||||
## Constructor, returns empty layer
|
||||
T(delta: LayerDeltaRef())
|
||||
T()
|
||||
|
||||
func hash*(node: NodeRef): Hash =
|
||||
## Table/KeyedQueue/HashSet mixin
|
||||
|
|
|
@ -51,8 +51,7 @@ proc newAristoRdbDbRef(
|
|||
return err(rc.error)
|
||||
rc.value
|
||||
ok((AristoDbRef(
|
||||
top: LayerRef(
|
||||
delta: LayerDeltaRef(vTop: vTop)),
|
||||
top: LayerRef(vTop: vTop),
|
||||
backend: be), oCfs))
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
|
|
|
@ -30,7 +30,7 @@ func dup(sTab: Table[RootedVertexID,VertexRef]): Table[RootedVertexID,VertexRef]
|
|||
# ------------------------------------------------------------------------------
|
||||
|
||||
func vTop*(db: AristoDbRef): VertexID =
|
||||
db.top.delta.vTop
|
||||
db.top.vTop
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public getters/helpers
|
||||
|
@ -42,7 +42,7 @@ func nLayersVtx*(db: AristoDbRef): int =
|
|||
## layers as there might be duplicate entries for the same vertex ID on
|
||||
## different layers.
|
||||
##
|
||||
db.stack.mapIt(it.delta.sTab.len).foldl(a + b, db.top.delta.sTab.len)
|
||||
db.stack.mapIt(it.sTab.len).foldl(a + b, db.top.sTab.len)
|
||||
|
||||
func nLayersKey*(db: AristoDbRef): int =
|
||||
## Number of vertex ID/key entries on the cache layers. This is an upper
|
||||
|
@ -50,7 +50,7 @@ func nLayersKey*(db: AristoDbRef): int =
|
|||
## layers as there might be duplicate entries for the same vertex ID on
|
||||
## different layers.
|
||||
##
|
||||
db.stack.mapIt(it.delta.kMap.len).foldl(a + b, db.top.delta.kMap.len)
|
||||
db.stack.mapIt(it.kMap.len).foldl(a + b, db.top.kMap.len)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions: getter variants
|
||||
|
@ -60,11 +60,11 @@ func layersGetVtx*(db: AristoDbRef; rvid: RootedVertexID): Opt[(VertexRef, int)]
|
|||
## Find a vertex on the cache layers. An `ok()` result might contain a
|
||||
## `nil` vertex if it is stored on the cache that way.
|
||||
##
|
||||
db.top.delta.sTab.withValue(rvid, item):
|
||||
db.top.sTab.withValue(rvid, item):
|
||||
return Opt.some((item[], 0))
|
||||
|
||||
for i, w in enumerate(db.rstack):
|
||||
w.delta.sTab.withValue(rvid, item):
|
||||
w.sTab.withValue(rvid, item):
|
||||
return Opt.some((item[], i + 1))
|
||||
|
||||
Opt.none((VertexRef, int))
|
||||
|
@ -78,11 +78,11 @@ func layersGetKey*(db: AristoDbRef; rvid: RootedVertexID): Opt[(HashKey, int)] =
|
|||
## Find a hash key on the cache layers. An `ok()` result might contain a void
|
||||
## hash key if it is stored on the cache that way.
|
||||
##
|
||||
db.top.delta.kMap.withValue(rvid, item):
|
||||
db.top.kMap.withValue(rvid, item):
|
||||
return Opt.some((item[], 0))
|
||||
|
||||
for i, w in enumerate(db.rstack):
|
||||
w.delta.kMap.withValue(rvid, item):
|
||||
w.kMap.withValue(rvid, item):
|
||||
return ok((item[], i + 1))
|
||||
|
||||
Opt.none((HashKey, int))
|
||||
|
@ -92,21 +92,21 @@ func layersGetKeyOrVoid*(db: AristoDbRef; rvid: RootedVertexID): HashKey =
|
|||
(db.layersGetKey(rvid).valueOr (VOID_HASH_KEY, 0))[0]
|
||||
|
||||
func layersGetAccLeaf*(db: AristoDbRef; accPath: Hash256): Opt[VertexRef] =
|
||||
db.top.delta.accLeaves.withValue(accPath, item):
|
||||
db.top.accLeaves.withValue(accPath, item):
|
||||
return Opt.some(item[])
|
||||
|
||||
for w in db.rstack:
|
||||
w.delta.accLeaves.withValue(accPath, item):
|
||||
w.accLeaves.withValue(accPath, item):
|
||||
return Opt.some(item[])
|
||||
|
||||
Opt.none(VertexRef)
|
||||
|
||||
func layersGetStoLeaf*(db: AristoDbRef; mixPath: Hash256): Opt[VertexRef] =
|
||||
db.top.delta.stoLeaves.withValue(mixPath, item):
|
||||
db.top.stoLeaves.withValue(mixPath, item):
|
||||
return Opt.some(item[])
|
||||
|
||||
for w in db.rstack:
|
||||
w.delta.stoLeaves.withValue(mixPath, item):
|
||||
w.stoLeaves.withValue(mixPath, item):
|
||||
return Opt.some(item[])
|
||||
|
||||
Opt.none(VertexRef)
|
||||
|
@ -121,7 +121,7 @@ func layersPutVtx*(
|
|||
vtx: VertexRef;
|
||||
) =
|
||||
## Store a (potentally empty) vertex on the top layer
|
||||
db.top.delta.sTab[rvid] = vtx
|
||||
db.top.sTab[rvid] = vtx
|
||||
|
||||
func layersResVtx*(
|
||||
db: AristoDbRef;
|
||||
|
@ -138,7 +138,7 @@ func layersPutKey*(
|
|||
key: HashKey;
|
||||
) =
|
||||
## Store a (potentally void) hash key on the top layer
|
||||
db.top.delta.kMap[rvid] = key
|
||||
db.top.kMap[rvid] = key
|
||||
|
||||
|
||||
func layersResKey*(db: AristoDbRef; rvid: RootedVertexID) =
|
||||
|
@ -157,30 +157,39 @@ proc layersUpdateVtx*(
|
|||
|
||||
|
||||
func layersPutAccLeaf*(db: AristoDbRef; accPath: Hash256; leafVtx: VertexRef) =
|
||||
db.top.delta.accLeaves[accPath] = leafVtx
|
||||
db.top.accLeaves[accPath] = leafVtx
|
||||
|
||||
func layersPutStoLeaf*(db: AristoDbRef; mixPath: Hash256; leafVtx: VertexRef) =
|
||||
db.top.delta.stoLeaves[mixPath] = leafVtx
|
||||
db.top.stoLeaves[mixPath] = leafVtx
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
func isEmpty*(ly: LayerRef): bool =
|
||||
## Returns `true` if the layer does not contain any changes, i.e. all the
|
||||
## tables are empty. The field `txUid` is ignored, here.
|
||||
ly.sTab.len == 0 and
|
||||
ly.kMap.len == 0 and
|
||||
ly.accLeaves.len == 0 and
|
||||
ly.stoLeaves.len == 0
|
||||
|
||||
|
||||
func layersMergeOnto*(src: LayerRef; trg: var LayerObj) =
|
||||
## Merges the argument `src` into the argument `trg` and returns `trg`. For
|
||||
## the result layer, the `txUid` value set to `0`.
|
||||
##
|
||||
trg.txUid = 0
|
||||
|
||||
for (vid,vtx) in src.delta.sTab.pairs:
|
||||
trg.delta.sTab[vid] = vtx
|
||||
for (vid,key) in src.delta.kMap.pairs:
|
||||
trg.delta.kMap[vid] = key
|
||||
trg.delta.vTop = src.delta.vTop
|
||||
for (accPath,leafVtx) in src.delta.accLeaves.pairs:
|
||||
trg.delta.accLeaves[accPath] = leafVtx
|
||||
for (mixPath,leafVtx) in src.delta.stoLeaves.pairs:
|
||||
trg.delta.stoLeaves[mixPath] = leafVtx
|
||||
for (vid,vtx) in src.sTab.pairs:
|
||||
trg.sTab[vid] = vtx
|
||||
for (vid,key) in src.kMap.pairs:
|
||||
trg.kMap[vid] = key
|
||||
trg.vTop = src.vTop
|
||||
for (accPath,leafVtx) in src.accLeaves.pairs:
|
||||
trg.accLeaves[accPath] = leafVtx
|
||||
for (mixPath,leafVtx) in src.stoLeaves.pairs:
|
||||
trg.stoLeaves[mixPath] = leafVtx
|
||||
|
||||
func layersCc*(db: AristoDbRef; level = high(int)): LayerRef =
|
||||
## Provide a collapsed copy of layers up to a particular transaction level.
|
||||
|
@ -192,24 +201,22 @@ func layersCc*(db: AristoDbRef; level = high(int)): LayerRef =
|
|||
|
||||
# Set up initial layer (bottom layer)
|
||||
result = LayerRef(
|
||||
delta: LayerDeltaRef(
|
||||
sTab: layers[0].delta.sTab.dup, # explicit dup for ref values
|
||||
kMap: layers[0].delta.kMap,
|
||||
vTop: layers[^1].delta.vTop,
|
||||
accLeaves: layers[0].delta.accLeaves,
|
||||
stoLeaves: layers[0].delta.stoLeaves,
|
||||
))
|
||||
sTab: layers[0].sTab.dup, # explicit dup for ref values
|
||||
kMap: layers[0].kMap,
|
||||
vTop: layers[^1].vTop,
|
||||
accLeaves: layers[0].accLeaves,
|
||||
stoLeaves: layers[0].stoLeaves)
|
||||
|
||||
# Consecutively merge other layers on top
|
||||
for n in 1 ..< layers.len:
|
||||
for (vid,vtx) in layers[n].delta.sTab.pairs:
|
||||
result.delta.sTab[vid] = vtx
|
||||
for (vid,key) in layers[n].delta.kMap.pairs:
|
||||
result.delta.kMap[vid] = key
|
||||
for (accPath,vtx) in layers[n].delta.accLeaves.pairs:
|
||||
result.delta.accLeaves[accPath] = vtx
|
||||
for (mixPath,vtx) in layers[n].delta.stoLeaves.pairs:
|
||||
result.delta.stoLeaves[mixPath] = vtx
|
||||
for (vid,vtx) in layers[n].sTab.pairs:
|
||||
result.sTab[vid] = vtx
|
||||
for (vid,key) in layers[n].kMap.pairs:
|
||||
result.kMap[vid] = key
|
||||
for (accPath,vtx) in layers[n].accLeaves.pairs:
|
||||
result.accLeaves[accPath] = vtx
|
||||
for (mixPath,vtx) in layers[n].stoLeaves.pairs:
|
||||
result.stoLeaves[mixPath] = vtx
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public iterators
|
||||
|
@ -226,12 +233,12 @@ iterator layersWalkVtx*(
|
|||
## the one with a zero vertex which are othewise skipped by the iterator.
|
||||
## The `seen` argument must not be modified while the iterator is active.
|
||||
##
|
||||
for (rvid,vtx) in db.top.delta.sTab.pairs:
|
||||
for (rvid,vtx) in db.top.sTab.pairs:
|
||||
yield (rvid,vtx)
|
||||
seen.incl rvid.vid
|
||||
|
||||
for w in db.rstack:
|
||||
for (rvid,vtx) in w.delta.sTab.pairs:
|
||||
for (rvid,vtx) in w.sTab.pairs:
|
||||
if rvid.vid notin seen:
|
||||
yield (rvid,vtx)
|
||||
seen.incl rvid.vid
|
||||
|
@ -251,12 +258,12 @@ iterator layersWalkKey*(
|
|||
## Walk over all `(VertexID,HashKey)` pairs on the cache layers. Note that
|
||||
## entries are unsorted.
|
||||
var seen: HashSet[VertexID]
|
||||
for (rvid,key) in db.top.delta.kMap.pairs:
|
||||
for (rvid,key) in db.top.kMap.pairs:
|
||||
yield (rvid,key)
|
||||
seen.incl rvid.vid
|
||||
|
||||
for w in db.rstack:
|
||||
for (rvid,key) in w.delta.kMap.pairs:
|
||||
for (rvid,key) in w.kMap.pairs:
|
||||
if rvid.vid notin seen:
|
||||
yield (rvid,key)
|
||||
seen.incl rvid.vid
|
||||
|
|
|
@ -134,7 +134,7 @@ proc findTx*(
|
|||
|
||||
if db.txRef.isNil:
|
||||
# Try `(vid,key)` on top layer
|
||||
let topKey = db.top.delta.kMap.getOrVoid rvid
|
||||
let topKey = db.top.kMap.getOrVoid rvid
|
||||
if topKey == key:
|
||||
return ok(0)
|
||||
|
||||
|
@ -143,11 +143,11 @@ proc findTx*(
|
|||
for (n,tx,layer,error) in db.txRef.txFrameWalk:
|
||||
if error != AristoError(0):
|
||||
return err(error)
|
||||
if layer.delta.kMap.getOrVoid(rvid) == key:
|
||||
if layer.kMap.getOrVoid(rvid) == key:
|
||||
return ok(n)
|
||||
|
||||
# Try bottom layer
|
||||
let botKey = db.stack[0].delta.kMap.getOrVoid rvid
|
||||
let botKey = db.stack[0].kMap.getOrVoid rvid
|
||||
if botKey == key:
|
||||
return ok(db.stack.len)
|
||||
|
||||
|
|
|
@ -64,8 +64,7 @@ proc txFork*(
|
|||
let stackLayer = block:
|
||||
let rc = db.getTuvBE()
|
||||
if rc.isOk:
|
||||
LayerRef(
|
||||
delta: LayerDeltaRef(vTop: rc.value))
|
||||
LayerRef(vTop: rc.value)
|
||||
elif rc.error == GetTuvNotFound:
|
||||
LayerRef.init()
|
||||
else:
|
||||
|
|
|
@ -14,7 +14,6 @@
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/tables,
|
||||
results,
|
||||
".."/[aristo_desc, aristo_layers]
|
||||
|
||||
|
@ -81,10 +80,10 @@ proc txFrameBegin*(db: AristoDbRef): Result[AristoTxRef,AristoError] =
|
|||
if db.txFrameLevel != db.stack.len:
|
||||
return err(TxStackGarbled)
|
||||
|
||||
let vTop = db.top.delta.vTop
|
||||
let vTop = db.top.vTop
|
||||
db.stack.add db.top
|
||||
db.top = LayerRef(
|
||||
delta: LayerDeltaRef(vTop: vTop),
|
||||
vTop: vTop,
|
||||
txUid: db.getTxUid)
|
||||
|
||||
db.txRef = AristoTxRef(
|
||||
|
@ -123,18 +122,11 @@ proc txFrameCommit*(
|
|||
let db = ? tx.getDbDescFromTopTx()
|
||||
|
||||
# Pop layer from stack and merge database top layer onto it
|
||||
let merged = block:
|
||||
if db.top.delta.sTab.len == 0 and
|
||||
db.top.delta.kMap.len == 0:
|
||||
# Avoid `layersMergeOnto()`
|
||||
db.top.delta = db.stack[^1].delta
|
||||
let merged = db.stack[^1]
|
||||
db.stack.setLen(db.stack.len-1)
|
||||
db.top
|
||||
else:
|
||||
let layer = db.stack[^1]
|
||||
db.stack.setLen(db.stack.len-1)
|
||||
db.top.layersMergeOnto layer[]
|
||||
layer
|
||||
if not db.top.isEmpty():
|
||||
# Only call `layersMergeOnto()` if layer is empty
|
||||
db.top.layersMergeOnto merged[]
|
||||
|
||||
# Install `merged` stack top layer and update stack
|
||||
db.top = merged
|
||||
|
|
|
@ -14,10 +14,25 @@
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/tables,
|
||||
results,
|
||||
../aristo_delta/delta_merge,
|
||||
".."/[aristo_desc, aristo_delta]
|
||||
".."/[aristo_desc, aristo_delta, aristo_layers]
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc txStowOk*(
|
||||
db: AristoDbRef; # Database
|
||||
persistent: bool; # Stage only unless `true`
|
||||
): Result[void,AristoError] =
|
||||
if not db.txRef.isNil:
|
||||
return err(TxPendingTx)
|
||||
if 0 < db.stack.len:
|
||||
return err(TxStackGarbled)
|
||||
if persistent and not db.deltaPersistentOk():
|
||||
return err(TxBackendNotWritable)
|
||||
ok()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
|
@ -30,24 +45,18 @@ proc txStow*(
|
|||
): Result[void,AristoError] =
|
||||
## Worker for `stow()` and `persist()` variants.
|
||||
##
|
||||
if not db.txRef.isNil:
|
||||
return err(TxPendingTx)
|
||||
if 0 < db.stack.len:
|
||||
return err(TxStackGarbled)
|
||||
if persistent and not db.deltaPersistentOk():
|
||||
return err(TxBackendNotWritable)
|
||||
? db.txStowOk persistent
|
||||
|
||||
if db.top.delta.sTab.len != 0 or
|
||||
db.top.delta.kMap.len != 0 or
|
||||
db.top.delta.accLeaves.len != 0 or
|
||||
db.top.delta.stoLeaves.len != 0:
|
||||
|
||||
# Note that `deltaMerge()` will return the 1st argument if the 2nd is `nil`
|
||||
db.balancer = db.deltaMerge(db.top.delta, db.balancer).valueOr:
|
||||
return err(error[1])
|
||||
if not db.top.isEmpty():
|
||||
# Note that `deltaMerge()` will return the `db.top` argument if the
|
||||
# `db.balancer` is `nil`. Also, the `db.balancer` is read-only. In the
|
||||
# case that there are no forked peers one can ignore that restriction as
|
||||
# no balancer is shared.
|
||||
db.balancer = deltaMerge(
|
||||
db.top, modUpperOk = true, db.balancer, modLowerOk = db.nForked()==0)
|
||||
|
||||
# New empty top layer
|
||||
db.top = LayerRef(delta: LayerDeltaRef(vTop: db.balancer.vTop))
|
||||
db.top = LayerRef(vTop: db.balancer.vTop)
|
||||
|
||||
if persistent:
|
||||
# Merge/move `balancer` into persistent tables (unless missing)
|
||||
|
|
|
@ -24,17 +24,16 @@ import
|
|||
proc vidFetch*(db: AristoDbRef): VertexID =
|
||||
## Fetch next vertex ID.
|
||||
##
|
||||
if db.top.delta.vTop == 0:
|
||||
db.top.delta.vTop = VertexID(LEAST_FREE_VID)
|
||||
if db.top.vTop == 0:
|
||||
db.top.vTop = VertexID(LEAST_FREE_VID)
|
||||
else:
|
||||
db.top.delta.vTop.inc
|
||||
db.top.delta.vTop
|
||||
db.top.vTop.inc
|
||||
db.top.vTop
|
||||
|
||||
proc vidDispose*(db: AristoDbRef; vid: VertexID) =
|
||||
## Only top vertexIDs are disposed
|
||||
if vid == db.top.delta.vTop and
|
||||
LEAST_FREE_VID < db.top.delta.vTop.distinctBase:
|
||||
db.top.delta.vTop.dec
|
||||
if vid == db.top.vTop and LEAST_FREE_VID < db.top.vTop.distinctBase:
|
||||
db.top.vTop.dec
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
|
|
@ -23,12 +23,12 @@ iterator walkVtxBeImpl*[T](
|
|||
): tuple[rvid: RootedVertexID, vtx: VertexRef] =
|
||||
## Generic iterator
|
||||
when T is VoidBackendRef:
|
||||
let filter = if db.balancer.isNil: LayerDeltaRef() else: db.balancer
|
||||
let filter = if db.balancer.isNil: LayerRef() else: db.balancer
|
||||
|
||||
else:
|
||||
mixin walkVtx
|
||||
|
||||
let filter = LayerDeltaRef()
|
||||
let filter = LayerRef()
|
||||
if not db.balancer.isNil:
|
||||
filter.sTab = db.balancer.sTab # copy table
|
||||
|
||||
|
@ -52,12 +52,12 @@ iterator walkKeyBeImpl*[T](
|
|||
): tuple[rvid: RootedVertexID, key: HashKey] =
|
||||
## Generic iterator
|
||||
when T is VoidBackendRef:
|
||||
let filter = if db.balancer.isNil: LayerDeltaRef() else: db.balancer
|
||||
let filter = if db.balancer.isNil: LayerRef() else: db.balancer
|
||||
|
||||
else:
|
||||
mixin walkKey
|
||||
|
||||
let filter = LayerDeltaRef()
|
||||
let filter = LayerRef()
|
||||
if not db.balancer.isNil:
|
||||
filter.kMap = db.balancer.kMap # copy table
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
# nimbus-eth1
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
|
@ -134,11 +134,11 @@ proc ppBe[T](be: T; db: KvtDbRef; indent: int): string =
|
|||
|
||||
proc ppLayer(layer: LayerRef; db: KvtDbRef; indent = 4): string =
|
||||
let
|
||||
tLen = layer.delta.sTab.len
|
||||
tLen = layer.sTab.len
|
||||
info = "tab(" & $tLen & ")"
|
||||
pfx1 = indent.toPfx(1)
|
||||
pfx2 = if 0 < tLen: indent.toPfx(2) else: " "
|
||||
"<layer>" & pfx1 & info & pfx2 & layer.delta.sTab.ppTab(db,indent+2)
|
||||
"<layer>" & pfx1 & info & pfx2 & layer.sTab.ppTab(db,indent+2)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
|
@ -152,7 +152,7 @@ proc pp*(
|
|||
case be.kind:
|
||||
of BackendMemory:
|
||||
result &= be.MemBackendRef.ppBe(db, indent)
|
||||
of BackendRocksDB:
|
||||
of BackendRocksDB,BackendRdbTriggered:
|
||||
result &= be.RdbBackendRef.ppBe(db, indent)
|
||||
of BackendVoid:
|
||||
result &= "<NoBackend>"
|
||||
|
|
|
@ -23,23 +23,12 @@ import
|
|||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc deltaMerge*(
|
||||
db: KvtDbRef; # Database
|
||||
delta: LayerDeltaRef; # Filter to apply to database
|
||||
) =
|
||||
## Merge the argument `delta` into the balancer filter layer. Note that
|
||||
## this function has no control of the filter source. Having merged the
|
||||
## argument `delta`, all the `top` and `stack` layers should be cleared.
|
||||
##
|
||||
db.merge(delta, db.balancer)
|
||||
|
||||
|
||||
proc deltaUpdateOk*(db: KvtDbRef): bool =
|
||||
proc deltaPersistentOk*(db: KvtDbRef): bool =
|
||||
## Check whether the balancer filter can be merged into the backend
|
||||
not db.backend.isNil and db.isCentre
|
||||
|
||||
|
||||
proc deltaUpdate*(
|
||||
proc deltaPersistent*(
|
||||
db: KvtDbRef; # Database
|
||||
reCentreOk = false;
|
||||
): Result[void,KvtError] =
|
||||
|
@ -71,17 +60,30 @@ proc deltaUpdate*(
|
|||
# Always re-centre to `parent` (in case `reCentreOk` was set)
|
||||
defer: discard parent.reCentre()
|
||||
|
||||
# Update forked balancers here do that errors are detected early (if any.)
|
||||
if 0 < db.nForked:
|
||||
let rev = db.revFilter(db.balancer).valueOr:
|
||||
return err(error[1])
|
||||
if 0 < rev.sTab.len: # Can an empty `rev` happen at all?
|
||||
var unsharedRevOk = true
|
||||
for w in db.forked:
|
||||
if not w.db.balancer.isValid:
|
||||
unsharedRevOk = false
|
||||
# The `rev` filter can be modified if one can make sure that it is
|
||||
# not shared (i.e. only previously merged into the w.db.balancer.)
|
||||
# Note that it is trivially true for a single fork.
|
||||
let modLowerOk = w.isLast and unsharedRevOk
|
||||
w.db.balancer = deltaMerge(
|
||||
w.db.balancer, modUpperOk=false, rev, modLowerOk=modLowerOk)
|
||||
|
||||
# Store structural single trie entries
|
||||
let writeBatch = ? be.putBegFn()
|
||||
be.putKvpFn(writeBatch, db.balancer.sTab.pairs.toSeq)
|
||||
? be.putEndFn writeBatch
|
||||
|
||||
# Update peer filter balance.
|
||||
let rev = db.deltaReverse db.balancer
|
||||
for w in db.forked:
|
||||
db.merge(rev, w.balancer)
|
||||
# Done with balancer, all saved to backend
|
||||
db.balancer = LayerRef(nil)
|
||||
|
||||
db.balancer = LayerDeltaRef(nil)
|
||||
ok()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
|
|
|
@ -10,37 +10,61 @@
|
|||
|
||||
import
|
||||
std/tables,
|
||||
results,
|
||||
".."/[kvt_desc, kvt_utils]
|
||||
eth/common,
|
||||
../kvt_desc
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc layersMergeOnto(src: LayerRef; trg: var LayerObj) =
|
||||
for (key,val) in src.sTab.pairs:
|
||||
trg.sTab[key] = val
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc merge*(
|
||||
db: KvtDbRef; # Database
|
||||
upper: LayerDeltaRef; # Filter to apply onto `lower`
|
||||
lower: var LayerDeltaRef; # Target filter, will be modified
|
||||
) =
|
||||
## Merge the argument filter `upper` onto the argument filter `lower`
|
||||
## relative to the *unfiltered* backend database on `db.backened`. The `lower`
|
||||
## filter argument will have been modified.
|
||||
proc deltaMerge*(
|
||||
upper: LayerRef; # Think of `top`, `nil` is ok
|
||||
modUpperOk: bool; # May re-use/modify `upper`
|
||||
lower: LayerRef; # Think of `balancer`, `nil` is ok
|
||||
modLowerOk: bool; # May re-use/modify `lower`
|
||||
): LayerRef =
|
||||
## Merge argument `upper` into the `lower` filter instance.
|
||||
##
|
||||
## In case that the argument `lower` is `nil`, it will be replaced by `upper`.
|
||||
## Note that the namimg `upper` and `lower` indicate that the filters are
|
||||
## stacked and the database access is `upper -> lower -> backend`.
|
||||
##
|
||||
if lower.isNil:
|
||||
lower = upper
|
||||
elif not upper.isNil:
|
||||
for (key,val) in upper.sTab.pairs:
|
||||
if val.isValid or not lower.sTab.hasKey key:
|
||||
lower.sTab[key] = val
|
||||
elif lower.sTab.getOrVoid(key).isValid:
|
||||
let rc = db.getUbe key
|
||||
if rc.isOk:
|
||||
lower.sTab[key] = val # empty blob
|
||||
# Degenerate case: `upper` is void
|
||||
result = upper
|
||||
|
||||
elif upper.isNil:
|
||||
# Degenerate case: `lower` is void
|
||||
result = lower
|
||||
|
||||
elif modLowerOk:
|
||||
# Can modify `lower` which is the prefered action mode but applies only
|
||||
# in cases where the `lower` argument is not shared.
|
||||
layersMergeOnto(upper, lower[])
|
||||
result = lower
|
||||
|
||||
elif not modUpperOk:
|
||||
# Cannot modify any argument layers.
|
||||
result = LayerRef(sTab: lower.sTab)
|
||||
layersMergeOnto(upper, result[])
|
||||
|
||||
else:
|
||||
doAssert rc.error == GetNotFound
|
||||
lower.sTab.del key # no need to keep that in merged filter
|
||||
# Otherwise avoid copying some tables by modifyinh `upper`. This is not
|
||||
# completely free as the merge direction changes to merging the `lower`
|
||||
# layer up into the higher prioritised `upper` layer (note that the `lower`
|
||||
# argument filter is read-only.) Here again, the `upper` argument must not
|
||||
# be a shared layer/filter.
|
||||
for (key,val) in lower.sTab.pairs:
|
||||
if not upper.sTab.hasKey(key):
|
||||
upper.sTab[key] = val
|
||||
result = upper
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
|
||||
import
|
||||
std/tables,
|
||||
eth/common,
|
||||
results,
|
||||
".."/[kvt_desc, kvt_utils]
|
||||
|
||||
|
@ -17,27 +18,29 @@ import
|
|||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc deltaReverse*(
|
||||
proc revFilter*(
|
||||
db: KvtDbRef; # Database
|
||||
delta: LayerDeltaRef; # Filter to revert
|
||||
): LayerDeltaRef =
|
||||
## Assemble a reverse filter for the `delta` argument, i.e. changes to the
|
||||
## backend that reverse the effect of applying this to the balancer filter.
|
||||
## The resulting filter is calculated against the current *unfiltered*
|
||||
## backend (excluding optionally installed balancer filters.)
|
||||
filter: LayerRef; # Filter to revert
|
||||
): Result[LayerRef,(Blob,KvtError)] =
|
||||
## Assemble reverse filter for the `filter` argument, i.e. changes to the
|
||||
## backend that reverse the effect of applying the this read-only filter.
|
||||
##
|
||||
## If `delta` is `nil`, the result will be `nil` as well.
|
||||
if not delta.isNil:
|
||||
result = LayerDeltaRef()
|
||||
## This read-only filter is calculated against the current unfiltered
|
||||
## backend (excluding optionally installed read-only filter.)
|
||||
##
|
||||
let rev = LayerRef()
|
||||
|
||||
# Calculate reverse changes for the `sTab[]` structural table
|
||||
for key in delta.sTab.keys:
|
||||
for key in filter.sTab.keys:
|
||||
let rc = db.getUbe key
|
||||
if rc.isOk:
|
||||
result.sTab[key] = rc.value
|
||||
rev.sTab[key] = rc.value
|
||||
elif rc.error == GetNotFound:
|
||||
rev.sTab[key] = EmptyBlob
|
||||
else:
|
||||
doAssert rc.error == GetNotFound
|
||||
result.sTab[key] = EmptyBlob
|
||||
return err((key,rc.error))
|
||||
|
||||
ok(rev)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
|
|
@ -46,7 +46,7 @@ type
|
|||
## Three tier database object supporting distributed instances.
|
||||
top*: LayerRef ## Database working layer, mutable
|
||||
stack*: seq[LayerRef] ## Stashed immutable parent layers
|
||||
balancer*: LayerDeltaRef ## Apply read filter (locks writing)
|
||||
balancer*: LayerRef ## Balance out concurrent backend access
|
||||
backend*: BackendRef ## Backend database (may well be `nil`)
|
||||
|
||||
txRef*: KvtTxRef ## Latest active transaction
|
||||
|
@ -82,6 +82,9 @@ func getOrVoid*(tab: Table[Blob,Blob]; w: Blob): Blob =
|
|||
func isValid*(key: Blob): bool =
|
||||
key != EmptyBlob
|
||||
|
||||
func isValid*(layer: LayerRef): bool =
|
||||
layer != LayerRef(nil)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions, miscellaneous
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -162,13 +165,18 @@ proc fork*(
|
|||
|
||||
ok clone
|
||||
|
||||
iterator forked*(db: KvtDbRef): KvtDbRef =
|
||||
iterator forked*(db: KvtDbRef): tuple[db: KvtDbRef, isLast: bool] =
|
||||
## Interate over all non centre descriptors (see comments on `reCentre()`
|
||||
## for details.)
|
||||
##
|
||||
## The second `isLast` yielded loop entry is `true` if the yielded tuple
|
||||
## is the last entry in the list.
|
||||
if not db.dudes.isNil:
|
||||
var nLeft = db.dudes.peers.len
|
||||
for dude in db.dudes.peers.items:
|
||||
if dude != db.dudes.centre:
|
||||
yield dude
|
||||
nLeft.dec
|
||||
yield (dude, nLeft == 1)
|
||||
|
||||
func nForked*(db: KvtDbRef): int =
|
||||
## Returns the number of non centre descriptors (see comments on `reCentre()`
|
||||
|
|
|
@ -18,14 +18,11 @@ import
|
|||
eth/common
|
||||
|
||||
type
|
||||
LayerDeltaRef* = ref object
|
||||
## Delta tables relative to previous layer
|
||||
sTab*: Table[Blob,Blob] ## Structural data table
|
||||
|
||||
LayerRef* = ref object
|
||||
LayerRef* = ref LayerObj
|
||||
LayerObj* = object
|
||||
## Kvt database layer structures. Any layer holds the full
|
||||
## change relative to the backend.
|
||||
delta*: LayerDeltaRef ## Structural tables held as deltas
|
||||
sTab*: Table[Blob,Blob] ## Structural data table
|
||||
txUid*: uint ## Transaction identifier if positive
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -34,11 +31,11 @@ type
|
|||
|
||||
func init*(T: type LayerRef): T =
|
||||
## Constructor, returns empty layer
|
||||
T(delta: LayerDeltaRef())
|
||||
T()
|
||||
|
||||
func dup*(ly: LayerDeltaRef): LayerDeltaRef =
|
||||
func dup*(ly: LayerRef): LayerRef =
|
||||
## Duplicate/copy
|
||||
LayerDeltaRef(sTab: ly.sTab)
|
||||
LayerRef(sTab: ly.sTab)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
|
|
@ -25,7 +25,7 @@ func nLayersKeys*(db: KvtDbRef): int =
|
|||
## bound for the number of effective key/value mappings held on the cache
|
||||
## layers as there might be duplicate entries for the same key on different
|
||||
## layers.
|
||||
db.stack.mapIt(it.delta.sTab.len).foldl(a + b, db.top.delta.sTab.len)
|
||||
db.stack.mapIt(it.sTab.len).foldl(a + b, db.top.sTab.len)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions: get function
|
||||
|
@ -37,11 +37,11 @@ func layersLen*(db: KvtDbRef; key: openArray[byte]|seq[byte]): Opt[int] =
|
|||
when key isnot seq[byte]:
|
||||
let key = @key
|
||||
|
||||
db.top.delta.sTab.withValue(key, item):
|
||||
db.top.sTab.withValue(key, item):
|
||||
return Opt.some(item[].len())
|
||||
|
||||
for w in db.rstack:
|
||||
w.delta.sTab.withValue(key, item):
|
||||
w.sTab.withValue(key, item):
|
||||
return Opt.some(item[].len())
|
||||
|
||||
Opt.none(int)
|
||||
|
@ -58,11 +58,11 @@ func layersGet*(db: KvtDbRef; key: openArray[byte]|seq[byte]): Opt[Blob] =
|
|||
when key isnot seq[byte]:
|
||||
let key = @key
|
||||
|
||||
db.top.delta.sTab.withValue(key, item):
|
||||
db.top.sTab.withValue(key, item):
|
||||
return Opt.some(item[])
|
||||
|
||||
for w in db.rstack:
|
||||
w.delta.sTab.withValue(key, item):
|
||||
w.sTab.withValue(key, item):
|
||||
return Opt.some(item[])
|
||||
|
||||
Opt.none(Blob)
|
||||
|
@ -73,7 +73,7 @@ func layersGet*(db: KvtDbRef; key: openArray[byte]|seq[byte]): Opt[Blob] =
|
|||
|
||||
func layersPut*(db: KvtDbRef; key: openArray[byte]; data: openArray[byte]) =
|
||||
## Store a (potentally empty) value on the top layer
|
||||
db.top.delta.sTab[@key] = @data
|
||||
db.top.sTab[@key] = @data
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
|
@ -87,12 +87,12 @@ func layersCc*(db: KvtDbRef; level = high(int)): LayerRef =
|
|||
else: db.stack[0 .. level]
|
||||
|
||||
# Set up initial layer (bottom layer)
|
||||
result = LayerRef(delta: LayerDeltaRef(sTab: layers[0].delta.sTab))
|
||||
result = LayerRef(sTab: layers[0].sTab)
|
||||
|
||||
# Consecutively merge other layers on top
|
||||
for n in 1 ..< layers.len:
|
||||
for (key,val) in layers[n].delta.sTab.pairs:
|
||||
result.delta.sTab[key] = val
|
||||
for (key,val) in layers[n].sTab.pairs:
|
||||
result.sTab[key] = val
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public iterators
|
||||
|
@ -109,12 +109,12 @@ iterator layersWalk*(
|
|||
## the one with a zero vertex which are othewise skipped by the iterator.
|
||||
## The `seen` argument must not be modified while the iterator is active.
|
||||
##
|
||||
for (key,val) in db.top.delta.sTab.pairs:
|
||||
for (key,val) in db.top.sTab.pairs:
|
||||
yield (key,val)
|
||||
seen.incl key
|
||||
|
||||
for w in db.rstack:
|
||||
for (key,val) in w.delta.sTab.pairs:
|
||||
for (key,val) in w.sTab.pairs:
|
||||
if key notin seen:
|
||||
yield (key,val)
|
||||
seen.incl key
|
||||
|
|
|
@ -83,7 +83,6 @@ proc txFrameBegin*(db: KvtDbRef): Result[KvtTxRef,KvtError] =
|
|||
|
||||
db.stack.add db.top
|
||||
db.top = LayerRef(
|
||||
delta: LayerDeltaRef(),
|
||||
txUid: db.getTxUid)
|
||||
db.txRef = KvtTxRef(
|
||||
db: db,
|
||||
|
@ -122,8 +121,8 @@ proc txFrameCommit*(
|
|||
|
||||
# Replace the top two layers by its merged version
|
||||
let merged = db.stack[^1]
|
||||
for (key,val) in db.top.delta.sTab.pairs:
|
||||
merged.delta.sTab[key] = val
|
||||
for (key,val) in db.top.sTab.pairs:
|
||||
merged.sTab[key] = val
|
||||
|
||||
# Install `merged` layer
|
||||
db.top = merged
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
import
|
||||
std/tables,
|
||||
results,
|
||||
../kvt_delta/delta_merge,
|
||||
".."/[kvt_desc, kvt_delta]
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -31,10 +32,8 @@ proc txStowOk*(
|
|||
return err(TxPendingTx)
|
||||
if 0 < db.stack.len:
|
||||
return err(TxStackGarbled)
|
||||
|
||||
if persistent and not db.deltaUpdateOk():
|
||||
if persistent and not db.deltaPersistentOk():
|
||||
return err(TxBackendNotWritable)
|
||||
|
||||
ok()
|
||||
|
||||
proc txStow*(
|
||||
|
@ -49,13 +48,20 @@ proc txStow*(
|
|||
##
|
||||
? db.txStowOk persistent
|
||||
|
||||
if 0 < db.top.delta.sTab.len:
|
||||
db.deltaMerge db.top.delta
|
||||
db.top.delta = LayerDeltaRef()
|
||||
if 0 < db.top.sTab.len:
|
||||
# Note that `deltaMerge()` will return the `db.top` argument if the
|
||||
# `db.balancer` is `nil`. Also, the `db.balancer` is read-only. In the
|
||||
# case that there are no forked peers one can ignore that restriction as
|
||||
# no balancer is shared.
|
||||
db.balancer = deltaMerge(
|
||||
db.top, modUpperOk = true, db.balancer, modLowerOk = db.nForked()==0)
|
||||
|
||||
# New empty top layer
|
||||
db.top = LayerRef()
|
||||
|
||||
if persistent:
|
||||
# Move `balancer` data into persistent tables
|
||||
? db.deltaUpdate()
|
||||
? db.deltaPersistent()
|
||||
|
||||
ok()
|
||||
|
||||
|
|
|
@ -149,7 +149,7 @@ proc cleanUp(dx: var DbTriplet) =
|
|||
|
||||
# ----------------------
|
||||
|
||||
proc isDbEq(a, b: LayerDeltaRef; db: AristoDbRef; noisy = true): bool =
|
||||
proc isDbEq(a, b: LayerRef; db: AristoDbRef; noisy = true): bool =
|
||||
## Verify that argument filter `a` has the same effect on the
|
||||
## physical/unfiltered backend of `db` as argument filter `b`.
|
||||
if a.isNil:
|
||||
|
@ -255,8 +255,8 @@ proc testBalancer*(
|
|||
# Resulting clause (11) filters from `aristo/README.md` example
|
||||
# which will be used in the second part of the tests
|
||||
var
|
||||
c11Filter1 = LayerDeltaRef(nil)
|
||||
c11Filter3 = LayerDeltaRef(nil)
|
||||
c11Filter1 = LayerRef(nil)
|
||||
c11Filter3 = LayerRef(nil)
|
||||
|
||||
# Work through clauses (8)..(11) from `aristo/README.md` example
|
||||
block:
|
||||
|
@ -278,14 +278,14 @@ proc testBalancer*(
|
|||
block:
|
||||
let rc = db1.persist()
|
||||
xCheckRc rc.error == 0
|
||||
xCheck db1.balancer == LayerDeltaRef(nil)
|
||||
xCheck db1.balancer == LayerRef(nil)
|
||||
xCheck db2.balancer == db3.balancer
|
||||
|
||||
block:
|
||||
let rc = db2.stow() # non-persistent
|
||||
xCheckRc rc.error == 0:
|
||||
noisy.say "*** testDistributedAccess (3)", "n=", n, "db2".dump db2
|
||||
xCheck db1.balancer == LayerDeltaRef(nil)
|
||||
xCheck db1.balancer == LayerRef(nil)
|
||||
xCheck db2.balancer != db3.balancer
|
||||
|
||||
# Clause (11) from `aristo/README.md` example
|
||||
|
@ -293,7 +293,7 @@ proc testBalancer*(
|
|||
block:
|
||||
let rc = db2.persist()
|
||||
xCheckRc rc.error == 0
|
||||
xCheck db2.balancer == LayerDeltaRef(nil)
|
||||
xCheck db2.balancer == LayerRef(nil)
|
||||
|
||||
# Check/verify backends
|
||||
block:
|
||||
|
@ -326,7 +326,7 @@ proc testBalancer*(
|
|||
block:
|
||||
let rc = db2.persist()
|
||||
xCheckRc rc.error == 0
|
||||
xCheck db2.balancer == LayerDeltaRef(nil)
|
||||
xCheck db2.balancer == LayerRef(nil)
|
||||
xCheck db1.balancer == db3.balancer
|
||||
|
||||
# Clause (13) from `aristo/README.md` example
|
||||
|
|
Loading…
Reference in New Issue