Simple stupid key-value table companion for Aristo DB (#1746)

why:
  Additional tables needed for the `CoreDB` object with separate
  key-value table and MPT.

details:
+ Stripped down copy of Aristo DB to have a similar look'n feel. Otherwise
  it is just a posh way for accessing `Table` objects or `RocksDB` data.
+ No unit tests yet, will be tested on the go.
This commit is contained in:
Jordan Hrycaj 2023-09-12 19:44:45 +01:00 committed by GitHub
parent 8e46953390
commit dda049cd43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1679 additions and 0 deletions

28
nimbus/db/kvt.nim Normal file
View File

@ -0,0 +1,28 @@
# nimbus-eth1
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Kvt DB -- Standard interface
## ============================
##
{.push raises: [].}
import kvt/[
kvt_constants, kvt_init, kvt_tx, kvt_utils]
export
kvt_constants, kvt_init, kvt_tx, kvt_utils
import
kvt/kvt_desc
export
KvtDbRef,
KvtError,
isValid
# End

View File

@ -0,0 +1,17 @@
# nimbus-eth1
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
../aristo/aristo_constants
export
EmptyBlob
# End

View File

@ -0,0 +1,62 @@
# nimbus-eth1
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Kvt DB -- key-value table
## =========================
##
{.push raises: [].}
import
std/tables,
eth/common,
./kvt_constants,
./kvt_desc/[desc_error, desc_structural]
from ./kvt_desc/desc_backend
import BackendRef
# Not auto-exporting backend
export
kvt_constants, desc_error, desc_structural
type
KvtTxRef* = ref object
## Transaction descriptor
db*: KvtDbRef ## Database descriptor
parent*: KvtTxRef ## Previous transaction
txUid*: uint ## Unique ID among transactions
level*: int ## Stack index for this transaction
KvtDbRef* = ref KvtDbObj
KvtDbObj* = object
## Three tier database object supporting distributed instances.
top*: LayerRef ## Database working layer, mutable
stack*: seq[LayerRef] ## Stashed immutable parent layers
backend*: BackendRef ## Backend database (may well be `nil`)
txRef*: KvtTxRef ## Latest active transaction
txUidGen*: uint ## Tx-relative unique number generator
KvtDbAction* = proc(db: KvtDbRef) {.gcsafe, raises: [CatchableError].}
## Generic call back function/closure.
# ------------------------------------------------------------------------------
# Public helpers
# ------------------------------------------------------------------------------
func getOrVoid*(tab: Table[Blob,Blob]; w: Blob): Blob =
tab.getOrDefault(w, EmptyBlob)
func isValid*(key: Blob): bool =
key != EmptyBlob
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,71 @@
# nimbus-eth1
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Kvt DB -- backend data types
## ============================
##
{.push raises: [].}
import
eth/common,
results,
./desc_error
type
GetKvpFn* =
proc(key: Blob): Result[Blob,KvtError] {.gcsafe, raises: [].}
## Generic backend database retrieval function
# -------------
PutHdlRef* = ref object of RootRef
## Persistent database transaction frame handle. This handle is used to
## wrap any of `PutVtxFn`, `PutKeyFn`, and `PutIdgFn` into and atomic
## transaction frame. These transaction frames must not be interleaved
## by any library function using the backend.
PutBegFn* =
proc(): PutHdlRef {.gcsafe, raises: [].}
## Generic transaction initialisation function
PutKvpFn* =
proc(hdl: PutHdlRef; kvps: openArray[(Blob,Blob)]) {.gcsafe, raises: [].}
## Generic backend database bulk storage function.
PutEndFn* =
proc(hdl: PutHdlRef): Result[void,KvtError] {.gcsafe, raises: [].}
## Generic transaction termination function
# -------------
CloseFn* =
proc(flush: bool) {.gcsafe, raises: [].}
## Generic destructor for the `Kvt DB` backend. The argument `flush`
## indicates that a full database deletion is requested. If passed
## `false` the outcome might differ depending on the type of backend
## (e.g. in-memory backends would flush on close.)
# -------------
BackendRef* = ref object of RootRef
## Backend interface.
getKvpFn*: GetKvpFn ## Read key-value pair
putBegFn*: PutBegFn ## Start bulk store session
putKvpFn*: PutKvpFn ## Bulk store key-value pairs
putEndFn*: PutEndFn ## Commit bulk store session
closeFn*: CloseFn ## Generic destructor
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,42 @@
# nimbus-eth1
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
type
KvtError* = enum
NothingSerious = 0
GenericError
GetNotFound
KeyInvalid
DataInvalid
# RocksDB backend
RdbBeCantCreateDataDir
RdbBeCantCreateBackupDir
RdbBeCantCreateTmpDir
RdbBeDriverInitError
RdbBeDriverGetError
RdbBeDriverDelError
RdbBeCreateSstWriter
RdbBeOpenSstWriter
RdbBeAddSstWriter
RdbBeFinishSstWriter
RdbBeIngestSstWriter
# Transaction wrappers
TxArgStaleTx
TxBackendNotWritable
TxNoPendingTx
TxPendingTx
TxNotTopTx
TxStackGarbled
TxStackUnderflow
# End

View File

@ -0,0 +1,40 @@
# nimbus-eth1
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Kvt DB -- structural data types
## ===============================
##
{.push raises: [].}
import
std/tables,
eth/common
type
LayerRef* = ref object
## Kvt database layer structures. Any layer holds the full
## change relative to the backend.
tab*: Table[Blob,Blob] ## Structural table
txUid*: uint ## Transaction identifier if positive
# ------------------------------------------------------------------------------
# Public helpers, miscellaneous functions
# ------------------------------------------------------------------------------
proc dup*(layer: LayerRef): LayerRef =
## Duplicate layer.
result = LayerRef(
txUid: layer.txUid)
for (k,v) in layer.tab.pairs:
result.tab[k] = v
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,21 @@
# nimbus-eth1
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Constructors for Key-Value Table DB
## ====================================
##
{.push raises: [].}
import
./kvt_init/memory_only
export
memory_only
# End

View File

@ -0,0 +1,61 @@
# nimbus-eth1
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises: [].}
import
../../aristo/aristo_init/init_common,
../kvt_desc,
../kvt_desc/desc_backend
export
BackendType # borrowed from Aristo
const
verifyIxId = true # and false
## Enforce session tracking
type
TypedBackendRef* = ref object of BackendRef
beKind*: BackendType ## Backend type identifier
when verifyIxId:
txGen: uint ## Transaction ID generator (for debugging)
txId: uint ## Active transaction ID (for debugging)
TypedPutHdlRef* = ref object of PutHdlRef
error*: KvtError ## Track error while collecting transaction
when verifyIxId:
txId: uint ## Transaction ID (for debugging)
# ------------------------------------------------------------------------------
# Public helpers
# ------------------------------------------------------------------------------
proc beginSession*(hdl: TypedPutHdlRef; db: TypedBackendRef) =
when verifyIxId:
doAssert db.txId == 0
if db.txGen == 0:
db.txGen = 1
db.txId = db.txGen
hdl.txId = db.txGen
db.txGen.inc
proc verifySession*(hdl: TypedPutHdlRef; db: TypedBackendRef) =
when verifyIxId:
doAssert db.txId == hdl.txId
proc finishSession*(hdl: TypedPutHdlRef; db: TypedBackendRef) =
when verifyIxId:
doAssert db.txId == hdl.txId
db.txId = 0
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,149 @@
# nimbus-eth1
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## In-memory backend for Kvt DB
## ============================
##
## The iterators provided here are currently available only by direct
## backend access
## ::
## import
## kvt/kvt_init,
## kvt/kvt_init/kvt_memory
##
## let rc = newKvtDbRef(BackendMemory)
## if rc.isOk:
## let be = rc.value.to(MemBackendRef)
## for (n, key, vtx) in be.walkVtx:
## ...
##
{.push raises: [].}
import
std/tables,
chronicles,
eth/common,
results,
../kvt_desc,
../kvt_desc/desc_backend,
./init_common
type
MemBackendRef* = ref object of TypedBackendRef
## Inheriting table so access can be extended for debugging purposes
tab: Table[Blob,Blob] ## Structural key-value table
MemPutHdlRef = ref object of TypedPutHdlRef
tab: Table[Blob,Blob]
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
template logTxt(info: static[string]): static[string] =
"MemoryDB " & info
proc newSession(db: MemBackendRef): MemPutHdlRef =
new result
result.TypedPutHdlRef.beginSession db
proc getSession(hdl: PutHdlRef; db: MemBackendRef): MemPutHdlRef =
hdl.TypedPutHdlRef.verifySession db
hdl.MemPutHdlRef
proc endSession(hdl: PutHdlRef; db: MemBackendRef): MemPutHdlRef =
hdl.TypedPutHdlRef.finishSession db
hdl.MemPutHdlRef
# ------------------------------------------------------------------------------
# Private functions: interface
# ------------------------------------------------------------------------------
proc getKvpFn(db: MemBackendRef): GetKvpFn =
result =
proc(key: Blob): Result[Blob,KvtError] =
if key.len == 0:
return err(KeyInvalid)
let data = db.tab.getOrVoid key
if data.isValid:
return ok(data)
err(GetNotFound)
# -------------
proc putBegFn(db: MemBackendRef): PutBegFn =
result =
proc(): PutHdlRef =
db.newSession()
proc putKvpFn(db: MemBackendRef): PutKvpFn =
result =
proc(hdl: PutHdlRef; kvps: openArray[(Blob,Blob)]) =
let hdl = hdl.getSession db
if hdl.error == KvtError(0):
for (k,v) in kvps:
if k.isValid:
hdl.tab[k] = v
else:
hdl.error = KeyInvalid
proc putEndFn(db: MemBackendRef): PutEndFn =
result =
proc(hdl: PutHdlRef): Result[void,KvtError] =
let hdl = hdl.endSession db
if hdl.error != KvtError(0):
debug logTxt "putEndFn: key/value failed", error=hdl.error
return err(hdl.error)
for (k,v) in hdl.tab.pairs:
db.tab[k] = v
ok()
# -------------
proc closeFn(db: MemBackendRef): CloseFn =
result =
proc(ignore: bool) =
discard
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc memoryBackend*: BackendRef =
let db = MemBackendRef(
beKind: BackendMemory)
db.getKvpFn = getKvpFn db
db.putBegFn = putBegFn db
db.putKvpFn = putKvpFn db
db.putEndFn = putEndFn db
db.closeFn = closeFn db
db
# ------------------------------------------------------------------------------
# Public iterators (needs direct backend access)
# ------------------------------------------------------------------------------
iterator walk*(
be: MemBackendRef;
): tuple[key: Blob, data: Blob] =
## Walk over all key-value pairs of the database.
for (k,v) in be.tab.pairs:
yield (k,v)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,88 @@
# nimbus-eth1
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Non persistent constructors for Kvt DB
## ======================================
##
{.push raises: [].}
import
std/sets,
results,
../kvt_desc,
../kvt_desc/desc_backend,
"."/[init_common, memory_db]
type
VoidBackendRef* = ref object of TypedBackendRef
## Dummy descriptor type, will typically used as `nil` reference
export
BackendType,
MemBackendRef
# ------------------------------------------------------------------------------
# Public database constuctors, destructor
# ------------------------------------------------------------------------------
proc newKvtDbRef*(
backend: static[BackendType];
): KvtDbRef =
## Simplified prototype for `BackendNone` and `BackendMemory` type backend.
##
when backend == BackendVoid:
KvtDbRef(top: LayerRef())
elif backend == BackendMemory:
KvtDbRef(top: LayerRef(), backend: memoryBackend(qidLayout))
elif backend == BackendRocksDB:
{.error: "Kvt DB backend \"BackendRocksDB\" needs basePath argument".}
else:
{.error: "Unknown/unsupported Kvt DB backend".}
# -----------------
proc finish*(db: KvtDbRef; flush = false) =
## Backend destructor. The argument `flush` indicates that a full database
## deletion is requested. If set `false` the outcome might differ depending
## on the type of backend (e.g. the `BackendMemory` backend will always
## flush on close.)
##
## This distructor may be used on already *destructed* descriptors.
##
if not db.isNil:
if not db.backend.isNil:
db.backend.closeFn flush
db[] = KvtDbObj(top: LayerRef())
# -----------------
proc to*[W: TypedBackendRef|MemBackendRef|VoidBackendRef](
db: KvtDbRef;
T: type W;
): T =
## Handy helper for lew-level access to some backend functionality
db.backend.T
proc kind*(
be: BackendRef;
): BackendType =
## Retrieves the backend type symbol for a `TypedBackendRef` argument where
## `BackendVoid` is returned for the`nil` backend.
if be.isNil:
BackendVoid
else:
be.TypedBackendRef.beKind
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,64 @@
# nimbus-eth1
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Persistent constructor for Kvt DB
## ====================================
##
## This module automatically pulls in the persistent backend library at the
## linking stage (e.g. `rocksdb`) which can be avoided for pure memory DB
## applications by importing `./kvt_init/memory_only` (rather than
## `./kvt_init/persistent`.)
##
{.push raises: [].}
import
results,
../kvt_desc,
"."/[init_common, rocks_db, memory_only]
export
RdbBackendRef,
memory_only
# ------------------------------------------------------------------------------
# Public database constuctors, destructor
# ------------------------------------------------------------------------------
proc newKvtDbRef*(
backend: static[BackendType];
basePath: string;
): Result[KvtDbRef,KvtError] =
## Generic constructor, `basePath` argument is ignored for `BackendNone` and
## `BackendMemory` type backend database. Also, both of these backends
## aways succeed initialising.
##
when backend == BackendRocksDB:
ok KvtDbRef(top: LayerRef(vGen: vGen), backend: ? rocksDbBackend basePath)
elif backend == BackendVoid:
{.error: "Use BackendNone.init() without path argument".}
elif backend == BackendMemory:
{.error: "Use BackendMemory.init() without path argument".}
else:
{.error: "Unknown/unsupported Kvt DB backend".}
# -----------------
proc to*[W: RdbBackendRef](
db: KvtDbRef;
T: type W;
): T =
## Handy helper for lew-level access to some backend functionality
db.backend.T
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,180 @@
# nimbus-eth1
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Rocksdb backend for Kvt DB
## ==========================
##
## The iterators provided here are currently available only by direct
## backend access
## ::
## import
## kvt/kvt_init,
## kvt/kvt_init/kvt_rocksdb
##
## let rc = KvtDb.init(BackendRocksDB, "/var/tmp")
## if rc.isOk:
## let be = rc.value.to(RdbBackendRef)
## for (n, key, vtx) in be.walkVtx:
## ...
##
{.push raises: [].}
import
chronicles,
eth/common,
rocksdb,
results,
../kvt_desc,
../kvt_desc/desc_backend,
./init_common,
./rocks_db/[rdb_desc, rdb_get, rdb_init, rdb_put, rdb_walk]
logScope:
topics = "kvt-backend"
type
RdbBackendRef* = ref object of TypedBackendRef
rdb: RdbInst ## Allows low level access to database
RdbPutHdlRef = ref object of TypedPutHdlRef
tab: Table[Blob,Blob] ## Transaction cache
const
extraTraceMessages = false or true
## Enabled additional logging noise
# ----------
maxOpenFiles = 512 ## Rocks DB setup, open files limit
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
template logTxt(info: static[string]): static[string] =
"RocksDB " & info
proc newSession(db: RdbBackendRef): RdbPutHdlRef =
new result
result.TypedPutHdlRef.beginSession db
proc getSession(hdl: PutHdlRef; db: RdbBackendRef): RdbPutHdlRef =
hdl.TypedPutHdlRef.verifySession db
hdl.RdbPutHdlRef
proc endSession(hdl: PutHdlRef; db: RdbBackendRef): RdbPutHdlRef =
hdl.TypedPutHdlRef.finishSession db
hdl.RdbPutHdlRef
# ------------------------------------------------------------------------------
# Private functions: interface
# ------------------------------------------------------------------------------
proc getKvpFn(db: RdbBackendRef): GetKvpFn =
result =
proc(key: Blob): Result[Blob,KvtError] =
if key.len == 0:
return err(KeyInvalid)
let rc = db.rdb.get key.toOpenArray(0,key.len-1)
if rc.isErr:
debug logTxt "getKvpFn() failed", key,
error=rc.error[0], info=rc.error[1]
return err(rc.error[0])
# Decode data record
if 0 < rc.value.len:
return ok(rc.value)
err(GetNotFound)
# -------------
proc putBegFn(db: RdbBackendRef): PutBegFn =
result =
proc(): PutHdlRef =
db.newSession()
proc putKvpFn(db: RdbBackendRef): PutKvpFn =
result =
proc(hdl: PutHdlRef; kvps: openArray[(Blob,Blob)]) =
let hdl = hdl.getSession db
if hdl.error == KvtError(0):
for (k,v) in kvps:
if k.isValid:
hdl.tab[k] = v
else:
hdl.error = KeyInvalid
proc putEndFn(db: RdbBackendRef): PutEndFn =
result =
proc(hdl: PutHdlRef): Result[void,KvtError] =
let hdl = hdl.endSession db
if hdl.error != KvtError(0):
debug logTxt "putEndFn: key/value failed", error=hdl.error
return err(hdl.error)
let rc = db.rdb.put hdl.tab
if rc.isErr:
when extraTraceMessages:
debug logTxt "putEndFn: failed",
error=rc.error[0], info=rc.error[1]
return err(rc.error[0])
ok()
proc closeFn(db: RdbBackendRef): CloseFn =
result =
proc(flush: bool) =
db.rdb.destroy(flush)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc rocksDbBackend*(
path: string;
): Result[BackendRef,KvtError] =
let db = RdbBackendRef(
beKind: BackendRocksDB)
# Initialise RocksDB
block:
let rc = db.rdb.init(path, maxOpenFiles)
if rc.isErr:
when extraTraceMessages:
trace logTxt "constructor failed",
error=rc.error[0], info=rc.error[1]
return err(rc.error[0])
db.getKvpFn = getKvpFn db
db.putBegFn = putBegFn db
db.putKvpFn = putKvpFn db
db.putEndFn = putEndFn db
db.closeFn = closeFn db
ok db
# ------------------------------------------------------------------------------
# Public iterators (needs direct backend access)
# ------------------------------------------------------------------------------
iterator walk*(
be: RdbBackendRef;
): tuple[key: Blob, data: Blob] =
## Walk over all key-value pairs of the database.
##
for (k,v) in be.rdb.walk:
yield (k,v)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,37 @@
# nimbus-eth1
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Rocks DB internal driver descriptor
## ===================================
{.push raises: [].}
import
rocksdb
type
RdbInst* = object
store*: RocksDBInstance ## Rocks DB database handler
basePath*: string ## Database directory
# Low level Rocks DB access for bulk store
envOpt*: rocksdb_envoptions_t
impOpt*: rocksdb_ingestexternalfileoptions_t
const
BaseFolder* = "nimbus" # Same as for Legacy DB
DataFolder* = "kvt" # Legacy DB has "data"
BackupFolder* = "khistory" # Legacy DB has "backups"
SstCache* = "kbulkput" # Rocks DB bulk load file name in temp folder
TempFolder* = "tmp" # Not used with legacy DB (same for Aristo)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,43 @@
# nimbus-eth1
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Rocks DB fetch data record
## ==========================
{.push raises: [].}
import
eth/common,
rocksdb,
results,
"../.."/[kvt_constants, kvt_desc],
./rdb_desc
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc get*(
rdb: RdbInst;
key: openArray[byte],
): Result[Blob,(KvtError,string)] =
var res: Blob
let onData: DataProc = proc(data: openArray[byte]) =
res = @data
let rc = rdb.store.get(key, onData)
if rc.isErr:
return err((RdbBeDriverGetError,rc.error))
if not rc.value:
res = EmptyBlob
ok res
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,107 @@
# nimbus-eth1
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Rocksdb constructor/destructor for Kvt DB
## =========================================
{.push raises: [].}
import
std/os,
chronicles,
rocksdb,
results,
../../kvt_desc,
./rdb_desc
logScope:
topics = "kvt-backend"
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
template logTxt(info: static[string]): static[string] =
"RocksDB/init " & info
# ------------------------------------------------------------------------------
# Public constructor
# ------------------------------------------------------------------------------
proc init*(
rdb: var RdbInst;
basePath: string;
openMax: int;
): Result[void,(KvtError,string)] =
## Constructor c ode inspired by `RocksStoreRef.init()` from
## kvstore_rocksdb.nim
let
dataDir = basePath / BaseFolder / DataFolder
backupsDir = basePath / BaseFolder / BackupFolder
tmpDir = basePath / BaseFolder / TempFolder
try:
dataDir.createDir
except OSError, IOError:
return err((RdbBeCantCreateDataDir, ""))
try:
backupsDir.createDir
except OSError, IOError:
return err((RdbBeCantCreateBackupDir, ""))
try:
tmpDir.createDir
except OSError, IOError:
return err((RdbBeCantCreateTmpDir, ""))
let rc = rdb.store.init(
dbPath=dataDir, dbBackuppath=backupsDir, readOnly=false,
maxOpenFiles=openMax)
if rc.isErr:
let error = RdbBeDriverInitError
debug logTxt "driver failed", dataDir, backupsDir, openMax,
error, info=rc.error
return err((RdbBeDriverInitError, rc.error))
# The following is a default setup (subject to change)
rdb.impOpt = rocksdb_ingestexternalfileoptions_create()
rdb.envOpt = rocksdb_envoptions_create()
rdb.basePath = basePath
ok()
proc destroy*(rdb: var RdbInst; flush: bool) =
## Destructor
rdb.envOpt.rocksdb_envoptions_destroy()
rdb.impOpt.rocksdb_ingestexternalfileoptions_destroy()
rdb.store.close()
let
base = rdb.basePath / BaseFolder
try:
(base / TempFolder).removeDir
if flush:
(base / DataFolder).removeDir
# Remove the base folder if it is empty
block done:
for w in base.walkDirRec:
# Ignore backup files
if 0 < w.len and w[^1] != '~':
break done
base.removeDir
except CatchableError:
discard
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,198 @@
# nimbus-eth1
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Rocks DB store data record
## ==========================
{.push raises: [].}
import
std/[algorithm, os, sequtils, sets, tables],
chronicles,
eth/common,
rocksdb,
results,
../../kvt_desc,
./rdb_desc
logScope:
topics = "kvt-backend"
type
RdbPutSession = object
writer: rocksdb_sstfilewriter_t
sstPath: string
nRecords: int
const
extraTraceMessages = false or true
## Enable additional logging noise
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
template logTxt(info: static[string]): static[string] =
"RocksDB/put " & info
proc getFileSize(fileName: string): int64 =
var f: File
if f.open fileName:
defer: f.close
try:
result = f.getFileSize
except:
discard
proc rmFileIgnExpt(fileName: string) =
try:
fileName.removeFile
except:
discard
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc destroy(rps: RdbPutSession) =
rps.writer.rocksdb_sstfilewriter_destroy()
rps.sstPath.rmFileIgnExpt
proc begin(
rdb: var RdbInst;
): Result[RdbPutSession,(KvtError,string)] =
## Begin a new bulk load session storing data into a temporary cache file
## `fileName`. When finished, this file will bi direcly imported into the
## database.
var csError: cstring
var session = RdbPutSession(
writer: rocksdb_sstfilewriter_create(rdb.envOpt, rdb.store.options),
sstPath: rdb.basePath / BaseFolder / TempFolder / SstCache)
if session.writer.isNil:
return err((RdbBeCreateSstWriter, "Cannot create sst writer session"))
session.sstPath.rmFileIgnExpt
session.writer.rocksdb_sstfilewriter_open(
session.sstPath.cstring, addr csError)
if not csError.isNil:
session.destroy()
return err((RdbBeOpenSstWriter, $csError))
ok session
proc add(
session: var RdbPutSession;
key: openArray[byte];
val: openArray[byte];
): Result[void,(KvtError,string)] =
## Append a record to the SST file. Note that consecutive records must be
## strictly increasing.
##
## This function is a wrapper around `rocksdb_sstfilewriter_add()` or
## `rocksdb_sstfilewriter_put()` (stragely enough, there are two functions
## with exactly the same impementation code.)
var csError: cstring
session.writer.rocksdb_sstfilewriter_add(
cast[cstring](unsafeAddr key[0]), csize_t(key.len),
cast[cstring](unsafeAddr val[0]), csize_t(val.len), addr csError)
if not csError.isNil:
return err((RdbBeAddSstWriter, $csError))
session.nRecords.inc
ok()
proc commit(
rdb: var RdbInst;
session: RdbPutSession;
): Result[void,(KvtError,string)] =
## Commit collected and cached data to the database. This function implies
## `destroy()` if successful. Otherwise `destroy()` must be called
## explicitely, e.g. after error analysis.
var csError: cstring
if 0 < session.nRecords:
session.writer.rocksdb_sstfilewriter_finish(addr csError)
if not csError.isNil:
return err((RdbBeFinishSstWriter, $csError))
rdb.store.db.rocksdb_ingest_external_file(
[session.sstPath].allocCStringArray, 1, rdb.impOpt, addr csError)
if not csError.isNil:
return err((RdbBeIngestSstWriter, $csError))
when extraTraceMessages:
let fileSize = session.sstPath.getFileSize
trace logTxt "finished sst", fileSize
session.destroy()
ok()
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc put*(
rdb: var RdbInst;
tab: Table[Blob,Blob];
): Result[void,(KvtError,string)] =
var session = block:
let rc = rdb.begin()
if rc.isErr:
return err(rc.error)
rc.value
# Vertices with empty table values will be deleted
var delKey: HashSet[Blob]
# Compare `Blob`s as left aligned big endian numbers, right padded with zeros
proc cmpBlobs(a, b: Blob): int =
let minLen = min(a.len, b.len)
for n in 0 ..< minLen:
if a[n] != b[n]:
return a[n].cmp b[n]
if a.len < b.len:
return -1
if b.len < a.len:
return 1
for key in tab.keys.toSeq.sorted cmpBlobs:
let val = tab.getOrVoid key
if val.isValid:
let rc = session.add(key, val)
if rc.isErr:
session.destroy()
return err(rc.error)
else:
delKey.incl key
block:
let rc = rdb.commit session
if rc.isErr:
trace logTxt "commit error", error=rc.error[0], info=rc.error[1]
return err(rc.error)
# Delete vertices after successfully updating vertices with non-zero values.
for key in delKey:
let rc = rdb.store.del key
if rc.isErr:
return err((RdbBeDriverDelError,rc.error))
ok()
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,54 @@
# nimbus-eth1
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Rocks DB store data iterator
## ============================
##
{.push raises: [].}
import
std/sequtils,
eth/common,
rocksdb,
./rdb_desc
# ------------------------------------------------------------------------------
# Public iterators
# ------------------------------------------------------------------------------
iterator walk*(rdb: RdbInst): tuple[key: Blob, data: Blob] =
## Walk over all key-value pairs of the database.
##
let rit = rdb.store.db.rocksdb_create_iterator(rdb.store.readOptions)
defer: rit.rocksdb_iter_destroy()
rit.rocksdb_iter_seek_to_first()
while rit.rocksdb_iter_valid() != 0:
var kLen: csize_t
let kData = rit.rocksdb_iter_key(addr kLen)
if not kData.isNil and 0 < kLen:
var vLen: csize_t
let vData = rit.rocksdb_iter_value(addr vLen)
if not vData.isNil and 0 < vLen:
let
key = kData.toOpenArrayByte(0,int(kLen)-1).toSeq
data = vData.toOpenArrayByte(0,int(vLen)-1).toSeq
yield (key,data)
# Update Iterator (might overwrite kData/vdata)
rit.rocksdb_iter_next()
# End while
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,18 @@
# nimbus-eth1
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
kvt_init/persistent as init_persistent,
kvt_walk/persistent as walk_persistent
export
init_persistent,
walk_persistent
# End

