557 lines
17 KiB
Nim
557 lines
17 KiB
Nim
# nimbus-eth1
|
|
# Copyright (c) 2021 Status Research & Development GmbH
|
|
# Licensed under either of
|
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
|
# http://www.apache.org/licenses/LICENSE-2.0)
|
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
|
# http://opensource.org/licenses/MIT)
|
|
# at your option. This file may not be copied, modified, or distributed
|
|
# except according to those terms.
|
|
|
|
## Aristo DB -- Transaction interface
|
|
## ==================================
|
|
##
|
|
{.push raises: [].}
|
|
|
|
import
|
|
std/[sequtils, tables],
|
|
results,
|
|
"."/[aristo_desc, aristo_filter, aristo_get, aristo_hashify]
|
|
|
|
type
|
|
DoSpanPrepFn =
|
|
proc(db: AristoDbRef; flg: bool): Result[void,AristoError] {.gcsafe.}
|
|
|
|
DoSpanExecFn =
|
|
proc(db: AristoDbRef) {.gcsafe.}
|
|
|
|
func isTop*(tx: AristoTxRef): bool
|
|
func level*(db: AristoDbRef): int
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Private helpers
|
|
# ------------------------------------------------------------------------------
|
|
|
|
func fromVae(err: (VertexID,AristoError)): AristoError =
|
|
## Map error pair to error reason component
|
|
err[1]
|
|
|
|
func getDbDescFromTopTx(tx: AristoTxRef): Result[AristoDbRef,AristoError] =
|
|
if not tx.isTop():
|
|
return err(TxNotTopTx)
|
|
let db = tx.db
|
|
if tx.level != db.stack.len:
|
|
return err(TxStackGarbled)
|
|
ok db
|
|
|
|
proc getTxUid(db: AristoDbRef): uint =
|
|
if db.txUidGen == high(uint):
|
|
db.txUidGen = 0
|
|
db.txUidGen.inc
|
|
db.txUidGen
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Private functions: Single descriptor transaction frame
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc txBeginPrepImpl(db: AristoDbRef): Result[void,AristoError] =
|
|
## Starts a new transaction.
|
|
##
|
|
if db.level != db.stack.len:
|
|
return err(TxStackGarbled)
|
|
ok()
|
|
|
|
proc txBeginExecImpl(db: AristoDbRef) =
|
|
## Starts a new transaction.
|
|
##
|
|
db.stack.add db.top.dup # push (save and use top later)
|
|
db.top.txUid = db.getTxUid()
|
|
|
|
db.txRef = AristoTxRef(
|
|
db: db,
|
|
txUid: db.top.txUid,
|
|
parent: db.txRef,
|
|
level: db.stack.len)
|
|
|
|
# ---------------
|
|
|
|
proc rollbackImpl(db: AristoDbRef) =
|
|
## Roll back to previous layer.
|
|
##
|
|
db.top = db.stack[^1]
|
|
db.stack.setLen(db.stack.len-1)
|
|
db.txRef = db.txRef.parent # `db.txRef` needs to be checked by caller
|
|
|
|
# ---------------
|
|
|
|
proc commitPrepImpl(
|
|
db: AristoDbRef; # Top transaction on database
|
|
dontHashify: bool; # Process/fix MPT hashes
|
|
): Result[void,AristoError] =
|
|
## Commit transaction layer.
|
|
##
|
|
if db.top.dirty and not dontHashify:
|
|
discard ? db.hashify().mapErr fromVae
|
|
ok()
|
|
|
|
proc commitExecImpl(db: AristoDbRef) =
|
|
## Commit transaction layer.
|
|
##
|
|
# Keep top and discard layer below
|
|
db.top.txUid = db.stack[^1].txUid
|
|
db.stack.setLen(db.stack.len-1)
|
|
db.txRef = db.txRef.parent # `db.txRef` needs to be checked by caller
|
|
|
|
# ---------------
|
|
|
|
proc collapseCommitPrepImpl(
|
|
db: AristoDbRef;
|
|
dontHashify = false; # Process/fix MPT hashes
|
|
): Result[void,AristoError] =
|
|
# For commit, hashify the current layer, otherwise the stack bottom layer.
|
|
# install the stack bottom.
|
|
if db.top.dirty and not dontHashify:
|
|
discard ? db.hashify().mapErr fromVae
|
|
ok()
|
|
|
|
proc collapseRollbackPrepImpl(
|
|
db: AristoDbRef;
|
|
dontHashify = false; # Process/fix MPT hashes
|
|
): Result[void,AristoError] =
|
|
# Rollback hashify the current layer, otherwise the stack bottom layer.
|
|
# install the stack bottom.
|
|
if db.top.dirty and not dontHashify:
|
|
db.stack[0].swap db.top
|
|
defer: db.stack[0].swap db.top
|
|
discard ? db.hashify().mapErr fromVae
|
|
ok()
|
|
|
|
|
|
proc collapseCommitExecImpl(db: AristoDbRef) =
|
|
# If commit, then leave the current layer and clear the stack, oterwise
|
|
# install the stack bottom.
|
|
db.top.txUid = 0
|
|
db.stack.setLen(0)
|
|
db.txRef = AristoTxRef(nil)
|
|
|
|
proc collapseRollbackExecImpl(db: AristoDbRef) =
|
|
db.stack[0].swap db.top
|
|
db.top.txUid = 0
|
|
db.stack.setLen(0)
|
|
db.txRef = AristoTxRef(nil)
|
|
|
|
# ---------------
|
|
|
|
proc doSpan(
|
|
db: AristoDbRef; # Top transaction on database
|
|
prepFn = DoSpanPrepFn(nil); # Optional preparation layer
|
|
prepFlag = false; # `prepFn` argument
|
|
execFn: DoSpanExecFn; # Mandatory execution layer
|
|
): Result[void,AristoError]
|
|
{.gcsafe.} =
|
|
## Common execution framework for `rollbackImpl()` or `commitImpl()` over
|
|
## all descriptors in the transaction span.
|
|
##
|
|
if not prepFn.isNil:
|
|
var
|
|
revert: Table[AristoDbRef,LayerRef]
|
|
defer:
|
|
# Restore previous layer
|
|
for (dude,top) in revert.pairs:
|
|
dude.top = top
|
|
|
|
for dude in db.txSpan:
|
|
if dude.stack.len == 0 or
|
|
dude.stack.len != dude.txRef.level or
|
|
dude.top.txUid != dude.txRef.txUid:
|
|
return err(TxStackGarbled)
|
|
let keep = db.top
|
|
? dude.prepFn prepFlag # Preparation function
|
|
revert[dude] = keep
|
|
revert.clear # Done, no restoring
|
|
|
|
for dude in db.txSpan:
|
|
dude.execFn() # Commit function
|
|
|
|
if db.level == 0:
|
|
db.txSpanClear()
|
|
|
|
ok()
|
|
|
|
proc doThisPrep(
|
|
db: AristoDbRef; # Top transaction on database
|
|
prepFn = DoSpanPrepFn(nil); # Mandatory preparation layer function
|
|
prepFlag = false; # `prepFn` argument
|
|
): Result[void,AristoError]
|
|
{.gcsafe.} =
|
|
## ..
|
|
let
|
|
keep = db.top
|
|
rc = db.prepFn prepFlag
|
|
if rc.isErr:
|
|
db.top = keep
|
|
return err(rc.error)
|
|
ok()
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public functions, getters
|
|
# ------------------------------------------------------------------------------
|
|
|
|
func txTop*(db: AristoDbRef): Result[AristoTxRef,AristoError] =
|
|
## Getter, returns top level transaction if there is any.
|
|
if db.txRef.isNil:
|
|
err(TxNoPendingTx)
|
|
else:
|
|
ok(db.txRef)
|
|
|
|
func isTop*(tx: AristoTxRef): bool =
|
|
## Getter, returns `true` if the argument `tx` referes to the current top
|
|
## level transaction.
|
|
tx.db.txRef == tx and tx.db.top.txUid == tx.txUid
|
|
|
|
func level*(tx: AristoTxRef): int =
|
|
## Getter, positive nesting level of transaction argument `tx`
|
|
tx.level
|
|
|
|
func level*(db: AristoDbRef): int =
|
|
## Getter, non-negative nesting level (i.e. number of pending transactions)
|
|
if not db.txRef.isNil:
|
|
result = db.txRef.level
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public functions
|
|
# ------------------------------------------------------------------------------
|
|
|
|
func to*(tx: AristoTxRef; T: type[AristoDbRef]): T =
|
|
## Getter, retrieves the parent database descriptor from argument `tx`
|
|
tx.db
|
|
|
|
|
|
proc forkTx*(
|
|
tx: AristoTxRef; # Transaction descriptor
|
|
dontHashify = false; # Process/fix MPT hashes
|
|
): Result[AristoDbRef,AristoError] =
|
|
## Clone a transaction into a new DB descriptor accessing the same backend
|
|
## (if any) database as the argument `db`. The new descriptor is linked to
|
|
## the transaction parent and is fully functional as a forked instance (see
|
|
## comments on `aristo_desc.reCentre()` for details.)
|
|
##
|
|
## The new DB descriptor will contain a copy of the argument transaction
|
|
## `tx` as top layer of level 1 (i.e. this is he only transaction.) Rolling
|
|
## back will end up at the backend layer (incl. backend filter.)
|
|
##
|
|
## If the arguent flag `dontHashify` is passed `true`, the clone descriptor
|
|
## will *NOT* be hashified right after construction.
|
|
##
|
|
## Use `aristo_desc.forget()` to clean up this descriptor.
|
|
##
|
|
let db = tx.db
|
|
|
|
# Provide new top layer
|
|
var topLayer: LayerRef
|
|
if db.txRef == tx:
|
|
topLayer = db.top.dup
|
|
elif tx.level < db.stack.len:
|
|
topLayer = db.stack[tx.level].dup
|
|
else:
|
|
return err(TxArgStaleTx)
|
|
if topLayer.txUid != tx.txUid:
|
|
return err(TxArgStaleTx)
|
|
topLayer.txUid = 1
|
|
|
|
# Empty stack
|
|
let stackLayer = block:
|
|
let rc = db.getIdgBE()
|
|
if rc.isOk:
|
|
LayerRef(vGen: rc.value)
|
|
elif rc.error == GetIdgNotFound:
|
|
LayerRef()
|
|
else:
|
|
return err(rc.error)
|
|
|
|
let txClone = ? db.fork(rawToplayer = true)
|
|
|
|
# Set up clone associated to `db`
|
|
txClone.top = topLayer # is a deep copy
|
|
txClone.stack = @[stackLayer]
|
|
txClone.roFilter = db.roFilter # no need to copy contents (done when updated)
|
|
txClone.backend = db.backend
|
|
txClone.txUidGen = 1
|
|
|
|
# Install transaction similar to `tx` on clone
|
|
txClone.txRef = AristoTxRef(
|
|
db: txClone,
|
|
txUid: 1,
|
|
level: 1)
|
|
|
|
if db.top.dirty and not dontHashify:
|
|
let rc = txClone.hashify()
|
|
if rc.isErr:
|
|
discard txClone.forget()
|
|
return err(rc.error.fromVae)
|
|
|
|
ok(txClone)
|
|
|
|
|
|
proc forkTop*(
|
|
db: AristoDbRef;
|
|
dontHashify = false; # Process/fix MPT hashes
|
|
): Result[AristoDbRef,AristoError] =
|
|
## Variant of `forkTx()` for the top transaction if there is any. Otherwise
|
|
## the top layer is cloned, only.
|
|
##
|
|
## Use `aristo_desc.forget()` to clean up this descriptor.
|
|
##
|
|
if db.txRef.isNil:
|
|
let dbClone = ? db.fork(rawToplayer = true)
|
|
|
|
dbClone.top = db.top.dup # is a deep copy
|
|
dbClone.roFilter = db.roFilter # no need to copy contents when updated
|
|
dbClone.backend = db.backend
|
|
|
|
if db.top.dirty and not dontHashify:
|
|
let rc = dbClone.hashify()
|
|
if rc.isErr:
|
|
discard dbClone.forget()
|
|
return err(rc.error.fromVae)
|
|
return ok(dbClone)
|
|
|
|
db.txRef.forkTx dontHashify
|
|
|
|
|
|
proc exec*(
|
|
tx: AristoTxRef;
|
|
action: AristoDbAction;
|
|
dontHashify = false; # Process/fix MPT hashes
|
|
): Result[void,AristoError]
|
|
{.gcsafe, raises: [CatchableError].} =
|
|
## Execute function argument `action()` on a temporary `tx.forkTx()`
|
|
## transaction clone database. After return, the temporary database gets
|
|
## destroyed.
|
|
##
|
|
## If the arguent flag `dontHashify` is passed `true`, the clone database
|
|
## will *NOT* be hashified right after construction.
|
|
##
|
|
let db = ? tx.forkTx dontHashify
|
|
db.action()
|
|
? db.forget()
|
|
ok()
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public functions: Transaction frame
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc txBegin*(db: AristoDbRef): Result[AristoTxRef,AristoError] =
|
|
## Starts a new transaction.
|
|
##
|
|
## Example:
|
|
## ::
|
|
## proc doSomething(db: AristoDbRef) =
|
|
## let tx = db.begin
|
|
## defer: tx.rollback()
|
|
## ... continue using db ...
|
|
## tx.commit()
|
|
##
|
|
if not db.inTxSpan:
|
|
? db.txBeginPrepImpl()
|
|
db.txBeginExecImpl()
|
|
|
|
elif not db.isCentre:
|
|
return err(TxSpanOffCentre)
|
|
|
|
else:
|
|
for dude in db.txSpan:
|
|
? dude.txBeginPrepImpl() # Only check, no need to restore
|
|
for dude in db.txSpan:
|
|
dude.txBeginExecImpl()
|
|
|
|
ok db.txRef
|
|
|
|
proc txBeginSpan*(db: AristoDbRef): Result[AristoTxRef,AristoError] =
|
|
## Start a new transaction simultaneously on all descriptors accessing the
|
|
## same backend.
|
|
##
|
|
## This function must be run on the centre argument descriptor `db` (see
|
|
## comments on `aristo_desc.reCentre()` for details.) This function is
|
|
## effective only when there is no transaction opened, yet. Sub-transactions
|
|
## are handled by `txBegin()` accordingly.
|
|
##
|
|
## When starting sub-transactions outside a transaction span, these
|
|
## transactions are handled independently.
|
|
##
|
|
## Example:
|
|
## ::
|
|
## let
|
|
## tx = db.txBeginSpan # includes all forked descriptors
|
|
## ty = db.txBegin # includes all forked descriptors
|
|
##
|
|
## tmpDb = tx.forkTx # outside transaction span
|
|
## tz = tmpDb.txBegin # outside transaction span
|
|
##
|
|
if not db.isCentre:
|
|
return err(TxSpanOffCentre)
|
|
|
|
if 0 < db.nForked:
|
|
if db.level == 0:
|
|
if 0 < db.nTxSpan:
|
|
return err(TxGarbledSpan)
|
|
db.forked.toSeq.txSpanSet
|
|
|
|
db.txBegin
|
|
|
|
|
|
proc rollback*(
|
|
tx: AristoTxRef; # Top transaction on database
|
|
): Result[void,AristoError] =
|
|
## Given a *top level* handle, this function discards all database operations
|
|
## performed for this transactio. The previous transaction is returned if
|
|
## there was any.
|
|
##
|
|
let db = ? tx.getDbDescFromTopTx()
|
|
if not db.inTxSpan:
|
|
db.rollbackImpl()
|
|
return ok()
|
|
|
|
if not db.isCentre:
|
|
return err(TxSpanOffCentre)
|
|
|
|
db.doSpan(execFn = rollbackImpl)
|
|
|
|
|
|
proc commit*(
|
|
tx: AristoTxRef; # Top transaction on database
|
|
dontHashify = false; # Process/fix MPT hashes
|
|
): Result[void,AristoError] =
|
|
## Given a *top level* handle, this function accepts all database operations
|
|
## performed through this handle and merges it to the previous layer. The
|
|
## previous transaction is returned if there was any.
|
|
##
|
|
## Unless the argument `dontHashify` is set `true`, the function will process
|
|
## Merkle Patricia Treee hashes unless there was no change to this layer.
|
|
## This may produce additional errors (see `hashify()`.)
|
|
##
|
|
let db = ? tx.getDbDescFromTopTx()
|
|
if not db.inTxSpan:
|
|
? db.doThisPrep(commitPrepImpl, dontHashify)
|
|
db.commitExecImpl()
|
|
return ok()
|
|
|
|
if not db.isCentre:
|
|
return err(TxSpanOffCentre)
|
|
|
|
db.doSpan(
|
|
prepFn = commitPrepImpl,
|
|
prepFlag = dontHashify,
|
|
execFn = commitExecImpl)
|
|
|
|
|
|
proc collapse*(
|
|
tx: AristoTxRef; # Top transaction on database
|
|
commit: bool; # Commit if `true`, otherwise roll back
|
|
dontHashify = false; # Process/fix MPT hashes
|
|
): Result[void,AristoError] =
|
|
## Iterated application of `commit()` or `rollback()` performing the
|
|
## something similar to
|
|
## ::
|
|
## while true:
|
|
## discard tx.commit() # ditto for rollback()
|
|
## if db.topTx.isErr: break
|
|
## tx = db.topTx.value
|
|
##
|
|
## The `dontHashify` flag is treated as described for `commit()`
|
|
##
|
|
let db = ? tx.getDbDescFromTopTx()
|
|
if not db.inTxSpan:
|
|
if commit:
|
|
? db.doThisPrep(collapseCommitPrepImpl, dontHashify)
|
|
db.collapseCommitExecImpl()
|
|
else:
|
|
? db.doThisPrep(collapseRollbackPrepImpl, dontHashify)
|
|
db.collapseRollbackExecImpl()
|
|
return ok()
|
|
|
|
if not db.isCentre:
|
|
return err(TxSpanOffCentre)
|
|
|
|
if commit:
|
|
db.doSpan(
|
|
prepFn = collapseCommitPrepImpl,
|
|
prepFlag = dontHashify,
|
|
execFn = collapseCommitExecImpl)
|
|
else:
|
|
db.doSpan(
|
|
prepFn = collapseRollbackPrepImpl,
|
|
prepFlag = dontHashify,
|
|
execFn = collapseRollbackExecImpl)
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public functions: save database
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc stow*(
|
|
db: AristoDbRef; # Database
|
|
persistent = false; # Stage only unless `true`
|
|
dontHashify = false; # Process/fix MPT hashes
|
|
chunkedMpt = false; # Partial data (e.g. from `snap`)
|
|
): Result[void,AristoError] =
|
|
## If there is no backend while the `persistent` argument is set `true`,
|
|
## the function returns immediately with an error. The same happens if there
|
|
## is a pending transaction.
|
|
##
|
|
## The `dontHashify` is treated as described for `commit()`.
|
|
##
|
|
## The function then merges the data from the top layer cache into the
|
|
## backend stage area. After that, the top layer cache is cleared.
|
|
##
|
|
## Staging the top layer cache might fail withh a partial MPT when it is
|
|
## set up from partial MPT chunks as it happens with `snap` sync processing.
|
|
## In this case, the `chunkedMpt` argument must be set `true` (see alse
|
|
## `fwdFilter`.)
|
|
##
|
|
## If the argument `persistent` is set `true`, all the staged data are merged
|
|
## into the physical backend database and the staged data area is cleared.
|
|
##
|
|
if not db.txRef.isNil:
|
|
return err(TxPendingTx)
|
|
if 0 < db.stack.len:
|
|
return err(TxStackGarbled)
|
|
if persistent and not db.canResolveBackendFilter():
|
|
return err(TxBackendNotWritable)
|
|
|
|
if db.top.dirty and not dontHashify:
|
|
discard ? db.hashify().mapErr fromVae
|
|
|
|
let fwd = ? db.fwdFilter(db.top, chunkedMpt).mapErr fromVae
|
|
|
|
if fwd.isValid:
|
|
# Merge `top` layer into `roFilter`
|
|
? db.merge(fwd).mapErr fromVae
|
|
db.top = LayerRef(vGen: db.roFilter.vGen)
|
|
|
|
if persistent:
|
|
? db.resolveBackendFilter()
|
|
db.roFilter = FilterRef(nil)
|
|
|
|
# Delete or clear stack and clear top
|
|
db.stack.setLen(0)
|
|
db.top = LayerRef(vGen: db.top.vGen, txUid: db.top.txUid)
|
|
|
|
ok()
|
|
|
|
proc stow*(
|
|
db: AristoDbRef; # Database
|
|
stageLimit: int; # Policy based persistent storage
|
|
dontHashify = false; # Process/fix MPT hashes
|
|
chunkedMpt = false; # Partial data (e.g. from `snap`)
|
|
): Result[void,AristoError] =
|
|
## Variant of `stow()` with the `persistent` argument replaced by
|
|
## `stageLimit < max(db.roFilter.bulk, db.top.bulk)`.
|
|
db.stow(
|
|
persistent = (stageLimit < max(db.roFilter.bulk, db.top.bulk)),
|
|
dontHashify = dontHashify,
|
|
chunkedMpt = chunkedMpt)
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# End
|
|
# ------------------------------------------------------------------------------
|