Jacek Sieka 41cf81f80b
Fix dboptions init (#2391)
For the block cache to be shared between column families, the options
instance must be shared between the various column families being
created. This also ensures that there is only one source of truth for
configuration options instead of having two different sets depending on
how the tables were initialized.

This PR also removes the re-opening mechanism which can double startup
time - every time the database is opened, the log is replayed - a large
log file will take a long time to open.

Finally, several options got correclty implemented as column family
options, including an one that puts a hash index in the SST files.
2024-06-19 10:55:57 +02:00

331 lines
10 KiB
Nim

# nimbus-eth1
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Rocksdb backend for Kvt DB
## ==========================
##
## The iterators provided here are currently available only by direct
## backend access
## ::
## import
## kvt/kvt_init,
## kvt/kvt_init/kvt_rocksdb
##
## let rc = KvtDb.init(BackendRocksDB, "/var/tmp")
## if rc.isOk:
## let be = rc.value.to(RdbBackendRef)
## for (n, key, vtx) in be.walkVtx:
## ...
##
{.push raises: [].}
import
chronicles,
eth/common,
rocksdb,
results,
../../aristo/aristo_init/persistent,
../../opts,
../kvt_desc,
../kvt_desc/desc_backend,
../kvt_tx/tx_stow,
./init_common,
./rocks_db/[rdb_desc, rdb_get, rdb_init, rdb_put, rdb_walk]
const
extraTraceMessages = false or true
## Enabled additional logging noise
type
RdbBackendRef* = ref object of TypedBackendRef
rdb: RdbInst ## Allows low level access to database
RdbPutHdlRef = ref object of TypedPutHdlRef
logScope:
topics = "kvt-backend"
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
template logTxt(info: static[string]): static[string] =
"RocksDB " & info
proc newSession(db: RdbBackendRef): RdbPutHdlRef =
new result
result.TypedPutHdlRef.beginSession db
proc getSession(hdl: PutHdlRef; db: RdbBackendRef): RdbPutHdlRef =
hdl.TypedPutHdlRef.verifySession db
hdl.RdbPutHdlRef
proc endSession(hdl: PutHdlRef; db: RdbBackendRef): RdbPutHdlRef =
hdl.TypedPutHdlRef.finishSession db
hdl.RdbPutHdlRef
# ------------------------------------------------------------------------------
# Private functions: standard interface
# ------------------------------------------------------------------------------
proc getKvpFn(db: RdbBackendRef): GetKvpFn =
result =
proc(key: openArray[byte]): Result[Blob,KvtError] =
# Get data record
var data = db.rdb.get(key).valueOr:
when extraTraceMessages:
debug logTxt "getKvpFn() failed", key, error=error[0], info=error[1]
return err(error[0])
# Return if non-empty
if 0 < data.len:
return ok(move(data))
err(GetNotFound)
# -------------
proc putBegFn(db: RdbBackendRef): PutBegFn =
result =
proc(): Result[PutHdlRef,KvtError] =
db.rdb.begin()
ok db.newSession()
proc putKvpFn(db: RdbBackendRef): PutKvpFn =
result =
proc(hdl: PutHdlRef; kvps: openArray[(Blob,Blob)]) =
let hdl = hdl.getSession db
if hdl.error == KvtError(0):
# Collect batch session arguments
db.rdb.put(kvps).isOkOr:
hdl.error = error[1]
hdl.info = error[2]
return
proc putEndFn(db: RdbBackendRef): PutEndFn =
result =
proc(hdl: PutHdlRef): Result[void,KvtError] =
let hdl = hdl.endSession db
if hdl.error != KvtError(0):
when extraTraceMessages:
debug logTxt "putEndFn: failed", error=hdl.error, info=hdl.info
db.rdb.rollback()
return err(hdl.error)
# Commit session
db.rdb.commit().isOkOr:
when extraTraceMessages:
trace logTxt "putEndFn: failed", error=($error[0]), info=error[1]
return err(error[0])
ok()
proc closeFn(db: RdbBackendRef): CloseFn =
result =
proc(eradicate: bool) =
db.rdb.destroy(eradicate)
proc canModFn(db: RdbBackendRef): CanModFn =
result =
proc(): Result[void,KvtError] =
ok()
proc setWrReqFn(db: RdbBackendRef): SetWrReqFn =
result =
proc(kvt: RootRef): Result[void,KvtError] =
err(RdbBeHostNotApplicable)
# ------------------------------------------------------------------------------
# Private functions: triggered interface changes
# ------------------------------------------------------------------------------
proc putBegTriggeredFn(db: RdbBackendRef): PutBegFn =
## Variant of `putBegFn()` for piggyback write batch
result =
proc(): Result[PutHdlRef,KvtError] =
# Check whether somebody else initiated the rocksdb write batch/session
if db.rdb.session.isNil:
const error = RdbBeDelayedNotReady
when extraTraceMessages:
debug logTxt "putBegTriggeredFn: failed", error
return err(error)
ok db.newSession()
proc putEndTriggeredFn(db: RdbBackendRef): PutEndFn =
## Variant of `putEndFn()` for piggyback write batch
result =
proc(hdl: PutHdlRef): Result[void,KvtError] =
# There is no commit()/rollback() here as we do not own the backend.
let hdl = hdl.endSession db
if hdl.error != KvtError(0):
when extraTraceMessages:
debug logTxt "putEndTriggeredFn: failed",
error=hdl.error, info=hdl.info
# The error return code will signal a problem to the `txStow()`
# function which was called by `writeEvCb()` below.
return err(hdl.error)
# Commit the session. This will be acknowledged by the `txStow()`
# function which was called by `writeEvCb()` below.
ok()
proc closeTriggeredFn(db: RdbBackendRef): CloseFn =
## Variant of `closeFn()` for piggyback write batch
result =
proc(eradicate: bool) =
# Nothing to do here as we do not own the backend
discard
proc canModTriggeredFn(db: RdbBackendRef): CanModFn =
## Variant of `canModFn()` for piggyback write batch
result =
proc(): Result[void,KvtError] =
# Deny modifications/changes if there is a pending write request
if not db.rdb.delayedPersist.isNil:
return err(RdbBeDelayedLocked)
ok()
proc setWrReqTriggeredFn(db: RdbBackendRef): SetWrReqFn =
result =
proc(kvt: RootRef): Result[void,KvtError] =
if db.rdb.delayedPersist.isNil:
db.rdb.delayedPersist = KvtDbRef(kvt)
ok()
else:
err(RdbBeDelayedAlreadyRegistered)
# ------------------------------------------------------------------------------
# Private function: trigger handler
# ------------------------------------------------------------------------------
proc writeEvCb(db: RdbBackendRef): RdbWriteEventCb =
## Write session event handler
result =
proc(ws: WriteBatchRef): bool =
# Only do something if a write session request was queued
if not db.rdb.delayedPersist.isNil:
defer:
# Clear session environment when leaving. This makes sure that the
# same session can only be run once.
db.rdb.session = WriteBatchRef(nil)
db.rdb.delayedPersist = KvtDbRef(nil)
# Publish session argument
db.rdb.session = ws
# Execute delayed session. Note the the `txStow()` function is located
# in `tx_stow.nim`. This module `tx_stow.nim` is also imported by
# `kvt_tx.nim` which contains `persist() `. So the logic goes:
# ::
# kvt_tx.persist() --> registers a delayed write request rather
# than excuting tx_stow.txStow()
#
# // the backend owner (i.e. Aristo) will start a write cycle and
# // invoke the envent handler rocks_db.writeEvCb()
# rocks_db.writeEvCb() --> calls tx_stow.txStow()
#
# tx_stow.txStow() --> calls rocks_db.putBegTriggeredFn()
# calls rocks_db.putKvpFn()
# calls rocks_db.putEndTriggeredFn()
#
let rc = db.rdb.delayedPersist.txStow(persistent=true)
if rc.isErr:
error "writeEventCb(): persist() failed", error=rc.error
return false
true
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc rocksDbKvtBackend*(
path: string;
dbOpts: DbOptionsRef;
cfOpts: ColFamilyOptionsRef;
): Result[BackendRef,(KvtError,string)] =
let db = RdbBackendRef(
beKind: BackendRocksDB)
# Initialise RocksDB
db.rdb.init(path, dbOpts, cfOpts).isOkOr:
when extraTraceMessages:
trace logTxt "constructor failed", error=error[0], info=error[1]
return err(error)
db.getKvpFn = getKvpFn db
db.putBegFn = putBegFn db
db.putKvpFn = putKvpFn db
db.putEndFn = putEndFn db
db.closeFn = closeFn db
db.canModFn = canModFn db
db.setWrReqFn = setWrReqFn db
ok db
proc rocksDbKvtTriggeredBackend*(
adb: AristoDbRef;
oCfs: openArray[ColFamilyReadWrite];
): Result[BackendRef,(KvtError,string)] =
let db = RdbBackendRef(
beKind: BackendRdbTriggered)
# Initialise RocksDB piggy-backed on `Aristo` backend.
db.rdb.init(oCfs).isOkOr:
when extraTraceMessages:
trace logTxt "constructor failed", error=error[0], info=error[1]
return err(error)
# Register write session event handler
adb.activateWrTrigger(db.writeEvCb()).isOkOr:
return err((RdbBeHostError,$error))
db.getKvpFn = getKvpFn db
db.putBegFn = putBegTriggeredFn db
db.putKvpFn = putKvpFn db
db.putEndFn = putEndTriggeredFn db
db.closeFn = closeTriggeredFn db
db.canModFn = canModTriggeredFn db
db.setWrReqFn = setWrReqTriggeredFn db
ok db
proc dup*(db: RdbBackendRef): RdbBackendRef =
new result
init_common.init(result[], db[])
result.rdb = db.rdb
# ------------------------------------------------------------------------------
# Public iterators (needs direct backend access)
# ------------------------------------------------------------------------------
iterator walk*(
be: RdbBackendRef;
): tuple[key: Blob, data: Blob] =
## Walk over all key-value pairs of the database.
##
for (k,v) in be.rdb.walk:
yield (k,v)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------