196
nimbus/db/kvt/kvt_tx.nim Normal file
View File

@ -0,0 +1,196 @@
# nimbus-eth1
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Kvt DB -- Transaction interface
## ===============================
##
{.push raises: [].}
import
std/[sequtils, tables],
results,
./kvt_desc/desc_backend,
./kvt_desc
func isTop*(tx: KvtTxRef): bool
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
func getDbDescFromTopTx(tx: KvtTxRef): Result[KvtDbRef,KvtError] =
if not tx.isTop():
return err(TxNotTopTx)
let db = tx.db
if tx.level != db.stack.len:
return err(TxStackUnderflow)
ok db
proc getTxUid(db: KvtDbRef): uint =
if db.txUidGen == high(uint):
db.txUidGen = 0
db.txUidGen.inc
db.txUidGen
# ------------------------------------------------------------------------------
# Public functions, getters
# ------------------------------------------------------------------------------
func txTop*(db: KvtDbRef): Result[KvtTxRef,KvtError] =
## Getter, returns top level transaction if there is any.
if db.txRef.isNil:
err(TxNoPendingTx)
else:
ok(db.txRef)
func isTop*(tx: KvtTxRef): bool =
## Getter, returns `true` if the argument `tx` referes to the current top
## level transaction.
tx.db.txRef == tx and tx.db.top.txUid == tx.txUid
func level*(tx: KvtTxRef): int =
## Getter, positive nesting level of transaction argument `tx`
tx.level
func level*(db: KvtDbRef): int =
## Getter, non-negative nesting level (i.e. number of pending transactions)
if not db.txRef.isNil:
result = db.txRef.level
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
func to*(tx: KvtTxRef; T: type[KvtDbRef]): T =
## Getter, retrieves the parent database descriptor from argument `tx`
tx.db
# ------------------------------------------------------------------------------
# Public functions: Transaction frame
# ------------------------------------------------------------------------------
proc txBegin*(db: KvtDbRef): Result[KvtTxRef,KvtError] =
## Starts a new transaction.
##
## Example:
## ::
## proc doSomething(db: KvtDbRef) =
## let tx = db.begin
## defer: tx.rollback()
## ... continue using db ...
## tx.commit()
##
if db.level != db.stack.len:
return err(TxStackGarbled)
db.stack.add db.top.dup # push (save and use top later)
db.top.txUid = db.getTxUid()
db.txRef = KvtTxRef(
db: db,
txUid: db.top.txUid,
parent: db.txRef,
level: db.stack.len)
ok db.txRef
proc rollback*(
tx: KvtTxRef; # Top transaction on database
): Result[void,KvtError] =
## Given a *top level* handle, this function discards all database operations
## performed for this transactio. The previous transaction is returned if
## there was any.
##
let db = ? tx.getDbDescFromTopTx()
# Roll back to previous layer.
db.top = db.stack[^1]
db.stack.setLen(db.stack.len-1)
db.txRef = tx.parent
ok()
proc commit*(
tx: KvtTxRef; # Top transaction on database
): Result[void,KvtError] =
## Given a *top level* handle, this function accepts all database operations
## performed through this handle and merges it to the previous layer. The
## previous transaction is returned if there was any.
##
let db = ? tx.getDbDescFromTopTx()
# Keep top and discard layer below
db.top.txUid = db.stack[^1].txUid
db.stack.setLen(db.stack.len-1)
db.txRef = tx.parent
ok()
proc collapse*(
tx: KvtTxRef; # Top transaction on database
commit: bool; # Commit if `true`, otherwise roll back
): Result[void,KvtError] =
## Iterated application of `commit()` or `rollback()` performing the
## something similar to
## ::
## while true:
## discard tx.commit() # ditto for rollback()
## if db.topTx.isErr: break
## tx = db.topTx.value
##
let db = ? tx.getDbDescFromTopTx()
# If commit, then leave the current layer and clear the stack, oterwise
# install the stack bottom.
if not commit:
db.stack[0].swap db.top
db.top.txUid = 0
db.stack.setLen(0)
ok()
# ------------------------------------------------------------------------------
# Public functions: save database
# ------------------------------------------------------------------------------
proc stow*(
db: KvtDbRef; # Database
): Result[void,KvtError] =
## The function saves the data from the top layer cache into the
## backend database.
##
## If there is no backend the function returns immediately with an error.
## The same happens if there is a pending transaction.
##
if not db.txRef.isNil:
return err(TxPendingTx)
if 0 < db.stack.len:
return err(TxStackGarbled)
let be = db.backend
if be.isNil:
return err(TxBackendNotWritable)
# Save structural and other table entries
let txFrame = be.putBegFn()
be.putKvpFn(txFrame, db.top.tab.pairs.toSeq)
? be.putEndFn txFrame
# Delete or clear stack and clear top
db.stack.setLen(0)
db.top = LayerRef(txUid: db.top.txUid)
ok()
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,95 @@
# nimbus-eth1
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Kvt DB -- Common functions
## ==========================
##
{.push raises: [].}
import
eth/common,
results,
./kvt_desc/desc_backend,
./kvt_desc
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc getBE*(
db: KvtDbRef; # Database
key: Blob; # Key of database record
): Result[Blob,KvtError] =
## For the argument `key` return the associated value from the backend
## database if available.
##
let be = db.backend
if not be.isNil:
return be.getKvpFn key
err(GetNotFound)
# ------------------------------------------------------------------------------
# Public functions, converters
# ------------------------------------------------------------------------------
proc put*(
db: KvtDbRef; # Database
key: Blob; # Key of database record to store
data: Blob; # Value of database record to store
): Result[void,KvtError] =
## For the argument `key` associated the argument `data` as value (which
## will be marked in the top layer cache.)
if key.len == 0:
return err(KeyInvalid)
if data.len == 0:
return err(DataInvalid)
db.top.tab[key] = data
ok()
proc del*(
db: KvtDbRef; # Database
key: Blob; # Key of database record to delete
): Result[void,KvtError] =
## For the argument `key` delete the associated value (which will be marked
## in the top layer cache.)
if key.len == 0:
return err(KeyInvalid)
let rc = db.getBE(key)
if rc.isOk:
db.top.tab[key] = EmptyBlob
elif rc.error == GetNotFound:
db.top.tab.del key
else:
return err(rc.error)
ok()
# ------------
proc get*(
db: KvtDbRef; # Database
key: Blob; # Key of database record
): Result[Blob,KvtError] =
## For the argument `key` return the associated value preferably from the
## top layer, or the database otherwise.
##
if key.len == 0:
return err(KeyInvalid)
let data = db.top.tab.getOrVoid key
if data.isValid:
return ok(data)
db.getBE key
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,27 @@
# Nimbus - Types, data structures and shared utilities used in network sync
#
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or
# distributed except according to those terms.
## Backend DB traversal for Kvt DB
## ===============================
##
## This module provides iterators for the memory based backend or the
## backend-less database. Do import `kvt_walk/persistent` for the
## persistent backend though avoiding to unnecessarily link to the persistent
## backend library (e.g. `rocksdb`) when a memory only database is used.
##
{.push raises: [].}
import
./kvt_walk/memory_only
export
memory_only
# End

