Aristo balancer clean up (#2501)

* Remove `chunkedMpt` from `persistent()`/`stow()` function

why:
  Proof-mode code was removed with PR #2445 and needs to be re-designed.

* Remove unused `beStateRoot` argument from `deltaMerge()`

* Update/drastically simplify `txStow()`

why:
  Got rid of many boundary conditions

details:
  Many pre-conditions have changed. In particular, previous versions
  used the account state (hash) which was conveniently available and
  checked it against the backend in order to find out whether there
  was something to do, at all. Currently, only an empty set of all
  tables in the delta layer has the balancer update ignored.

  Notable changes are:
  * no check against account state (see above)
  * balancer filters have no hash signature (some legacy stuff left over
    from journals)
  * no (shap sync) proof data which made the generation of the a top layer
    more complex

* Cosmetics, cruft removal

* Update unit test file & function name

why:
  Was legacy module
This commit is contained in:
Jordan Hrycaj 2024-07-17 19:27:33 +00:00 committed by GitHub
parent 51cf991439
commit 6677f57ea9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 52 additions and 213 deletions

View File

@ -342,7 +342,6 @@ type
AristoApiPersistFn* =
proc(db: AristoDbRef;
nxtSid = 0u64;
chunkedMpt = false;
): Result[void,AristoError]
{.noRaise.}
## Persistently store data onto backend database. If the system is
@ -356,11 +355,6 @@ type
## database and the staged data area is cleared.
##
## The argument `nxtSid` will be the ID for the next saved state record.
##
## Staging the top layer cache might fail with a partial MPT when it is
## set up from partial MPT chunks as it happens with `snap` sync
## processing. In this case, the `chunkedMpt` argument must be set
## `true` (see alse `fwdFilter()`.)
AristoApiReCentreFn* =
proc(db: AristoDbRef;

View File

@ -301,7 +301,7 @@ proc ppFilter(
pfx = indent.toPfx
pfx1 = indent.toPfx(1)
pfx2 = indent.toPfx(2)
result = "<filter>"
result = "<balancer>"
if fl.isNil:
result &= " n/a"
return

View File

@ -20,9 +20,8 @@ import
proc deltaMerge*(
db: AristoDbRef;
upper: LayerDeltaRef; # Src filter, `nil` is ok
upper: LayerDeltaRef; # new filter, `nil` is ok
lower: LayerDeltaRef; # Trg filter, `nil` is ok
beStateRoot: HashKey; # Merkle hash key
): Result[LayerDeltaRef,(VertexID,AristoError)] =
## Merge argument `upper` into the `lower` filter instance.
##
@ -71,6 +70,12 @@ proc deltaMerge*(
else:
return err((rvid.vid,rc.error))
for (accPath,leafVtx) in upper.accLeaves.pairs:
newFilter.accLeaves[accPath] = leafVtx
for (mixPath,leafVtx) in upper.stoLeaves.pairs:
newFilter.stoLeaves[mixPath] = leafVtx
ok newFilter
# ------------------------------------------------------------------------------

View File

@ -95,14 +95,17 @@ proc update*(ctx: UpdateSiblingsRef): Result[UpdateSiblingsRef,AristoError] =
# Update distributed filters. Note that the physical backend database
# must not have been updated, yet. So the new root key for the backend
# will be `db.balancer.kMap[$1]`.
let trg = db.balancer.kMap.getOrVoid((VertexID(1), VertexID(1)))
for w in db.forked:
let rc = db.deltaMerge(w.balancer, ctx.rev, trg)
if rc.isErr:
ctx.rollback()
return err(rc.error[1])
ctx.balancers.add (w, w.balancer)
w.balancer = rc.value
if w.balancer.isNil:
# Sharing the `ctx.rev` ref is safe as it is read-inly
w.balancer = ctx.rev
else:
let rc = db.deltaMerge(w.balancer, ctx.rev)
if rc.isErr:
ctx.rollback()
return err(rc.error[1])
ctx.balancers.add (w, w.balancer)
w.balancer = rc.value
ok(ctx)
proc update*(

View File

@ -88,11 +88,7 @@ type
# Functions from `aristo_delta.nim`
FilBackendMissing
FilBackendRoMode
#FilNilFilterRejected
FilSiblingsCommitUnfinshed
#FilSrcTrgInconsistent
#FilStateRootMismatch
#FilTrgSrcMismatch
# Fetch functions from `aristo_fetch.nim`
@ -188,8 +184,6 @@ type
TxNotFound
TxNotTopTx
TxPendingTx
TxPrettyPointlessLayer
TxStackGarbled
TxStateRootMismatch
# End

View File

@ -221,7 +221,6 @@ proc collapse*(
proc persist*(
db: AristoDbRef; # Database
nxtSid = 0u64; # Next state ID (aka block number)
chunkedMpt = false; # Partial data (e.g. from `snap`)
): Result[void,AristoError] =
## Persistently store data onto backend database. If the system is running
## without a database backend, the function returns immediately with an
@ -234,20 +233,14 @@ proc persist*(
## and the staged data area is cleared. Wile performing this last step,
## the recovery journal is updated (if available.)
##
## If the argument `nxtFid` is passed non-zero, it will be the ID for the
## If the argument `nxtSid` is passed non-zero, it will be the ID for the
## next recovery journal record. If non-zero, this ID must be greater than
## all previous IDs (e.g. block number when stowing after block execution.)
##
## Staging the top layer cache might fail with a partial MPT when it is
## set up from partial MPT chunks as it happens with `snap` sync processing.
## In this case, the `chunkedMpt` argument must be set `true` (see alse
## `fwdFilter()`.)
##
db.txStow(nxtSid, persistent=true, chunkedMpt=chunkedMpt)
db.txStow(nxtSid, persistent=true)
proc stow*(
db: AristoDbRef; # Database
chunkedMpt = false; # Partial data (e.g. from `snap`)
): Result[void,AristoError] =
## This function is similar to `persist()` stopping short of performing the
## final step storing on the persistent database. It fails if there is a
@ -257,12 +250,7 @@ proc stow*(
## backend stage area and leaves it there. This function can be seen as
## a sort of a bottom level transaction `commit()`.
##
## Staging the top layer cache might fail with a partial MPT when it is
## set up from partial MPT chunks as it happens with `snap` sync processing.
## In this case, the `chunkedMpt` argument must be set `true` (see alse
## `fwdFilter()`.)
##
db.txStow(nxtSid=0u64, persistent=false, chunkedMpt=chunkedMpt)
db.txStow(nxtSid=0u64, persistent=false)
# ------------------------------------------------------------------------------
# End

View File

@ -17,66 +17,7 @@ import
std/tables,
results,
../aristo_delta/delta_merge,
".."/[aristo_desc, aristo_get, aristo_delta, aristo_layers]
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc getBeStateRoot(
db: AristoDbRef;
chunkedMpt: bool;
): Result[HashKey,AristoError] =
## Get the Merkle hash key for the current backend state root and check
## validity of top layer.
const rvid = (VertexID(1), VertexID(1))
let srcRoot = block:
let rc = db.getKeyBE rvid
if rc.isOk:
rc.value
elif rc.error == GetKeyNotFound:
VOID_HASH_KEY
else:
return err(rc.error)
if db.top.delta.kMap.getOrVoid(rvid).isValid:
return ok(srcRoot)
elif not db.top.delta.kMap.hasKey(rvid) and
not db.top.delta.sTab.hasKey(rvid):
# This layer is unusable, need both: vertex and key
return err(TxPrettyPointlessLayer)
ok(srcRoot)
#elif not db.top.delta.sTab.getOrVoid(VertexID 1).isValid:
# # Root key and vertex have been deleted
# return ok(srcRoot)
#
#elif chunkedMpt and srcRoot == db.top.delta.kMap.getOrVoid VertexID(1):
# # FIXME: this one needs to be double checked with `snap` sunc preload
# return ok(srcRoot)
#
#err(TxStateRootMismatch)
proc topMerge(db: AristoDbRef; src: HashKey): Result[void,AristoError] =
## Merge the `top` layer into the read-only balacer layer.
let ubeRoot = block:
const rvid = (VertexID(1), VertexID(1))
let rc = db.getKeyUbe rvid
if rc.isOk:
rc.value
elif rc.error == GetKeyNotFound:
VOID_HASH_KEY
else:
return err(rc.error)
# This one will return the `db.top.delta` if `db.balancer.isNil`
db.balancer = db.deltaMerge(db.top.delta, db.balancer, ubeRoot).valueOr:
return err(error[1])
ok()
".."/[aristo_desc, aristo_delta]
# ------------------------------------------------------------------------------
# Public functions
@ -86,7 +27,6 @@ proc txStow*(
db: AristoDbRef; # Database
nxtSid: uint64; # Next state ID (aka block number)
persistent: bool; # Stage only unless `true`
chunkedMpt: bool; # Partial data (e.g. from `snap`)
): Result[void,AristoError] =
## Worker for `stow()` and `persist()` variants.
##
@ -97,43 +37,22 @@ proc txStow*(
if persistent and not db.deltaPersistentOk():
return err(TxBackendNotWritable)
# Verify database consistency and get `src` field for update
let rc = db.getBeStateRoot chunkedMpt
if rc.isErr and rc.error != TxPrettyPointlessLayer:
return err(rc.error)
if db.top.delta.sTab.len != 0 or
db.top.delta.kMap.len != 0 or
db.top.delta.accLeaves.len != 0 or
db.top.delta.stoLeaves.len != 0:
# Move/merge/install `top` layer onto `balancer`
if rc.isOk:
db.topMerge(rc.value).isOkOr:
return err(error)
# Note that `deltaMerge()` will return the 1st argument if the 2nd is `nil`
db.balancer = db.deltaMerge(db.top.delta, db.balancer).valueOr:
return err(error[1])
# New empty top layer (probably with `snap` proofs and `vTop` carry over)
db.top = LayerRef(
delta: LayerDeltaRef())
if db.balancer.isValid:
db.top.delta.vTop = db.balancer.vTop
else:
let rc = db.getTuvUbe()
if rc.isOk:
db.top.delta.vTop = rc.value
else:
# It is OK if there was no `vTop`. Otherwise something serious happened
# and there is no way to recover easily.
doAssert rc.error == GetTuvNotFound
elif db.top.delta.sTab.len != 0 and
not db.top.delta.sTab.getOrVoid((VertexID(1), VertexID(1))).isValid:
# Currently, a `VertexID(1)` root node is required
return err(TxAccRootMissing)
# New empty top layer
db.top = LayerRef(delta: LayerDeltaRef(vTop: db.balancer.vTop))
if persistent:
# Merge/move `balancer` into persistent tables
# Merge/move `balancer` into persistent tables (unless missing)
? db.deltaPersistent nxtSid
# New empty top layer (probably with `snap` proofs carry over)
db.top = LayerRef(
delta: LayerDeltaRef(vTop: db.vTop),
txUid: db.top.txUid)
ok()
# ------------------------------------------------------------------------------

View File

@ -20,7 +20,7 @@ import
./replay/[pp, undump_accounts, undump_storages],
./test_aristo/test_short_keys,
./test_aristo/test_blobify,
./test_aristo/[test_samples_xx, test_filter, test_helpers, test_tx]
./test_aristo/[test_balancer, test_helpers, test_samples_xx, test_tx]
const
baseDir = [".", "..", ".."/"..", $DirSep]
@ -102,8 +102,8 @@ proc accountsRunner(
test &"Delete accounts database sub-trees, {accLst.len} lists":
check noisy.testTxMergeAndDeleteSubTree(accLst, dbDir)
test &"Distributed backend access {accLst.len} entries":
check noisy.testDistributedAccess(accLst, dbDir)
test &"Distributed backend balancers {accLst.len} entries":
check noisy.testBalancer(accLst, dbDir)
proc storagesRunner(
@ -137,8 +137,8 @@ proc storagesRunner(
test &"Delete storage database sub-trees, {stoLst.len} lists":
check noisy.testTxMergeAndDeleteSubTree(stoLst, dbDir)
test &"Distributed backend access {stoLst.len} entries":
check noisy.testDistributedAccess(stoLst, dbDir)
test &"Distributed backend balancers {stoLst.len} entries":
check noisy.testBalancer(stoLst, dbDir)
# ------------------------------------------------------------------------------
# Main function(s)

View File

@ -61,12 +61,11 @@ proc dump(pfx: string; dx: varargs[AristoDbRef]): string =
if n1 < dx.len:
result &= " ==========\n "
when false:
proc dump(dx: varargs[AristoDbRef]): string =
"".dump dx
proc dump(dx: varargs[AristoDbRef]): string {.used.} =
"".dump dx
proc dump(w: DbTriplet): string =
"db".dump(w[0], w[1], w[2])
proc dump(w: DbTriplet): string {.used.} =
"db".dump(w[0], w[1], w[2])
# ------------------------------------------------------------------------------
# Private helpers
@ -111,12 +110,6 @@ proc dbTriplet(w: LeafQuartet; rdbPath: string): Result[DbTriplet,AristoError] =
else:
AristoDbRef.init MemBackendRef
block:
# Add a dummy entry so the balancer logic can be triggered in `persist()`
let rc = db.mergeDummyAccLeaf(0, 0)
xCheckRc rc.error == 0:
result = err(rc.error)
# Set failed `xCheck()` error result
result = err(AristoError 1)
@ -145,12 +138,6 @@ proc dbTriplet(w: LeafQuartet; rdbPath: string): Result[DbTriplet,AristoError] =
db.finish(eradicate=true)
xCheck (n, report.error) == (n,0)
block:
# Add a dummy entry so the balancer logic can be triggered in `persist()`
let rc = db.mergeDummyAccLeaf(0, 1)
xCheckRc rc.error == 0:
result = err(rc.error)
return ok(dx)
# ----------------------
@ -160,6 +147,8 @@ proc cleanUp(dx: var DbTriplet) =
dx[0].finish(eradicate=true)
dx.reset
# ----------------------
proc isDbEq(a, b: LayerDeltaRef; db: AristoDbRef; noisy = true): bool =
## Verify that argument filter `a` has the same effect on the
## physical/unfiltered backend of `db` as argument filter `b`.
@ -254,7 +243,7 @@ proc checkBeOk(
# Public test function
# ------------------------------------------------------------------------------
proc testDistributedAccess*(
proc testBalancer*(
noisy: bool;
list: openArray[ProofTrieData];
rdbPath: string; # Rocks DB storage directory
@ -292,11 +281,6 @@ proc testDistributedAccess*(
xCheck db1.balancer == LayerDeltaRef(nil)
xCheck db2.balancer == db3.balancer
block:
# Add dummy entry so the balancer logic can be triggered in `persist()`
let rc = db2.mergeDummyAccLeaf(0, 100+n)
xCheckRc rc.error == 0
block:
let rc = db2.stow() # non-persistent
xCheckRc rc.error == 0:
@ -337,11 +321,6 @@ proc testDistributedAccess*(
defer:
dy.cleanUp()
block:
# Add dummy entry so the balancer logic can be triggered in `persist()`
let rc = db2.mergeDummyAccLeaf(0, 100+n)
xCheckRc rc.error == 0
# Build clause (12) from `aristo/README.md` example
discard db2.reCentre()
block:

View File

@ -233,22 +233,6 @@ proc mergeList*(
(merged, dups, AristoError(0))
proc mergeDummyAccLeaf*(
db: AristoDbRef;
pathID: int;
nonce: int;
): Result[void,AristoError] =
# Add a dummy entry so the balancer logic can be triggered
let
acc = AristoAccount(nonce: nonce.AccountNonce)
rc = db.mergeAccountRecord(pathID.uint64.toBytesBE.keccakHash, acc)
if rc.isOk:
ok()
else:
err(rc.error)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -120,7 +120,6 @@ proc innerCleanUp(db: var AristoDbRef): bool {.discardable.} =
proc schedStow(
db: AristoDbRef; # Database
chunkedMpt = false; # Partial data (e.g. from `snap`)
): Result[void,AristoError] =
## Scheduled storage
let
@ -129,13 +128,12 @@ proc schedStow(
else: db.balancer.sTab.len + db.balancer.kMap.len
persistent = MaxFilterBulk < max(layersMeter, filterMeter)
if persistent:
db.persist(chunkedMpt=chunkedMpt)
db.persist()
else:
db.stow(chunkedMpt=chunkedMpt)
db.stow()
proc saveToBackend(
tx: var AristoTxRef;
chunkedMpt: bool;
relax: bool;
noisy: bool;
debugID: int;
@ -176,7 +174,7 @@ proc saveToBackend(
xCheckErr rc.value.level < 0 # force error
block:
let rc = db.schedStow(chunkedMpt=chunkedMpt)
let rc = db.schedStow()
xCheckRc rc.error == 0
block:
@ -191,7 +189,6 @@ proc saveToBackend(
proc saveToBackendWithOops(
tx: var AristoTxRef;
chunkedMpt: bool;
noisy: bool;
debugID: int;
oops: (int,AristoError);
@ -224,7 +221,7 @@ proc saveToBackendWithOops(
xCheckErr rc.value.level < 0 # force error
block:
let rc = db.schedStow(chunkedMpt=chunkedMpt)
let rc = db.schedStow()
xCheckRc rc.error == 0:
noisy.say "***", "saveToBackendWithOops(8)",
" debugID=", debugID,
@ -371,13 +368,8 @@ proc testTxMergeAndDeleteOneByOne*(
doSaveBeOk = ((u mod saveMod) == saveRest)
(leaf, lid) = lvp
# Add a dummy entry so the balancer logic can be triggered
let rc = db.mergeDummyAccLeaf(n, runID)
xCheckRc rc.error == 0
if doSaveBeOk:
let saveBeOk = tx.saveToBackend(
chunkedMpt=false, relax=relax, noisy=noisy, runID)
let saveBeOk = tx.saveToBackend(relax=relax, noisy=noisy, runID)
xCheck saveBeOk:
noisy.say "***", "del1by1(2)",
" u=", u,
@ -441,11 +433,6 @@ proc testTxMergeAndDeleteSubTree*(
else:
AristoDbRef.init(MemBackendRef)
# Add a dummy entry so the balancer logic can be triggered
block:
let rc = db.mergeDummyAccLeaf(n, 42)
xCheckRc rc.error == 0
# Start transaction (double frame for testing)
xCheck db.txTop.isErr
var tx = db.txBegin().value.to(AristoDbRef).txBegin().value
@ -477,8 +464,7 @@ proc testTxMergeAndDeleteSubTree*(
# === delete sub-tree ===
block:
let saveBeOk = tx.saveToBackend(
chunkedMpt=false, relax=false, noisy=noisy, 1 + list.len * n)
let saveBeOk = tx.saveToBackend(relax=false, noisy=noisy, 1+list.len*n)
xCheck saveBeOk:
noisy.say "***", "del(1)",
" n=", n, "/", list.len,
@ -492,15 +478,8 @@ proc testTxMergeAndDeleteSubTree*(
" n=", n, "/", list.len,
"\n db\n ", db.pp(backendOk=true),
""
# Update dummy entry so the journal logic can be triggered
block:
let rc = db.mergeDummyAccLeaf(n, 43)
xCheckRc rc.error == 0
block:
let saveBeOk = tx.saveToBackend(
chunkedMpt=false, relax=false, noisy=noisy, 2 + list.len * n)
let saveBeOk = tx.saveToBackend(relax=false, noisy=noisy, 2+list.len*n)
xCheck saveBeOk:
noisy.say "***", "del(3)",
" n=", n, "/", list.len,
@ -555,11 +534,6 @@ proc testTxMergeProofAndKvpList*(
count = 0
count.inc
# Add a dummy entry so the balancer logic can be triggered
block:
let rc = db.mergeDummyAccLeaf(n, 42)
xCheckRc rc.error == 0
let
testId = idPfx & "#" & $w.id & "." & $n
runID = n
@ -576,8 +550,7 @@ proc testTxMergeProofAndKvpList*(
block:
let
oops = oopsTab.getOrDefault(testId,(0,AristoError(0)))
saveBeOk = tx.saveToBackendWithOops(
chunkedMpt=true, noisy=noisy, debugID=runID, oops)
saveBeOk = tx.saveToBackendWithOops(noisy=noisy, debugID=runID, oops)
xCheck saveBeOk
when true and false: