nimbus-eth1/nimbus/db/aristo/aristo_init/rocks_db.nim

463 lines
14 KiB
Nim
Raw Normal View History

# nimbus-eth1
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Rocksdb backend for Aristo DB
## =============================
##
## The iterators provided here are currently available only by direct
## backend access
## ::
## import
## aristo/aristo_init,
## aristo/aristo_init/aristo_rocksdb
##
## let rc = AristoDb.init(BackendRocksDB, "/var/tmp")
## if rc.isOk:
## let be = rc.value.to(RdbBackendRef)
## for (n, key, vtx) in be.walkVtx:
## ...
##
{.push raises: [].}
{.warning: "*** importing rocks DB which needs a linker library".}
import
eth/common,
rocksdb,
results,
../aristo_constants,
../aristo_desc,
../aristo_desc/desc_backend,
Aristo db update for short nodes key edge cases (#1887) * Aristo: Provide key-value list signature calculator detail: Simple wrappers around `Aristo` core functionality * Update new API for `CoreDb` details: + Renamed new API functions `contains()` => `hasKey()` or `hasPath()` which disables the `in` operator on non-boolean `contains()` functions + The functions `get()` and `fetch()` always return a not-found error if there is no item, available. The new functions `getOrEmpty()` and `mergeOrEmpty()` return an an empty `Blob` if there is no such key found. * Rewrite `core_apps.nim` using new API from `CoreDb` * Use `Aristo` functionality for calculating Merkle signatures details: For debugging, the `VerifyAristoForMerkleRootCalc` can be set so that `Aristo` results will be verified against the legacy versions. * Provide general interface for Merkle signing key-value tables details: Export `Aristo` wrappers * Activate `CoreDb` tests why: Now, API seems to be stable enough for general tests. * Update `toHex()` usage why: Byteutils' `toHex()` is superior to `toSeq.mapIt(it.toHex(2)).join` * Split `aristo_transcode` => `aristo_serialise` + `aristo_blobify` why: + Different modules for different purposes + `aristo_serialise`: RLP encoding/decoding + `aristo_blobify`: Aristo database encoding/decoding * Compacted representation of small nodes' links instead of Keccak hashes why: Ethereum MPTs use Keccak hashes as node links if the size of an RLP encoded node is at least 32 bytes. Otherwise, the RLP encoded node value is used as a pseudo node link (rather than a hash.) Such a node is nor stored on key-value database. Rather the RLP encoded node value is stored instead of a lode link in a parent node instead. Only for the root hash, the top level node is always referred to by the hash. This feature needed an abstraction of the `HashKey` object which is now either a hash or a blob of length at most 31 bytes. This leaves two ways of representing an empty/void `HashKey` type, either as an empty blob of zero length, or the hash of an empty blob. * Update `CoreDb` interface (mainly reducing logger noise) * Fix copyright years (to make `Lint` happy)
2023-11-08 12:18:32 +00:00
../aristo_blobify,
./init_common,
./rocks_db/[rdb_desc, rdb_get, rdb_init, rdb_put, rdb_walk]
const
maxOpenFiles = 512 ## Rocks DB setup, open files limit
extraTraceMessages = false
## Enabled additional logging noise
type
RdbBackendRef* = ref object of TypedBackendRef
rdb: RdbInst ## Allows low level access to database
RdbPutHdlRef = ref object of TypedPutHdlRef
when extraTraceMessages:
import chronicles
logScope:
topics = "aristo-backend"
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc newSession(db: RdbBackendRef): RdbPutHdlRef =
new result
result.TypedPutHdlRef.beginSession db
proc getSession(hdl: PutHdlRef; db: RdbBackendRef): RdbPutHdlRef =
hdl.TypedPutHdlRef.verifySession db
hdl.RdbPutHdlRef
proc endSession(hdl: PutHdlRef; db: RdbBackendRef): RdbPutHdlRef =
hdl.TypedPutHdlRef.finishSession db
hdl.RdbPutHdlRef
# ------------------------------------------------------------------------------
# Private functions: interface
# ------------------------------------------------------------------------------
proc getVtxFn(db: RdbBackendRef): GetVtxFn =
result =
proc(vid: VertexID): Result[VertexRef,AristoError] =
# Fetch serialised data record
let data = db.rdb.getVtx(vid.uint64).valueOr:
when extraTraceMessages:
trace logTxt "getVtxFn() failed", vid, error=error[0], info=error[1]
return err(error[0])
# Decode data record
if 0 < data.len:
return data.deblobify VertexRef
err(GetVtxNotFound)
proc getKeyFn(db: RdbBackendRef): GetKeyFn =
result =
proc(vid: VertexID): Result[HashKey,AristoError] =
# Fetch serialised data record
let data = db.rdb.getKey(vid.uint64).valueOr:
when extraTraceMessages:
trace logTxt "getKeyFn: failed", vid, error=error[0], info=error[1]
return err(error[0])
# Decode data record
if 0 < data.len:
let lid = HashKey.fromBytes(data).valueOr:
Aristo db update for short nodes key edge cases (#1887) * Aristo: Provide key-value list signature calculator detail: Simple wrappers around `Aristo` core functionality * Update new API for `CoreDb` details: + Renamed new API functions `contains()` => `hasKey()` or `hasPath()` which disables the `in` operator on non-boolean `contains()` functions + The functions `get()` and `fetch()` always return a not-found error if there is no item, available. The new functions `getOrEmpty()` and `mergeOrEmpty()` return an an empty `Blob` if there is no such key found. * Rewrite `core_apps.nim` using new API from `CoreDb` * Use `Aristo` functionality for calculating Merkle signatures details: For debugging, the `VerifyAristoForMerkleRootCalc` can be set so that `Aristo` results will be verified against the legacy versions. * Provide general interface for Merkle signing key-value tables details: Export `Aristo` wrappers * Activate `CoreDb` tests why: Now, API seems to be stable enough for general tests. * Update `toHex()` usage why: Byteutils' `toHex()` is superior to `toSeq.mapIt(it.toHex(2)).join` * Split `aristo_transcode` => `aristo_serialise` + `aristo_blobify` why: + Different modules for different purposes + `aristo_serialise`: RLP encoding/decoding + `aristo_blobify`: Aristo database encoding/decoding * Compacted representation of small nodes' links instead of Keccak hashes why: Ethereum MPTs use Keccak hashes as node links if the size of an RLP encoded node is at least 32 bytes. Otherwise, the RLP encoded node value is used as a pseudo node link (rather than a hash.) Such a node is nor stored on key-value database. Rather the RLP encoded node value is stored instead of a lode link in a parent node instead. Only for the root hash, the top level node is always referred to by the hash. This feature needed an abstraction of the `HashKey` object which is now either a hash or a blob of length at most 31 bytes. This leaves two ways of representing an empty/void `HashKey` type, either as an empty blob of zero length, or the hash of an empty blob. * Update `CoreDb` interface (mainly reducing logger noise) * Fix copyright years (to make `Lint` happy)
2023-11-08 12:18:32 +00:00
return err(RdbHashKeyExpected)
return ok lid
err(GetKeyNotFound)
proc getFilFn(db: RdbBackendRef): GetFilFn =
if db.rdb.noFq:
result =
proc(qid: QueueID): Result[FilterRef,AristoError] =
err(FilQuSchedDisabled)
else:
result =
proc(qid: QueueID): Result[FilterRef,AristoError] =
# Fetch serialised data record.
let data = db.rdb.getByPfx(FilPfx, qid.uint64).valueOr:
when extraTraceMessages:
trace logTxt "getFilFn: failed", qid, error=error[0], info=error[1]
return err(error[0])
# Decode data record
if 0 < data.len:
return data.deblobify FilterRef
err(GetFilNotFound)
proc getIdgFn(db: RdbBackendRef): GetIdgFn =
result =
proc(): Result[seq[VertexID],AristoError]=
# Fetch serialised data record.
let data = db.rdb.getByPfx(AdmPfx, AdmTabIdIdg.uint64).valueOr:
when extraTraceMessages:
trace logTxt "getIdgFn: failed", error=error[0], info=error[1]
return err(error[0])
# Decode data record
if data.len == 0:
let w = EmptyVidSeq # Must be `let`
return ok w # Compiler error with `ok(EmptyVidSeq)`
# Decode data record
data.deblobify seq[VertexID]
proc getFqsFn(db: RdbBackendRef): GetFqsFn =
if db.rdb.noFq:
result =
proc(): Result[seq[(QueueID,QueueID)],AristoError] =
err(FilQuSchedDisabled)
else:
result =
proc(): Result[seq[(QueueID,QueueID)],AristoError]=
# Fetch serialised data record.
let data = db.rdb.getByPfx(AdmPfx, AdmTabIdFqs.uint64).valueOr:
when extraTraceMessages:
trace logTxt "getFqsFn: failed", error=error[0], info=error[1]
return err(error[0])
if data.len == 0:
let w = EmptyQidPairSeq # Must be `let`
return ok w # Compiler error with `ok(EmptyQidPairSeq)`
# Decode data record
data.deblobify seq[(QueueID,QueueID)]
# -------------
proc putBegFn(db: RdbBackendRef): PutBegFn =
result =
proc(): PutHdlRef =
db.rdb.begin()
db.newSession()
proc putVtxFn(db: RdbBackendRef): PutVtxFn =
result =
proc(hdl: PutHdlRef; vrps: openArray[(VertexID,VertexRef)]) =
let hdl = hdl.getSession db
if hdl.error.isNil:
# Collect batch session arguments
var batch: seq[(uint64,Blob)]
for (vid,vtx) in vrps:
if vtx.isValid:
let rc = vtx.blobify()
if rc.isErr:
hdl.error = TypedPutHdlErrRef(
pfx: VtxPfx,
vid: vid,
code: rc.error)
return
batch.add (vid.uint64, rc.value)
else:
batch.add (vid.uint64, EmptyBlob)
# Stash batch session data via LRU cache
db.rdb.putVtx(batch).isOkOr:
hdl.error = TypedPutHdlErrRef(
pfx: VtxPfx,
vid: VertexID(error[0]),
code: error[1],
info: error[2])
proc putKeyFn(db: RdbBackendRef): PutKeyFn =
result =
proc(hdl: PutHdlRef; vkps: openArray[(VertexID,HashKey)]) =
let hdl = hdl.getSession db
if hdl.error.isNil:
# Collect batch session arguments
var batch: seq[(uint64,Blob)]
for (vid,key) in vkps:
if key.isValid:
Aristo avoid storage trie update race conditions (#2251) * Update TDD suite logger output format choices why: New format is not practical for TDD as it just dumps data across a wide range (considerably larder than 80 columns.) So the new format can be turned on by function argument. * Update unit tests samples configuration why: Slightly changed the way to find the `era1` directory * Remove compiler warnings (fix deprecated expressions and phrases) * Update `Aristo` debugging tools * Always update the `storageID` field of account leaf vertices why: Storage tries are weekly linked to an account leaf object in that the `storageID` field is updated by the application. Previously, `Aristo` verified that leaf objects make sense when passed to the database. As a consequence * the database was inconsistent for a short while * the burden for correctness was all on the application which led to delayed error handling which is hard to debug. So `Aristo` will internally update the account leaf objects so that there are no race conditions due to the storage trie handling * Aristo: Let `stow()`/`persist()` bail out unless there is a `VertexID(1)` why: The journal and filter logic depends on the hash of the `VertexID(1)` which is commonly known as the state root. This implies that all changes to the database are somehow related to that. * Make sure that a `Ledger` account does not overwrite the storage trie reference why: Due to the abstraction of a sub-trie (now referred to as column with a hash describing its state) there was a weakness in the `Aristo` handler where an account leaf could be overwritten though changing the validity of the database. This has been changed and the database will now reject such changes. This patch fixes the behaviour on the application layer. In particular, the column handle returned by the `CoreDb` needs to be updated by the `Aristo` database state. This mitigates the problem that a storage trie might have vanished or re-apperaed with a different vertex ID. * Fix sub-trie deletion test why: Was originally hinged on `VertexID(1)` which cannot be wholesale deleted anymore after the last Aristo update. Also, running with `VertexID(2)` needs an artificial `VertexID(1)` for making `stow()` or `persist()` work. * Cosmetics * Activate `test_generalstate_json` * Temporarily `deactivate test_tracer_json` * Fix copyright header --------- Co-authored-by: jordan <jordan@dry.pudding> Co-authored-by: Jacek Sieka <jacek@status.im>
2024-05-30 17:48:38 +00:00
batch.add (vid.uint64, @(key.data))
else:
batch.add (vid.uint64, EmptyBlob)
# Stash batch session data via LRU cache
db.rdb.putKey(batch).isOkOr:
hdl.error = TypedPutHdlErrRef(
pfx: KeyPfx,
vid: VertexID(error[0]),
code: error[1],
info: error[2])
proc putFilFn(db: RdbBackendRef): PutFilFn =
if db.rdb.noFq:
result =
proc(hdl: PutHdlRef; vf: openArray[(QueueID,FilterRef)]) =
let hdl = hdl.getSession db
if hdl.error.isNil:
hdl.error = TypedPutHdlErrRef(
pfx: FilPfx,
qid: (if 0 < vf.len: vf[0][0] else: QueueID(0)),
code: FilQuSchedDisabled)
else:
result =
proc(hdl: PutHdlRef; vrps: openArray[(QueueID,FilterRef)]) =
let hdl = hdl.getSession db
if hdl.error.isNil:
# Collect batch session arguments
var batch: seq[(uint64,Blob)]
for (qid,filter) in vrps:
if filter.isValid:
let rc = filter.blobify()
if rc.isErr:
hdl.error = TypedPutHdlErrRef(
pfx: FilPfx,
qid: qid,
code: rc.error)
return
batch.add (qid.uint64, rc.value)
else:
batch.add (qid.uint64, EmptyBlob)
# Stash batch session data
db.rdb.putByPfx(FilPfx, batch).isOkOr:
hdl.error = TypedPutHdlErrRef(
pfx: FilPfx,
qid: QueueID(error[0]),
code: error[1],
info: error[2])
proc putIdgFn(db: RdbBackendRef): PutIdgFn =
result =
proc(hdl: PutHdlRef; vs: openArray[VertexID]) =
let hdl = hdl.getSession db
if hdl.error.isNil:
let idg = if 0 < vs.len: vs.blobify else: EmptyBlob
db.rdb.putByPfx(AdmPfx, @[(AdmTabIdIdg.uint64, idg)]).isOkOr:
hdl.error = TypedPutHdlErrRef(
pfx: AdmPfx,
aid: AdmTabIdIdg,
code: error[1],
info: error[2])
proc putFqsFn(db: RdbBackendRef): PutFqsFn =
if db.rdb.noFq:
result =
proc(hdl: PutHdlRef; fs: openArray[(QueueID,QueueID)]) =
let hdl = hdl.getSession db
if hdl.error.isNil:
hdl.error = TypedPutHdlErrRef(
pfx: AdmPfx,
code: FilQuSchedDisabled)
else:
result =
proc(hdl: PutHdlRef; vs: openArray[(QueueID,QueueID)]) =
let hdl = hdl.getSession db
if hdl.error.isNil:
# Stash batch session data
let fqs = if 0 < vs.len: vs.blobify else: EmptyBlob
db.rdb.putByPfx(AdmPfx, @[(AdmTabIdFqs.uint64, fqs)]).isOkOr:
hdl.error = TypedPutHdlErrRef(
pfx: AdmPfx,
aid: AdmTabIdFqs,
code: error[1],
info: error[2])
proc putEndFn(db: RdbBackendRef): PutEndFn =
result =
proc(hdl: PutHdlRef): Result[void,AristoError] =
let hdl = hdl.endSession db
if not hdl.error.isNil:
when extraTraceMessages:
case hdl.error.pfx:
of VtxPfx, KeyPfx: trace logTxt "putEndFn: vtx/key failed",
pfx=hdl.error.pfx, vid=hdl.error.vid, error=hdl.error.code
of FilPfx: trace logTxt "putEndFn: filter failed",
pfx=FilPfx, qid=hdl.error.qid, error=hdl.error.code
of AdmPfx: trace logTxt "putEndFn: admin failed",
pfx=AdmPfx, aid=hdl.error.aid.uint64, error=hdl.error.code
of Oops: trace logTxt "putEndFn: oops",
error=hdl.error.code
return err(hdl.error.code)
# Commit session
db.rdb.commit().isOkOr:
when extraTraceMessages:
trace logTxt "putEndFn: failed", error=($error[0]), info=error[1]
return err(error[0])
ok()
proc guestDbFn(db: RdbBackendRef): GuestDbFn =
result =
proc(instance: int): Result[RootRef,AristoError] =
let gdb = db.rdb.initGuestDb(instance).valueOr:
when extraTraceMessages:
trace logTxt "guestDbFn", error=error[0], info=error[1]
return err(error[0])
ok gdb
proc closeFn(db: RdbBackendRef): CloseFn =
result =
proc(flush: bool) =
db.rdb.destroy(flush)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc rocksDbBackend*(
path: string;
qidLayout: QidLayoutRef;
): Result[BackendRef,AristoError] =
let db = RdbBackendRef(
beKind: BackendRocksDB)
# Initialise RocksDB
block:
let rc = db.rdb.init(path, maxOpenFiles)
if rc.isErr:
when extraTraceMessages:
trace logTxt "constructor failed",
error=rc.error[0], info=rc.error[1]
return err(rc.error[0])
db.rdb.noFq = qidLayout.isNil
db.getVtxFn = getVtxFn db
db.getKeyFn = getKeyFn db
db.getFilFn = getFilFn db
db.getIdgFn = getIdgFn db
db.getFqsFn = getFqsFn db
db.putBegFn = putBegFn db
db.putVtxFn = putVtxFn db
db.putKeyFn = putKeyFn db
db.putFilFn = putFilFn db
db.putIdgFn = putIdgFn db
db.putFqsFn = putFqsFn db
db.putEndFn = putEndFn db
db.guestDbFn = guestDbFn db
db.closeFn = closeFn db
# Set up filter management table
if not db.rdb.noFq:
db.journal = QidSchedRef(ctx: qidLayout)
db.journal.state = block:
let rc = db.getFqsFn()
if rc.isErr:
db.closeFn(flush = false)
return err(rc.error)
rc.value
ok db
proc dup*(db: RdbBackendRef): RdbBackendRef =
## Duplicate descriptor shell as needed for API debugging
new result
init_common.init(result[], db[])
result.rdb = db.rdb
# ------------------------------------------------------------------------------
# Public iterators (needs direct backend access)
# ------------------------------------------------------------------------------
iterator walk*(
be: RdbBackendRef;
): tuple[pfx: StorageType, xid: uint64, data: Blob] =
## Walk over all key-value pairs of the database.
##
## Non-decodable entries are stepped over while the counter `n` of the
## yield record is still incremented.
if be.rdb.noFq:
for w in be.rdb.walk:
case w.pfx:
of AdmPfx:
if w.xid == AdmTabIdFqs.uint64:
continue
of FilPfx:
break # last sub-table
else:
discard
yield w
else:
for w in be.rdb.walk:
yield w
iterator walkVtx*(
be: RdbBackendRef;
): tuple[vid: VertexID, vtx: VertexRef] =
## Variant of `walk()` iteration over the vertex sub-table.
for (xid, data) in be.rdb.walk VtxPfx:
let rc = data.deblobify VertexRef
if rc.isOk:
yield (VertexID(xid), rc.value)
iterator walkKey*(
be: RdbBackendRef;
): tuple[vid: VertexID, key: HashKey] =
## Variant of `walk()` iteration over the Markle hash sub-table.
for (xid, data) in be.rdb.walk KeyPfx:
Aristo db update for short nodes key edge cases (#1887) * Aristo: Provide key-value list signature calculator detail: Simple wrappers around `Aristo` core functionality * Update new API for `CoreDb` details: + Renamed new API functions `contains()` => `hasKey()` or `hasPath()` which disables the `in` operator on non-boolean `contains()` functions + The functions `get()` and `fetch()` always return a not-found error if there is no item, available. The new functions `getOrEmpty()` and `mergeOrEmpty()` return an an empty `Blob` if there is no such key found. * Rewrite `core_apps.nim` using new API from `CoreDb` * Use `Aristo` functionality for calculating Merkle signatures details: For debugging, the `VerifyAristoForMerkleRootCalc` can be set so that `Aristo` results will be verified against the legacy versions. * Provide general interface for Merkle signing key-value tables details: Export `Aristo` wrappers * Activate `CoreDb` tests why: Now, API seems to be stable enough for general tests. * Update `toHex()` usage why: Byteutils' `toHex()` is superior to `toSeq.mapIt(it.toHex(2)).join` * Split `aristo_transcode` => `aristo_serialise` + `aristo_blobify` why: + Different modules for different purposes + `aristo_serialise`: RLP encoding/decoding + `aristo_blobify`: Aristo database encoding/decoding * Compacted representation of small nodes' links instead of Keccak hashes why: Ethereum MPTs use Keccak hashes as node links if the size of an RLP encoded node is at least 32 bytes. Otherwise, the RLP encoded node value is used as a pseudo node link (rather than a hash.) Such a node is nor stored on key-value database. Rather the RLP encoded node value is stored instead of a lode link in a parent node instead. Only for the root hash, the top level node is always referred to by the hash. This feature needed an abstraction of the `HashKey` object which is now either a hash or a blob of length at most 31 bytes. This leaves two ways of representing an empty/void `HashKey` type, either as an empty blob of zero length, or the hash of an empty blob. * Update `CoreDb` interface (mainly reducing logger noise) * Fix copyright years (to make `Lint` happy)
2023-11-08 12:18:32 +00:00
let lid = HashKey.fromBytes(data).valueOr:
continue
yield (VertexID(xid), lid)
iterator walkFil*(
be: RdbBackendRef;
): tuple[qid: QueueID, filter: FilterRef] =
## Variant of `walk()` iteration over the filter sub-table.
if not be.rdb.noFq:
for (xid, data) in be.rdb.walk FilPfx:
let rc = data.deblobify FilterRef
if rc.isOk:
yield (QueueID(xid), rc.value)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------