nimbus-eth1/nimbus/db/aristo/aristo_journal.nim
Jordan Hrycaj 54f784bef1
Kvt remodel tx and forked descriptors (#2168)
* Aristo: Generalise alien/guest interface for piggiback on database

* Aristo: Code cosmetics

* CoreDb+Kvt: Update transaction API

why:
  Use single addressable function `forkTx(backLevel: int)` as used
  in `Aristo`. So `Kvt` can be synced simultaneously to `Aristo`.

also:
  Refactored `kvt_tx.nim` in a similar fashion to `Aristo`.

* Kvt: Replace `LayerDelta` object by reference

why:
  Will be needed when introducing filters

* Kvt: Remodel backend filter facility similar to `Aristo`

why:
  This allows to operate on several KVT instances simultaneously.

* CoreDb+Kvt: Fix on-disk storage

why:
  Overlooked name change: `stow()` => `persist()` for permanent storage

* Fix copyright headers
2024-05-07 19:59:27 +00:00

229 lines
8.2 KiB
Nim

# nimbus-eth1
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Aristo DB -- Filter and journal management
## ==========================================
##
import
std/[options, sequtils, sets, tables],
eth/common,
results,
"."/[aristo_desc, aristo_get, aristo_vid],
./aristo_desc/desc_backend,
./aristo_journal/[
filter_state_root, filter_merge, filter_reverse, filter_siblings,
journal_get, journal_ops]
# ------------------------------------------------------------------------------
# Public functions, construct filters
# ------------------------------------------------------------------------------
proc journalFwdFilter*(
db: AristoDbRef; # Database
layer: LayerRef; # Layer to derive filter from
chunkedMpt = false; # Relax for snap/proof scenario
): Result[FilterRef,(VertexID,AristoError)] =
## Assemble forward delta, i.e. changes to the backend equivalent to applying
## the current top layer.
##
## Typically, the `layer` layer would reflect a change of the MPT but there
## is the case of partial MPTs sent over the network when synchronising (see
## `snap` protocol.) In this case, the state root might not see a change on
## the `layer` layer which would result in an error unless the argument
## `extendOK` is set `true`
##
## This delta is taken against the current backend including optional
## read-only filter.
##
# Register the Merkle hash keys of the MPT where this reverse filter will be
# applicable: `be => fg`
let (srcRoot, trgRoot) = block:
let rc = db.getLayerStateRoots(layer.delta, chunkedMpt)
if rc.isOK:
(rc.value.be, rc.value.fg)
elif rc.error == FilPrettyPointlessLayer:
return ok FilterRef(nil)
else:
return err((VertexID(1), rc.error))
ok FilterRef(
src: srcRoot,
sTab: layer.delta.sTab,
kMap: layer.delta.kMap,
vGen: layer.final.vGen.vidReorg, # Compact recycled IDs
trg: trgRoot)
# ------------------------------------------------------------------------------
# Public functions, apply/install filters
# ------------------------------------------------------------------------------
proc journalMerge*(
db: AristoDbRef; # Database
filter: FilterRef; # Filter to apply to database
): Result[void,(VertexID,AristoError)] =
## Merge the argument `filter` into the read-only filter layer. Note that
## this function has no control of the filter source. Having merged the
## argument `filter`, all the `top` and `stack` layers should be cleared.
##
let ubeRoot = block:
let rc = db.getKeyUbe VertexID(1)
if rc.isOk:
rc.value.to(Hash256)
elif rc.error == GetKeyNotFound:
EMPTY_ROOT_HASH
else:
return err((VertexID(1),rc.error))
db.roFilter = ? db.merge(filter, db.roFilter, ubeRoot)
if db.roFilter.src == db.roFilter.trg:
# Under normal conditions, the root keys cannot be the same unless the
# database is empty. This changes if there is a fixed root vertex as
# used with the `snap` sync protocol boundaty proof. In that case, there
# can be no history chain and the filter is just another cache.
if VertexID(1) notin db.top.final.pPrf:
db.roFilter = FilterRef(nil)
ok()
proc journalUpdateOk*(db: AristoDbRef): bool =
## Check whether the read-only filter can be merged into the backend
not db.backend.isNil and db.isCentre
proc journalUpdate*(
db: AristoDbRef; # Database
nxtFid = none(FilterID); # Next filter ID (if any)
reCentreOk = false;
): Result[void,AristoError] =
## Resolve (i.e. move) the backend filter into the physical backend database.
##
## This needs write permission on the backend DB for the argument `db`
## descriptor (see the function `aristo_desc.isCentre()`.) With the argument
## flag `reCentreOk` passed `true`, write permission will be temporarily
## acquired when needed.
##
## When merging the current backend filter, its reverse will be is stored as
## back log on the filter fifos (so the current state can be retrieved.)
## Also, other non-centre descriptors are updated so there is no visible
## database change for these descriptors.
##
## Caveat: This function will delete entries from the cascaded fifos if the
## current backend filter is the reverse compiled from the top item
## chain from the cascaded fifos as implied by the function
## `forkBackLog()`, for example.
##
let be = db.backend
if be.isNil:
return err(FilBackendMissing)
# Blind or missing filter
if db.roFilter.isNil:
return ok()
# Make sure that the argument `db` is at the centre so the backend is in
# read-write mode for this peer.
let parent = db.getCentre
if db != parent:
if not reCentreOk:
return err(FilBackendRoMode)
db.reCentre
# Always re-centre to `parent` (in case `reCentreOk` was set)
defer: parent.reCentre
# Initialise peer filter balancer.
let updateSiblings = ? UpdateSiblingsRef.init db
defer: updateSiblings.rollback()
# Figure out how to save the reverse filter on a cascades slots queue
var instr: JournalOpsMod
if not be.journal.isNil: # Otherwise ignore
block getInstr:
# Compile instruction for updating filters on the cascaded fifos
if db.roFilter.isValid:
let ovLap = be.journalGetOverlap db.roFilter
if 0 < ovLap:
instr = ? be.journalOpsDeleteSlots ovLap # Revert redundant entries
break getInstr
instr = ? be.journalOpsPushSlot(
updateSiblings.rev, # Store reverse filter
nxtFid) # Set filter ID (if any)
# Store structural single trie entries
let writeBatch = be.putBegFn()
be.putVtxFn(writeBatch, db.roFilter.sTab.pairs.toSeq)
be.putKeyFn(writeBatch, db.roFilter.kMap.pairs.toSeq)
be.putIdgFn(writeBatch, db.roFilter.vGen)
# Store `instr` as history journal entry
if not be.journal.isNil:
be.putFilFn(writeBatch, instr.put)
be.putFqsFn(writeBatch, instr.scd.state)
? be.putEndFn writeBatch # Finalise write batch
# Update dudes and this descriptor
? updateSiblings.update().commit()
# Finally update slot queue scheduler state (as saved)
if not be.journal.isNil:
be.journal.state = instr.scd.state
db.roFilter = FilterRef(nil)
ok()
proc journalFork*(
db: AristoDbRef;
episode: int;
): Result[AristoDbRef,AristoError] =
## Construct a new descriptor on the `db` backend which enters it through a
## set of backend filters from the casacded filter fifos. The filter used is
## addressed as `episode`, where the most recend backward filter has episode
## `0`, the next older has episode `1`, etc.
##
## Use `aristo_filter.forget()` directive to clean up this descriptor.
##
let be = db.backend
if be.isNil:
return err(FilBackendMissing)
if episode < 0:
return err(FilNegativeEpisode)
let
instr = ? be.journalOpsFetchSlots(backSteps = episode+1)
clone = ? db.fork(noToplayer = true)
clone.top = LayerRef.init()
clone.top.final.vGen = instr.fil.vGen
clone.roFilter = instr.fil
ok clone
proc journalFork*(
db: AristoDbRef;
fid: Option[FilterID];
earlierOK = false;
): Result[AristoDbRef,AristoError] =
## Variant of `journalFork()` for forking to a particular filter ID (or the
## nearest predecessot if `earlierOK` is passed `true`.) if there is some
## filter ID `fid`.
##
## Otherwise, the oldest filter is forked to (regardless of the value of
## `earlierOK`.)
##
let be = db.backend
if be.isNil:
return err(FilBackendMissing)
let fip = ? be.journalGetInx(fid, earlierOK)
db.journalFork fip.inx
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------