View File

@ -0,0 +1,39 @@
# Nimbus - Types, data structures and shared utilities used in network sync
#
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or
# distributed except according to those terms.
## Iterators for non-persistent backend of the Kvt DB
## ==================================================
##
import
eth/common,
../kvt_init/[memory_db, memory_only],
../kvt_init
export
memory_db,
memory_only
# ------------------------------------------------------------------------------
# Public iterators (all in one)
# ------------------------------------------------------------------------------
iterator walkAny*(
be: MemBackendRef|VoidBackendRef;
): tuple[key: Blob, data: Blob] =
## Iterate over backend filters.
when be isnot VoidBackendRef:
mixin walk
for (k,v) in be.walk:
yield (k,v)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,42 @@
# Nimbus - Types, data structures and shared utilities used in network sync
#
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or
# distributed except according to those terms.
## Iterators for persistent backend of the Kvt DB
## ==============================================
##
## This module automatically pulls in the persistent backend library at the
## linking stage (e.g. `rocksdb`) which can be avoided for pure memory DB
## applications by importing `./kvt_walk/memory_only` (rather than
## `./kvt_walk/persistent`.)
##
import
eth/common,
../kvt_init/[rocks_db, persistent],
./memory_only
export
rocks_db,
memory_only,
persistent
# ------------------------------------------------------------------------------
# Public iterators (all in one)
# ------------------------------------------------------------------------------
iterator walkAny*(
be: RdbBackendRef;
): tuple[key: Blob, data: Blob] =
## Walk filter slots in fifo order.
for (k,v) in be.walk:
yield (k,v)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------