Core db+aristo fixes and tx handling updates (#2164)

* Aristo: Rename journal related sources and functions

why:
  Previously, the naming was hinged on the phrases `fifo`, `filter` etc.
  which reflect the inner workings of cascaded filters. This was
  unfortunate for reading/understanding the source code for actions where
  the focus is the journal as a whole.

* Aristo: Fix buffer overflow (path length truncating error)

* Aristo: Tighten `hikeUp()` stop check, update error code

why:
  Detect dangling vertex links. These are legit with `snap` sync
  processing but not with regular processing.

* Aristo: Raise assert in regular mode `merge()` at a dangling link/edge

why:
  With `snap` sync processing, partial trees are ok and can be amended.
  Not so in regular mode.

  Previously there was only a debug message when a non-legit dangling edge
  was encountered.

* Aristo: Make sure that vertices are copied before modification

why:
  Otherwise vertices from lower layers might also be modified

* Aristo: Fix relaxed mode for validity checker `check()`

* Remove cruft

* Aristo: Update API for transaction handling

details:
+ Split `aristo_tx.nim` into sub-modules
+ Split `forkWith()` into `findTx()` + `forkTx()`
+ Removed `forkTop()`, `forkBase()` (now superseded by new `forkTx()`)

* CoreDb+Aristo: Fix initialiser (missing methods)
This commit is contained in:
Jordan Hrycaj 2024-05-03 17:38:17 +00:00 committed by GitHub
parent 7da36bf459
commit 143f2e99f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 919 additions and 618 deletions

View File

@ -17,8 +17,8 @@ import
eth/[common, trie/nibbles],
results,
./aristo_desc/desc_backend,
./aristo_filter/filter_helpers,
./aristo_init/memory_db,
./aristo_journal/journal_get,
"."/[aristo_delete, aristo_desc, aristo_fetch, aristo_get, aristo_hashify,
aristo_hike, aristo_init, aristo_merge, aristo_path, aristo_profile,
aristo_serialise, aristo_tx, aristo_vid]
@ -95,6 +95,28 @@ type
## Cascaded attempt to traverse the `Aristo Trie` and fetch the value
## of a leaf vertex. This function is complementary to `mergePayload()`.
AristoApiFindTxFn* =
proc(db: AristoDbRef;
vid: VertexID;
key: HashKey;
): Result[int,AristoError]
{.noRaise.}
## Find the transaction where the vertex with ID `vid` exists and has
## the Merkle hash key `key`. If there is no transaction available,
## search in the filter and then in the backend.
##
## If the above procedure succeeds, an integer indicating the transaction
## level is returned:
##
## * `0` -- top level, current layer
## * `1`,`2`,`..` -- some transaction level further down the stack
## * `-1` -- the filter between transaction stack and database backend
## * `-2` -- the databse backend
##
## A successful return code might be used for the `forkTx()` call for
## creating a forked descriptor that provides the pair `(vid,key)`.
##
AristoApiFinishFn* =
proc(db: AristoDbRef;
flush = false;
@ -119,52 +141,35 @@ type
## A non centre descriptor should always be destructed after use (see
## also# comments on `fork()`.)
AristoApiForkTopFn* =
AristoApiForkTxFn* =
proc(db: AristoDbRef;
backLevel: int;
dontHashify = false;
): Result[AristoDbRef,AristoError]
{.noRaise.}
## Clone a descriptor in a way so that there is exactly one active
## transaction.
##
## If the arguent flag `dontHashify` is passed `true`, the clone
## descriptor will *NOT* be hashified right after construction.
##
## Use `aristo_desc.forget()` to clean up this descriptor.
AristoApiForkWithFn* =
proc(db: AristoDbRef;
vid: VertexID;
key: HashKey;
dontHashify = false;
): Result[AristoDbRef,AristoError]
{.noRaise.}
## Find the transaction where the vertex with ID `vid` exists and has
## the Merkle hash key `key`. If there is no transaction available,
## search in the filter and then in the backend.
##
## If the above procedure succeeds, a new descriptor is forked with
## exactly one transaction which contains the all the bottom layers up
## until the layer where the `(vid,key)` pair is found. In case the
## pair was found on the filter or the backend, this transaction is
## empty.
AristoApiGetFromJournalFn* =
proc(be: BackendRef;
fid: Option[FilterID];
earlierOK = false;
): Result[FilterIndexPair,AristoError]
{.noRaise.}
## For a positive argument `fid`, find the filter on the journal with ID
## not larger than `fid` (i e. the resulting filter might be older.)
##
## If the argument `earlierOK` is passed `false`, the function succeeds
## only if the filter ID of the returned filter is equal to the argument
## `fid`.
##
## In case that the argument `fid` is zera (i.e. `FilterID(0)`), the
## filter with the smallest filter ID (i.e. the oldest filter) is
## returned. In that case, the argument `earlierOK` is ignored.
## Fork a new descriptor obtained from parts of the argument database
## as described by arguments `db` and `backLevel`.
##
## If the argument `backLevel` is non-negative, the forked descriptor
## will provide the database view where the first `backLevel` transaction
## layers are stripped and the remaing layers are squashed into a single
## transaction.
##
## If `backLevel` is `-1`, a database descriptor with empty transaction
## layers will be provided where the `roFilter` between database and
## transaction layers are kept in place.
##
## If `backLevel` is `-2`, a database descriptor with empty transaction
## layers will be provided without an `roFilter`.
##
## The returned database descriptor will always have transaction level one.
## If there were no transactions that could be squashed, an empty
## transaction is added.
##
## If the arguent flag `dontHashify` is passed `true`, the forked descriptor
## will *NOT* be hashified right after construction.
##
## Use `aristo_desc.forget()` to clean up this descriptor.
AristoApiGetKeyRcFn* =
proc(db: AristoDbRef;
@ -207,6 +212,23 @@ type
## Getter, returns `true` if the argument `tx` referes to the current
## top level transaction.
AristoApiJournalGetInxFn* =
proc(be: BackendRef;
fid: Option[FilterID];
earlierOK = false;
): Result[JournalInx,AristoError]
{.noRaise.}
## For a positive argument `fid`, find the filter on the journal with ID
## not larger than `fid` (i e. the resulting filter might be older.)
##
## If the argument `earlierOK` is passed `false`, the function succeeds
## only if the filter ID of the returned filter is equal to the argument
## `fid`.
##
## In case that the argument `fid` is zera (i.e. `FilterID(0)`), the
## filter with the smallest filter ID (i.e. the oldest filter) is
## returned. In that case, the argument `earlierOK` is ignored.
AristoApiLevelFn* =
proc(db: AristoDbRef;
): int
@ -371,16 +393,16 @@ type
delete*: AristoApiDeleteFn
delTree*: AristoApiDelTreeFn
fetchPayload*: AristoApiFetchPayloadFn
findTx*: AristoApiFindTxFn
finish*: AristoApiFinishFn
forget*: AristoApiForgetFn
forkTop*: AristoApiForkTopFn
forkWith*: AristoApiForkWithFn
getFromJournal*: AristoApiGetFromJournalFn
forkTx*: AristoApiForkTxFn
getKeyRc*: AristoApiGetKeyRcFn
hashify*: AristoApiHashifyFn
hasPath*: AristoApiHasPathFn
hikeUp*: AristoApiHikeUpFn
isTop*: AristoApiIsTopFn
journalGetInx*: AristoApiJournalGetInxFn
level*: AristoApiLevelFn
nForked*: AristoApiNForkedFn
merge*: AristoApiMergeFn
@ -404,16 +426,16 @@ type
AristoApiProfDeleteFn = "delete"
AristoApiProfDelTreeFn = "delTree"
AristoApiProfFetchPayloadFn = "fetchPayload"
AristoApiProfFindTxFn = "findTx"
AristoApiProfFinishFn = "finish"
AristoApiProfForgetFn = "forget"
AristoApiProfForkTopFn = "forkTop"
AristoApiProfForkWithFn = "forkWith"
AristoApiProfGetFromJournalFn = "getFromJournal"
AristoApiProfForkTxFn = "forkTx"
AristoApiProfGetKeyRcFn = "getKeyRc"
AristoApiProfHashifyFn = "hashify"
AristoApiProfHasPathFn = "hasPath"
AristoApiProfHikeUpFn = "hikeUp"
AristoApiProfIsTopFn = "isTop"
AristoApiProfJournalGetInxFn = "journalGetInx"
AristoApiProfLevelFn = "level"
AristoApiProfNForkedFn = "nForked"
AristoApiProfMergeFn = "merge"
@ -455,16 +477,16 @@ when AutoValidateApiHooks:
doAssert not api.delete.isNil
doAssert not api.delTree.isNil
doAssert not api.fetchPayload.isNil
doAssert not api.findTx.isNil
doAssert not api.finish.isNil
doAssert not api.forget.isNil
doAssert not api.forkTop.isNil
doAssert not api.forkWith.isNil
doAssert not api.getFromJournal.isNil
doAssert not api.forkTx.isNil
doAssert not api.getKeyRc.isNil
doAssert not api.hashify.isNil
doAssert not api.hasPath.isNil
doAssert not api.hikeUp.isNil
doAssert not api.isTop.isNil
doAssert not api.journalGetInx.isNil
doAssert not api.level.isNil
doAssert not api.nForked.isNil
doAssert not api.merge.isNil
@ -508,16 +530,16 @@ func init*(api: var AristoApiObj) =
api.delete = delete
api.delTree = delTree
api.fetchPayload = fetchPayload
api.findTx = findTx
api.finish = finish
api.forget = forget
api.forkTop = forkTop
api.forkWith = forkWith
api.getFromJournal = getFromJournal
api.forkTx = forkTx
api.getKeyRc = getKeyRc
api.hashify = hashify
api.hasPath = hasPath
api.hikeUp = hikeUp
api.isTop = isTop
api.isTop = isTop
api.journalGetInx = journalGetInx
api.level = level
api.nForked = nForked
api.merge = merge
@ -544,16 +566,16 @@ func dup*(api: AristoApiRef): AristoApiRef =
delete: api.delete,
delTree: api.delTree,
fetchPayload: api.fetchPayload,
findTx: api.findTx,
finish: api.finish,
forget: api.forget,
forkTop: api.forkTop,
forkWith: api.forkWith,
getFromJournal: api.getFromJournal,
forkTx: api.forkTx,
getKeyRc: api.getKeyRc,
hashify: api.hashify,
hasPath: api.hasPath,
hikeUp: api.hikeUp,
isTop: api.isTop,
journalGetInx: api.journalGetInx,
level: api.level,
nForked: api.nForked,
merge: api.merge,
@ -617,6 +639,11 @@ func init*(
AristoApiProfFetchPayloadFn.profileRunner:
result = api.fetchPayload(a, b, c)
profApi.findTx =
proc(a: AristoDbRef; b: VertexID; c: HashKey): auto =
AristoApiProfFindTxFn.profileRunner:
result = api.findTx(a, b, c)
profApi.finish =
proc(a: AristoDbRef; b = false) =
AristoApiProfFinishFn.profileRunner:
@ -627,20 +654,10 @@ func init*(
AristoApiProfForgetFn.profileRunner:
result = api.forget(a)
profApi.forkTop =
proc(a: AristoDbRef; b = false): auto =
AristoApiProfForkTopFn.profileRunner:
result = api.forkTop(a, b)
profApi.forkWith =
proc(a: AristoDbRef; b: VertexID; c: HashKey; d = false): auto =
AristoApiProfForkWithFn.profileRunner:
result = api.forkWith(a, b, c, d)
profApi.getFromJournal =
proc(a: BackendRef; b: Option[FilterID]; c = false): auto =
AristoApiProfGetFromJournalFn.profileRunner:
result = api.getFromJournal(a, b, c)
profApi.forkTx =
proc(a: AristoDbRef; b: int; c = false): auto =
AristoApiProfForkTxFn.profileRunner:
result = api.forkTx(a, b, c)
profApi.getKeyRc =
proc(a: AristoDbRef; b: VertexID): auto =
@ -667,6 +684,11 @@ func init*(
AristoApiProfIsTopFn.profileRunner:
result = api.isTop(a)
profApi.journalGetInx =
proc(a: BackendRef; b: Option[FilterID]; c = false): auto =
AristoApiProfJournalGetInxFn.profileRunner:
result = api.journalGetInx(a, b, c)
profApi.level =
proc(a: AristoDbRef): auto =
AristoApiProfLevelFn.profileRunner:

View File

@ -160,20 +160,31 @@ proc checkBE*[T: RdbBackendRef|MemBackendRef|VoidBackendRef](
# Check top layer cache against backend
if cache:
if 0 < db.dirty.len:
return err((VertexID(0),CheckBeCacheIsDirty))
let checkKeysOk = block:
if db.dirty.len == 0:
true
elif relax:
false
else:
return err((VertexID(0),CheckBeCacheIsDirty))
# Check structural table
for (vid,vtx) in db.layersWalkVtx:
let key = db.layersGetKey(vid).valueOr:
# A `kMap[]` entry must exist.
return err((vid,CheckBeCacheKeyMissing))
let key = block:
let rc = db.layersGetKey(vid)
if rc.isOk:
rc.value
elif checkKeysOk:
# A `kMap[]` entry must exist.
return err((vid,CheckBeCacheKeyMissing))
else:
VOID_HASH_KEY
if vtx.isValid:
# Register existing vid against backend generator state
discard vids.reduce Interval[VertexID,uint64].new(vid,vid)
else:
# Some vertex is to be deleted, the key must be empty
if key.isValid:
if checkKeysOk and key.isValid:
return err((vid,CheckBeCacheKeyNonEmpty))
# There must be a representation on the backend DB unless in a TX
if db.getVtxBE(vid).isErr and db.stack.len == 0:

View File

@ -14,7 +14,7 @@ import
std/[algorithm, sequtils, sets, tables],
eth/common,
results,
../aristo_filter/filter_scheduler,
../aristo_journal/journal_scheduler,
../aristo_walk/persistent,
".."/[aristo_desc, aristo_blobify]

View File

@ -17,7 +17,7 @@ import
stew/[byteutils, interval_set],
./aristo_desc/desc_backend,
./aristo_init/[memory_db, memory_only, rocks_db],
./aristo_filter/filter_scheduler,
./aristo_journal/journal_scheduler,
"."/[aristo_constants, aristo_desc, aristo_hike, aristo_layers]
# ------------------------------------------------------------------------------

View File

@ -17,15 +17,11 @@
import
std/[sets, typetraits],
chronicles,
eth/[common, trie/nibbles],
results,
"."/[aristo_desc, aristo_get, aristo_hike, aristo_layers, aristo_path,
aristo_utils, aristo_vid]
logScope:
topics = "aristo-delete"
type
SaveToVaeVidFn =
proc(err: AristoError): (VertexID,AristoError) {.gcsafe, raises: [].}

View File

@ -68,7 +68,9 @@ type
# Path function `hikeUp()`
HikeBranchMissingEdge
HikeBranchTailEmpty
HikeDanglingEdge
HikeEmptyPath
HikeExtMissingEdge
HikeExtTailEmpty
HikeExtTailMismatch
HikeLeafUnexpected
@ -261,7 +263,8 @@ type
TxArgStaleTx
TxArgsUseless
TxBackendNotWritable
TxGarbledSpan
TxLevelTooDeep
TxLevelUseless
TxNoPendingTx
TxNotFound
TxNotTopTx

View File

@ -128,11 +128,6 @@ type
final*: LayerFinalRef ## Stored as latest version
txUid*: uint ## Transaction identifier if positive
FilterIndexPair* = object
## Helper structure for fetching fiters from journal.
inx*: int ## Non negative journal index. latest=`0`
fil*: FilterRef ## Valid filter
# ----------------------
QidLayoutRef* = ref object
@ -151,6 +146,11 @@ type
ctx*: QidLayoutRef ## Organisation of the FIFO
state*: seq[(QueueID,QueueID)] ## Current fill state
JournalInx* = tuple
## Helper structure for fetching fiters from the journal.
inx: int ## Non negative journal index. latest=`0`
fil: FilterRef ## Valid filter
const
DefaultQidWrap = QueueID(0x3fff_ffff_ffff_ffffu64)

View File

@ -94,7 +94,7 @@ proc hikeUp*(
return err((VertexID(0),HikeEmptyPath,hike))
var vid = root
while vid.isValid:
while true:
var leg = Leg(wp: VidVtxPair(vid: vid), nibble: -1)
# Fetch next vertex
@ -103,19 +103,25 @@ proc hikeUp*(
return err((vid,error,hike))
if hike.legs.len == 0:
return err((vid,HikeNoLegs,hike))
break
# The vertex ID `vid` was a follow up from a parent vertex, but there is
# no child vertex on the database. So `vid` is a dangling link which is
# allowed only if there is a partial trie (e.g. with `snap` sync.)
return err((vid,HikeDanglingEdge,hike))
case leg.wp.vtx.vType:
of Leaf:
# This must be the last vertex, so there cannot be any `tail` left.
if hike.tail.len == hike.tail.sharedPrefixLen(leg.wp.vtx.lPfx):
# Bingo, got full path
hike.legs.add leg
hike.tail = EmptyNibbleSeq
# This is the only loop exit
break
return err((vid,HikeLeafUnexpected,hike))
of Branch:
# There must be some more data (aka `tail`) after a `Branch` vertex.
if hike.tail.len == 0:
hike.legs.add leg
return err((vid,HikeBranchTailEmpty,hike))
@ -133,6 +139,7 @@ proc hikeUp*(
vid = nextVid
of Extension:
# There must be some more data (aka `tail`) after an `Extension` vertex.
if hike.tail.len == 0:
hike.legs.add leg
hike.tail = EmptyNibbleSeq
@ -141,9 +148,13 @@ proc hikeUp*(
if leg.wp.vtx.ePfx.len != hike.tail.sharedPrefixLen(leg.wp.vtx.ePfx):
return err((vid,HikeExtTailMismatch,hike)) # Need to branch from here
let nextVid = leg.wp.vtx.eVid
if not nextVid.isValid:
return err((vid,HikeExtMissingEdge,hike))
hike.legs.add leg
hike.tail = hike.tail.slice(leg.wp.vtx.ePfx.len)
vid = leg.wp.vtx.eVid
vid = nextVid
ok hike

View File

@ -8,8 +8,8 @@
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Aristo DB -- Patricia Trie filter management
## =============================================
## Aristo DB -- Filter and journal management
## ==========================================
##
import
@ -18,14 +18,15 @@ import
results,
"."/[aristo_desc, aristo_get, aristo_vid],
./aristo_desc/desc_backend,
./aristo_filter/[
filter_fifos, filter_helpers, filter_merge, filter_reverse, filter_siblings]
./aristo_journal/[
filter_state_root, filter_merge, filter_reverse, filter_siblings,
journal_get, journal_ops]
# ------------------------------------------------------------------------------
# Public functions, construct filters
# ------------------------------------------------------------------------------
proc fwdFilter*(
proc journalFwdFilter*(
db: AristoDbRef; # Database
layer: LayerRef; # Layer to derive filter from
chunkedMpt = false; # Relax for snap/proof scenario
@ -64,7 +65,7 @@ proc fwdFilter*(
# Public functions, apply/install filters
# ------------------------------------------------------------------------------
proc merge*(
proc journalMerge*(
db: AristoDbRef; # Database
filter: FilterRef; # Filter to apply to database
): Result[void,(VertexID,AristoError)] =
@ -93,12 +94,12 @@ proc merge*(
ok()
proc canResolveBackendFilter*(db: AristoDbRef): bool =
proc journalUpdateOk*(db: AristoDbRef): bool =
## Check whether the read-only filter can be merged into the backend
not db.backend.isNil and db.isCentre
proc resolveBackendFilter*(
proc journalUpdate*(
db: AristoDbRef; # Database
nxtFid = none(FilterID); # Next filter ID (if any)
reCentreOk = false;
@ -143,16 +144,16 @@ proc resolveBackendFilter*(
defer: updateSiblings.rollback()
# Figure out how to save the reverse filter on a cascades slots queue
var instr: FifoInstr
var instr: JournalOpsMod
if not be.journal.isNil: # Otherwise ignore
block getInstr:
# Compile instruction for updating filters on the cascaded fifos
if db.roFilter.isValid:
let ovLap = be.getJournalOverlap db.roFilter
let ovLap = be.journalGetOverlap db.roFilter
if 0 < ovLap:
instr = ? be.fifosDelete ovLap # Revert redundant entries
instr = ? be.journalOpsDeleteSlots ovLap # Revert redundant entries
break getInstr
instr = ? be.fifosStore(
instr = ? be.journalOpsPushSlot(
updateSiblings.rev, # Store reverse filter
nxtFid) # Set filter ID (if any)
@ -178,7 +179,7 @@ proc resolveBackendFilter*(
ok()
proc forkByJournal*(
proc journalFork*(
db: AristoDbRef;
episode: int;
): Result[AristoDbRef,AristoError] =
@ -195,19 +196,19 @@ proc forkByJournal*(
if episode < 0:
return err(FilNegativeEpisode)
let
instr = ? be.fifosFetch(backSteps = episode+1)
instr = ? be.journalOpsFetchSlots(backSteps = episode+1)
clone = ? db.fork(noToplayer = true)
clone.top = LayerRef.init()
clone.top.final.vGen = instr.fil.vGen
clone.roFilter = instr.fil
ok clone
proc forkByJournal*(
proc journalFork*(
db: AristoDbRef;
fid: Option[FilterID];
earlierOK = false;
): Result[AristoDbRef,AristoError] =
## Variant of `forkByJounal()` for forking to a particular filter ID (or the
## Variant of `journalFork()` for forking to a particular filter ID (or the
## nearest predecessot if `earlierOK` is passed `true`.) if there is some
## filter ID `fid`.
##
@ -218,8 +219,8 @@ proc forkByJournal*(
if be.isNil:
return err(FilBackendMissing)
let fip = ? be.getFromJournal(fid, earlierOK)
db.forkByJournal fip.inx
let fip = ? be.journalGetInx(fid, earlierOK)
db.journalFork fip.inx
# ------------------------------------------------------------------------------
# End

View File

@ -0,0 +1,77 @@
# nimbus-eth1
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
std/tables,
eth/common,
results,
".."/[aristo_desc, aristo_get]
type
LayerStateRoot* = tuple
## Helper structure for analysing state roots.
be: Hash256 ## Backend state root
fg: Hash256 ## Layer or filter implied state root
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc getLayerStateRoots*(
db: AristoDbRef;
delta: LayerDeltaRef;
chunkedMpt: bool;
): Result[LayerStateRoot,AristoError] =
## Get the Merkle hash key for target state root to arrive at after this
## reverse filter was applied.
##
var spr: LayerStateRoot
let sprBeKey = block:
let rc = db.getKeyBE VertexID(1)
if rc.isOk:
rc.value
elif rc.error == GetKeyNotFound:
VOID_HASH_KEY
else:
return err(rc.error)
spr.be = sprBeKey.to(Hash256)
spr.fg = block:
let key = delta.kMap.getOrVoid VertexID(1)
if key.isValid:
key.to(Hash256)
else:
EMPTY_ROOT_HASH
if spr.fg.isValid:
return ok(spr)
if not delta.kMap.hasKey(VertexID(1)) and
not delta.sTab.hasKey(VertexID(1)):
# This layer is unusable, need both: vertex and key
return err(FilPrettyPointlessLayer)
elif not delta.sTab.getOrVoid(VertexID(1)).isValid:
# Root key and vertex has been deleted
return ok(spr)
if chunkedMpt:
if sprBeKey == delta.kMap.getOrVoid VertexID(1):
spr.fg = spr.be
return ok(spr)
if delta.sTab.len == 0 and
delta.kMap.len == 0:
return err(FilPrettyPointlessLayer)
err(FilStateRootMismatch)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -9,76 +9,21 @@
# except according to those terms.
import
std/[options, tables],
std/options,
eth/common,
results,
".."/[aristo_desc, aristo_desc/desc_backend, aristo_get],
./filter_scheduler
type
StateRootPair* = object
## Helper structure for analysing state roots.
be*: Hash256 ## Backend state root
fg*: Hash256 ## Layer or filter implied state root
".."/[aristo_desc, aristo_desc/desc_backend],
./journal_scheduler
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc getLayerStateRoots*(
db: AristoDbRef;
delta: LayerDeltaRef;
chunkedMpt: bool;
): Result[StateRootPair,AristoError] =
## Get the Merkle hash key for target state root to arrive at after this
## reverse filter was applied.
##
var spr: StateRootPair
let sprBeKey = block:
let rc = db.getKeyBE VertexID(1)
if rc.isOk:
rc.value
elif rc.error == GetKeyNotFound:
VOID_HASH_KEY
else:
return err(rc.error)
spr.be = sprBeKey.to(Hash256)
spr.fg = block:
let key = delta.kMap.getOrVoid VertexID(1)
if key.isValid:
key.to(Hash256)
else:
EMPTY_ROOT_HASH
if spr.fg.isValid:
return ok(spr)
if not delta.kMap.hasKey(VertexID(1)) and
not delta.sTab.hasKey(VertexID(1)):
# This layer is unusable, need both: vertex and key
return err(FilPrettyPointlessLayer)
elif not delta.sTab.getOrVoid(VertexID(1)).isValid:
# Root key and vertex has been deleted
return ok(spr)
if chunkedMpt:
if sprBeKey == delta.kMap.getOrVoid VertexID(1):
spr.fg = spr.be
return ok(spr)
if delta.sTab.len == 0 and
delta.kMap.len == 0:
return err(FilPrettyPointlessLayer)
err(FilStateRootMismatch)
proc getFromJournal*(
proc journalGetInx*(
be: BackendRef;
fid = none(FilterID);
earlierOK = false;
): Result[FilterIndexPair,AristoError] =
): Result[JournalInx,AristoError] =
## If there is some argument `fid`, find the filter on the journal with ID
## not larger than `fid` (i e. the resulting filter must not be more recent.)
##
@ -112,7 +57,7 @@ proc getFromJournal*(
if not qid.isValid:
return err(FilFilterNotFound)
var fip = FilterIndexPair()
var fip: JournalInx
fip.fil = block:
if cache[0] == qid:
cache[1]
@ -127,15 +72,17 @@ proc getFromJournal*(
ok fip
proc getJournalOverlap*(
proc journalGetOverlap*(
be: BackendRef;
filter: FilterRef;
): int =
## Return the number of journal filters in the leading chain that is
## reverted by the argument `filter`. A heuristc approach is used here
## for an argument `filter` with a valid filter ID when the chain is
## longer than one items. Only single step filter overlaps are guaranteed
## to be found.
## This function will find the overlap of an argument `filter` which is
## composed by some recent filter slots from the journal.
##
## The function returns the number of most recent journal filters that are
## reverted by the argument `filter`. This requires that `src`, `trg`, and
## `fid` of the argument `filter` is properly calculated (e.g. using
## `journalOpsFetchSlots()`.)
##
# Check against the top-fifo entry.
let qid = be.journal[0]
@ -155,7 +102,7 @@ proc getJournalOverlap*(
# Check against some stored filter IDs
if filter.isValid:
let fp = be.getFromJournal(some(filter.fid), earlierOK=true).valueOr:
let fp = be.journalGetInx(some(filter.fid), earlierOK=true).valueOr:
return 0
if filter.trg == fp.fil.trg:
return 1 + fp.inx

View File

@ -12,17 +12,18 @@ import
std/[options, tables],
results,
".."/[aristo_desc, aristo_desc/desc_backend],
"."/[filter_merge, filter_scheduler]
"."/[filter_merge, journal_scheduler]
type
FifoInstr* = object
## Database backend instructions for storing or deleting filters.
JournalOpsMod* = object
## Database journal instructions for storing or deleting filters.
put*: seq[(QueueID,FilterRef)]
scd*: QidSchedRef
FetchInstr* = object
JournalOpsFetch* = object
## Database journal instructions for merge-fetching slots.
fil*: FilterRef
del*: FifoInstr
del*: JournalOpsMod
# ------------------------------------------------------------------------------
# Private functions
@ -63,14 +64,16 @@ template getNextFidOrReturn(be: BackendRef; fid: Option[FilterID]): FilterID =
# Public functions
# ------------------------------------------------------------------------------
proc fifosStore*(
proc journalOpsPushSlot*(
be: BackendRef; # Database backend
filter: FilterRef; # Filter to save
filter: FilterRef; # Filter to store
fid: Option[FilterID]; # Next filter ID (if any)
): Result[FifoInstr,AristoError] =
): Result[JournalOpsMod,AristoError] =
## Calculate backend instructions for storing the arguent `filter` on the
## argument backend `be`.
##
## The journal is not modified by this function.
##
if be.journal.isNil:
return err(FilQuSchedDisabled)
@ -81,7 +84,7 @@ proc fifosStore*(
# Update journal filters and calculate database update
var
instr = FifoInstr(scd: upd.fifo)
instr = JournalOpsMod(scd: upd.journal)
dbClear: seq[QueueID]
hold: seq[FilterRef]
saved = false
@ -138,14 +141,16 @@ proc fifosStore*(
ok instr
proc fifosFetch*(
proc journalOpsFetchSlots*(
be: BackendRef; # Database backend
backSteps: int; # Backstep this many filters
): Result[FetchInstr,AristoError] =
): Result[JournalOpsFetch,AristoError] =
## This function returns the single filter obtained by squash merging the
## topmost `backSteps` filters on the backend journal fifo. Also, backend
## instructions are calculated and returned for deleting the merged journal
## filters on the fifo.
## instructions are calculated and returned for deleting the extracted
## journal slots.
##
## The journal is not modified by this function.
##
if be.journal.isNil:
return err(FilQuSchedDisabled)
@ -154,7 +159,7 @@ proc fifosFetch*(
# Get instructions
let fetch = be.journal.fetchItems backSteps
var instr = FetchInstr(del: FifoInstr(scd: fetch.fifo))
var instr = JournalOpsFetch(del: JournalOpsMod(scd: fetch.journal))
# Follow `HoldQid` instructions and combine journal filters for sub-queues
# and push intermediate results on the `hold` stack
@ -186,11 +191,16 @@ proc fifosFetch*(
ok instr
proc fifosDelete*(
proc journalOpsDeleteSlots*(
be: BackendRef; # Database backend
backSteps: int; # Backstep this many filters
): Result[FifoInstr,AristoError] =
## Variant of `fetch()` for calculating the deletion part only.
): Result[JournalOpsMod,AristoError] =
## Calculate backend instructions for deleting the most recent `backSteps`
## slots on the journal. This is basically the deletion calculator part
## from `journalOpsFetchSlots()`.
##
## The journal is not modified by this function.
##
if be.journal.isNil:
return err(FilQuSchedDisabled)
if backSteps <= 0:
@ -198,7 +208,7 @@ proc fifosDelete*(
# Get instructions
let fetch = be.journal.fetchItems backSteps
var instr = FifoInstr(scd: fetch.fifo)
var instr = JournalOpsMod(scd: fetch.journal)
# Follow `HoldQid` instructions for producing the list of entries that
# need to be deleted

View File

@ -279,12 +279,12 @@ func capacity*(
func addItem*(
fifo: QidSchedRef; # Cascaded fifos descriptor
): tuple[exec: seq[QidAction], fifo: QidSchedRef] =
journal: QidSchedRef; # Cascaded fifos descriptor
): tuple[exec: seq[QidAction], journal: QidSchedRef] =
## Get the instructions for adding a new slot to the cascades queues. The
## argument `fifo` is a complete state of the addresses of a cascaded *FIFO*
## when applied to a database. Only the *FIFO* queue addresses are needed
## in order to describe how to add another item.
## argument `journal` is a complete state of the addresses of a cascaded
## *FIFO* when applied to a database. Only the *FIFO* queue addresses are
## needed in order to describe how to add another item.
##
## The function returns a list of instructions what to do when adding a new
## item and the new state of the cascaded *FIFO*. The following instructions
@ -310,9 +310,9 @@ func addItem*(
## -- another item.
##
let
ctx = fifo.ctx.q
ctx = journal.ctx.q
var
state = fifo.state
state = journal.state
deferred: seq[QidAction] # carry over to next sub-queue
revActions: seq[QidAction] # instructions in reverse order
@ -370,15 +370,15 @@ func addItem*(
op: DelQid,
qid: deferred[0].qid)
(revActions.reversed, QidSchedRef(ctx: fifo.ctx, state: state))
(revActions.reversed, QidSchedRef(ctx: journal.ctx, state: state))
func fetchItems*(
fifo: QidSchedRef; # Cascaded fifos descriptor
journal: QidSchedRef; # Cascaded fifos descriptor
size: int; # Leading items to merge
): tuple[exec: seq[QidAction], fifo: QidSchedRef] =
): tuple[exec: seq[QidAction], journal: QidSchedRef] =
## Get the instructions for extracting the latest `size` items from the
## cascaded queues. argument `fifo` is a complete state of the addresses of
## cascaded queues. argument `journal` is a complete state of the addresses of
## a cascaded *FIFO* when applied to a database. Only the *FIFO* queue
## addresses are used in order to describe how to add another item.
##
@ -395,13 +395,13 @@ func fetchItems*(
## The extracted items will then be available from the hold queue.
var
actions: seq[QidAction]
state = fifo.state
state = journal.state
if 0 < size:
var size = size.uint64
for n in 0 ..< fifo.state.len:
let q = fifo.state[n]
for n in 0 ..< journal.state.len:
let q = journal.state[n]
if q[0] == 0:
discard
@ -447,7 +447,7 @@ func fetchItems*(
# | :
# | wrap
let
wrap = fifo.ctx.q[n].wrap
wrap = journal.ctx.q[n].wrap
qSize1 = q[1] - QueueID(0)
if size <= qSize1:
@ -490,35 +490,35 @@ func fetchItems*(
state[n] = (QueueID(0), QueueID(0))
(actions, QidSchedRef(ctx: fifo.ctx, state: state))
(actions, QidSchedRef(ctx: journal.ctx, state: state))
func lengths*(
fifo: QidSchedRef; # Cascaded fifos descriptor
journal: QidSchedRef; # Cascaded fifos descriptor
): seq[int] =
## Return the list of lengths for all cascaded sub-fifos.
for n in 0 ..< fifo.state.len:
result.add fifo.state[n].fifoLen(fifo.ctx.q[n].wrap).int
for n in 0 ..< journal.state.len:
result.add journal.state[n].fifoLen(journal.ctx.q[n].wrap).int
func len*(
fifo: QidSchedRef; # Cascaded fifos descriptor
journal: QidSchedRef; # Cascaded fifos descriptor
): int =
## Size of the fifo
fifo.lengths.foldl(a + b, 0)
## Size of the journal
journal.lengths.foldl(a + b, 0)
func `[]`*(
fifo: QidSchedRef; # Cascaded fifos descriptor
journal: QidSchedRef; # Cascaded fifos descriptor
inx: int; # Index into latest items
): QueueID =
## Get the queue ID of the `inx`-th `fifo` entry where index `0` refers to
## Get the queue ID of the `inx`-th `journal` entry where index `0` refers to
## the entry most recently added, `1` the one before, etc. If there is no
## such entry `QueueID(0)` is returned.
if 0 <= inx:
var inx = inx.uint64
for n in 0 ..< fifo.state.len:
let q = fifo.state[n]
for n in 0 ..< journal.state.len:
let q = journal.state[n]
if q[0] == 0:
discard
@ -551,22 +551,22 @@ func `[]`*(
inx -= qInxMax1 + 1 # Otherwise continue
let
wrap = fifo.ctx.q[n].wrap
wrap = journal.ctx.q[n].wrap
qInxMax0 = wrap - q[0]
if inx <= qInxMax0:
return n.globalQid(wrap - inx)
inx -= qInxMax0 + 1 # Otherwise continue
func `[]`*(
fifo: QidSchedRef; # Cascaded fifos descriptor
journal: QidSchedRef; # Cascaded fifos descriptor
bix: BackwardsIndex; # Index into latest items
): QueueID =
## Variant of `[]` for provifing `[^bix]`.
fifo[fifo.len - bix.distinctBase]
journal[journal.len - bix.distinctBase]
func `[]`*(
fifo: QidSchedRef; # Cascaded fifos descriptor
journal: QidSchedRef; # Cascaded fifos descriptor
qid: QueueID; # Index into latest items
): int =
## ..
@ -575,12 +575,12 @@ func `[]`*(
chn = (qid.uint64 shr 62).int
qid = (qid.uint64 and 0x3fff_ffff_ffff_ffffu64).QueueID
if chn < fifo.state.len:
if chn < journal.state.len:
var offs = 0
for n in 0 ..< chn:
offs += fifo.state[n].fifoLen(fifo.ctx.q[n].wrap).int
offs += journal.state[n].fifoLen(journal.ctx.q[n].wrap).int
let q = fifo.state[chn]
let q = journal.state[chn]
if q[0] <= q[1]:
# Single file
# ::
@ -606,14 +606,14 @@ func `[]`*(
return offs + (q[1] - qid).int
if q[0] <= qid:
let wrap = fifo.ctx.q[chn].wrap
let wrap = journal.ctx.q[chn].wrap
if qid <= wrap:
return offs + (q[1] - QueueID(0)).int + (wrap - qid).int
-1
proc le*(
fifo: QidSchedRef; # Cascaded fifos descriptor
journal: QidSchedRef; # Cascaded fifos descriptor
fid: FilterID; # Upper (or right) bound
fn: QuFilMap; # QueueID/FilterID mapping
forceEQ = false; # Check for strict equality
@ -627,7 +627,7 @@ proc le*(
##
var
left = 0
right = fifo.len - 1
right = journal.len - 1
template toFid(qid: QueueID): FilterID =
fn(qid).valueOr:
@ -639,30 +639,30 @@ proc le*(
if 0 <= right:
# Check left fringe
let
maxQid = fifo[left]
maxQid = journal[left]
maxFid = maxQid.toFid
if maxFid <= fid:
if forceEQ and maxFid != fid:
return QueueID(0)
return maxQid
# So `fid < fifo[left]`
# So `fid < journal[left]`
# Check right fringe
let
minQid = fifo[right]
minQid = journal[right]
minFid = minQid.toFid
if fid <= minFid:
if minFid == fid:
return minQid
return QueueID(0)
# So `fifo[right] < fid`
# So `journal[right] < fid`
# Bisection
var rightQid = minQid # Might be used as end result
while 1 < right - left:
let
pivot = (left + right) div 2
pivQid = fifo[pivot]
pivQid = journal[pivot]
pivFid = pivQid.toFid
#
# Example:
@ -671,19 +671,20 @@ proc le*(
# inx: left ... pivot ... right
# fid: 77
#
# with `fifo[left].toFid > fid > fifo[right].toFid`
# with `journal[left].toFid > fid > journal[right].toFid`
#
if pivFid < fid: # fid >= fifo[half].toFid:
if pivFid < fid: # fid >= journal[half].toFid:
right = pivot
rightQid = pivQid
elif fid < pivFid: # fifo[half].toFid > fid
elif fid < pivFid: # journal[half].toFid > fid
left = pivot
else:
return pivQid
# Now: `fifo[right].toFid < fid < fifo[left].toFid` (and `right == left+1`).
# Now: `journal[right].toFid < fid < journal[left].toFid`
# (and `right == left+1`).
if not forceEQ:
# Make sure that `fifo[right].toFid` exists
# Make sure that `journal[right].toFid` exists
if fn(rightQid).isOk:
return rightQid
@ -691,12 +692,12 @@ proc le*(
proc eq*(
fifo: QidSchedRef; # Cascaded fifos descriptor
journal: QidSchedRef; # Cascaded fifos descriptor
fid: FilterID; # Filter ID to search for
fn: QuFilMap; # QueueID/FilterID mapping
): QueueID =
## Variant of `le()` for strict equality.
fifo.le(fid, fn, forceEQ = true)
journal.le(fid, fn, forceEQ = true)
# ------------------------------------------------------------------------------
# End

View File

@ -26,7 +26,6 @@
import
std/[algorithm, sequtils, strutils, sets, tables, typetraits],
chronicles,
eth/[common, trie/nibbles],
results,
stew/keyed_queue,
@ -34,9 +33,6 @@ import
"."/[aristo_desc, aristo_get, aristo_hike, aristo_layers,
aristo_path, aristo_serialise, aristo_utils, aristo_vid]
logScope:
topics = "aristo-merge"
type
LeafTiePayload* = object
## Generalised key-value pair for a sub-trie. The main trie is the
@ -186,17 +182,15 @@ proc insertBranch(
return err(MergeNonBranchProofModeLock)
if linkVtx.vType == Leaf:
# Update vertex path lookup
let
path = hike.legsTo(NibblesSeq) & linkVtx.lPfx
rc = path.pathToTag()
if rc.isErr:
debug "Branch link leaf path garbled", linkID, path
# Double check path prefix
if 64 < hike.legsTo(NibblesSeq).len + linkVtx.lPfx.len:
return err(MergeBranchLinkLeafGarbled)
let local = db.vidFetch(pristine = true)
db.setVtxAndKey(hike.root, local, linkVtx)
linkVtx.lPfx = linkVtx.lPfx.slice(1+n)
let
local = db.vidFetch(pristine = true)
linkDup = linkVtx.dup
db.setVtxAndKey(hike.root, local, linkDup)
linkDup.lPfx = linkDup.lPfx.slice(1+n)
forkVtx.bVid[linkInx] = local
elif linkVtx.ePfx.len == n + 1:
@ -204,9 +198,11 @@ proc insertBranch(
forkVtx.bVid[linkInx] = linkVtx.eVid
else:
let local = db.vidFetch
db.setVtxAndKey(hike.root, local, linkVtx)
linkVtx.ePfx = linkVtx.ePfx.slice(1+n)
let
local = db.vidFetch
linkDup = linkVtx.dup
db.setVtxAndKey(hike.root, local, linkDup)
linkDup.ePfx = linkDup.ePfx.slice(1+n)
forkVtx.bVid[linkInx] = local
block:
@ -285,13 +281,14 @@ proc concatBranchAndLeaf(
# Append leaf vertex
let
brDup = brVtx.dup
vid = db.vidFetch(pristine = true)
vtx = VertexRef(
vType: Leaf,
lPfx: hike.tail.slice(1),
lData: payload)
brVtx.bVid[nibble] = vid
db.setVtxAndKey(hike.root, brVid, brVtx)
brDup.bVid[nibble] = vid
db.setVtxAndKey(hike.root, brVid, brDup)
db.setVtxAndKey(hike.root, vid, vtx)
okHike.legs.add Leg(wp: VidVtxPair(vtx: vtx, vid: vid), nibble: -1)
@ -330,8 +327,12 @@ proc topIsBranchAddLeaf(
#
if db.pPrf.len == 0:
# Not much else that can be done here
debug "Dangling leaf link, reused", branch=hike.legs[^1].wp.vid,
nibble, linkID, leafPfx=hike.tail
raiseAssert "Dangling edge:" &
" pfx=" & $hike.legsTo(hike.legs.len-1,NibblesSeq) &
" branch=" & $parent &
" nibble=" & $nibble &
" edge=" & $linkID &
" tail=" & $hike.tail
# Reuse placeholder entry in table
let vtx = VertexRef(
@ -413,15 +414,16 @@ proc topIsExtAddLeaf(
return err(MergeBranchProofModeLock)
let
brDup = brVtx.dup
vid = db.vidFetch(pristine = true)
vtx = VertexRef(
vType: Leaf,
lPfx: hike.tail.slice(1),
lData: payload)
brVtx.bVid[nibble] = vid
db.setVtxAndKey(hike.root, brVid, brVtx)
brDup.bVid[nibble] = vid
db.setVtxAndKey(hike.root, brVid, brDup)
db.setVtxAndKey(hike.root, vid, vtx)
okHike.legs.add Leg(wp: VidVtxPair(vtx: brVtx, vid: brVid), nibble: nibble)
okHike.legs.add Leg(wp: VidVtxPair(vtx: brDup, vid: brVid), nibble: nibble)
okHike.legs.add Leg(wp: VidVtxPair(vtx: vtx, vid: vid), nibble: -1)
ok okHike
@ -447,17 +449,18 @@ proc topIsEmptyAddLeaf(
return err(MergeBranchProofModeLock)
let
rootDup = rootVtx.dup
leafVid = db.vidFetch(pristine = true)
leafVtx = VertexRef(
vType: Leaf,
lPfx: hike.tail.slice(1),
lData: payload)
rootVtx.bVid[nibble] = leafVid
db.setVtxAndKey(hike.root, hike.root, rootVtx)
rootDup.bVid[nibble] = leafVid
db.setVtxAndKey(hike.root, hike.root, rootDup)
db.setVtxAndKey(hike.root, leafVid, leafVtx)
return ok Hike(
root: hike.root,
legs: @[Leg(wp: VidVtxPair(vtx: rootVtx, vid: hike.root), nibble: nibble),
legs: @[Leg(wp: VidVtxPair(vtx: rootDup, vid: hike.root), nibble: nibble),
Leg(wp: VidVtxPair(vtx: leafVtx, vid: leafVid), nibble: -1)])
db.insertBranch(hike, hike.root, rootVtx, payload)

View File

@ -49,7 +49,7 @@ func pathAsBlob*(tag: PathID): Blob =
if 64 <= tag.length:
return key
else:
return key[0 .. (tag.length + 1) div 2]
return key[0 .. (tag.length - 1) div 2]
func pathAsHEP*(tag: PathID; isLeaf = false): Blob =
## Convert the `tag` argument to a hex encoded partial path as used in `eth`

View File

@ -14,126 +14,10 @@
{.push raises: [].}
import
std/[options, tables],
std/options,
results,
"."/[aristo_desc, aristo_filter, aristo_get, aristo_layers, aristo_hashify]
func isTop*(tx: AristoTxRef): bool {.gcsafe.}
func level*(db: AristoDbRef): int {.gcsafe.}
proc txBegin*(db: AristoDbRef): Result[AristoTxRef,AristoError] {.gcsafe.}
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
func getDbDescFromTopTx(tx: AristoTxRef): Result[AristoDbRef,AristoError] =
if not tx.isTop():
return err(TxNotTopTx)
let db = tx.db
if tx.level != db.stack.len:
return err(TxStackGarbled)
ok db
proc getTxUid(db: AristoDbRef): uint =
if db.txUidGen == high(uint):
db.txUidGen = 0
db.txUidGen.inc
db.txUidGen
iterator txWalk(tx: AristoTxRef): (AristoTxRef,LayerRef,AristoError) =
## Walk down the transaction chain.
let db = tx.db
var tx = tx
block body:
# Start at top layer if tx refers to that
if tx.level == db.stack.len:
if tx.txUid != db.top.txUid:
yield (tx,db.top,TxStackGarbled)
break body
# Yield the top level
yield (tx,db.top,AristoError(0))
# Walk down the transaction stack
for level in (tx.level-1).countDown(1):
tx = tx.parent
if tx.isNil or tx.level != level:
yield (tx,LayerRef(nil),TxStackGarbled)
break body
var layer = db.stack[level]
if tx.txUid != layer.txUid:
yield (tx,layer,TxStackGarbled)
break body
yield (tx,layer,AristoError(0))
# ---------
proc stowImpl(
db: AristoDbRef; # Database
nxtFid: Option[FilterID]; # Next filter ID (zero is OK)
persistent: bool; # Stage only unless `true`
chunkedMpt: bool; # Partial data (e.g. from `snap`)
): Result[void,AristoError] =
## Worker for `stow()` variants.
##
if not db.txRef.isNil:
return err(TxPendingTx)
if 0 < db.stack.len:
return err(TxStackGarbled)
if persistent and not db.canResolveBackendFilter():
return err(TxBackendNotWritable)
# Update Merkle hashes (unless disabled)
db.hashify().isOkOr:
return err(error[1])
let fwd = db.fwdFilter(db.top, chunkedMpt).valueOr:
return err(error[1])
if fwd.isValid:
# Merge `top` layer into `roFilter`
db.merge(fwd).isOkOr:
return err(error[1])
# Special treatment for `snap` proofs (aka `chunkedMpt`)
let final =
if chunkedMpt: LayerFinalRef(fRpp: db.top.final.fRpp)
else: LayerFinalRef()
# New empty top layer (probably with `snap` proofs and `vGen` carry over)
db.top = LayerRef(
delta: LayerDeltaRef(),
final: final)
if db.roFilter.isValid:
db.top.final.vGen = db.roFilter.vGen
else:
let rc = db.getIdgUbe()
if rc.isOk:
db.top.final.vGen = rc.value
else:
# It is OK if there was no `Idg`. Otherwise something serious happened
# and there is no way to recover easily.
doAssert rc.error == GetIdgNotFound
if persistent:
# Merge `roFiler` into persistent tables
? db.resolveBackendFilter nxtFid
db.roFilter = FilterRef(nil)
# Special treatment for `snap` proofs (aka `chunkedMpt`)
let final =
if chunkedMpt: LayerFinalRef(vGen: db.vGen, fRpp: db.top.final.fRpp)
else: LayerFinalRef(vGen: db.vGen)
# New empty top layer (probably with `snap` proofs carry over)
db.top = LayerRef(
delta: LayerDeltaRef(),
final: final,
txUid: db.top.txUid)
ok()
./aristo_tx/[tx_fork, tx_frame, tx_stow],
"."/[aristo_desc, aristo_get]
# ------------------------------------------------------------------------------
# Public functions, getters
@ -141,24 +25,20 @@ proc stowImpl(
func txTop*(db: AristoDbRef): Result[AristoTxRef,AristoError] =
## Getter, returns top level transaction if there is any.
if db.txRef.isNil:
err(TxNoPendingTx)
else:
ok(db.txRef)
db.txFrameTop()
func isTop*(tx: AristoTxRef): bool =
## Getter, returns `true` if the argument `tx` referes to the current top
## level transaction.
tx.db.txRef == tx and tx.db.top.txUid == tx.txUid
tx.txFrameIsTop()
func level*(tx: AristoTxRef): int =
## Getter, positive nesting level of transaction argument `tx`
tx.level
tx.txFrameLevel()
func level*(db: AristoDbRef): int =
## Getter, non-negative nesting level (i.e. number of pending transactions)
if not db.txRef.isNil:
result = db.txRef.level
db.txFrameLevel()
# ------------------------------------------------------------------------------
# Public functions
@ -170,146 +50,88 @@ func to*(tx: AristoTxRef; T: type[AristoDbRef]): T =
proc forkTx*(
tx: AristoTxRef; # Transaction descriptor
db: AristoDbRef;
backLevel: int; # Backward location of transaction
dontHashify = false; # Process/fix MPT hashes
): Result[AristoDbRef,AristoError] =
## Clone a transaction into a new DB descriptor accessing the same backend
## database (if any) as the argument `db`. The new descriptor is linked to
## the transaction parent and is fully functional as a forked instance (see
## comments on `aristo_desc.reCentre()` for details.)
## Fork a new descriptor obtained from parts of the argument database
## as described by arguments `db` and `backLevel`.
##
## Input situation:
## ::
## tx -> db0 with tx is top transaction, tx.level > 0
## If the argument `backLevel` is non-negative, the forked descriptor will
## provide the database view where the first `backLevel` transaction layers
## are stripped and the remaing layers are squashed into a single transaction.
##
## Output situation:
## ::
## tx -> db0 \
## > share the same backend
## tx1 -> db1 /
## If `backLevel` is `-1`, a database descriptor with empty transaction
## layers will be provided where the `roFilter` between database and
## transaction layers are kept in place.
##
## where `tx.level > 0`, `db1.level == 1` and `db1` is returned. The
## transaction `tx1` can be retrieved via `db1.txTop()`.
## If `backLevel` is `-2`, a database descriptor with empty transaction
## layers will be provided without an `roFilter`.
##
## The new DB descriptor will contain a copy of the argument transaction
## `tx` as top layer of level 1 (i.e. this is he only transaction.) Rolling
## back will end up at the backend layer (incl. backend filter.)
## The returned database descriptor will always have transaction level one.
## If there were no transactions that could be squashed, an empty
## transaction is added.
##
## If the arguent flag `dontHashify` is passed `true`, the clone descriptor
## If the arguent flag `dontHashify` is passed `true`, the forked descriptor
## will *NOT* be hashified right after construction.
##
## Use `aristo_desc.forget()` to clean up this descriptor.
##
let db = tx.db
# Fork top layer (with or without pending transaction)?
if backLevel == 0:
return db.txForkTop dontHashify
# Verify `tx` argument
if db.txRef == tx:
if db.top.txUid != tx.txUid:
return err(TxArgStaleTx)
elif db.stack.len <= tx.level:
return err(TxArgStaleTx)
elif db.stack[tx.level].txUid != tx.txUid:
return err(TxArgStaleTx)
# Fork bottom layer (=> 0 < db.stack.len)
if backLevel == db.stack.len:
return db.txForkBase dontHashify
# Provide new empty stack layer
let stackLayer = block:
let rc = db.getIdgBE()
if rc.isOk:
LayerRef(
delta: LayerDeltaRef(),
final: LayerFinalRef(vGen: rc.value))
elif rc.error == GetIdgNotFound:
LayerRef.init()
else:
return err(rc.error)
# Inspect transaction stack
if 0 < backLevel:
var tx = db.txRef
if tx.isNil or db.stack.len < backLevel:
return err(TxLevelTooDeep)
# Set up clone associated to `db`
let txClone = ? db.fork(noToplayer = true, noFilter = false)
txClone.top = db.layersCc tx.level # Provide tx level 1 stack
txClone.stack = @[stackLayer] # Zero level stack
txClone.top.txUid = 1
txClone.txUidGen = 1
# Fetch tx of level `backLevel` (seed to skip some items)
for _ in 0 ..< backLevel:
tx = tx.parent
if tx.isNil:
return err(TxStackGarbled)
return tx.txFork dontHashify
# Install transaction similar to `tx` on clone
txClone.txRef = AristoTxRef(
db: txClone,
txUid: 1,
level: 1)
# Plain fork, include `roFilter`
if backLevel == -1:
let xb = ? db.fork(noFilter=false)
discard xb.txFrameBegin()
return ok(xb)
if not dontHashify:
txClone.hashify().isOkOr:
discard txClone.forget()
return err(error[1])
# Plain fork, unfiltered backend
if backLevel == -2:
let xb = ? db.fork(noFilter=true)
discard xb.txFrameBegin()
return ok(xb)
ok(txClone)
err(TxLevelUseless)
proc forkTop*(
db: AristoDbRef;
dontHashify = false; # Process/fix MPT hashes
): Result[AristoDbRef,AristoError] =
## Variant of `forkTx()` for the top transaction if there is any. Otherwise
## the top layer is cloned, and an empty transaction is set up. After
## successful fork the returned descriptor has transaction level 1.
##
## Use `aristo_desc.forget()` to clean up this descriptor.
##
if db.txRef.isNil:
let dbClone = ? db.fork(noToplayer=true, noFilter=false)
dbClone.top = db.layersCc # Is a deep copy
if not dontHashify:
dbClone.hashify().isOkOr:
discard dbClone.forget()
return err(error[1])
discard dbClone.txBegin
return ok(dbClone)
# End if()
db.txRef.forkTx dontHashify
proc forkBase*(
db: AristoDbRef;
dontHashify = false; # Process/fix MPT hashes
): Result[AristoDbRef,AristoError] =
## Variant of `forkTx()`, sort of the opposite of `forkTop()`. This is the
## equivalent of top layer forking after all tranactions have been rolled
## back.
##
## Use `aristo_desc.forget()` to clean up this descriptor.
##
if not db.txRef.isNil:
let dbClone = ? db.fork(noToplayer=true, noFilter=false)
dbClone.top = db.layersCc 0
if not dontHashify:
dbClone.hashify().isOkOr:
discard dbClone.forget()
return err(error[1])
discard dbClone.txBegin
return ok(dbClone)
# End if()
db.forkTop dontHashify
proc forkWith*(
proc findTx*(
db: AristoDbRef;
vid: VertexID; # Pivot vertex (typically `VertexID(1)`)
key: HashKey; # Hash key of pivot verte
dontHashify = true; # Process/fix MPT hashes
): Result[AristoDbRef,AristoError] =
key: HashKey; # Hash key of pivot vertex
): Result[int,AristoError] =
## Find the transaction where the vertex with ID `vid` exists and has the
## Merkle hash key `key`. If there is no transaction available, search in
## the filter and then in the backend.
##
## If the above procedure succeeds, a new descriptor is forked with exactly
## one transaction which contains the all the bottom layers up until the
## layer where the `(vid,key)` pair is found. In case the pair was found on
## the filter or the backend, this transaction is empty.
## If the above procedure succeeds, an integer indicating the transaction
## level integer is returned:
##
## * `0` -- top level, current layer
## * `1`, `2`, ... -- some transaction level further down the stack
## * `-1` -- the filter between transaction stack and database backend
## * `-2` -- the databse backend
##
## A successful return code might be used for the `forkTx()` call for
## creating a forked descriptor that provides the pair `(vid,key)`.
##
if not vid.isValid or
not key.isValid:
@ -319,38 +141,33 @@ proc forkWith*(
# Try `(vid,key)` on top layer
let topKey = db.top.delta.kMap.getOrVoid vid
if topKey == key:
return db.forkTop dontHashify
return ok(0)
else:
# Find `(vid,key)` on transaction layers
for (tx,layer,error) in db.txRef.txWalk:
var n = 0
for (n,tx,layer,error) in db.txRef.txFrameWalk:
if error != AristoError(0):
return err(error)
if layer.delta.kMap.getOrVoid(vid) == key:
return tx.forkTx dontHashify
return ok(n)
# Try bottom layer
let botKey = db.stack[0].delta.kMap.getOrVoid vid
if botKey == key:
return db.forkBase dontHashify
return ok(db.stack.len)
# Try `(vid,key)` on filter
# Try `(vid,key)` on roFilter
if not db.roFilter.isNil:
let roKey = db.roFilter.kMap.getOrVoid vid
if roKey == key:
let rc = db.fork(noFilter = false)
if rc.isOk:
discard rc.value.txBegin
return rc
return ok(-1)
# Try `(vid,key)` on unfiltered backend
block:
let beKey = db.getKeyUbe(vid).valueOr: VOID_HASH_KEY
if beKey == key:
let rc = db.fork(noFilter = true)
if rc.isOk:
discard rc.value.txBegin
return rc
return ok(-2)
err(TxNotFound)
@ -369,23 +186,7 @@ proc txBegin*(db: AristoDbRef): Result[AristoTxRef,AristoError] =
## ... continue using db ...
## tx.commit()
##
if db.level != db.stack.len:
return err(TxStackGarbled)
db.stack.add db.top
db.top = LayerRef(
delta: LayerDeltaRef(),
final: db.top.final.dup,
txUid: db.getTxUid)
db.txRef = AristoTxRef(
db: db,
txUid: db.top.txUid,
parent: db.txRef,
level: db.stack.len)
ok db.txRef
db.txFrameBegin()
proc rollback*(
tx: AristoTxRef; # Top transaction on database
@ -394,15 +195,7 @@ proc rollback*(
## performed for this transactio. The previous transaction is returned if
## there was any.
##
let db = ? tx.getDbDescFromTopTx()
# Roll back to previous layer.
db.top = db.stack[^1]
db.stack.setLen(db.stack.len-1)
db.txRef = db.txRef.parent
ok()
tx.txFrameRollback()
proc commit*(
tx: AristoTxRef; # Top transaction on database
@ -411,32 +204,7 @@ proc commit*(
## performed through this handle and merges it to the previous layer. The
## previous transaction is returned if there was any.
##
let db = ? tx.getDbDescFromTopTx()
db.hashify().isOkOr:
return err(error[1])
# Pop layer from stack and merge database top layer onto it
let merged = block:
if db.top.delta.sTab.len == 0 and
db.top.delta.kMap.len == 0:
# Avoid `layersMergeOnto()`
db.top.delta = db.stack[^1].delta
db.stack.setLen(db.stack.len-1)
db.top
else:
let layer = db.stack[^1]
db.stack.setLen(db.stack.len-1)
db.top.layersMergeOnto layer[]
layer
# Install `merged` stack top layer and update stack
db.top = merged
db.txRef = tx.parent
if 0 < db.stack.len:
db.txRef.txUid = db.getTxUid
db.top.txUid = db.txRef.txUid
ok()
tx.txFrameCommit()
proc collapse*(
tx: AristoTxRef; # Top transaction on database
@ -447,20 +215,10 @@ proc collapse*(
## ::
## while true:
## discard tx.commit() # ditto for rollback()
## if db.topTx.isErr: break
## tx = db.topTx.value
## if db.txTop.isErr: break
## tx = db.txTop.value
##
let db = ? tx.getDbDescFromTopTx()
if commit:
# For commit, hashify the current layer if requested and install it
db.hashify().isOkOr:
return err(error[1])
db.top.txUid = 0
db.stack.setLen(0)
db.txRef = AristoTxRef(nil)
ok()
tx.txFrameCollapse commit
# ------------------------------------------------------------------------------
# Public functions: save to database
@ -491,7 +249,7 @@ proc persist*(
## In this case, the `chunkedMpt` argument must be set `true` (see alse
## `fwdFilter()`.)
##
db.stowImpl(nxtFid, persistent=true, chunkedMpt=chunkedMpt)
db.txStow(nxtFid, persistent=true, chunkedMpt=chunkedMpt)
proc stow*(
db: AristoDbRef; # Database
@ -510,7 +268,7 @@ proc stow*(
## In this case, the `chunkedMpt` argument must be set `true` (see alse
## `fwdFilter()`.)
##
db.stowImpl(nxtFid=none(FilterID), persistent=false, chunkedMpt=chunkedMpt)
db.txStow(nxtFid=none(FilterID), persistent=false, chunkedMpt=chunkedMpt)
# ------------------------------------------------------------------------------
# End

View File

@ -0,0 +1,152 @@
# nimbus-eth1
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Aristo DB -- Transaction fork helpers
## =====================================
##
{.push raises: [].}
import
results,
./tx_frame,
".."/[aristo_desc, aristo_get, aristo_layers, aristo_hashify]
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc txFork*(
tx: AristoTxRef; # Transaction descriptor
dontHashify = false; # Process/fix MPT hashes
): Result[AristoDbRef,AristoError] =
## Clone a transaction into a new DB descriptor accessing the same backend
## database (if any) as the argument `db`. The new descriptor is linked to
## the transaction parent and is fully functional as a forked instance (see
## comments on `aristo_desc.reCentre()` for details.)
##
## Input situation:
## ::
## tx -> db0 with tx is top transaction, tx.level > 0
##
## Output situation:
## ::
## tx -> db0 \
## > share the same backend
## tx1 -> db1 /
##
## where `tx.level > 0`, `db1.level == 1` and `db1` is returned. The
## transaction `tx1` can be retrieved via `db1.txTop()`.
##
## The new DB descriptor will contain a copy of the argument transaction
## `tx` as top layer of level 1 (i.e. this is he only transaction.) Rolling
## back will end up at the backend layer (incl. backend filter.)
##
## If the arguent flag `dontHashify` is passed `true`, the clone descriptor
## will *NOT* be hashified right after construction.
##
## Use `aristo_desc.forget()` to clean up this descriptor.
##
let db = tx.db
# Verify `tx` argument
if db.txRef == tx:
if db.top.txUid != tx.txUid:
return err(TxArgStaleTx)
elif db.stack.len <= tx.level:
return err(TxArgStaleTx)
elif db.stack[tx.level].txUid != tx.txUid:
return err(TxArgStaleTx)
# Provide new empty stack layer
let stackLayer = block:
let rc = db.getIdgBE()
if rc.isOk:
LayerRef(
delta: LayerDeltaRef(),
final: LayerFinalRef(vGen: rc.value))
elif rc.error == GetIdgNotFound:
LayerRef.init()
else:
return err(rc.error)
# Set up clone associated to `db`
let txClone = ? db.fork(noToplayer = true, noFilter = false)
txClone.top = db.layersCc tx.level # Provide tx level 1 stack
txClone.stack = @[stackLayer] # Zero level stack
txClone.top.txUid = 1
txClone.txUidGen = 1
# Install transaction similar to `tx` on clone
txClone.txRef = AristoTxRef(
db: txClone,
txUid: 1,
level: 1)
if not dontHashify:
txClone.hashify().isOkOr:
discard txClone.forget()
return err(error[1])
ok(txClone)
proc txForkTop*(
db: AristoDbRef;
dontHashify = false; # Process/fix MPT hashes
): Result[AristoDbRef,AristoError] =
## Variant of `forkTx()` for the top transaction if there is any. Otherwise
## the top layer is cloned, and an empty transaction is set up. After
## successful fork the returned descriptor has transaction level 1.
##
## Use `aristo_desc.forget()` to clean up this descriptor.
##
if db.txRef.isNil:
let txClone = ? db.fork(noToplayer=true, noFilter=false)
txClone.top = db.layersCc # Is a deep copy
if not dontHashify:
txClone.hashify().isOkOr:
discard txClone.forget()
return err(error[1])
discard txClone.txFrameBegin()
return ok(txClone)
# End if()
db.txRef.txFork dontHashify
proc txForkBase*(
db: AristoDbRef;
dontHashify = false; # Process/fix MPT hashes
): Result[AristoDbRef,AristoError] =
## Variant of `forkTx()`, sort of the opposite of `forkTop()`. This is the
## equivalent of top layer forking after all tranactions have been rolled
## back.
##
## Use `aristo_desc.forget()` to clean up this descriptor.
##
if db.txRef.isNil:
return db.txForkTop dontHashify
let txClone = ? db.fork(noToplayer=true, noFilter=false)
txClone.top = db.layersCc 0
if not dontHashify:
txClone.hashify().isOkOr:
discard txClone.forget()
return err(error[1])
discard txClone.txFrameBegin()
ok(txClone)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,209 @@
# nimbus-eth1
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Aristo DB -- Transaction frames helper
## ======================================
##
{.push raises: [].}
import
std/tables,
results,
".."/[aristo_desc, aristo_layers, aristo_hashify]
func txFrameIsTop*(tx: AristoTxRef): bool
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
func getDbDescFromTopTx(tx: AristoTxRef): Result[AristoDbRef,AristoError] =
if not tx.txFrameIsTop():
return err(TxNotTopTx)
let db = tx.db
if tx.level != db.stack.len:
return err(TxStackGarbled)
ok db
proc getTxUid(db: AristoDbRef): uint =
if db.txUidGen == high(uint):
db.txUidGen = 0
db.txUidGen.inc
db.txUidGen
# ------------------------------------------------------------------------------
# Public functions, getters
# ------------------------------------------------------------------------------
func txFrameTop*(db: AristoDbRef): Result[AristoTxRef,AristoError] =
## Getter, returns top level transaction if there is any.
if db.txRef.isNil:
err(TxNoPendingTx)
else:
ok(db.txRef)
func txFrameIsTop*(tx: AristoTxRef): bool =
## Getter, returns `true` if the argument `tx` referes to the current top
## level transaction.
tx.db.txRef == tx and tx.db.top.txUid == tx.txUid
func txFrameLevel*(tx: AristoTxRef): int =
## Getter, positive nesting level of transaction argument `tx`
tx.level
func txFrameLevel*(db: AristoDbRef): int =
## Getter, non-negative nesting level (i.e. number of pending transactions)
if not db.txRef.isNil:
result = db.txRef.level
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc txFrameBegin*(db: AristoDbRef): Result[AristoTxRef,AristoError] =
## Starts a new transaction.
##
## Example:
## ::
## proc doSomething(db: AristoDbRef) =
## let tx = db.begin
## defer: tx.rollback()
## ... continue using db ...
## tx.commit()
##
if db.txFrameLevel != db.stack.len:
return err(TxStackGarbled)
db.stack.add db.top
db.top = LayerRef(
delta: LayerDeltaRef(),
final: db.top.final.dup,
txUid: db.getTxUid)
db.txRef = AristoTxRef(
db: db,
txUid: db.top.txUid,
parent: db.txRef,
level: db.stack.len)
ok db.txRef
proc txFrameRollback*(
tx: AristoTxRef; # Top transaction on database
): Result[void,AristoError] =
## Given a *top level* handle, this function discards all database operations
## performed for this transactio. The previous transaction is returned if
## there was any.
##
let db = ? tx.getDbDescFromTopTx()
# Roll back to previous layer.
db.top = db.stack[^1]
db.stack.setLen(db.stack.len-1)
db.txRef = db.txRef.parent
ok()
proc txFrameCommit*(
tx: AristoTxRef; # Top transaction on database
): Result[void,AristoError] =
## Given a *top level* handle, this function accepts all database operations
## performed through this handle and merges it to the previous layer. The
## previous transaction is returned if there was any.
##
let db = ? tx.getDbDescFromTopTx()
db.hashify().isOkOr:
return err(error[1])
# Pop layer from stack and merge database top layer onto it
let merged = block:
if db.top.delta.sTab.len == 0 and
db.top.delta.kMap.len == 0:
# Avoid `layersMergeOnto()`
db.top.delta = db.stack[^1].delta
db.stack.setLen(db.stack.len-1)
db.top
else:
let layer = db.stack[^1]
db.stack.setLen(db.stack.len-1)
db.top.layersMergeOnto layer[]
layer
# Install `merged` stack top layer and update stack
db.top = merged
db.txRef = tx.parent
if 0 < db.stack.len:
db.txRef.txUid = db.getTxUid
db.top.txUid = db.txRef.txUid
ok()
proc txFrameCollapse*(
tx: AristoTxRef; # Top transaction on database
commit: bool; # Commit if `true`, otherwise roll back
): Result[void,AristoError] =
## Iterated application of `commit()` or `rollback()` performing the
## something similar to
## ::
## while true:
## discard tx.commit() # ditto for rollback()
## if db.txTop.isErr: break
## tx = db.txTop.value
##
let db = ? tx.getDbDescFromTopTx()
if commit:
# For commit, hashify the current layer if requested and install it
db.hashify().isOkOr:
return err(error[1])
db.top.txUid = 0
db.stack.setLen(0)
db.txRef = AristoTxRef(nil)
ok()
# ------------------------------------------------------------------------------
# Public iterators
# ------------------------------------------------------------------------------
iterator txFrameWalk*(tx: AristoTxRef): (int,AristoTxRef,LayerRef,AristoError) =
## Walk down the transaction stack chain.
let db = tx.db
var tx = tx
block body:
# Start at top layer if tx refers to that
if tx.level == db.stack.len:
if tx.txUid != db.top.txUid:
yield (-1,tx,db.top,TxStackGarbled)
break body
# Yield the top level
yield (0,tx,db.top,AristoError(0))
# Walk down the transaction stack
for level in (tx.level-1).countDown(1):
tx = tx.parent
if tx.isNil or tx.level != level:
yield (-1,tx,LayerRef(nil),TxStackGarbled)
break body
var layer = db.stack[level]
if tx.txUid != layer.txUid:
yield (-1,tx,layer,TxStackGarbled)
break body
yield (db.stack.len-level,tx,layer,AristoError(0))
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,91 @@
# nimbus-eth1
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Aristo DB -- Transaction stow/save helper
## =========================================
##
{.push raises: [].}
import
std/options,
results,
".."/[aristo_desc, aristo_get, aristo_journal, aristo_layers, aristo_hashify]
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc txStow*(
db: AristoDbRef; # Database
nxtFid: Option[FilterID]; # Next filter ID (zero is OK)
persistent: bool; # Stage only unless `true`
chunkedMpt: bool; # Partial data (e.g. from `snap`)
): Result[void,AristoError] =
## Worker for `stow()` and `persist()` variants.
##
if not db.txRef.isNil:
return err(TxPendingTx)
if 0 < db.stack.len:
return err(TxStackGarbled)
if persistent and not db.journalUpdateOk():
return err(TxBackendNotWritable)
# Update Merkle hashes (unless disabled)
db.hashify().isOkOr:
return err(error[1])
let fwd = db.journalFwdFilter(db.top, chunkedMpt).valueOr:
return err(error[1])
if fwd.isValid:
# Merge `top` layer into `roFilter`
db.journalMerge(fwd).isOkOr:
return err(error[1])
# Special treatment for `snap` proofs (aka `chunkedMpt`)
let final =
if chunkedMpt: LayerFinalRef(fRpp: db.top.final.fRpp)
else: LayerFinalRef()
# New empty top layer (probably with `snap` proofs and `vGen` carry over)
db.top = LayerRef(
delta: LayerDeltaRef(),
final: final)
if db.roFilter.isValid:
db.top.final.vGen = db.roFilter.vGen
else:
let rc = db.getIdgUbe()
if rc.isOk:
db.top.final.vGen = rc.value
else:
# It is OK if there was no `Idg`. Otherwise something serious happened
# and there is no way to recover easily.
doAssert rc.error == GetIdgNotFound
if persistent:
# Merge `roFiler` into persistent tables
? db.journalUpdate nxtFid
db.roFilter = FilterRef(nil)
# Special treatment for `snap` proofs (aka `chunkedMpt`)
let final =
if chunkedMpt: LayerFinalRef(vGen: db.vGen, fRpp: db.top.final.fRpp)
else: LayerFinalRef(vGen: db.vGen)
# New empty top layer (probably with `snap` proofs carry over)
db.top = LayerRef(
delta: LayerDeltaRef(),
final: final,
txUid: db.top.txUid)
ok()
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -29,7 +29,7 @@ iterator aristoReplicate[T](
root = dsc.rootID
mpt = dsc.to(AristoDbRef)
api = dsc.to(AristoApiRef)
p = api.forkTop(mpt).valueOrApiError "aristoReplicate()"
p = api.forkTx(mpt,0).valueOrApiError "aristoReplicate()"
defer: discard api.forget(p)
for (vid,key,vtx,node) in T.replicate(p):
if key.len == 32:

View File

@ -314,9 +314,11 @@ proc accMethods(cAcc: AristoCoreDxAccRef): CoreDbAccFns =
colType: CtAccounts)
proc accCloneMpt(): CoreDbRc[CoreDxMptRef] =
ok(AristoCoreDxMptRef(
var xpt = AristoCoreDxMptRef(
base: base,
mptRoot: AccountsVID))
mptRoot: AccountsVID)
xpt.methods = xpt.mptMethods
ok(db.bless xpt)
proc accFetch(address: EthAddress): CoreDbRc[CoreDbAccount] =
const info = "acc/fetchFn()"
@ -570,7 +572,7 @@ func toVoidRc*[T](
proc getFromJournal*(base: AristoBaseRef; fid: Option[FilterID]): FilterRef =
let be = base.ctx.mpt.backend
if not be.isNil:
let fp = base.api.getFromJournal(be, fid, earlierOK=true).valueOr:
let fp = base.api.journalGetInx(be, fid, earlierOK=true).valueOr:
return FilterRef(nil)
return fp.fil
@ -730,9 +732,16 @@ proc init*(
vid = VertexID(colType)
key = colState.to(HashKey)
# Find `(vid,key)` on transaction stack
inx = block:
let rc = api.findTx(base.ctx.mpt, vid, key)
if rc.isErr:
return err(rc.error.toError(base, info))
rc.value
# Fork MPT descriptor that provides `(vid,key)`
newMpt = block:
let rc = api.forkWith(base.ctx.mpt, vid, key)
let rc = api.forkTx(base.ctx.mpt, inx)
if rc.isErr:
return err(rc.error.toError(base, info))
rc.value

View File

@ -279,7 +279,7 @@ func init*(T: type KvtBaseRef; db: CoreDbRef; kdb: KvtDbRef): T =
base: result,
kvt: kdb)
dsc.methods = dsc.kvtMethods()
result.cache = KvtCoreDxKvtRef(db.bless dsc)
result.cache = db.bless dsc
when CoreDbEnableApiProfiling:
let profApi = KvtApiProfRef.init(result.api, kdb.backend)

View File

@ -15,7 +15,7 @@ import
chronos,
eth/p2p,
../../db/aristo/aristo_desc,
../../db/aristo/aristo_filter/filter_scheduler,
../../db/aristo/aristo_journal/journal_scheduler,
".."/[protocol, sync_desc],
../handlers/eth,
../misc/[best_pivot, block_queue, sync_ctrl, ticker],

View File

@ -22,10 +22,10 @@ import
aristo_debug,
aristo_desc,
aristo_desc/desc_backend,
aristo_filter,
aristo_filter/filter_fifos,
aristo_filter/filter_scheduler,
aristo_get,
aristo_journal,
aristo_journal/journal_ops,
aristo_journal/journal_scheduler,
aristo_layers,
aristo_merge,
aristo_persistent,
@ -191,7 +191,7 @@ proc dbTriplet(w: LeafQuartet; rdbPath: string): Result[DbTriplet,AristoError] =
check rc.error == 0
return
let dx = [db, db.forkTop.value, db.forkTop.value]
let dx = [db, db.forkTx(0).value, db.forkTx(0).value]
xCheck dx[0].nForked == 2
# Reduce unwanted tx layers
@ -417,7 +417,7 @@ proc storeFilter(
): bool =
## ..
let instr = block:
let rc = be.fifosStore(filter, none(FilterID))
let rc = be.journalOpsPushSlot(filter, none(FilterID))
xCheckRc rc.error == 0
rc.value
@ -450,11 +450,11 @@ proc fetchDelete(
## ...
let
vfyInst = block:
let rc = be.fifosDelete(backSteps = backSteps)
let rc = be.journalOpsDeleteSlots(backSteps = backSteps)
xCheckRc rc.error == 0
rc.value
instr = block:
let rc = be.fifosFetch(backSteps = backSteps)
let rc = be.journalOpsFetchSlots(backSteps = backSteps)
xCheckRc rc.error == 0
rc.value
qid = be.journal.le(instr.fil.fid, be.qid2fidFn)
@ -842,7 +842,7 @@ proc testFilterBacklog*(
# Realign to earlier state
xb = block:
let rc = db.forkByJournal(episode = episode)
let rc = db.journalFork(episode = episode)
xCheckRc rc.error == 0
rc.value
block:
@ -851,7 +851,7 @@ proc testFilterBacklog*(
# Store this state backend database (temporarily re-centre)
block:
let rc = xb.resolveBackendFilter(reCentreOk = true)
let rc = xb.journalUpdate(reCentreOk = true)
xCheckRc rc.error == 0
xCheck db.isCentre
block:
@ -863,7 +863,7 @@ proc testFilterBacklog*(
# Restore previous database state
block:
let rc = db.resolveBackendFilter()
let rc = db.journalUpdate()
xCheckRc rc.error == 0
block:
let rc = db.check(relax=false)

View File

@ -13,7 +13,7 @@ import
eth/common,
rocksdb,
../../nimbus/db/aristo/[
aristo_debug, aristo_desc, aristo_delete, aristo_filter/filter_scheduler,
aristo_debug, aristo_desc, aristo_delete, aristo_journal/journal_scheduler,
aristo_hashify, aristo_hike, aristo_merge],
../../nimbus/db/kvstore_rocksdb,
../../nimbus/sync/protocol/snap/snap_types,

View File

@ -21,7 +21,7 @@ import
../../nimbus/db/aristo/[
aristo_check, aristo_debug, aristo_desc, aristo_blobify, aristo_layers,
aristo_vid],
../../nimbus/db/aristo/aristo_filter/filter_scheduler,
../../nimbus/db/aristo/aristo_journal/journal_scheduler,
../replay/xcheck,
./test_helpers
@ -396,7 +396,7 @@ proc testQidScheduler*(
let w = scd.addItem()
let execOk = list.exec(serial=n, instr=w.exec, relax=false)
xCheck execOk
scd[] = w.fifo[]
scd[] = w.journal[]
let validateOk = list.validate(scd, serial=n, relax=false)
xCheck validateOk:
show(serial=n, exec=w.exec)
@ -436,7 +436,7 @@ proc testQidScheduler*(
list.del qid
xCheck delIDs.len == 0
scd[] = fetch.fifo[]
scd[] = fetch.journal[]
# -------------------
@ -445,7 +445,7 @@ proc testQidScheduler*(
let w = scd.addItem()
let execOk = list.exec(serial=n, instr=w.exec, relax=true)
xCheck execOk
scd[] = w.fifo[]
scd[] = w.journal[]
let validateOk = list.validate(scd, serial=n, relax=true)
xCheck validateOk:
show(serial=n, exec=w.exec)
@ -455,7 +455,7 @@ proc testQidScheduler*(
let w = scd.addItem()
let execOk = list.exec(serial=n, instr=w.exec, relax=false)
xCheck execOk
scd[] = w.fifo[]
scd[] = w.journal[]
let validateOk = list.validate(scd, serial=n, relax=false)
xCheck validateOk