229 lines
8.2 KiB
Nim
229 lines
8.2 KiB
Nim
# nimbus-eth1
|
|
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
|
# Licensed under either of
|
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
|
# http://www.apache.org/licenses/LICENSE-2.0)
|
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
|
# http://opensource.org/licenses/MIT)
|
|
# at your option. This file may not be copied, modified, or distributed
|
|
# except according to those terms.
|
|
|
|
## Aristo DB -- Filter and journal management
|
|
## ==========================================
|
|
##
|
|
|
|
import
|
|
std/[options, sequtils, sets, tables],
|
|
eth/common,
|
|
results,
|
|
"."/[aristo_desc, aristo_get, aristo_vid],
|
|
./aristo_desc/desc_backend,
|
|
./aristo_journal/[
|
|
filter_state_root, filter_merge, filter_reverse, filter_siblings,
|
|
journal_get, journal_ops]
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public functions, construct filters
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc journalFwdFilter*(
|
|
db: AristoDbRef; # Database
|
|
layer: LayerRef; # Layer to derive filter from
|
|
chunkedMpt = false; # Relax for snap/proof scenario
|
|
): Result[FilterRef,(VertexID,AristoError)] =
|
|
## Assemble forward delta, i.e. changes to the backend equivalent to applying
|
|
## the current top layer.
|
|
##
|
|
## Typically, the `layer` layer would reflect a change of the MPT but there
|
|
## is the case of partial MPTs sent over the network when synchronising (see
|
|
## `snap` protocol.) In this case, the state root might not see a change on
|
|
## the `layer` layer which would result in an error unless the argument
|
|
## `extendOK` is set `true`
|
|
##
|
|
## This delta is taken against the current backend including optional
|
|
## read-only filter.
|
|
##
|
|
# Register the Merkle hash keys of the MPT where this reverse filter will be
|
|
# applicable: `be => fg`
|
|
let (srcRoot, trgRoot) = block:
|
|
let rc = db.getLayerStateRoots(layer.delta, chunkedMpt)
|
|
if rc.isOK:
|
|
(rc.value.be, rc.value.fg)
|
|
elif rc.error == FilPrettyPointlessLayer:
|
|
return ok FilterRef(nil)
|
|
else:
|
|
return err((VertexID(1), rc.error))
|
|
|
|
ok FilterRef(
|
|
src: srcRoot,
|
|
sTab: layer.delta.sTab,
|
|
kMap: layer.delta.kMap,
|
|
vGen: layer.final.vGen.vidReorg, # Compact recycled IDs
|
|
trg: trgRoot)
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public functions, apply/install filters
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc journalMerge*(
|
|
db: AristoDbRef; # Database
|
|
filter: FilterRef; # Filter to apply to database
|
|
): Result[void,(VertexID,AristoError)] =
|
|
## Merge the argument `filter` into the read-only filter layer. Note that
|
|
## this function has no control of the filter source. Having merged the
|
|
## argument `filter`, all the `top` and `stack` layers should be cleared.
|
|
##
|
|
let ubeRoot = block:
|
|
let rc = db.getKeyUbe VertexID(1)
|
|
if rc.isOk:
|
|
rc.value.to(Hash256)
|
|
elif rc.error == GetKeyNotFound:
|
|
EMPTY_ROOT_HASH
|
|
else:
|
|
return err((VertexID(1),rc.error))
|
|
|
|
db.roFilter = ? db.merge(filter, db.roFilter, ubeRoot)
|
|
if db.roFilter.src == db.roFilter.trg:
|
|
# Under normal conditions, the root keys cannot be the same unless the
|
|
# database is empty. This changes if there is a fixed root vertex as
|
|
# used with the `snap` sync protocol boundaty proof. In that case, there
|
|
# can be no history chain and the filter is just another cache.
|
|
if VertexID(1) notin db.top.final.pPrf:
|
|
db.roFilter = FilterRef(nil)
|
|
|
|
ok()
|
|
|
|
|
|
proc journalUpdateOk*(db: AristoDbRef): bool =
|
|
## Check whether the read-only filter can be merged into the backend
|
|
not db.backend.isNil and db.isCentre
|
|
|
|
|
|
proc journalUpdate*(
|
|
db: AristoDbRef; # Database
|
|
nxtFid = none(FilterID); # Next filter ID (if any)
|
|
reCentreOk = false;
|
|
): Result[void,AristoError] =
|
|
## Resolve (i.e. move) the backend filter into the physical backend database.
|
|
##
|
|
## This needs write permission on the backend DB for the argument `db`
|
|
## descriptor (see the function `aristo_desc.isCentre()`.) With the argument
|
|
## flag `reCentreOk` passed `true`, write permission will be temporarily
|
|
## acquired when needed.
|
|
##
|
|
## When merging the current backend filter, its reverse will be is stored as
|
|
## back log on the filter fifos (so the current state can be retrieved.)
|
|
## Also, other non-centre descriptors are updated so there is no visible
|
|
## database change for these descriptors.
|
|
##
|
|
## Caveat: This function will delete entries from the cascaded fifos if the
|
|
## current backend filter is the reverse compiled from the top item
|
|
## chain from the cascaded fifos as implied by the function
|
|
## `forkBackLog()`, for example.
|
|
##
|
|
let be = db.backend
|
|
if be.isNil:
|
|
return err(FilBackendMissing)
|
|
|
|
# Blind or missing filter
|
|
if db.roFilter.isNil:
|
|
return ok()
|
|
|
|
# Make sure that the argument `db` is at the centre so the backend is in
|
|
# read-write mode for this peer.
|
|
let parent = db.getCentre
|
|
if db != parent:
|
|
if not reCentreOk:
|
|
return err(FilBackendRoMode)
|
|
db.reCentre
|
|
# Always re-centre to `parent` (in case `reCentreOk` was set)
|
|
defer: parent.reCentre
|
|
|
|
# Initialise peer filter balancer.
|
|
let updateSiblings = ? UpdateSiblingsRef.init db
|
|
defer: updateSiblings.rollback()
|
|
|
|
# Figure out how to save the reverse filter on a cascades slots queue
|
|
var instr: JournalOpsMod
|
|
if not be.journal.isNil: # Otherwise ignore
|
|
block getInstr:
|
|
# Compile instruction for updating filters on the cascaded fifos
|
|
if db.roFilter.isValid:
|
|
let ovLap = be.journalGetOverlap db.roFilter
|
|
if 0 < ovLap:
|
|
instr = ? be.journalOpsDeleteSlots ovLap # Revert redundant entries
|
|
break getInstr
|
|
instr = ? be.journalOpsPushSlot(
|
|
updateSiblings.rev, # Store reverse filter
|
|
nxtFid) # Set filter ID (if any)
|
|
|
|
# Store structural single trie entries
|
|
let writeBatch = be.putBegFn()
|
|
be.putVtxFn(writeBatch, db.roFilter.sTab.pairs.toSeq)
|
|
be.putKeyFn(writeBatch, db.roFilter.kMap.pairs.toSeq)
|
|
be.putIdgFn(writeBatch, db.roFilter.vGen)
|
|
|
|
# Store `instr` as history journal entry
|
|
if not be.journal.isNil:
|
|
be.putFilFn(writeBatch, instr.put)
|
|
be.putFqsFn(writeBatch, instr.scd.state)
|
|
? be.putEndFn writeBatch # Finalise write batch
|
|
|
|
# Update dudes and this descriptor
|
|
? updateSiblings.update().commit()
|
|
|
|
# Finally update slot queue scheduler state (as saved)
|
|
if not be.journal.isNil:
|
|
be.journal.state = instr.scd.state
|
|
|
|
db.roFilter = FilterRef(nil)
|
|
ok()
|
|
|
|
|
|
proc journalFork*(
|
|
db: AristoDbRef;
|
|
episode: int;
|
|
): Result[AristoDbRef,AristoError] =
|
|
## Construct a new descriptor on the `db` backend which enters it through a
|
|
## set of backend filters from the casacded filter fifos. The filter used is
|
|
## addressed as `episode`, where the most recend backward filter has episode
|
|
## `0`, the next older has episode `1`, etc.
|
|
##
|
|
## Use `aristo_filter.forget()` directive to clean up this descriptor.
|
|
##
|
|
let be = db.backend
|
|
if be.isNil:
|
|
return err(FilBackendMissing)
|
|
if episode < 0:
|
|
return err(FilNegativeEpisode)
|
|
let
|
|
instr = ? be.journalOpsFetchSlots(backSteps = episode+1)
|
|
clone = ? db.fork(noToplayer = true)
|
|
clone.top = LayerRef.init()
|
|
clone.top.final.vGen = instr.fil.vGen
|
|
clone.roFilter = instr.fil
|
|
ok clone
|
|
|
|
proc journalFork*(
|
|
db: AristoDbRef;
|
|
fid: Option[FilterID];
|
|
earlierOK = false;
|
|
): Result[AristoDbRef,AristoError] =
|
|
## Variant of `journalFork()` for forking to a particular filter ID (or the
|
|
## nearest predecessot if `earlierOK` is passed `true`.) if there is some
|
|
## filter ID `fid`.
|
|
##
|
|
## Otherwise, the oldest filter is forked to (regardless of the value of
|
|
## `earlierOK`.)
|
|
##
|
|
let be = db.backend
|
|
if be.isNil:
|
|
return err(FilBackendMissing)
|
|
|
|
let fip = ? be.journalGetInx(fid, earlierOK)
|
|
db.journalFork fip.inx
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# End
|
|
# ------------------------------------------------------------------------------
|