mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-02-28 11:50:45 +00:00
Simplify txFrame protocol, improve persist performance (#3077)
* Simplify txFrame protocol, improve persist performance To prepare forked-layers for further surgery to avoid the nesting tax, the commit/rollback style of interacting must first be adjusted, since it does not provide a point in time where the frame is "done" and goes from being actively written to, to simply waiting to be persisted or discarded. A collateral benefit of this change is that the scheme removes some complexity from the process by moving the "last saved block number" into txframe along with the actual state changes thus reducing the risk that they go "out of sync" and removing the "commit" consolidation responsibility from ForkedChain. * commit/rollback become checkpoint/dispose - since these are pure in-memory constructs, there's less error handling and there's no real "rollback" involved - dispose better implies that the instance cannot be used and we can more aggressively clear the memory it uses * simplified block number handling that moves to become part of txFrame just like the data that the block number references * avoid reparenting step by replacing the base instead of keeping a singleton instance * persist builds the set of changes from the bottom which helps avoid moving changes in the top layers through each ancestor level of the frame stack * when using an in-memory database in tests, allow the instance to be passed around to enable testing persist and reload logic
This commit is contained in:
parent
c8e6247a16
commit
caca11b30b
@ -2,16 +2,16 @@ TracerTests
|
||||
===
|
||||
## TracerTests
|
||||
```diff
|
||||
block46147.json Skip
|
||||
block46400.json Skip
|
||||
block46402.json Skip
|
||||
block47205.json Skip
|
||||
block48712.json Skip
|
||||
block48915.json Skip
|
||||
block49018.json Skip
|
||||
block97.json Skip
|
||||
+ block46147.json OK
|
||||
+ block46400.json OK
|
||||
+ block46402.json OK
|
||||
+ block47205.json OK
|
||||
+ block48712.json OK
|
||||
+ block48915.json OK
|
||||
+ block49018.json OK
|
||||
+ block97.json OK
|
||||
```
|
||||
OK: 0/8 Fail: 0/8 Skip: 8/8
|
||||
OK: 8/8 Fail: 0/8 Skip: 0/8
|
||||
|
||||
---TOTAL---
|
||||
OK: 0/8 Fail: 0/8 Skip: 8/8
|
||||
OK: 8/8 Fail: 0/8 Skip: 0/8
|
||||
|
@ -43,7 +43,7 @@ const
|
||||
proc processBlock(c: ForkedChainRef,
|
||||
parent: Header,
|
||||
txFrame: CoreDbTxRef,
|
||||
blk: Block): Result[seq[Receipt], string] =
|
||||
blk: Block, blkHash: Hash32): Result[seq[Receipt], string] =
|
||||
template header(): Header =
|
||||
blk.header
|
||||
|
||||
@ -62,11 +62,7 @@ proc processBlock(c: ForkedChainRef,
|
||||
|
||||
# We still need to write header to database
|
||||
# because validateUncles still need it
|
||||
let blockHash = header.blockHash()
|
||||
?txFrame.persistHeader(
|
||||
blockHash,
|
||||
header,
|
||||
c.com.startOfHistory)
|
||||
?txFrame.persistHeader(blkHash, header, c.com.startOfHistory)
|
||||
|
||||
# update currentBlock *after* we persist it
|
||||
# so the rpc return consistent result
|
||||
@ -93,7 +89,7 @@ func updateBranch(c: ForkedChainRef,
|
||||
c.activeBranch = newBranch
|
||||
|
||||
proc writeBaggage(c: ForkedChainRef,
|
||||
blk: Block,
|
||||
blk: Block, blkHash: Hash32,
|
||||
txFrame: CoreDbTxRef,
|
||||
receipts: openArray[Receipt]) =
|
||||
template header(): Header =
|
||||
@ -140,13 +136,16 @@ proc validateBlock(c: ForkedChainRef,
|
||||
requestsHash: blk.header.requestsHash,
|
||||
)
|
||||
|
||||
var res = c.processBlock(parent.header, txFrame, blk)
|
||||
if res.isErr:
|
||||
txFrame.rollback()
|
||||
return err(res.error)
|
||||
var receipts = c.processBlock(parent.header, txFrame, blk, blkHash).valueOr:
|
||||
txFrame.dispose()
|
||||
return err(error)
|
||||
|
||||
c.writeBaggage(blk, txFrame, res.value)
|
||||
c.updateBranch(parent, blk, blkHash, txFrame, move(res.value))
|
||||
c.writeBaggage(blk, blkHash, txFrame, receipts)
|
||||
|
||||
# Block fully written to txFrame, mark it as such
|
||||
txFrame.checkpoint(blk.header.number)
|
||||
|
||||
c.updateBranch(parent, blk, blkHash, txFrame, move(receipts))
|
||||
|
||||
for i, tx in blk.transactions:
|
||||
c.txRecords[rlpHash(tx)] = (blkHash, uint64(i))
|
||||
@ -266,15 +265,11 @@ func calculateNewBase(
|
||||
|
||||
doAssert(false, "Unreachable code, finalized block outside canonical chain")
|
||||
|
||||
proc removeBlockFromCache(c: ForkedChainRef, bd: BlockDesc, commit = false) =
|
||||
proc removeBlockFromCache(c: ForkedChainRef, bd: BlockDesc) =
|
||||
c.hashToBlock.del(bd.hash)
|
||||
for tx in bd.blk.transactions:
|
||||
c.txRecords.del(rlpHash(tx))
|
||||
if commit:
|
||||
if bd.txFrame != c.baseTxFrame:
|
||||
bd.txFrame.commit()
|
||||
else:
|
||||
bd.txFrame.dispose()
|
||||
bd.txFrame.dispose()
|
||||
|
||||
proc updateHead(c: ForkedChainRef, head: BlockPos) =
|
||||
## Update head if the new head is different from current head.
|
||||
@ -374,10 +369,10 @@ proc updateBase(c: ForkedChainRef, newBase: BlockPos) =
|
||||
# Cleanup in-memory blocks starting from newBase backward
|
||||
# e.g. B3 backward. Switch to parent branch if needed.
|
||||
|
||||
template commitBlocks(number, branch) =
|
||||
template disposeBlocks(number, branch) =
|
||||
let tailNumber = branch.tailNumber
|
||||
while number >= tailNumber:
|
||||
c.removeBlockFromCache(branch.blocks[number - tailNumber], commit = true)
|
||||
c.removeBlockFromCache(branch.blocks[number - tailNumber])
|
||||
inc count
|
||||
|
||||
if number == 0:
|
||||
@ -385,10 +380,6 @@ proc updateBase(c: ForkedChainRef, newBase: BlockPos) =
|
||||
break
|
||||
dec number
|
||||
|
||||
proc commitBase(c: ForkedChainRef, bd: BlockDesc) =
|
||||
if bd.txFrame != c.baseTxFrame:
|
||||
bd.txFrame.commit()
|
||||
|
||||
let
|
||||
# Cache to prevent crash after we shift
|
||||
# the blocks
|
||||
@ -401,10 +392,11 @@ proc updateBase(c: ForkedChainRef, newBase: BlockPos) =
|
||||
|
||||
let nextIndex = int(newBase.number - branch.tailNumber)
|
||||
|
||||
# Commit base block but don't remove from FC
|
||||
c.commitBase(branch.blocks[nextIndex])
|
||||
# Persist the new base block - this replaces the base tx in coredb!
|
||||
c.com.db.persist(newBase.txFrame)
|
||||
c.baseTxFrame = newBase.txFrame
|
||||
|
||||
commitBlocks(number, branch)
|
||||
disposeBlocks(number, branch)
|
||||
|
||||
# Update base if it indeed changed
|
||||
if nextIndex > 0:
|
||||
@ -424,7 +416,7 @@ proc updateBase(c: ForkedChainRef, newBase: BlockPos) =
|
||||
# Older branches will gone
|
||||
branch = branch.parent
|
||||
while not branch.isNil:
|
||||
commitBlocks(number, branch)
|
||||
disposeBlocks(number, branch)
|
||||
|
||||
for i, brc in c.branches:
|
||||
if brc == branch:
|
||||
@ -454,12 +446,6 @@ proc updateBase(c: ForkedChainRef, newBase: BlockPos) =
|
||||
baseNumber = c.baseBranch.tailNumber,
|
||||
baseHash = c.baseBranch.tailHash.short
|
||||
|
||||
# Update base txFrame
|
||||
if c.baseBranch.blocks[0].txFrame != c.baseTxFrame:
|
||||
c.baseBranch.blocks[0].txFrame = c.baseTxFrame
|
||||
if c.baseBranch.len > 1:
|
||||
c.baseBranch.blocks[1].txFrame.reparent(c.baseTxFrame)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
@ -565,11 +551,6 @@ proc forkChoice*(c: ForkedChainRef,
|
||||
doAssert(newBaseNumber <= finalized.number)
|
||||
c.updateBase(newBase)
|
||||
|
||||
# Save and record the block number before the last saved block state.
|
||||
if newBaseNumber > 0:
|
||||
c.com.db.persist(newBaseNumber).isOkOr:
|
||||
return err("Failed to save state: " & $$error)
|
||||
|
||||
ok()
|
||||
|
||||
func haveBlockAndState*(c: ForkedChainRef, blockHash: Hash32): bool =
|
||||
|
@ -30,8 +30,7 @@ proc fcKvtPersist*(c: ForkedChainRef) =
|
||||
## should not be any.)
|
||||
##
|
||||
let db = c.com.db
|
||||
db.persist(c.baseTxFrame.getSavedStateBlockNumber()).isOkOr:
|
||||
raiseAssert "fcKvtPersist: persistent() failed: " & $$error
|
||||
db.persist(c.baseTxFrame)
|
||||
|
||||
proc fcKvtHasKey*(c: ForkedChainRef, key: openArray[byte]): bool =
|
||||
## Check whether the argument `key` exists on the `kvt` table (i.e. `get()`
|
||||
|
@ -64,8 +64,10 @@ proc getVmState(
|
||||
if p.vmState == nil:
|
||||
let
|
||||
vmState = BaseVMState()
|
||||
txFrame = p.c.db.baseTxFrame()
|
||||
parent = ?txFrame.getBlockHeader(header.parentHash)
|
||||
txFrame = p.c.db.baseTxFrame.txFrameBegin()
|
||||
parent = ?txFrame.getBlockHeader(header.parentHash)
|
||||
|
||||
doAssert txFrame.getSavedStateBlockNumber() == parent.number
|
||||
vmState.init(parent, header, p.c.com, txFrame, storeSlotHash = storeSlotHash)
|
||||
p.vmState = vmState
|
||||
else:
|
||||
@ -78,14 +80,15 @@ proc getVmState(
|
||||
ok(p.vmState)
|
||||
|
||||
proc dispose*(p: var Persister) =
|
||||
p.c.db.baseTxFrame().rollback()
|
||||
p.vmState.ledger.txFrame.dispose()
|
||||
p.vmState = nil
|
||||
|
||||
proc init*(T: type Persister, c: ChainRef, flags: PersistBlockFlags): T =
|
||||
T(c: c, flags: flags)
|
||||
|
||||
proc checkpoint*(p: var Persister): Result[void, string] =
|
||||
if NoValidation notin p.flags:
|
||||
let stateRoot = p.c.db.baseTxFrame().getStateRoot().valueOr:
|
||||
let stateRoot = p.vmState.ledger.txFrame.getStateRoot().valueOr:
|
||||
return err($$error)
|
||||
|
||||
if p.parent.stateRoot != stateRoot:
|
||||
@ -100,9 +103,10 @@ proc checkpoint*(p: var Persister): Result[void, string] =
|
||||
"stateRoot mismatch, expect: " & $p.parent.stateRoot & ", got: " & $stateRoot
|
||||
)
|
||||
|
||||
# Save and record the block number before the last saved block state.
|
||||
p.c.db.persist(p.parent.number).isOkOr:
|
||||
return err("Failed to save state: " & $$error)
|
||||
# Move in-memory state to disk
|
||||
p.c.db.persist(p.vmState.ledger.txFrame)
|
||||
# Get a new frame since the DB assumes ownership
|
||||
p.vmState.ledger.txFrame = p.c.db.baseTxFrame().txFrameBegin()
|
||||
|
||||
ok()
|
||||
|
||||
@ -170,6 +174,8 @@ proc persistBlock*(p: var Persister, blk: Block): Result[void, string] =
|
||||
p.stats.txs += blk.transactions.len
|
||||
p.stats.gas += blk.header.gasUsed
|
||||
|
||||
txFrame.checkpoint(header.number)
|
||||
|
||||
assign(p.parent, header)
|
||||
|
||||
ok()
|
||||
|
@ -39,14 +39,11 @@ when AristoPersistentBackendOk:
|
||||
{.pragma: noRaise, gcsafe, raises: [].}
|
||||
|
||||
type
|
||||
AristoApiCommitFn* =
|
||||
AristoApiCheckpointFn* =
|
||||
proc(tx: AristoTxRef;
|
||||
): Result[void,AristoError]
|
||||
{.noRaise.}
|
||||
## Given a *top level* handle, this function accepts all database
|
||||
## operations performed through this handle and merges it to the
|
||||
## previous layer. The previous transaction is returned if there
|
||||
## was any.
|
||||
blockNumber: uint64
|
||||
) {.noRaise.}
|
||||
## Update the txFrame to the given checkpoint "identifier", or block number
|
||||
|
||||
AristoApiDeleteAccountRecordFn* =
|
||||
proc(db: AristoTxRef;
|
||||
@ -78,9 +75,9 @@ type
|
||||
## Variant of `deleteStorageData()` for purging the whole storage tree
|
||||
## associated to the account argument `accPath`.
|
||||
|
||||
AristoApiFetchLastSavedStateFn* =
|
||||
AristoApiFetchLastCheckpointFn* =
|
||||
proc(db: AristoTxRef
|
||||
): Result[SavedState,AristoError]
|
||||
): Result[uint64,AristoError]
|
||||
{.noRaise.}
|
||||
## The function returns the state of the last saved state. This is a
|
||||
## Merkle hash tag for vertex with ID 1 and a bespoke `uint64` identifier
|
||||
@ -249,41 +246,25 @@ type
|
||||
proc(
|
||||
db: AristoDbRef;
|
||||
batch: PutHdlRef;
|
||||
nxtSid = 0u64;
|
||||
txFrame: AristoTxRef;
|
||||
) {.noRaise.}
|
||||
## Persistently store data onto backend database. If the system is
|
||||
## running without a database backend, the function returns immediately
|
||||
## with an error. The same happens if there is a pending transaction.
|
||||
##
|
||||
## The function merges all staged data from the top layer cache onto the
|
||||
## backend stage area. After that, the top layer cache is cleared.
|
||||
##
|
||||
## Finally, the staged data are merged into the physical backend
|
||||
## database and the staged data area is cleared.
|
||||
##
|
||||
## The argument `nxtSid` will be the ID for the next saved state record.
|
||||
## Persistently store the cumulative set of changes that `txFrame`
|
||||
## represents to the database. `txFrame` becomes the new base after this
|
||||
## operation.
|
||||
|
||||
AristoApiRollbackFn* =
|
||||
AristoApiDisposeFn* =
|
||||
proc(tx: AristoTxRef;
|
||||
): Result[void,AristoError]
|
||||
{.noRaise.}
|
||||
## Given a *top level* handle, this function discards all database
|
||||
## operations performed for this transactio. The previous transaction
|
||||
## is returned if there was any.
|
||||
) {.noRaise.}
|
||||
## Release a frame releasing its associated resources. This operation
|
||||
## makes all frames built on top of it invalid - they still need to be
|
||||
## released however.
|
||||
|
||||
AristoApiTxFrameBeginFn* =
|
||||
proc(db: AristoDbRef; parent: AristoTxRef
|
||||
): Result[AristoTxRef,AristoError]
|
||||
): AristoTxRef
|
||||
{.noRaise.}
|
||||
## Starts a new transaction.
|
||||
##
|
||||
## Example:
|
||||
## ::
|
||||
## proc doSomething(db: AristoTxRef) =
|
||||
## let tx = db.begin
|
||||
## defer: tx.rollback()
|
||||
## ... continue using db ...
|
||||
## tx.commit()
|
||||
## Create a new layered transaction frame - the frame can later be
|
||||
## released or frozen and persisted.
|
||||
|
||||
AristoApiBaseTxFrameFn* =
|
||||
proc(db: AristoDbRef;
|
||||
@ -293,13 +274,13 @@ type
|
||||
AristoApiRef* = ref AristoApiObj
|
||||
AristoApiObj* = object of RootObj
|
||||
## Useful set of `Aristo` fuctions that can be filtered, stacked etc.
|
||||
commit*: AristoApiCommitFn
|
||||
checkpoint*: AristoApiCheckpointFn
|
||||
|
||||
deleteAccountRecord*: AristoApiDeleteAccountRecordFn
|
||||
deleteStorageData*: AristoApiDeleteStorageDataFn
|
||||
deleteStorageTree*: AristoApiDeleteStorageTreeFn
|
||||
|
||||
fetchLastSavedState*: AristoApiFetchLastSavedStateFn
|
||||
fetchLastCheckpoint*: AristoApiFetchLastCheckpointFn
|
||||
|
||||
fetchAccountRecord*: AristoApiFetchAccountRecordFn
|
||||
fetchStateRoot*: AristoApiFetchStateRootFn
|
||||
@ -321,7 +302,7 @@ type
|
||||
|
||||
pathAsBlob*: AristoApiPathAsBlobFn
|
||||
persist*: AristoApiPersistFn
|
||||
rollback*: AristoApiRollbackFn
|
||||
dispose*: AristoApiDisposeFn
|
||||
txFrameBegin*: AristoApiTxFrameBeginFn
|
||||
baseTxFrame*: AristoApiBaseTxFrameFn
|
||||
|
||||
@ -329,13 +310,13 @@ type
|
||||
AristoApiProfNames* = enum
|
||||
## Index/name mapping for profile slots
|
||||
AristoApiProfTotal = "total"
|
||||
AristoApiProfCommitFn = "commit"
|
||||
AristoApiProfCheckpointFn = "checkpoint"
|
||||
|
||||
AristoApiProfDeleteAccountRecordFn = "deleteAccountRecord"
|
||||
AristoApiProfDeleteStorageDataFn = "deleteStorageData"
|
||||
AristoApiProfDeleteStorageTreeFn = "deleteStorageTree"
|
||||
|
||||
AristoApiProfFetchLastSavedStateFn = "fetchLastSavedState"
|
||||
AristoApiProfFetchLastCheckpointFn = "fetchLastCheckpoint"
|
||||
|
||||
AristoApiProfFetchAccountRecordFn = "fetchAccountRecord"
|
||||
AristoApiProfFetchStateRootFn = "fetchStateRoot"
|
||||
@ -358,9 +339,9 @@ type
|
||||
|
||||
AristoApiProfPathAsBlobFn = "pathAsBlob"
|
||||
AristoApiProfPersistFn = "persist"
|
||||
AristoApiProfRollbackFn = "rollback"
|
||||
AristoApiProfTxFrameBeginFn = "txFrameBegin"
|
||||
AristoApiProfBaseTxFrameFn = "baseTxFrame"
|
||||
AristoApiProfDisposeFn = "dispose"
|
||||
AristoApiProfTxFrameBeginFn = "txFrameBegin"
|
||||
AristoApiProfBaseTxFrameFn = "baseTxFrame"
|
||||
|
||||
AristoApiProfBeGetVtxFn = "be/getVtx"
|
||||
AristoApiProfBeGetKeyFn = "be/getKey"
|
||||
@ -410,13 +391,13 @@ func init*(api: var AristoApiObj) =
|
||||
##
|
||||
when AutoValidateApiHooks:
|
||||
api.reset
|
||||
api.commit = commit
|
||||
api.checkpoint = checkpoint
|
||||
|
||||
api.deleteAccountRecord = deleteAccountRecord
|
||||
api.deleteStorageData = deleteStorageData
|
||||
api.deleteStorageTree = deleteStorageTree
|
||||
|
||||
api.fetchLastSavedState = fetchLastSavedState
|
||||
api.fetchLastCheckpoint = fetchLastCheckpoint
|
||||
|
||||
api.fetchAccountRecord = fetchAccountRecord
|
||||
api.fetchStateRoot = fetchStateRoot
|
||||
@ -439,7 +420,7 @@ func init*(api: var AristoApiObj) =
|
||||
|
||||
api.pathAsBlob = pathAsBlob
|
||||
api.persist = persist
|
||||
api.rollback = rollback
|
||||
api.dispose = dispose
|
||||
api.txFrameBegin = txFrameBegin
|
||||
api.baseTxFrame = baseTxFrame
|
||||
|
||||
@ -483,10 +464,10 @@ func init*(
|
||||
code
|
||||
data.update(n.ord, getTime() - start)
|
||||
|
||||
profApi.commit =
|
||||
profApi.checkpoint =
|
||||
proc(a: AristoTxRef): auto =
|
||||
AristoApiProfCommitFn.profileRunner:
|
||||
result = api.commit(a)
|
||||
AristoApiProfCheckpointFn.profileRunner:
|
||||
api.checkpoint(a)
|
||||
|
||||
profApi.deleteAccountRecord =
|
||||
proc(a: AristoTxRef; b: Hash32): auto =
|
||||
@ -503,10 +484,10 @@ func init*(
|
||||
AristoApiProfDeleteStorageTreeFn.profileRunner:
|
||||
result = api.deleteStorageTree(a, b)
|
||||
|
||||
profApi.fetchLastSavedState =
|
||||
profApi.fetchLastCheckpoint =
|
||||
proc(a: AristoTxRef): auto =
|
||||
AristoApiProfFetchLastSavedStateFn.profileRunner:
|
||||
result = api.fetchLastSavedState(a)
|
||||
AristoApiProfFetchLastCheckpointFn.profileRunner:
|
||||
result = api.fetchLastCheckpoint(a)
|
||||
|
||||
profApi.fetchAccountRecord =
|
||||
proc(a: AristoTxRef; b: Hash32): auto =
|
||||
@ -588,10 +569,10 @@ func init*(
|
||||
AristoApiProfPersistFn.profileRunner:
|
||||
result = api.persist(a, b)
|
||||
|
||||
profApi.rollback =
|
||||
proc(a: AristoTxRef): auto =
|
||||
AristoApiProfRollbackFn.profileRunner:
|
||||
result = api.rollback(a)
|
||||
profApi.dispose =
|
||||
proc(a: AristoTxRef) =
|
||||
AristoApiProfDisposeFn.profileRunner:
|
||||
api.dispose(a)
|
||||
|
||||
profApi.txFrameBegin =
|
||||
proc(a: AristoTxRef): auto =
|
||||
|
@ -44,6 +44,7 @@ type
|
||||
db*: AristoDbRef ## Database descriptor
|
||||
parent*: AristoTxRef ## Previous transaction
|
||||
layer*: LayerRef
|
||||
blockNumber*: Opt[uint64] ## Block number set when freezing the frame
|
||||
|
||||
AristoDbRef* = ref object
|
||||
## Three tier database object supporting distributed instances.
|
||||
@ -151,8 +152,19 @@ func hash*(db: AristoDbRef): Hash =
|
||||
# Public helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
iterator stack*(tx: AristoTxRef): AristoTxRef =
|
||||
# Stack going from base to tx
|
||||
var frames: seq[AristoTxRef]
|
||||
var tx = tx
|
||||
while tx != nil:
|
||||
frames.add tx
|
||||
tx = tx.parent
|
||||
|
||||
while frames.len > 0:
|
||||
yield frames.pop()
|
||||
|
||||
iterator rstack*(tx: AristoTxRef): (LayerRef, int) =
|
||||
# Stack in reverse order
|
||||
# Stack in reverse order, ie going from tx to base
|
||||
var tx = tx
|
||||
|
||||
var i = 0
|
||||
|
@ -125,8 +125,6 @@ type
|
||||
accLeaves*: Table[Hash32, VertexRef] ## Account path -> VertexRef
|
||||
stoLeaves*: Table[Hash32, VertexRef] ## Storage path -> VertexRef
|
||||
|
||||
cTop*: VertexID ## Last committed vertex ID
|
||||
|
||||
GetVtxFlag* = enum
|
||||
PeekCache
|
||||
## Peek into, but don't update cache - useful on work loads that are
|
||||
|
@ -190,14 +190,17 @@ proc hasStoragePayload(
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc fetchLastSavedState*(
|
||||
proc fetchLastCheckpoint*(
|
||||
db: AristoTxRef;
|
||||
): Result[SavedState,AristoError] =
|
||||
): Result[BlockNumber,AristoError] =
|
||||
## Wrapper around `getLstBe()`. The function returns the state of the last
|
||||
## saved state. This is a Merkle hash tag for vertex with ID 1 and a bespoke
|
||||
## `uint64` identifier (may be interpreted as block number.)
|
||||
# TODO store in frame!!
|
||||
db.db.getLstBe()
|
||||
if db.blockNumber.isSome():
|
||||
return ok db.blockNumber.get()
|
||||
|
||||
let state = ?db.db.getLstBe()
|
||||
ok state.serial
|
||||
|
||||
proc fetchAccountRecord*(
|
||||
db: AristoTxRef;
|
||||
|
@ -1,5 +1,5 @@
|
||||
# nimbus-eth1
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Copyright (c) 2023-2025 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
@ -21,8 +21,8 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
./aristo_init/memory_only
|
||||
./aristo_init/[init_common, memory_only]
|
||||
export
|
||||
memory_only
|
||||
init_common, memory_only
|
||||
|
||||
# End
|
||||
|
@ -93,6 +93,37 @@ proc init*(trg: var TypedBackendObj; src: TypedBackendObj) =
|
||||
trg.txGen = src.txGen
|
||||
trg.txId = src.txId
|
||||
|
||||
proc init*(
|
||||
T: type AristoDbRef;
|
||||
backend: BackendRef
|
||||
): Result[T, AristoError] =
|
||||
let
|
||||
vTop = if backend == nil: VertexID(0) else: ?backend.getTuvFn()
|
||||
db = AristoDbRef(
|
||||
txRef: AristoTxRef(layer: LayerRef(vTop: vTop)),
|
||||
backend: backend,
|
||||
accLeaves: LruCache[Hash32, VertexRef].init(ACC_LRU_SIZE),
|
||||
stoLeaves: LruCache[Hash32, VertexRef].init(ACC_LRU_SIZE),
|
||||
)
|
||||
|
||||
db.txRef.db = db # TODO evaluate if this cyclic ref is worth the convenience
|
||||
|
||||
ok(db)
|
||||
|
||||
proc finish*(db: AristoDbRef; eradicate = false) =
|
||||
## Backend destructor. The argument `eradicate` indicates that a full
|
||||
## database deletion is requested. If set `false` the outcome might differ
|
||||
## depending on the type of backend (e.g. the `BackendMemory` backend will
|
||||
## always eradicate on close.)
|
||||
##
|
||||
## In case of distributed descriptors accessing the same backend, all
|
||||
## distributed descriptors will be destroyed.
|
||||
##
|
||||
## This distructor may be used on already *destructed* descriptors.
|
||||
##
|
||||
if not db.backend.isNil:
|
||||
db.backend.closeFn eradicate
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -27,7 +27,7 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[algorithm, options, sequtils, tables],
|
||||
std/[algorithm, sequtils, tables],
|
||||
results,
|
||||
../aristo_constants,
|
||||
../aristo_desc,
|
||||
@ -40,11 +40,11 @@ const
|
||||
## Enabled additional logging noise
|
||||
|
||||
type
|
||||
MemDbRef = ref object
|
||||
MemDbRef* = ref object
|
||||
## Database
|
||||
sTab: Table[RootedVertexID,seq[byte]] ## Structural vertex table making up a trie
|
||||
tUvi: Option[VertexID] ## Top used vertex ID
|
||||
lSst: Opt[SavedState] ## Last saved state
|
||||
sTab*: Table[RootedVertexID,seq[byte]] ## Structural vertex table making up a trie
|
||||
tUvi*: Opt[VertexID] ## Top used vertex ID
|
||||
lSst*: Opt[SavedState] ## Last saved state
|
||||
|
||||
MemBackendRef* = ref object of TypedBackendRef
|
||||
## Inheriting table so access can be extended for debugging purposes
|
||||
@ -52,7 +52,7 @@ type
|
||||
|
||||
MemPutHdlRef = ref object of TypedPutHdlRef
|
||||
sTab: Table[RootedVertexID,seq[byte]]
|
||||
tUvi: Option[VertexID]
|
||||
tUvi: Opt[VertexID]
|
||||
lSst: Opt[SavedState]
|
||||
|
||||
when extraTraceMessages:
|
||||
@ -109,16 +109,12 @@ proc getKeyFn(db: MemBackendRef): GetKeyFn =
|
||||
proc getTuvFn(db: MemBackendRef): GetTuvFn =
|
||||
result =
|
||||
proc(): Result[VertexID,AristoError]=
|
||||
if db.mdb.tUvi.isSome:
|
||||
return ok db.mdb.tUvi.unsafeGet
|
||||
err(GetTuvNotFound)
|
||||
db.mdb.tUvi or ok(VertexID(0))
|
||||
|
||||
proc getLstFn(db: MemBackendRef): GetLstFn =
|
||||
result =
|
||||
proc(): Result[SavedState,AristoError]=
|
||||
if db.mdb.lSst.isSome:
|
||||
return ok db.mdb.lSst.unsafeGet
|
||||
err(GetLstNotFound)
|
||||
db.mdb.lSst or err(GetLstNotFound)
|
||||
|
||||
# -------------
|
||||
|
||||
@ -143,7 +139,7 @@ proc putTuvFn(db: MemBackendRef): PutTuvFn =
|
||||
proc(hdl: PutHdlRef; vs: VertexID) =
|
||||
let hdl = hdl.getSession db
|
||||
if hdl.error.isNil:
|
||||
hdl.tUvi = some(vs)
|
||||
hdl.tUvi = Opt.some(vs)
|
||||
|
||||
proc putLstFn(db: MemBackendRef): PutLstFn =
|
||||
result =
|
||||
@ -175,7 +171,7 @@ proc putEndFn(db: MemBackendRef): PutEndFn =
|
||||
|
||||
let tuv = hdl.tUvi.get(otherwise = VertexID(0))
|
||||
if tuv.isValid:
|
||||
db.mdb.tUvi = some(tuv)
|
||||
db.mdb.tUvi = Opt.some(tuv)
|
||||
|
||||
if hdl.lSst.isSome:
|
||||
db.mdb.lSst = hdl.lSst
|
||||
@ -193,10 +189,10 @@ proc closeFn(db: MemBackendRef): CloseFn =
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc memoryBackend*(): BackendRef =
|
||||
proc memoryBackend*(mdb = MemDbRef()): BackendRef =
|
||||
let db = MemBackendRef(
|
||||
beKind: BackendMemory,
|
||||
mdb: MemDbRef())
|
||||
mdb: mdb)
|
||||
|
||||
db.getVtxFn = getVtxFn db
|
||||
db.getKeyFn = getKeyFn db
|
||||
|
@ -54,14 +54,13 @@ proc init*(
|
||||
## Memory backend constructor.
|
||||
##
|
||||
|
||||
let db =
|
||||
when B is VoidBackendRef:
|
||||
AristoDbRef(txRef: AristoTxRef(layer: LayerRef()))
|
||||
when B is VoidBackendRef:
|
||||
AristoDbRef.init(nil)[]
|
||||
|
||||
elif B is MemBackendRef:
|
||||
AristoDbRef(txRef: AristoTxRef(layer: LayerRef()), backend: memoryBackend())
|
||||
db.txRef.db = db
|
||||
db
|
||||
elif B is MemBackendRef:
|
||||
AristoDbRef.init(memoryBackend())[]
|
||||
else:
|
||||
raiseAssert "Unknown backend"
|
||||
|
||||
proc init*(
|
||||
T: type AristoDbRef; # Target type
|
||||
@ -69,21 +68,6 @@ proc init*(
|
||||
## Shortcut for `AristoDbRef.init(VoidBackendRef)`
|
||||
AristoDbRef.init VoidBackendRef
|
||||
|
||||
|
||||
proc finish*(db: AristoDbRef; eradicate = false) =
|
||||
## Backend destructor. The argument `eradicate` indicates that a full
|
||||
## database deletion is requested. If set `false` the outcome might differ
|
||||
## depending on the type of backend (e.g. the `BackendMemory` backend will
|
||||
## always eradicate on close.)
|
||||
##
|
||||
## In case of distributed descriptors accessing the same backend, all
|
||||
## distributed descriptors will be destroyed.
|
||||
##
|
||||
## This distructor may be used on already *destructed* descriptors.
|
||||
##
|
||||
if not db.backend.isNil:
|
||||
db.backend.closeFn eradicate
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -24,13 +24,13 @@ import
|
||||
../../opts,
|
||||
../aristo_desc,
|
||||
./rocks_db/rdb_desc,
|
||||
"."/[rocks_db, memory_only]
|
||||
"."/[init_common, rocks_db]
|
||||
|
||||
export
|
||||
AristoDbRef,
|
||||
RdbBackendRef,
|
||||
RdbWriteEventCb,
|
||||
memory_only,
|
||||
init_common,
|
||||
aristo_desc
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
@ -45,22 +45,10 @@ proc init*(
|
||||
): Result[T, AristoError] =
|
||||
let
|
||||
be = rocksDbBackend(opts, baseDb)
|
||||
vTop = block:
|
||||
let rc = be.getTuvFn()
|
||||
if rc.isErr:
|
||||
be.closeFn(eradicate = false)
|
||||
return err(rc.error)
|
||||
rc.value
|
||||
db = AristoDbRef(
|
||||
txRef: AristoTxRef(layer: LayerRef(vTop: vTop, cTop: vTop)),
|
||||
backend: be,
|
||||
accLeaves: LruCache[Hash32, VertexRef].init(ACC_LRU_SIZE),
|
||||
stoLeaves: LruCache[Hash32, VertexRef].init(ACC_LRU_SIZE),
|
||||
)
|
||||
|
||||
db.txRef.db = db # TODO evaluate if this cyclic ref is worth the convenience
|
||||
|
||||
ok(db)
|
||||
db = AristoDbRef.init(be).valueOr:
|
||||
be.closeFn(eradicate = false)
|
||||
return err(error)
|
||||
ok db
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
@ -1,5 +1,5 @@
|
||||
# nimbus-eth1
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Copyright (c) 2023-2025 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
@ -21,26 +21,11 @@ import
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc persist*(
|
||||
db: AristoDbRef; # Database
|
||||
db: AristoDbRef;
|
||||
batch: PutHdlRef;
|
||||
nxtSid = 0u64; # Next state ID (aka block number)
|
||||
txFrame: AristoTxRef;
|
||||
) =
|
||||
## Persistently store data onto backend database. If the system is running
|
||||
## without a database backend, the function returns immediately with an
|
||||
## error.
|
||||
##
|
||||
## The function merges all staged data from the top layer cache onto the
|
||||
## backend stage area. After that, the top layer cache is cleared.
|
||||
##
|
||||
## Finally, the staged data are merged into the physical backend database
|
||||
## and the staged data area is cleared. Wile performing this last step,
|
||||
## the recovery journal is updated (if available.)
|
||||
##
|
||||
## If the argument `nxtSid` is passed non-zero, it will be the ID for the
|
||||
## next recovery journal record. If non-zero, this ID must be greater than
|
||||
## all previous IDs (e.g. block number when stowing after block execution.)
|
||||
##
|
||||
db.txFramePersist(batch, nxtSid)
|
||||
db.txFramePersist(batch, txFrame)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
@ -21,18 +21,7 @@ import
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc txFrameBegin*(db: AristoDbRef, parent: AristoTxRef): Result[AristoTxRef,AristoError] =
|
||||
## Starts a new transaction.
|
||||
##
|
||||
## Example:
|
||||
## ::
|
||||
## proc doSomething(db: AristoDbRef) =
|
||||
## let tx = db.begin
|
||||
## defer: tx.rollback()
|
||||
## ... continue using db ...
|
||||
## tx.commit()
|
||||
##
|
||||
|
||||
proc txFrameBegin*(db: AristoDbRef, parent: AristoTxRef): AristoTxRef =
|
||||
let parent = if parent == nil:
|
||||
db.txRef
|
||||
else:
|
||||
@ -40,9 +29,9 @@ proc txFrameBegin*(db: AristoDbRef, parent: AristoTxRef): Result[AristoTxRef,Ari
|
||||
|
||||
let
|
||||
vTop = parent.layer.vTop
|
||||
layer = LayerRef(vTop: vTop, cTop: vTop)
|
||||
layer = LayerRef(vTop: vTop)
|
||||
|
||||
ok AristoTxRef(
|
||||
AristoTxRef(
|
||||
db: db,
|
||||
parent: parent,
|
||||
layer: layer)
|
||||
@ -50,66 +39,60 @@ proc txFrameBegin*(db: AristoDbRef, parent: AristoTxRef): Result[AristoTxRef,Ari
|
||||
proc baseTxFrame*(db: AristoDbRef): AristoTxRef=
|
||||
db.txRef
|
||||
|
||||
proc rollback*(
|
||||
tx: AristoTxRef; # Top transaction on database
|
||||
): Result[void,AristoError] =
|
||||
## Given a *top level* handle, this function discards all database operations
|
||||
## performed for this transaction.
|
||||
# TODO Everyone using this txref should repoint their parent field
|
||||
proc dispose*(
|
||||
tx: AristoTxRef;
|
||||
) =
|
||||
tx[].reset()
|
||||
|
||||
let vTop = tx.layer[].cTop
|
||||
tx.layer[] = Layer(vTop: vTop, cTop: vTop)
|
||||
|
||||
ok()
|
||||
|
||||
proc commit*(
|
||||
tx: AristoTxRef; # Top transaction on database
|
||||
): Result[void,AristoError] =
|
||||
## This function pushes all changes done in this frame to its parent
|
||||
##
|
||||
# TODO Everyone using this txref should repoint their parent field
|
||||
doAssert tx.parent != nil, "should not commit the base tx"
|
||||
|
||||
# A rollback after commit should reset to the new vTop!
|
||||
tx.layer[].cTop = tx.layer[].vTop
|
||||
|
||||
mergeAndReset(tx.parent.layer[], tx.layer[])
|
||||
|
||||
ok()
|
||||
proc checkpoint*(
|
||||
tx: AristoTxRef;
|
||||
blockNumber: uint64;
|
||||
) =
|
||||
tx.blockNumber = Opt.some(blockNumber)
|
||||
|
||||
proc txFramePersist*(
|
||||
db: AristoDbRef; # Database
|
||||
batch: PutHdlRef;
|
||||
nxtSid = 0u64; # Next state ID (aka block number)
|
||||
txFrame: AristoTxRef;
|
||||
) =
|
||||
## Persistently store data onto backend database. If the system is running
|
||||
## without a database backend, the function returns immediately with an
|
||||
## error.
|
||||
##
|
||||
## The function merges all data staged in `txFrame` and merges it onto the
|
||||
## backend database. `txFrame` becomes the new `baseTxFrame`.
|
||||
##
|
||||
## Any parent frames of `txFrame` become invalid after this operation.
|
||||
##
|
||||
## If the argument `nxtSid` is passed non-zero, it will be the ID for the
|
||||
## next recovery journal record. If non-zero, this ID must be greater than
|
||||
## all previous IDs (e.g. block number when stowing after block execution.)
|
||||
##
|
||||
|
||||
if txFrame == db.txRef and txFrame.layer.sTab.len == 0:
|
||||
# No changes in frame - no `checkpoint` requirement - nothing to do here
|
||||
return
|
||||
|
||||
let be = db.backend
|
||||
doAssert not be.isNil, "Persisting to backend requires ... a backend!"
|
||||
|
||||
let lSst = SavedState(
|
||||
key: emptyRoot, # placeholder for more
|
||||
serial: nxtSid)
|
||||
serial: txFrame.blockNumber.expect("`checkpoint` before persisting frame"))
|
||||
|
||||
# Squash all changes up to the base
|
||||
if txFrame != db.txRef:
|
||||
# Consolidate the changes from the old to the new base going from the
|
||||
# bottom of the stack to avoid having to cascade each change through
|
||||
# the full stack
|
||||
assert txFrame.parent != nil
|
||||
for frame in txFrame.stack():
|
||||
if frame == db.txRef:
|
||||
continue
|
||||
mergeAndReset(db.txRef.layer[], frame.layer[])
|
||||
db.txRef.blockNumber = frame.blockNumber
|
||||
|
||||
frame.dispose() # This will also dispose `txFrame` itself!
|
||||
|
||||
# Put the now-merged contents in txFrame and make it the new base
|
||||
swap(db.txRef[], txFrame[])
|
||||
db.txRef = txFrame
|
||||
|
||||
# Store structural single trie entries
|
||||
for rvid, vtx in db.txRef.layer.sTab:
|
||||
db.txRef.layer.kMap.withValue(rvid, key) do:
|
||||
for rvid, vtx in txFrame.layer.sTab:
|
||||
txFrame.layer.kMap.withValue(rvid, key) do:
|
||||
be.putVtxFn(batch, rvid, vtx, key[])
|
||||
do:
|
||||
be.putVtxFn(batch, rvid, vtx, default(HashKey))
|
||||
|
||||
be.putTuvFn(batch, db.txRef.layer.vTop)
|
||||
be.putTuvFn(batch, txFrame.layer.vTop)
|
||||
be.putLstFn(batch, lSst)
|
||||
|
||||
# TODO above, we only prepare the changes to the database but don't actually
|
||||
@ -118,19 +101,16 @@ proc txFramePersist*(
|
||||
# in-memory and on-disk state)
|
||||
|
||||
# Copy back updated payloads
|
||||
for accPath, vtx in db.txRef.layer.accLeaves:
|
||||
for accPath, vtx in txFrame.layer.accLeaves:
|
||||
db.accLeaves.put(accPath, vtx)
|
||||
|
||||
for mixPath, vtx in db.txRef.layer.stoLeaves:
|
||||
for mixPath, vtx in txFrame.layer.stoLeaves:
|
||||
db.stoLeaves.put(mixPath, vtx)
|
||||
|
||||
# Done with txRef, all saved to backend
|
||||
db.txRef.layer.cTop = db.txRef.layer.vTop
|
||||
db.txRef.layer.sTab.clear()
|
||||
db.txRef.layer.kMap.clear()
|
||||
db.txRef.layer.accLeaves.clear()
|
||||
db.txRef.layer.stoLeaves.clear()
|
||||
|
||||
txFrame.layer.sTab.clear()
|
||||
txFrame.layer.kMap.clear()
|
||||
txFrame.layer.accLeaves.clear()
|
||||
txFrame.layer.stoLeaves.clear()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
@ -104,13 +104,10 @@ proc `$$`*(e: CoreDbError): string =
|
||||
|
||||
proc persist*(
|
||||
db: CoreDbRef;
|
||||
blockNumber: BlockNumber;
|
||||
): CoreDbRc[void] =
|
||||
## This function stored cached data from the default context (see `ctx()`
|
||||
## below) to the persistent database.
|
||||
##
|
||||
## It also stores the argument block number `blockNumber` as a state record
|
||||
## which can be retrieved via `stateBlockNumber()`.
|
||||
txFrame: CoreDbTxRef;
|
||||
) =
|
||||
## This function persists changes up to and including the given frame to the
|
||||
## database.
|
||||
##
|
||||
db.setTrackNewApi BasePersistFn
|
||||
|
||||
@ -131,8 +128,8 @@ proc persist*(
|
||||
# kvt changes written to memory but not to disk because of an aristo
|
||||
# error), we have to panic instead.
|
||||
|
||||
CoreDbKvtRef(db.ctx).call(persist, db.ctx.kvt, kvtBatch[])
|
||||
CoreDbAccRef(db.ctx).call(persist, db.ctx.mpt, mptBatch[], blockNumber)
|
||||
CoreDbKvtRef(db.ctx).call(persist, db.ctx.kvt, kvtBatch[], txFrame.kTx)
|
||||
CoreDbAccRef(db.ctx).call(persist, db.ctx.mpt, mptBatch[], txFrame.aTx)
|
||||
|
||||
db.defCtx.kvt.backend.putEndFn(kvtBatch[]).isOkOr:
|
||||
raiseAssert $api & ": " & $error
|
||||
@ -140,7 +137,6 @@ proc persist*(
|
||||
db.defCtx.mpt.backend.putEndFn(mptBatch[]).isOkOr:
|
||||
raiseAssert $api & ": " & $error
|
||||
|
||||
result = ok()
|
||||
else:
|
||||
discard kvtBatch.expect($api & ": should always be able to create batch")
|
||||
discard mptBatch.expect($api & ": should always be able to create batch")
|
||||
@ -153,9 +149,9 @@ proc stateBlockNumber*(db: CoreDbTxRef): BlockNumber =
|
||||
##
|
||||
db.setTrackNewApi BaseStateBlockNumberFn
|
||||
result = block:
|
||||
let rc = db.ctx.parent.ariApi.call(fetchLastSavedState, db.aTx)
|
||||
let rc = db.ctx.parent.ariApi.call(fetchLastCheckpoint, db.aTx)
|
||||
if rc.isOk:
|
||||
rc.value.serial.BlockNumber
|
||||
rc.value.BlockNumber
|
||||
else:
|
||||
0u64
|
||||
db.ifTrackNewApi: debug logTxt, api, elapsed, result
|
||||
@ -605,48 +601,28 @@ proc txFrameBegin*(ctx: CoreDbCtxRef, parent: CoreDbTxRef): CoreDbTxRef =
|
||||
##
|
||||
ctx.setTrackNewApi BaseNewTxFn
|
||||
let
|
||||
kTx = CoreDbKvtRef(ctx).call(txFrameBegin, ctx.kvt, if parent != nil: parent.kTx else: nil).valueOr:
|
||||
raiseAssert $api & ": " & $error
|
||||
aTx = CoreDbAccRef(ctx).call(txFrameBegin, ctx.mpt, if parent != nil: parent.aTx else: nil).valueOr:
|
||||
raiseAssert $api & ": " & $error
|
||||
kTx = CoreDbKvtRef(ctx).call(txFrameBegin, ctx.kvt, if parent != nil: parent.kTx else: nil)
|
||||
aTx = CoreDbAccRef(ctx).call(txFrameBegin, ctx.mpt, if parent != nil: parent.aTx else: nil)
|
||||
|
||||
result = ctx.bless CoreDbTxRef(kTx: kTx, aTx: aTx)
|
||||
ctx.ifTrackNewApi:
|
||||
let newLevel = CoreDbAccRef(ctx).call(level, ctx.mpt)
|
||||
debug logTxt, api, elapsed, newLevel
|
||||
|
||||
proc commit*(tx: CoreDbTxRef) =
|
||||
proc checkpoint*(tx: CoreDbTxRef, blockNumber: BlockNumber) =
|
||||
tx.setTrackNewApi TxCommitFn:
|
||||
let prvLevel {.used.} = CoreDbAccRef(tx.ctx).call(level, tx.aTx)
|
||||
CoreDbAccRef(tx.ctx).call(commit, tx.aTx).isOkOr:
|
||||
raiseAssert $api & ": " & $error
|
||||
CoreDbKvtRef(tx.ctx).call(commit, tx.kTx).isOkOr:
|
||||
raiseAssert $api & ": " & $error
|
||||
tx.ifTrackNewApi: debug logTxt, api, elapsed, prvLevel
|
||||
|
||||
proc rollback*(tx: CoreDbTxRef) =
|
||||
tx.setTrackNewApi TxRollbackFn:
|
||||
let prvLevel {.used.} = CoreDbAccRef(tx.ctx).call(level, tx.aTx)
|
||||
CoreDbAccRef(tx.ctx).call(rollback, tx.aTx).isOkOr:
|
||||
raiseAssert $api & ": " & $error
|
||||
CoreDbKvtRef(tx.ctx).call(rollback, tx.kTx).isOkOr:
|
||||
raiseAssert $api & ": " & $error
|
||||
CoreDbAccRef(tx.ctx).call(checkpoint, tx.aTx, blockNumber)
|
||||
tx.ifTrackNewApi: debug logTxt, api, elapsed, prvLevel
|
||||
|
||||
proc dispose*(tx: CoreDbTxRef) =
|
||||
tx.setTrackNewApi TxDisposeFn:
|
||||
tx.setTrackNewApi TxRollbackFn:
|
||||
let prvLevel {.used.} = CoreDbAccRef(tx.ctx).call(level, tx.aTx)
|
||||
# if CoreDbAccRef(tx.ctx).call(isTop, tx.aTx):
|
||||
CoreDbAccRef(tx.ctx).call(rollback, tx.aTx).isOkOr:
|
||||
raiseAssert $api & ": " & $error
|
||||
# if CoreDbKvtRef(tx.ctx).call(isTop, tx.kTx):
|
||||
CoreDbKvtRef(tx.ctx).call(rollback, tx.kTx).isOkOr:
|
||||
raiseAssert $api & ": " & $error
|
||||
CoreDbAccRef(tx.ctx).call(dispose, tx.aTx)
|
||||
CoreDbKvtRef(tx.ctx).call(dispose, tx.kTx)
|
||||
tx[].reset()
|
||||
tx.ifTrackNewApi: debug logTxt, api, elapsed, prvLevel
|
||||
|
||||
func reparent*(tx: CoreDbTxRef, parent: CoreDbTxRef) =
|
||||
tx.aTx.parent = parent.aTx
|
||||
tx.kTx.parent = parent.kTx
|
||||
|
||||
proc txFrameBegin*(tx: CoreDbTxRef): CoreDbTxRef =
|
||||
tx.ctx.txFrameBegin(tx)
|
||||
|
||||
|
@ -40,7 +40,6 @@ type
|
||||
KvtDbProfData* = AristoDbProfData
|
||||
## Borrowed from `aristo_profile`
|
||||
|
||||
KvtApiCommitFn* = proc(tx: KvtTxRef): Result[void,KvtError] {.noRaise.}
|
||||
KvtApiDelFn* = proc(db: KvtTxRef,
|
||||
key: openArray[byte]): Result[void,KvtError] {.noRaise.}
|
||||
KvtApiFinishFn* = proc(db: KvtDbRef, eradicate = false) {.noRaise.}
|
||||
@ -53,24 +52,23 @@ type
|
||||
key: openArray[byte]): Result[bool,KvtError] {.noRaise.}
|
||||
KvtApiPutFn* = proc(db: KvtTxRef,
|
||||
key, data: openArray[byte]): Result[void,KvtError] {.noRaise.}
|
||||
KvtApiRollbackFn* = proc(tx: KvtTxRef): Result[void,KvtError] {.noRaise.}
|
||||
KvtApiPersistFn* = proc(db: KvtDbRef, batch: PutHdlRef) {.noRaise.}
|
||||
KvtApiDisposeFn* = proc(tx: KvtTxRef) {.noRaise.}
|
||||
KvtApiPersistFn* = proc(db: KvtDbRef, batch: PutHdlRef, txFrame: KvtTxRef) {.noRaise.}
|
||||
KvtApiToKvtDbRefFn* = proc(tx: KvtTxRef): KvtDbRef {.noRaise.}
|
||||
KvtApiTxFrameBeginFn* = proc(db: KvtDbRef, parent: KvtTxRef): Result[KvtTxRef,KvtError] {.noRaise.}
|
||||
KvtApiTxFrameBeginFn* = proc(db: KvtDbRef, parent: KvtTxRef): KvtTxRef {.noRaise.}
|
||||
KvtApiBaseTxFrameFn* = proc(db: KvtDbRef): KvtTxRef {.noRaise.}
|
||||
|
||||
KvtApiRef* = ref KvtApiObj
|
||||
KvtApiObj* = object of RootObj
|
||||
## Useful set of `Kvt` fuctions that can be filtered, stacked etc. Note
|
||||
## that this API is modelled after a subset of the `Aristo` API.
|
||||
commit*: KvtApiCommitFn
|
||||
del*: KvtApiDelFn
|
||||
finish*: KvtApiFinishFn
|
||||
get*: KvtApiGetFn
|
||||
len*: KvtApiLenFn
|
||||
hasKeyRc*: KvtApiHasKeyRcFn
|
||||
put*: KvtApiPutFn
|
||||
rollback*: KvtApiRollbackFn
|
||||
dispose*: KvtApiDisposeFn
|
||||
persist*: KvtApiPersistFn
|
||||
txFrameBegin*: KvtApiTxFrameBeginFn
|
||||
baseTxFrame*: KvtApiBaseTxFrameFn
|
||||
@ -80,14 +78,13 @@ type
|
||||
## index/name mapping for profile slots
|
||||
KvtApiProfTotal = "total"
|
||||
|
||||
KvtApiProfCommitFn = "commit"
|
||||
KvtApiProfDelFn = "del"
|
||||
KvtApiProfFinishFn = "finish"
|
||||
KvtApiProfGetFn = "get"
|
||||
KvtApiProfLenFn = "len"
|
||||
KvtApiProfHasKeyRcFn = "hasKeyRc"
|
||||
KvtApiProfPutFn = "put"
|
||||
KvtApiProfRollbackFn = "rollback"
|
||||
KvtApiProfDisposeFn = "dispose"
|
||||
KvtApiProfPersistFn = "persist"
|
||||
KvtApiProfTxFrameBeginFn = "txFrameBegin"
|
||||
KvtApiProfBaseTxFrameFn = "baseTxFrame"
|
||||
@ -134,14 +131,13 @@ proc dup(be: BackendRef): BackendRef =
|
||||
func init*(api: var KvtApiObj) =
|
||||
when AutoValidateApiHooks:
|
||||
api.reset
|
||||
api.commit = commit
|
||||
api.del = del
|
||||
api.finish = finish
|
||||
api.get = get
|
||||
api.len = len
|
||||
api.hasKeyRc = hasKeyRc
|
||||
api.put = put
|
||||
api.rollback = rollback
|
||||
api.dispose = dispose
|
||||
api.persist = persist
|
||||
api.txFrameBegin = txFrameBegin
|
||||
api.baseTxFrame = baseTxFrame
|
||||
@ -185,11 +181,6 @@ func init*(
|
||||
code
|
||||
data.update(n.ord, getTime() - start)
|
||||
|
||||
profApi.commit =
|
||||
proc(a: KvtTxRef): auto =
|
||||
KvtApiProfCommitFn.profileRunner:
|
||||
result = api.commit(a)
|
||||
|
||||
profApi.del =
|
||||
proc(a: KvtDbRef; b: openArray[byte]): auto =
|
||||
KvtApiProfDelFn.profileRunner:
|
||||
@ -220,10 +211,10 @@ func init*(
|
||||
KvtApiProfPutFn.profileRunner:
|
||||
result = api.put(a, b, c)
|
||||
|
||||
profApi.rollback =
|
||||
profApi.dispose =
|
||||
proc(a: KvtTxRef): auto =
|
||||
KvtApiProfRollbackFn.profileRunner:
|
||||
result = api.rollback(a)
|
||||
KvtApiProfDisposeFn.profileRunner:
|
||||
result = api.dispose(a)
|
||||
|
||||
profApi.persist =
|
||||
proc(a: KvtDbRef): auto =
|
||||
@ -231,9 +222,9 @@ func init*(
|
||||
result = api.persist(a)
|
||||
|
||||
profApi.txFrameBegin =
|
||||
proc(a: KvtDbRef): auto =
|
||||
proc(a: KvtDbRef) =
|
||||
KvtApiProfTxFrameBeginFn.profileRunner:
|
||||
result = api.txFrameBegin(a)
|
||||
api.txFrameBegin(a)
|
||||
|
||||
let beDup = be.dup()
|
||||
if beDup.isNil:
|
||||
|
@ -68,6 +68,17 @@ func isValid*(layer: LayerRef): bool =
|
||||
# Don't put in a hash!
|
||||
func hash*(db: KvtDbRef): Hash {.error.}
|
||||
|
||||
iterator stack*(tx: KvtTxRef): KvtTxRef =
|
||||
# Stack going from base to tx
|
||||
var frames: seq[KvtTxRef]
|
||||
var tx = tx
|
||||
while tx != nil:
|
||||
frames.add tx
|
||||
tx = tx.parent
|
||||
|
||||
while frames.len > 0:
|
||||
yield frames.pop()
|
||||
|
||||
iterator rstack*(tx: KvtTxRef): LayerRef =
|
||||
var tx = tx
|
||||
# Stack in reverse order
|
||||
|
@ -21,21 +21,11 @@ import
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc persist*(
|
||||
db: KvtDbRef; # Database
|
||||
db: KvtDbRef;
|
||||
batch: PutHdlRef;
|
||||
txFrame: KvtTxRef
|
||||
) =
|
||||
## Persistently store data onto backend database. If the system is running
|
||||
## without a database backend, the function returns immediately with an
|
||||
## error.
|
||||
##
|
||||
## The function merges all staged data from the top layer cache onto the
|
||||
## backend stage area. After that, the top layer cache is cleared.
|
||||
##
|
||||
## Finally, the staged data are merged into the physical backend database
|
||||
## and the staged data area is cleared. Wile performing this last step,
|
||||
## the recovery journal is updated (if available.)
|
||||
##
|
||||
db.txFramePersist(batch)
|
||||
db.txFramePersist(batch, txFrame)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
@ -14,7 +14,6 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
results,
|
||||
./[kvt_desc, kvt_layers]
|
||||
|
||||
|
||||
@ -22,7 +21,7 @@ import
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc txFrameBegin*(db: KvtDbRef, parent: KvtTxRef): Result[KvtTxRef,KvtError] =
|
||||
proc txFrameBegin*(db: KvtDbRef, parent: KvtTxRef): KvtTxRef =
|
||||
## Starts a new transaction.
|
||||
##
|
||||
## Example:
|
||||
@ -35,7 +34,7 @@ proc txFrameBegin*(db: KvtDbRef, parent: KvtTxRef): Result[KvtTxRef,KvtError] =
|
||||
##
|
||||
|
||||
let parent = if parent == nil: db.txRef else: parent
|
||||
ok KvtTxRef(
|
||||
KvtTxRef(
|
||||
db: db,
|
||||
layer: LayerRef(),
|
||||
parent: parent,
|
||||
@ -44,59 +43,44 @@ proc txFrameBegin*(db: KvtDbRef, parent: KvtTxRef): Result[KvtTxRef,KvtError] =
|
||||
proc baseTxFrame*(db: KvtDbRef): KvtTxRef =
|
||||
db.txRef
|
||||
|
||||
proc rollback*(
|
||||
tx: KvtTxRef; # Top transaction on database
|
||||
): Result[void,KvtError] =
|
||||
## Given a *top level* handle, this function discards all database operations
|
||||
## performed for this transactio. The previous transaction is returned if
|
||||
## there was any.
|
||||
##
|
||||
proc dispose*(
|
||||
tx: KvtTxRef;
|
||||
) =
|
||||
|
||||
tx.layer[] = Layer()
|
||||
ok()
|
||||
|
||||
proc commit*(
|
||||
tx: KvtTxRef; # Top transaction on database
|
||||
): Result[void,KvtError] =
|
||||
## Given a *top level* handle, this function accepts all database operations
|
||||
## performed through this handle and merges it to the previous layer. The
|
||||
## previous transaction is returned if there was any.
|
||||
##
|
||||
doAssert tx.parent != nil, "don't commit base tx"
|
||||
|
||||
mergeAndReset(tx.parent.layer[], tx.layer[])
|
||||
|
||||
ok()
|
||||
tx[].reset()
|
||||
|
||||
proc txFramePersist*(
|
||||
db: KvtDbRef; # Database
|
||||
db: KvtDbRef;
|
||||
batch: PutHdlRef;
|
||||
txFrame: KvtTxRef;
|
||||
) =
|
||||
## Persistently store data onto backend database. If the system is running
|
||||
## without a database backend, the function returns immediately with an
|
||||
## error.
|
||||
##
|
||||
## The function merges all staged data from the top layer cache onto the
|
||||
## backend stage area. After that, the top layer cache is cleared.
|
||||
##
|
||||
## Finally, the staged data are merged into the physical backend database
|
||||
## and the staged data area is cleared. Wile performing this last step,
|
||||
## the recovery journal is updated (if available.)
|
||||
##
|
||||
let be = db.backend
|
||||
doAssert not be.isNil, "Persisting to backend requires ... a backend!"
|
||||
|
||||
# Store structural single trie entries
|
||||
for k,v in db.txRef.layer.sTab:
|
||||
be.putKvpFn(batch, k, v)
|
||||
if txFrame != db.txRef:
|
||||
# Consolidate the changes from the old to the new base going from the
|
||||
# bottom of the stack to avoid having to cascade each change through
|
||||
# the full stack
|
||||
assert txFrame.parent != nil
|
||||
for frame in txFrame.stack():
|
||||
if frame == db.txRef:
|
||||
continue
|
||||
mergeAndReset(db.txRef.layer[], frame.layer[])
|
||||
frame.dispose()
|
||||
|
||||
# Put the now-merged contents in txFrame and make it the new base
|
||||
swap(db.txRef[], txFrame[])
|
||||
db.txRef = txFrame
|
||||
|
||||
# Store structural single trie entries
|
||||
for k,v in txFrame.layer.sTab:
|
||||
be.putKvpFn(batch, k, v)
|
||||
# TODO above, we only prepare the changes to the database but don't actually
|
||||
# write them to disk - the code below that updates the frame should
|
||||
# really run after things have been written (to maintain sync betweeen
|
||||
# in-memory and on-disk state)
|
||||
|
||||
# Done with txRef, all saved to backend
|
||||
db.txRef.layer.sTab.clear()
|
||||
txFrame.layer.sTab.clear()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
@ -69,8 +69,6 @@ proc processBlock(
|
||||
let clearEmptyAccount = com.isSpuriousOrLater(header.number)
|
||||
db.persist(clearEmptyAccount)
|
||||
|
||||
vmState.ledger.txFrame.commit()
|
||||
|
||||
ok()
|
||||
|
||||
proc getVmState(c: ChainRef, header: Header, txFrame: CoreDbTxRef):
|
||||
@ -85,8 +83,7 @@ proc getVmState(c: ChainRef, header: Header, txFrame: CoreDbTxRef):
|
||||
# intended to accepts invalid block
|
||||
proc setBlock*(c: ChainRef; blk: Block): Result[void, string] =
|
||||
template header: Header = blk.header
|
||||
let txFrame = c.db.ctx.txFrameBegin(nil)
|
||||
defer: txFrame.dispose()
|
||||
let txFrame = c.db.ctx.txFrameBegin(c.db.baseTxFrame())
|
||||
|
||||
# Needed for figuring out whether KVT cleanup is due (see at the end)
|
||||
let
|
||||
@ -101,19 +98,18 @@ proc setBlock*(c: ChainRef; blk: Block): Result[void, string] =
|
||||
if blk.withdrawals.isSome:
|
||||
txFrame.persistWithdrawals(header.withdrawalsRoot.get, blk.withdrawals.get)
|
||||
|
||||
# update currentBlock *after* we persist it
|
||||
# so the rpc return consistent result
|
||||
# between eth_blockNumber and eth_syncing
|
||||
c.com.syncCurrent = header.number
|
||||
|
||||
txFrame.commit()
|
||||
txFrame.checkpoint(header.number)
|
||||
|
||||
# For the `Aristo` database, this code position is only reached if the
|
||||
# the parent state of the first block (as registered in `headers[0]`) was
|
||||
# the canonical state before updating. So this state will be saved with
|
||||
# `persistent()` together with the respective block number.
|
||||
c.db.persist(header.number - 1).isOkOr:
|
||||
return err($error)
|
||||
c.db.persist(txFrame)
|
||||
|
||||
# update currentBlock *after* we persist it
|
||||
# so the rpc return consistent result
|
||||
# between eth_blockNumber and eth_syncing
|
||||
c.com.syncCurrent = header.number
|
||||
|
||||
ok()
|
||||
|
||||
|
@ -18,13 +18,9 @@ import
|
||||
unittest2,
|
||||
../execution_chain/db/aristo/aristo_desc,
|
||||
./replay/pp,
|
||||
./test_aristo/test_blobify,
|
||||
./test_aristo/test_merge_proof,
|
||||
./test_aristo/test_nibbles,
|
||||
./test_aristo/test_portal_proof,
|
||||
./test_aristo/test_compute,
|
||||
./test_aristo/[
|
||||
test_helpers, test_samples_xx, test_tx,
|
||||
test_blobify, test_compute, test_helpers, test_merge_proof, test_nibbles,
|
||||
test_portal_proof, test_samples_xx, test_tx, test_tx_frame,
|
||||
undump_accounts, undump_storages]
|
||||
|
||||
const
|
||||
|
@ -21,6 +21,7 @@ import
|
||||
aristo_merge,
|
||||
aristo_desc,
|
||||
aristo_init,
|
||||
aristo_persist,
|
||||
aristo_tx_frame,
|
||||
]
|
||||
|
||||
@ -124,9 +125,10 @@ suite "Aristo compute":
|
||||
for (k, v, r) in samples[^1]:
|
||||
check:
|
||||
txFrame.mergeAccountRecord(k, v) == Result[bool, AristoError].ok(true)
|
||||
txFrame.checkpoint(1)
|
||||
|
||||
let batch = db.backend.putBegFn()[]
|
||||
db.txFramePersist(batch, 1)
|
||||
db.persist(batch, txFrame)
|
||||
check db.backend.putEndFn(batch).isOk()
|
||||
|
||||
check txFrame.computeKeys(root).isOk()
|
||||
|
85
tests/test_aristo/test_tx_frame.nim
Normal file
85
tests/test_aristo/test_tx_frame.nim
Normal file
@ -0,0 +1,85 @@
|
||||
# Nimbus
|
||||
# Copyright (c) 2025 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
||||
# http://opensource.org/licenses/MIT)
|
||||
# at your option. This file may not be copied, modified, or
|
||||
# distributed except according to those terms.
|
||||
|
||||
{.used.}
|
||||
|
||||
import
|
||||
unittest2,
|
||||
stew/endians2,
|
||||
results,
|
||||
eth/common/hashes,
|
||||
../../execution_chain/db/aristo/[
|
||||
aristo_delete,
|
||||
aristo_desc,
|
||||
aristo_fetch,
|
||||
aristo_tx_frame,
|
||||
aristo_init,
|
||||
aristo_init/memory_db,
|
||||
aristo_merge,
|
||||
aristo_persist,
|
||||
]
|
||||
|
||||
proc makeAccount(i: uint64): (Hash32, AristoAccount) =
|
||||
var path: Hash32
|
||||
path.data()[0 .. 7] = i.toBytesBE()
|
||||
(path, AristoAccount(balance: i.u256, codeHash: EMPTY_CODE_HASH))
|
||||
|
||||
const
|
||||
acc1 = makeAccount(1)
|
||||
acc2 = makeAccount(2)
|
||||
|
||||
suite "Aristo TxFrame":
|
||||
setup:
|
||||
let
|
||||
mdb = MemDbRef()
|
||||
db = AristoDbRef.init(memoryBackend(mdb)).expect("working memory backend")
|
||||
|
||||
test "Frames should independently keep data":
|
||||
let
|
||||
tx0 = db.txFrameBegin(db.baseTxFrame())
|
||||
tx1 = db.txFrameBegin(tx0)
|
||||
tx2 = db.txFrameBegin(tx1)
|
||||
tx2b = db.txFrameBegin(tx1)
|
||||
|
||||
check:
|
||||
tx0.mergeAccountRecord(acc1[0], acc1[1]).isOk()
|
||||
tx1.mergeAccountRecord(acc2[0], acc2[1]).isOk()
|
||||
tx2.deleteAccountRecord(acc2[0]).isOk()
|
||||
tx2b.deleteAccountRecord(acc1[0]).isOk()
|
||||
|
||||
check:
|
||||
tx0.fetchAccountRecord(acc1[0]).isOk()
|
||||
tx0.fetchAccountRecord(acc2[0]).isErr() # Doesn't exist in tx0
|
||||
tx1.fetchAccountRecord(acc1[0]).isOk()
|
||||
tx1.fetchAccountRecord(acc1[0]).isOk()
|
||||
tx2.fetchAccountRecord(acc1[0]).isOk()
|
||||
tx2.fetchAccountRecord(acc2[0]).isErr() # Doesn't exist in tx2
|
||||
tx2b.fetchAccountRecord(acc1[0]).isErr() # Doesn't exist in tx2b
|
||||
|
||||
tx0.fetchAccountRecord(acc1[0]) == tx2.fetchAccountRecord(acc1[0])
|
||||
|
||||
tx0.fetchStateRoot() != tx1.fetchStateRoot()
|
||||
tx0.fetchStateRoot() == tx2.fetchStateRoot()
|
||||
|
||||
tx2.checkpoint(1)
|
||||
let batch = db.backend.putBegFn().expect("working batch")
|
||||
db.persist(batch, tx2)
|
||||
check:
|
||||
db.backend.putEndFn(batch).isOk()
|
||||
|
||||
db.finish()
|
||||
|
||||
block:
|
||||
let
|
||||
db2 = AristoDbRef.init(memoryBackend(mdb)).expect("working backend")
|
||||
tx = db2.baseTxFrame()
|
||||
check:
|
||||
tx.fetchAccountRecord(acc1[0]).isOk()
|
||||
tx.fetchAccountRecord(acc2[0]).isErr() # Doesn't exist in tx2
|
Loading…
x
Reference in New Issue
Block a user