Aristo update rocksdb backend drivers (#2135)

* Aristo+RocksDB: Update backend drivers

why:
  RocksDB update allows use some of the newly provided methods which
  were previously implemented by using the very C backend (for the lack
  of NIM methods.)

* Aristo+RocksDB: Simplify drivers wrapper

* Kvt: Update backend drivers and wrappers similar to `Aristo`

* Aristo+Kvm: Use column families for RocksDB

* Aristo+MemoryDB: Code cosmetics

* Aristo: Provide guest column family for export

why:
  So `Kvt` can piggyback on `Aristo` so there avoiding to run a second
  DBMS system in parallel.

* Kvt: Provide import mechanism for RoksDB guest column family

why:
  So `Kvt` can piggyback on `Aristo` so there avoiding to run a second
   DBMS system in parallel.

* CoreDb+Aristo: Run persistent `Kvt` DB piggybacked on `Aristo`

why:
  Avoiding to run two DBMS systems in parallel.

* Fix copyright year

* Ditto
This commit is contained in:
Jordan Hrycaj 2024-04-16 20:39:11 +00:00 committed by GitHub
parent 160e033663
commit d6a4205324
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 637 additions and 823 deletions

View File

@ -91,6 +91,20 @@ type
# -------------
GuestDbFn* =
proc(): Result[RootRef,AristoError] {.gcsafe, raises: [].}
## Generic function that returns a compartmentalised database handle that
## can be used by another application. If non-nil, this handle allows to
## use a previously allocated database. It is separated from the `Aristo`
## columns.
##
## A successful return value might be `nil` if this feature is
## unsupported.
##
## Caveat:
## The guest database is closed automatically when closing the `Aristo`
## database.
CloseFn* =
proc(flush: bool) {.gcsafe, raises: [].}
## Generic destructor for the `Aristo DB` backend. The argument `flush`
@ -119,6 +133,8 @@ type
putFqsFn*: PutFqsFn ## Store filter ID state
putEndFn*: PutEndFn ## Commit bulk store session
guestDbFn*: GuestDbFn ## Piggyback DB for another application
closeFn*: CloseFn ## Generic destructor
proc init*(trg: var BackendObj; src: BackendObj) =
@ -135,6 +151,7 @@ proc init*(trg: var BackendObj; src: BackendObj) =
trg.putIdgFn = src.putIdgFn
trg.putFqsFn = src.putFqsFn
trg.putEndFn = src.putEndFn
trg.guestDbFn = src.guestDbFn
trg.closeFn = src.closeFn
# ------------------------------------------------------------------------------

View File

@ -242,14 +242,12 @@ type
# RocksDB backend
RdbBeCantCreateDataDir
RdbBeCantCreateTmpDir
RdbBeDriverInitError
RdbBeDriverGetError
RdbBeDriverDelError
RdbBeCreateSstWriter
RdbBeOpenSstWriter
RdbBeAddSstWriter
RdbBeFinishSstWriter
RdbBeIngestSstWriter
RdbBeDriverGetError
RdbBeDriverGuestError
RdbBeDriverInitError
RdbBeDriverPutError
RdbBeDriverWriteError
RdbHashKeyExpected
# Transaction wrappers

View File

@ -35,6 +35,10 @@ type
## Access keys for admin table records. When exposed (e.g. when itereating
## over the tables), this data type is to be used.
GuestDbRef* = ref object of RootRef
## Object returned from `GuestDbFn` (if any)
beKind*: BackendType ## Backend type identifier
TypedBackendRef* = ref TypedBackendObj
TypedBackendObj* = object of BackendObj
beKind*: BackendType ## Backend type identifier
@ -48,9 +52,12 @@ type
vid*: VertexID ## Vertex ID where the error occured
of FilPfx:
qid*: QueueID ## Ditto
of AdmPfx, Oops:
of AdmPfx:
aid*: AdminTabID
of Oops:
discard
code*: AristoError ## Error code (if any)
info*: string ## Error description (if any)
TypedPutHdlRef* = ref object of PutHdlRef
error*: TypedPutHdlErrRef ## Track error while collecting transaction

View File

@ -28,7 +28,6 @@
import
std/[algorithm, options, sequtils, tables],
chronicles,
eth/common,
results,
../aristo_constants,
@ -37,6 +36,10 @@ import
../aristo_blobify,
./init_common
const
extraTraceMessages = false or true
## Enabled additional logging noise
type
MemDbRef = ref object
## Database
@ -58,13 +61,18 @@ type
vGen: Option[seq[VertexID]]
vFqs: Option[seq[(QueueID,QueueID)]]
when extraTraceMessages:
import chronicles
logScope:
topics = "aristo-backend"
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
template logTxt(info: static[string]): static[string] =
"MemoryDB " & info
"MemoryDB/" & info
proc newSession(db: MemBackendRef): MemPutHdlRef =
new result
@ -88,9 +96,10 @@ proc getVtxFn(db: MemBackendRef): GetVtxFn =
# Fetch serialised data record
let data = db.mdb.sTab.getOrDefault(vid, EmptyBlob)
if 0 < data.len:
let rc = data.deblobify VertexRef
if rc.isErr:
debug logTxt "getVtxFn() failed", vid, error=rc.error, info=rc.error
let rc = data.deblobify(VertexRef)
when extraTraceMessages:
if rc.isErr:
trace logTxt "getVtxFn() failed", vid, error=rc.error
return rc
err(GetVtxNotFound)
@ -211,6 +220,7 @@ proc putFqsFn(db: MemBackendRef): PutFqsFn =
if hdl.error.isNil:
hdl.error = TypedPutHdlErrRef(
pfx: AdmPfx,
aid: AdmTabIdFqs,
code: FilQuSchedDisabled)
else:
result =
@ -225,16 +235,16 @@ proc putEndFn(db: MemBackendRef): PutEndFn =
proc(hdl: PutHdlRef): Result[void,AristoError] =
let hdl = hdl.endSession db
if not hdl.error.isNil:
case hdl.error.pfx:
of VtxPfx, KeyPfx:
debug logTxt "putEndFn: vtx/key failed",
when extraTraceMessages:
case hdl.error.pfx:
of VtxPfx, KeyPfx: trace logTxt "putEndFn: vtx/key failed",
pfx=hdl.error.pfx, vid=hdl.error.vid, error=hdl.error.code
of FilPfx:
debug logTxt "putEndFn: filter failed",
of FilPfx: trace logTxt "putEndFn: filter failed",
pfx=hdl.error.pfx, qid=hdl.error.qid, error=hdl.error.code
else:
debug logTxt "putEndFn: failed",
pfx=hdl.error.pfx, error=hdl.error.code
of AdmPfx: trace logTxt "putEndFn: admin failed",
pfx=AdmPfx, aid=hdl.error.aid.uint64, error=hdl.error.code
of Oops: trace logTxt "putEndFn: failed",
pfx=hdl.error.pfx, error=hdl.error.code
return err(hdl.error.code)
for (vid,data) in hdl.sTab.pairs:
@ -273,6 +283,11 @@ proc putEndFn(db: MemBackendRef): PutEndFn =
# -------------
proc guestDbFn(db: MemBackendRef): GuestDbFn =
result =
proc(): Result[RootRef,AristoError] =
ok(RootRef nil)
proc closeFn(db: MemBackendRef): CloseFn =
result =
proc(ignore: bool) =
@ -303,6 +318,8 @@ proc memoryBackend*(qidLayout: QidLayoutRef): BackendRef =
db.putFqsFn = putFqsFn db
db.putEndFn = putEndFn db
db.guestDbFn = guestDbFn db
db.closeFn = closeFn db
# Set up filter management table
@ -330,7 +347,8 @@ iterator walkVtx*(
if 0 < data.len:
let rc = data.deblobify VertexRef
if rc.isErr:
debug logTxt "walkVtxFn() skip", n, vid, error=rc.error
when extraTraceMessages:
debug logTxt "walkVtxFn() skip", n, vid, error=rc.error
else:
yield (vid, rc.value)
@ -353,7 +371,8 @@ iterator walkFil*(
if 0 < data.len:
let rc = data.deblobify FilterRef
if rc.isErr:
debug logTxt "walkFilFn() skip", n, qid, error=rc.error
when extraTraceMessages:
debug logTxt "walkFilFn() skip", n, qid, error=rc.error
else:
yield (qid, rc.value)

View File

@ -14,6 +14,7 @@
{.push raises: [].}
import
results,
../aristo_desc,
../aristo_desc/desc_backend,
"."/[init_common, memory_db]
@ -26,6 +27,7 @@ type
export
BackendType,
GuestDbRef,
MemBackendRef,
QidLayoutRef
@ -86,6 +88,14 @@ proc init*(
## Shortcut for `AristoDbRef.init(VoidBackendRef)`
AristoDbRef.init VoidBackendRef
proc guestDb*(db: AristoDbRef): Result[GuestDbRef,AristoError] =
## Database pigiback feature
if db.backend.isNil:
ok(GuestDbRef(nil))
else:
let gdb = db.backend.guestDbFn().valueOr:
return err(error)
ok(gdb.GuestDbRef)
proc finish*(db: AristoDbRef; flush = false) =
## Backend destructor. The argument `flush` indicates that a full database

View File

@ -20,8 +20,11 @@
import
results,
rocksdb,
../aristo_desc,
./rocks_db/rdb_desc,
"."/[rocks_db, memory_only]
export
RdbBackendRef,
memory_only
@ -79,6 +82,12 @@ proc init*[W: RdbBackendRef](
when B is RdbBackendRef:
basePath.newAristoRdbDbRef DEFAULT_QID_QUEUES.to(QidLayoutRef)
proc getRocksDbFamily*(gdb: GuestDbRef): Result[ColFamilyReadWrite,void] =
## Database pigiback feature
if not gdb.isNil and gdb.beKind == BackendRocksDB:
return ok RdbGuestDbRef(gdb).guestDb
err()
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -28,7 +28,6 @@
{.warning: "*** importing rocks DB which needs a linker library".}
import
chronicles,
eth/common,
rocksdb,
results,
@ -39,23 +38,23 @@ import
./init_common,
./rocks_db/[rdb_desc, rdb_get, rdb_init, rdb_put, rdb_walk]
logScope:
topics = "aristo-backend"
const
maxOpenFiles = 512 ## Rocks DB setup, open files limit
extraTraceMessages = false
## Enabled additional logging noise
type
RdbBackendRef* = ref object of TypedBackendRef
rdb: RdbInst ## Allows low level access to database
RdbPutHdlRef = ref object of TypedPutHdlRef
cache: RdbTabs ## Transaction cache
const
extraTraceMessages = false or true
## Enabled additional logging noise
when extraTraceMessages:
import chronicles
# ----------
maxOpenFiles = 512 ## Rocks DB setup, open files limit
logScope:
topics = "aristo-backend"
# ------------------------------------------------------------------------------
# Private helpers
@ -77,19 +76,6 @@ proc endSession(hdl: PutHdlRef; db: RdbBackendRef): RdbPutHdlRef =
hdl.TypedPutHdlRef.finishSession db
hdl.RdbPutHdlRef
proc `vtxCache=`(hdl: RdbPutHdlRef; val: tuple[vid: VertexID; data: Blob]) =
hdl.cache[VtxPfx][val.vid.uint64] = val.data
proc `keyCache=`(hdl: RdbPutHdlRef; val: tuple[vid: VertexID; data: Blob]) =
hdl.cache[KeyPfx][val.vid.uint64] = val.data
proc `filCache=`(hdl: RdbPutHdlRef; val: tuple[qid: QueueID; data: Blob]) =
hdl.cache[FilPfx][val.qid.uint64] = val.data
proc `admCache=`(hdl: RdbPutHdlRef; val: tuple[id: AdminTabID; data: Blob]) =
hdl.cache[AdmPfx][val.id.uint64] = val.data
# ------------------------------------------------------------------------------
# Private functions: interface
# ------------------------------------------------------------------------------
@ -99,15 +85,14 @@ proc getVtxFn(db: RdbBackendRef): GetVtxFn =
proc(vid: VertexID): Result[VertexRef,AristoError] =
# Fetch serialised data record
let rc = db.rdb.get vid.toOpenArray(VtxPfx)
if rc.isErr:
debug logTxt "getVtxFn() failed", vid,
error=rc.error[0], info=rc.error[1]
return err(rc.error[0])
let data = db.rdb.get(VtxPfx, vid.uint64).valueOr:
when extraTraceMessages:
trace logTxt "getVtxFn() failed", vid, error=error[0], info=error[1]
return err(error[0])
# Decode data record
if 0 < rc.value.len:
return rc.value.deblobify VertexRef
if 0 < data.len:
return data.deblobify VertexRef
err(GetVtxNotFound)
@ -116,15 +101,14 @@ proc getKeyFn(db: RdbBackendRef): GetKeyFn =
proc(vid: VertexID): Result[HashKey,AristoError] =
# Fetch serialised data record
let rc = db.rdb.get vid.toOpenArray(KeyPfx)
if rc.isErr:
debug logTxt "getKeyFn: failed", vid,
error=rc.error[0], info=rc.error[1]
return err(rc.error[0])
let data = db.rdb.get(KeyPfx, vid.uint64).valueOr:
when extraTraceMessages:
trace logTxt "getKeyFn: failed", vid, error=error[0], info=error[1]
return err(error[0])
# Decode data record
if 0 < rc.value.len:
let lid = HashKey.fromBytes(rc.value).valueOr:
if 0 < data.len:
let lid = HashKey.fromBytes(data).valueOr:
return err(RdbHashKeyExpected)
return ok lid
@ -140,15 +124,14 @@ proc getFilFn(db: RdbBackendRef): GetFilFn =
proc(qid: QueueID): Result[FilterRef,AristoError] =
# Fetch serialised data record
let rc = db.rdb.get qid.toOpenArray()
if rc.isErr:
debug logTxt "getFilFn: failed", qid,
error=rc.error[0], info=rc.error[1]
return err(rc.error[0])
let data = db.rdb.get(FilPfx, qid.uint64).valueOr:
when extraTraceMessages:
trace logTxt "getFilFn: failed", qid, error=error[0], info=error[1]
return err(error[0])
# Decode data record
if 0 < rc.value.len:
return rc.value.deblobify FilterRef
if 0 < data.len:
return data.deblobify FilterRef
err(GetFilNotFound)
@ -157,17 +140,18 @@ proc getIdgFn(db: RdbBackendRef): GetIdgFn =
proc(): Result[seq[VertexID],AristoError]=
# Fetch serialised data record
let rc = db.rdb.get AdmTabIdIdg.toOpenArray()
if rc.isErr:
debug logTxt "getIdgFn: failed", error=rc.error[1]
return err(rc.error[0])
if rc.value.len == 0:
let w = EmptyVidSeq
return ok w
let data = db.rdb.get(AdmPfx, AdmTabIdIdg.uint64).valueOr:
when extraTraceMessages:
trace logTxt "getIdgFn: failed", error=error[0], info=error[1]
return err(error[0])
# Decode data record
rc.value.deblobify seq[VertexID]
if data.len == 0:
let w = EmptyVidSeq # Must be `let`
return ok w # Compiler error with `ok(EmptyVidSeq)`
# Decode data record
data.deblobify seq[VertexID]
proc getFqsFn(db: RdbBackendRef): GetFqsFn =
if db.rdb.noFq:
@ -179,23 +163,24 @@ proc getFqsFn(db: RdbBackendRef): GetFqsFn =
proc(): Result[seq[(QueueID,QueueID)],AristoError]=
# Fetch serialised data record
let rc = db.rdb.get AdmTabIdFqs.toOpenArray()
if rc.isErr:
debug logTxt "getFqsFn: failed", error=rc.error[1]
return err(rc.error[0])
let data = db.rdb.get(AdmPfx, AdmTabIdFqs.uint64).valueOr:
when extraTraceMessages:
trace logTxt "getFqsFn: failed", error=error[0], info=error[1]
return err(error[0])
if rc.value.len == 0:
let w = EmptyQidPairSeq
return ok w
if data.len == 0:
let w = EmptyQidPairSeq # Must be `let`
return ok w # Compiler error with `ok(EmptyQidPairSeq)`
# Decode data record
rc.value.deblobify seq[(QueueID,QueueID)]
data.deblobify seq[(QueueID,QueueID)]
# -------------
proc putBegFn(db: RdbBackendRef): PutBegFn =
result =
proc(): PutHdlRef =
db.rdb.begin()
db.newSession()
@ -204,6 +189,9 @@ proc putVtxFn(db: RdbBackendRef): PutVtxFn =
proc(hdl: PutHdlRef; vrps: openArray[(VertexID,VertexRef)]) =
let hdl = hdl.getSession db
if hdl.error.isNil:
# Collect batch session arguments
var batch: seq[(uint64,Blob)]
for (vid,vtx) in vrps:
if vtx.isValid:
let rc = vtx.blobify()
@ -213,20 +201,39 @@ proc putVtxFn(db: RdbBackendRef): PutVtxFn =
vid: vid,
code: rc.error)
return
hdl.vtxCache = (vid, rc.value)
batch.add (vid.uint64, rc.value)
else:
hdl.vtxCache = (vid, EmptyBlob)
batch.add (vid.uint64, EmptyBlob)
# Stash batch session data
db.rdb.put(VtxPfx, batch).isOkOr:
hdl.error = TypedPutHdlErrRef(
pfx: VtxPfx,
vid: VertexID(error[0]),
code: error[1],
info: error[2])
proc putKeyFn(db: RdbBackendRef): PutKeyFn =
result =
proc(hdl: PutHdlRef; vkps: openArray[(VertexID,HashKey)]) =
let hdl = hdl.getSession db
if hdl.error.isNil:
# Collect batch session arguments
var batch: seq[(uint64,Blob)]
for (vid,key) in vkps:
if key.isValid:
hdl.keyCache = (vid, @key)
batch.add (vid.uint64, @key)
else:
hdl.keyCache = (vid, EmptyBlob)
batch.add (vid.uint64, EmptyBlob)
# Stash batch session data
db.rdb.put(KeyPfx, batch).isOkOr:
hdl.error = TypedPutHdlErrRef(
pfx: KeyPfx,
vid: VertexID(error[0]),
code: error[1],
info: error[2])
proc putFilFn(db: RdbBackendRef): PutFilFn =
if db.rdb.noFq:
@ -243,6 +250,9 @@ proc putFilFn(db: RdbBackendRef): PutFilFn =
proc(hdl: PutHdlRef; vrps: openArray[(QueueID,FilterRef)]) =
let hdl = hdl.getSession db
if hdl.error.isNil:
# Collect batch session arguments
var batch: seq[(uint64,Blob)]
for (qid,filter) in vrps:
if filter.isValid:
let rc = filter.blobify()
@ -252,19 +262,30 @@ proc putFilFn(db: RdbBackendRef): PutFilFn =
qid: qid,
code: rc.error)
return
hdl.filCache = (qid, rc.value)
batch.add (qid.uint64, rc.value)
else:
hdl.filCache = (qid, EmptyBlob)
batch.add (qid.uint64, EmptyBlob)
# Stash batch session data
db.rdb.put(FilPfx, batch).isOkOr:
hdl.error = TypedPutHdlErrRef(
pfx: FilPfx,
qid: QueueID(error[0]),
code: error[1],
info: error[2])
proc putIdgFn(db: RdbBackendRef): PutIdgFn =
result =
proc(hdl: PutHdlRef; vs: openArray[VertexID]) =
let hdl = hdl.getSession db
if hdl.error.isNil:
if 0 < vs.len:
hdl.admCache = (AdmTabIdIdg, vs.blobify)
else:
hdl.admCache = (AdmTabIdIdg, EmptyBlob)
let idg = if 0 < vs.len: vs.blobify else: EmptyBlob
db.rdb.put(AdmPfx, @[(AdmTabIdIdg.uint64, idg)]).isOkOr:
hdl.error = TypedPutHdlErrRef(
pfx: AdmPfx,
aid: AdmTabIdIdg,
code: error[1],
info: error[2])
proc putFqsFn(db: RdbBackendRef): PutFqsFn =
if db.rdb.noFq:
@ -280,10 +301,15 @@ proc putFqsFn(db: RdbBackendRef): PutFqsFn =
proc(hdl: PutHdlRef; vs: openArray[(QueueID,QueueID)]) =
let hdl = hdl.getSession db
if hdl.error.isNil:
if 0 < vs.len:
hdl.admCache = (AdmTabIdFqs, vs.blobify)
else:
hdl.admCache = (AdmTabIdFqs, EmptyBlob)
# Stash batch session data
let fqs = if 0 < vs.len: vs.blobify else: EmptyBlob
db.rdb.put(AdmPfx, @[(AdmTabIdFqs.uint64, fqs)]).isOkOr:
hdl.error = TypedPutHdlErrRef(
pfx: AdmPfx,
aid: AdmTabIdFqs,
code: error[1],
info: error[2])
proc putEndFn(db: RdbBackendRef): PutEndFn =
@ -291,22 +317,33 @@ proc putEndFn(db: RdbBackendRef): PutEndFn =
proc(hdl: PutHdlRef): Result[void,AristoError] =
let hdl = hdl.endSession db
if not hdl.error.isNil:
case hdl.error.pfx:
of VtxPfx, KeyPfx:
debug logTxt "putEndFn: vtx/key failed",
pfx=hdl.error.pfx, vid=hdl.error.vid, error=hdl.error.code
else:
debug logTxt "putEndFn: failed",
pfx=hdl.error.pfx, error=hdl.error.code
return err(hdl.error.code)
let rc = db.rdb.put hdl.cache
if rc.isErr:
when extraTraceMessages:
debug logTxt "putEndFn: failed",
error=rc.error[0], info=rc.error[1]
return err(rc.error[0])
case hdl.error.pfx:
of VtxPfx, KeyPfx: trace logTxt "putEndFn: vtx/key failed",
pfx=hdl.error.pfx, vid=hdl.error.vid, error=hdl.error.code
of FilPfx: trace logTxt "putEndFn: filter failed",
pfx=FilPfx, qid=hdl.error.qid, error=hdl.error.code
of AdmPfx: trace logTxt "putEndFn: admin failed",
pfx=AdmPfx, aid=hdl.error.aid.uint64, error=hdl.error.code
of Oops: trace logTxt "putEndFn: oops",
error=hdl.error.code
return err(hdl.error.code)
# Commit session
db.rdb.commit().isOkOr:
when extraTraceMessages:
trace logTxt "putEndFn: failed", error=($error[0]), info=error[1]
return err(error[0])
ok()
proc guestDbFn(db: RdbBackendRef): GuestDbFn =
result =
proc(): Result[RootRef,AristoError] =
let gdb = db.rdb.guestDb().valueOr:
when extraTraceMessages:
trace logTxt "guestDbFn", error=error[0], info=error[1]
return err(error[0])
ok gdb
proc closeFn(db: RdbBackendRef): CloseFn =
result =
@ -349,6 +386,8 @@ proc rocksDbBackend*(
db.putFqsFn = putFqsFn db
db.putEndFn = putEndFn db
db.guestDbFn = guestDbFn db
db.closeFn = closeFn db
# Set up filter management table

View File

@ -14,9 +14,7 @@
{.push raises: [].}
import
std/[tables, os],
eth/common,
rocksdb/lib/librocksdb,
std/os,
rocksdb,
stew/endians2,
../../aristo_desc,
@ -24,58 +22,42 @@ import
type
RdbInst* = object
dbOpts*: DbOptionsRef
store*: RocksDbReadWriteRef ## Rocks DB database handler
store*: ColFamilyReadWrite ## Rocks DB database handler
session*: WriteBatchRef ## For batched `put()`
basePath*: string ## Database directory
noFq*: bool ## No filter queues available
# Low level Rocks DB access for bulk store
envOpt*: ptr rocksdb_envoptions_t
impOpt*: ptr rocksdb_ingestexternalfileoptions_t
RdbGuestDbRef* = ref object of GuestDbRef
guestDb*: ColFamilyReadWrite ## Pigiback feature reference
RdbKey* = array[1 + sizeof VertexID, byte]
## Sub-table key, <pfx> + VertexID
RdbTabs* = array[StorageType, Table[uint64,Blob]]
## Combined table for caching data to be stored/updated
const
BaseFolder* = "nimbus" # Same as for Legacy DB
DataFolder* = "aristo" # Legacy DB has "data"
SstCache* = "bulkput" # Rocks DB bulk load file name in temp folder
TempFolder* = "tmp" # No `tmp` directory used with legacy DB
GuestFamily* = "Guest" ## Guest family (e.g. for Kvt)
AristoFamily* = "Aristo" ## RocksDB column family
BaseFolder* = "nimbus" ## Same as for Legacy DB
DataFolder* = "aristo" ## Legacy DB has "data"
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
template logTxt*(info: static[string]): static[string] =
"RocksDB/" & info
func baseDir*(rdb: RdbInst): string =
rdb.basePath / BaseFolder
func dataDir*(rdb: RdbInst): string =
rdb.baseDir / DataFolder
func cacheDir*(rdb: RdbInst): string =
rdb.dataDir / TempFolder
func sstFilePath*(rdb: RdbInst): string =
rdb.cacheDir / SstCache
func toRdbKey*(id: uint64; pfx: StorageType): RdbKey =
let idKey = id.toBytesBE
result[0] = pfx.ord.byte
copyMem(addr result[1], unsafeAddr idKey, sizeof idKey)
template toOpenArray*(vid: VertexID; pfx: StorageType): openArray[byte] =
vid.uint64.toRdbKey(pfx).toOpenArray(0, sizeof uint64)
template toOpenArray*(qid: QueueID): openArray[byte] =
qid.uint64.toRdbKey(FilPfx).toOpenArray(0, sizeof uint64)
template toOpenArray*(aid: AdminTabID): openArray[byte] =
aid.uint64.toRdbKey(AdmPfx).toOpenArray(0, sizeof uint64)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -1,5 +1,5 @@
# nimbus-eth1
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
@ -17,24 +17,41 @@ import
eth/common,
rocksdb,
results,
"../.."/[aristo_constants, aristo_desc],
../../aristo_desc,
../init_common,
./rdb_desc
const
extraTraceMessages = false
## Enable additional logging noise
when extraTraceMessages:
import
chronicles
logScope:
topics = "aristo-rocksdb"
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc get*(
rdb: RdbInst;
key: openArray[byte],
pfx: StorageType;
xid: uint64,
): Result[Blob,(AristoError,string)] =
var res: Blob
let onData: DataProc = proc(data: openArray[byte]) =
res = @data
let rc = rdb.store.get(key, onData)
if rc.isErr:
return err((RdbBeDriverGetError,rc.error))
if not rc.value:
let onData = proc(data: openArray[byte]) =
res = @data
let gotData = rdb.store.get(xid.toRdbKey pfx, onData).valueOr:
const errSym = RdbBeDriverGetError
when extraTraceMessages:
trace logTxt "get", error=errSym, info=error
return err((errSym,error))
if not gotData:
res = EmptyBlob
ok res

View File

@ -15,22 +15,21 @@
import
std/os,
chronicles,
rocksdb/lib/librocksdb,
rocksdb,
results,
../../aristo_desc,
./rdb_desc
logScope:
topics = "aristo-backend"
const
extraTraceMessages = false
## Enable additional logging noise
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
when extraTraceMessages:
import
chronicles
template logTxt(info: static[string]): static[string] =
"RocksDB/init " & info
logScope:
topics = "aristo-rocksdb"
# ------------------------------------------------------------------------------
# Public constructor
@ -52,40 +51,47 @@ proc init*(
dataDir.createDir
except OSError, IOError:
return err((RdbBeCantCreateDataDir, ""))
try:
rdb.cacheDir.createDir
except OSError, IOError:
return err((RdbBeCantCreateTmpDir, ""))
let dbOpts = defaultDbOptions()
dbOpts.setMaxOpenFiles(openMax)
let
cfs = @[initColFamilyDescriptor AristoFamily,
initColFamilyDescriptor GuestFamily]
opts = defaultDbOptions()
opts.setMaxOpenFiles(openMax)
let rc = openRocksDb(dataDir, dbOpts)
if rc.isErr:
let error = RdbBeDriverInitError
debug logTxt "driver failed", dataDir, openMax,
error, info=rc.error
return err((RdbBeDriverInitError, rc.error))
# Reserve a family corner for `Aristo` on the database
let baseDb = openRocksDb(dataDir, opts, columnFamilies=cfs).valueOr:
let errSym = RdbBeDriverInitError
when extraTraceMessages:
trace logTxt "init failed", dataDir, openMax, error=errSym, info=error
return err((errSym, error))
rdb.dbOpts = dbOpts
rdb.store = rc.get()
# Initialise `Aristo` family
rdb.store = baseDb.withColFamily(AristoFamily).valueOr:
let errSym = RdbBeDriverInitError
when extraTraceMessages:
trace logTxt "init failed", dataDir, openMax, error=errSym, info=error
return err((errSym, error))
# The following is a default setup (subject to change)
rdb.impOpt = rocksdb_ingestexternalfileoptions_create()
rdb.envOpt = rocksdb_envoptions_create()
ok()
proc guestDb*(rdb: RdbInst): Result[RootRef,(AristoError,string)] =
# Initialise `Guest` family
let guestDb = rdb.store.db.withColFamily(GuestFamily).valueOr:
let errSym = RdbBeDriverGuestError
when extraTraceMessages:
trace logTxt "guestDb failed", error=errSym, info=error
return err((errSym, error))
ok RdbGuestDbRef(
beKind: BackendRocksDB,
guestDb: guestDb)
proc destroy*(rdb: var RdbInst; flush: bool) =
## Destructor
rdb.envOpt.rocksdb_envoptions_destroy()
rdb.impOpt.rocksdb_ingestexternalfileoptions_destroy()
rdb.store.close()
rdb.store.db.close()
try:
rdb.cacheDir.removeDir
if flush:
if flush:
try:
rdb.dataDir.removeDir
# Remove the base folder if it is empty
@ -96,8 +102,8 @@ proc destroy*(rdb: var RdbInst; flush: bool) =
break done
rdb.baseDir.removeDir
except CatchableError:
discard
except CatchableError:
discard
# ------------------------------------------------------------------------------
# End

View File

@ -14,19 +14,14 @@
{.push raises: [].}
import
std/[algorithm, os, sequtils, strutils, sets, tables],
chronicles,
eth/common,
rocksdb/lib/librocksdb,
rocksdb,
results,
"../.."/[aristo_constants, aristo_desc],
../../aristo_desc,
../init_common,
./rdb_desc
logScope:
topics = "aristo-backend"
type
RdbPutSession = object
writer: ptr rocksdb_sstfilewriter_t
@ -34,165 +29,65 @@ type
nRecords: int
const
extraTraceMessages = false or true
extraTraceMessages = false
## Enable additional logging noise
when extraTraceMessages:
import chronicles
logScope:
topics = "aristo-rocksdb"
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
template logTxt(info: static[string]): static[string] =
"RocksDB/put " & info
proc getFileSize(fileName: string): int64 {.used.} =
var f: File
if f.open fileName:
defer: f.close
try:
result = f.getFileSize
except CatchableError:
discard
proc rmFileIgnExpt(fileName: string) =
try:
fileName.removeFile
except CatchableError:
discard
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc destroy(rps: RdbPutSession) =
rps.writer.rocksdb_sstfilewriter_destroy()
rps.sstPath.rmFileIgnExpt
proc begin(
rdb: var RdbInst;
): Result[RdbPutSession,(AristoError,string)] =
## Begin a new bulk load session storing data into a temporary cache file
## `fileName`. When finished, this file will bi direcly imported into the
## database.
var csError: cstring
var session = RdbPutSession(
writer: rocksdb_sstfilewriter_create(rdb.envOpt, rdb.dbOpts.cPtr),
sstPath: rdb.sstFilePath)
if session.writer.isNil:
let info = $csError
if "no such file or directory" in info.toLowerAscii:
# Somebody might have killed the "tmp" directory?
raiseAssert info
return err((RdbBeOpenSstWriter, info))
session.sstPath.rmFileIgnExpt
session.writer.rocksdb_sstfilewriter_open(
session.sstPath.cstring, cast[cstringArray](csError.addr))
if not csError.isNil:
session.destroy()
return err((RdbBeOpenSstWriter, $csError))
ok session
proc add(
session: var RdbPutSession;
key: openArray[byte];
val: openArray[byte];
): Result[void,(AristoError,string)] =
## Append a record to the SST file. Note that consecutive records must be
## strictly increasing.
##
## This function is a wrapper around `rocksdb_sstfilewriter_add()` or
## `rocksdb_sstfilewriter_put()` (stragely enough, there are two functions
## with exactly the same impementation code.)
var csError: cstring
session.writer.rocksdb_sstfilewriter_add(
cast[cstring](unsafeAddr key[0]), csize_t(key.len),
cast[cstring](unsafeAddr val[0]), csize_t(val.len),
cast[cstringArray](csError.addr))
if not csError.isNil:
return err((RdbBeAddSstWriter, $csError))
session.nRecords.inc
ok()
proc commit(
rdb: var RdbInst;
session: RdbPutSession;
): Result[void,(AristoError,string)] =
## Commit collected and cached data to the database. This function implies
## `destroy()` if successful. Otherwise `destroy()` must be called
## explicitely, e.g. after error analysis.
var csError: cstring
if 0 < session.nRecords:
session.writer.rocksdb_sstfilewriter_finish(cast[cstringArray](csError.addr))
if not csError.isNil:
return err((RdbBeFinishSstWriter, $csError))
var sstPath = session.sstPath.cstring
rdb.store.cPtr.rocksdb_ingest_external_file(
cast[cstringArray](sstPath.addr), 1, rdb.impOpt, cast[cstringArray](csError.addr))
if not csError.isNil:
return err((RdbBeIngestSstWriter, $csError))
when extraTraceMessages:
trace logTxt "finished sst", fileSize=session.sstPath.getFileSize
session.destroy()
ok()
proc disposeSession(rdb: var RdbInst) =
rdb.session.close()
rdb.session = WriteBatchRef(nil)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc begin*(rdb: var RdbInst) =
if rdb.session.isNil:
rdb.session = rdb.store.openWriteBatch()
proc rollback*(rdb: var RdbInst) =
if not rdb.session.isClosed():
rdb.disposeSession()
proc commit*(rdb: var RdbInst): Result[void,(AristoError,string)] =
if not rdb.session.isClosed():
defer: rdb.disposeSession()
rdb.store.write(rdb.session).isOkOr:
const errSym = RdbBeDriverWriteError
when extraTraceMessages:
trace logTxt "commit", error=errSym, info=error
return err((errSym,error))
ok()
proc put*(
rdb: var RdbInst;
tabs: RdbTabs;
): Result[void,(AristoError,string)] =
var session = block:
let rc = rdb.begin()
if rc.isErr:
return err(rc.error)
rc.value
# Vertices with empty table values will be deleted
var delKey: HashSet[RdbKey]
for pfx in low(StorageType) .. high(StorageType):
when extraTraceMessages:
trace logTxt "sub-table", pfx, nItems=tabs[pfx].len
for id in tabs[pfx].keys.toSeq.sorted:
let
key = id.toRdbKey pfx
val = tabs[pfx].getOrDefault(id, EmptyBlob)
if val.len == 0:
delKey.incl key
else:
let rc = session.add(key, val)
if rc.isErr:
session.destroy()
return err(rc.error)
block:
let rc = rdb.commit session
if rc.isErr:
trace logTxt "commit error", error=rc.error[0], info=rc.error[1]
return err(rc.error)
# Delete vertices after successfully updating vertices with non-zero values.
for key in delKey:
let rc = rdb.store.delete key
if rc.isErr:
return err((RdbBeDriverDelError,rc.error))
rdb: RdbInst;
pfx: StorageType;
data: openArray[(uint64,Blob)];
): Result[void,(uint64,AristoError,string)] =
let dsc = rdb.session
for (xid,val) in data:
let key = xid.toRdbKey pfx
if val.len == 0:
dsc.delete(key, rdb.store.name).isOkOr:
const errSym = RdbBeDriverDelError
when extraTraceMessages:
trace logTxt "del", pfx, xid, error=errSym, info=error
return err((xid,errSym,error))
else:
dsc.put(key, val, rdb.store.name).isOkOr:
const errSym = RdbBeDriverPutError
when extraTraceMessages:
trace logTxt "put", pfx, xid, error=errSym, info=error
return err((xid,errSym,error))
ok()
# ------------------------------------------------------------------------------

View File

@ -14,31 +14,22 @@
{.push raises: [].}
import
std/sequtils,
eth/common,
stew/endians2,
rocksdb/lib/librocksdb,
rocksdb,
../init_common,
./rdb_desc
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
const
extraTraceMessages = false
## Enable additional logging noise
func keyPfx(kData: cstring, kLen: csize_t): int =
if not kData.isNil and kLen == 1 + sizeof(uint64):
kData.toOpenArrayByte(0,0)[0].int
else:
-1
when extraTraceMessages:
import
chronicles
func keyXid(kData: cstring, kLen: csize_t): uint64 =
if not kData.isNil and kLen == 1 + sizeof(uint64):
return uint64.fromBytesBE kData.toOpenArrayByte(1,int(kLen)-1).toSeq
func valBlob(vData: cstring, vLen: csize_t): Blob =
if not vData.isNil and 0 < vLen:
return vData.toOpenArrayByte(0,int(vLen)-1).toSeq
logScope:
topics = "aristo-rocksdb"
# ------------------------------------------------------------------------------
# Public iterators
@ -50,37 +41,21 @@ iterator walk*(
## Walk over all key-value pairs of the database.
##
## Non-decodable entries are stepped over and ignored.
block walkBody:
let rit = rdb.store.openIterator().valueOr:
when extraTraceMessages:
trace logTxt "walk", pfx="all", error
break walkBody
defer: rit.close()
let
readOptions = rocksdb_readoptions_create()
rit = rdb.store.cPtr.rocksdb_create_iterator(readOptions)
defer:
rit.rocksdb_iter_destroy()
readOptions.rocksdb_readoptions_destroy()
rit.rocksdb_iter_seek_to_first()
while rit.rocksdb_iter_valid() != 0:
var kLen: csize_t
let kData = rit.rocksdb_iter_key(addr kLen)
let pfx = kData.keyPfx(kLen)
if 0 <= pfx:
if high(StorageType).ord < pfx:
break
let xid = kData.keyXid(kLen)
if 0 < xid:
var vLen: csize_t
let vData = rit.rocksdb_iter_value(addr vLen)
let val = vData.valBlob(vLen)
if 0 < val.len:
yield (pfx.StorageType, xid, val)
# Update Iterator (might overwrite kData/vdata)
rit.rocksdb_iter_next()
# End while
for (key,val) in rit.pairs:
if key.len == 9:
if StorageType.high.ord < key[0]:
break walkBody
let
pfx = StorageType(key[0])
id = uint64.fromBytesBE key[1..^1]
yield (pfx, id, val)
iterator walk*(
@ -93,65 +68,30 @@ iterator walk*(
## Non-decodable entries are stepped over and ignored.
##
block walkBody:
if pfx in {Oops, AdmPfx}:
# Unsupported
let rit = rdb.store.openIterator().valueOr:
when extraTraceMessages:
echo ">>> walk (2) oops",
" pfx=", pfx
trace logTxt "walk", pfx, error
break walkBody
defer: rit.close()
let
readOptions = rocksdb_readoptions_create()
rit = rdb.store.cPtr.rocksdb_create_iterator(readOptions)
defer:
rit.rocksdb_iter_destroy()
readOptions.rocksdb_readoptions_destroy()
# Start at first entry not less than `<pfx> & 1`
rit.seekToKey 1u64.toRdbKey pfx
var
kLen: csize_t
kData: cstring
# Fetch sub-table data as long as the current key is acceptable
while rit.isValid():
let key = rit.key()
if key.len == 9:
if key[0] != pfx.ord.uint:
break walkBody # done
# Seek for `VertexID(1)` and subsequent entries if that fails. There should
# always be a `VertexID(1)` entry unless the sub-table is empty. There is
# no such control for the filter table in which case there is a blind guess
# (in case `rocksdb_iter_seek()` does not search `ge` for some reason.)
let keyOne = 1u64.toRdbKey pfx
# It is not clear what happens when the `key` does not exist. The guess
# is that the interation will proceed at the next key position.
#
# Comment from GO port at
# //github.com/DanielMorsing/rocksdb/blob/master/iterator.go:
#
# Seek moves the iterator the position of the key given or, if the key
# doesn't exist, the next key that does exist in the database. If the key
# doesn't exist, and there is no next key, the Iterator becomes invalid.
#
kData = cast[cstring](unsafeAddr keyOne[0])
kLen = sizeof(keyOne).csize_t
rit.rocksdb_iter_seek(kData, kLen)
if rit.rocksdb_iter_valid() == 0:
break walkBody
# Fetch sub-table data
while true:
kData = rit.rocksdb_iter_key(addr kLen)
if pfx.ord != kData.keyPfx kLen:
break walkBody # done
let xid = kData.keyXid(kLen)
if 0 < xid:
# Fetch value data
var vLen: csize_t
let vData = rit.rocksdb_iter_value(addr vLen)
let val = vData.valBlob(vLen)
if 0 < val.len:
yield (xid, val)
let val = rit.value()
if val.len != 0:
yield (uint64.fromBytesBE key[1..^1], val)
# Update Iterator
rit.rocksdb_iter_next()
if rit.rocksdb_iter_valid() == 0:
break walkBody
# End while
rit.next()
# ------------------------------------------------------------------------------
# End

View File

@ -14,8 +14,10 @@ import
std/tables,
eth/common,
results,
"../.."/[aristo, aristo/aristo_walk],
"../.."/[kvt, kvt/kvt_init/memory_only, kvt/kvt_walk],
../../aristo as use_ari,
../../aristo/aristo_walk,
../../kvt as use_kvt,
../../kvt/[kvt_init/memory_only, kvt_walk],
".."/[base, base/base_desc],
./aristo_db/[common_desc, handlers_aristo, handlers_kvt, handlers_trace]
@ -183,10 +185,10 @@ proc baseMethods(
ok(db.bless db.tracerSetup(flags)))
# ------------------------------------------------------------------------------
# Private constructor helpers
# Public constructor and helper
# ------------------------------------------------------------------------------
proc create(
proc create*(
dbType: CoreDbType;
kdb: KvtDbRef;
K: typedesc;
@ -205,58 +207,20 @@ proc create(
db.methods = db.baseMethods(A,K)
db.bless
proc init(
dbType: CoreDbType;
K: typedesc;
A: typedesc;
qlr: QidLayoutRef;
): CoreDbRef =
dbType.create(KvtDbRef.init(K), K, AristoDbRef.init(A, qlr), A)
proc init(
dbType: CoreDbType;
K: typedesc;
A: typedesc;
): CoreDbRef =
dbType.create(KvtDbRef.init(K), K, AristoDbRef.init(A), A)
# ------------------------------------------------------------------------------
# Public constructor helpers
# ------------------------------------------------------------------------------
proc init*(
dbType: CoreDbType;
K: typedesc;
A: typedesc;
path: string;
qlr: QidLayoutRef;
): CoreDbRef =
dbType.create(
KvtDbRef.init(K, path).expect "Kvt/RocksDB init() failed", K,
AristoDbRef.init(A, path, qlr).expect "Aristo/RocksDB init() failed", A)
proc init*(
dbType: CoreDbType;
K: typedesc;
A: typedesc;
path: string;
): CoreDbRef =
dbType.create(
KvtDbRef.init(K, path).expect "Kvt/RocksDB init() failed", K,
AristoDbRef.init(A, path).expect "Aristo/RocksDB init() failed", A)
# ------------------------------------------------------------------------------
# Public constructor
# ------------------------------------------------------------------------------
proc newAristoMemoryCoreDbRef*(qlr: QidLayoutRef): CoreDbRef =
AristoDbMemory.init(kvt.MemBackendRef, aristo.MemBackendRef, qlr)
AristoDbMemory.create(
KvtDbRef.init(use_kvt.MemBackendRef), use_ari.MemBackendRef,
AristoDbRef.init(use_ari.MemBackendRef, qlr), use_kvt.MemBackendRef)
proc newAristoMemoryCoreDbRef*(): CoreDbRef =
AristoDbMemory.init(kvt.MemBackendRef, aristo.MemBackendRef)
AristoDbMemory.create(
KvtDbRef.init(use_kvt.MemBackendRef), use_ari.MemBackendRef,
AristoDbRef.init(use_ari.MemBackendRef), use_kvt.MemBackendRef)
proc newAristoVoidCoreDbRef*(): CoreDbRef =
AristoDbVoid.init(kvt.VoidBackendRef, aristo.VoidBackendRef)
AristoDbVoid.create(
KvtDbRef.init(use_kvt.VoidBackendRef), use_ari.VoidBackendRef,
AristoDbRef.init(use_ari.VoidBackendRef), use_kvt.VoidBackendRef)
# ------------------------------------------------------------------------------
# Public helpers, e.g. for direct backend access
@ -304,7 +268,7 @@ iterator aristoKvtPairsVoid*(dsc: CoreDxKvtRef): (Blob,Blob) {.rlpRaise.} =
api = dsc.toAristoApi()
p = api.forkTop(dsc.to(KvtDbRef)).valueOrApiError "aristoKvtPairs()"
defer: discard api.forget(p)
for (k,v) in kvt.VoidBackendRef.walkPairs p:
for (k,v) in use_kvt.VoidBackendRef.walkPairs p:
yield (k,v)
iterator aristoKvtPairsMem*(dsc: CoreDxKvtRef): (Blob,Blob) {.rlpRaise.} =
@ -312,7 +276,7 @@ iterator aristoKvtPairsMem*(dsc: CoreDxKvtRef): (Blob,Blob) {.rlpRaise.} =
api = dsc.toAristoApi()
p = api.forkTop(dsc.to(KvtDbRef)).valueOrApiError "aristoKvtPairs()"
defer: discard api.forget(p)
for (k,v) in kvt.MemBackendRef.walkPairs p:
for (k,v) in use_kvt.MemBackendRef.walkPairs p:
yield (k,v)
iterator aristoMptPairs*(dsc: CoreDxMptRef): (Blob,Blob) {.noRaise.} =
@ -324,12 +288,12 @@ iterator aristoMptPairs*(dsc: CoreDxMptRef): (Blob,Blob) {.noRaise.} =
iterator aristoReplicateMem*(dsc: CoreDxMptRef): (Blob,Blob) {.rlpRaise.} =
## Instantiation for `MemBackendRef`
for k,v in aristoReplicate[aristo.MemBackendRef](dsc):
for k,v in aristoReplicate[use_ari.MemBackendRef](dsc):
yield (k,v)
iterator aristoReplicateVoid*(dsc: CoreDxMptRef): (Blob,Blob) {.rlpRaise.} =
## Instantiation for `VoidBackendRef`
for k,v in aristoReplicate[aristo.VoidBackendRef](dsc):
for k,v in aristoReplicate[use_ari.VoidBackendRef](dsc):
yield (k,v)
# ------------------------------------------------------------------------------

View File

@ -14,17 +14,22 @@ import
eth/common,
results,
../../aristo,
../../aristo/[
aristo_desc, aristo_persistent, aristo_walk/persistent, aristo_tx],
../../aristo/aristo_persistent as use_ari,
../../aristo/[aristo_desc, aristo_walk/persistent, aristo_tx],
../../kvt,
../../kvt/kvt_persistent,
../../kvt/kvt_persistent as use_kvt,
../base,
./aristo_db,
./aristo_db/handlers_aristo
./aristo_db/[common_desc, handlers_aristo]
include
./aristo_db/aristo_replicate
const
# Expectation messages
aristoFail = "Aristo/RocksDB init() failed"
kvtFail = "Kvt/RocksDB init() failed"
# Annotation helper(s)
{.pragma: rlpRaise, gcsafe, raises: [AristoApiRlpError].}
@ -33,16 +38,18 @@ include
# ------------------------------------------------------------------------------
proc newAristoRocksDbCoreDbRef*(path: string; qlr: QidLayoutRef): CoreDbRef =
AristoDbRocks.init(
kvt_persistent.RdbBackendRef,
aristo_persistent.RdbBackendRef,
path, qlr)
let
adb = AristoDbRef.init(use_ari.RdbBackendRef, path, qlr).expect aristoFail
gdb = adb.guestDb().valueOr: GuestDbRef(nil)
kdb = KvtDbRef.init(use_kvt.RdbBackendRef, path, gdb).expect kvtFail
AristoDbRocks.create(kdb, use_kvt.RdbBackendRef, adb, use_ari.RdbBackendRef)
proc newAristoRocksDbCoreDbRef*(path: string): CoreDbRef =
AristoDbRocks.init(
kvt_persistent.RdbBackendRef,
aristo_persistent.RdbBackendRef,
path)
let
adb = AristoDbRef.init(use_ari.RdbBackendRef, path).expect aristoFail
gdb = adb.guestDb().valueOr: GuestDbRef(nil)
kdb = KvtDbRef.init(use_kvt.RdbBackendRef, path, gdb).expect kvtFail
AristoDbRocks.create(kdb, use_kvt.RdbBackendRef, adb, use_ari.RdbBackendRef)
# ------------------------------------------------------------------------------
# Public aristo iterators
@ -50,7 +57,7 @@ proc newAristoRocksDbCoreDbRef*(path: string): CoreDbRef =
iterator aristoReplicateRdb*(dsc: CoreDxMptRef): (Blob,Blob) {.rlpRaise.} =
## Instantiation for `VoidBackendRef`
for k,v in aristoReplicate[aristo_persistent.RdbBackendRef](dsc):
for k,v in aristoReplicate[use_ari.RdbBackendRef](dsc):
yield (k,v)
# ------------------------------------------------------------------------------

View File

@ -19,15 +19,11 @@ type
# RocksDB backend
RdbBeCantCreateDataDir
RdbBeCantCreateTmpDir
RdbBeDriverInitError
RdbBeDriverGetError
RdbBeDriverDelError
RdbBeCreateSstWriter
RdbBeOpenSstWriter
RdbBeAddSstWriter
RdbBeFinishSstWriter
RdbBeIngestSstWriter
RdbBeDriverGetError
RdbBeDriverInitError
RdbBeDriverPutError
RdbBeDriverWriteError
# Transaction wrappers
TxArgStaleTx

View File

@ -32,6 +32,7 @@ type
TypedPutHdlRef* = ref object of PutHdlRef
error*: KvtError ## Track error while collecting transaction
info*: string ## Error description (if any)
when verifyIxId:
txId: uint ## Transaction ID (for debugging)

View File

@ -1,5 +1,5 @@
# nimbus-eth1
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
@ -22,6 +22,10 @@ import
results,
../kvt_desc,
"."/[rocks_db, memory_only]
from ../../aristo/aristo_persistent
import GuestDbRef, getRocksDbFamily
export
RdbBackendRef,
memory_only
@ -34,13 +38,22 @@ proc init*[W: MemOnlyBackend|RdbBackendRef](
T: type KvtDbRef;
B: type W;
basePath: string;
guestDb = GuestDbRef(nil);
): Result[KvtDbRef,KvtError] =
## Generic constructor, `basePath` argument is ignored for `BackendNone` and
## `BackendMemory` type backend database. Also, both of these backends
## aways succeed initialising.
##
## If the argument `guestDb` is set and is a RocksDB column familly, the
## `Kvt`batabase is built upon this column familly. Othewise it is newly
## created with `basePath` as storage location.
##
when B is RdbBackendRef:
ok KvtDbRef(top: LayerRef(), backend: ? rocksDbBackend basePath)
let rc = guestDb.getRocksDbFamily()
if rc.isOk:
ok KvtDbRef(top: LayerRef(), backend: ? rocksDbBackend rc.value)
else:
ok KvtDbRef(top: LayerRef(), backend: ? rocksDbBackend basePath)
else:
ok KvtDbRef.init B

View File

@ -28,7 +28,6 @@
{.warning: "*** importing rocks DB which needs a linker library".}
import
chronicles,
eth/common,
rocksdb,
results,
@ -37,8 +36,12 @@ import
./init_common,
./rocks_db/[rdb_desc, rdb_get, rdb_init, rdb_put, rdb_walk]
logScope:
topics = "kvt-backend"
const
maxOpenFiles = 512 ## Rocks DB setup, open files limit
extraTraceMessages = false or true
## Enabled additional logging noise
type
RdbBackendRef* = ref object of TypedBackendRef
@ -47,13 +50,11 @@ type
RdbPutHdlRef = ref object of TypedPutHdlRef
tab: Table[Blob,Blob] ## Transaction cache
const
extraTraceMessages = false or true
## Enabled additional logging noise
when extraTraceMessages:
import chronicles
# ----------
maxOpenFiles = 512 ## Rocks DB setup, open files limit
logScope:
topics = "aristo-backend"
# ------------------------------------------------------------------------------
# Private helpers
@ -82,17 +83,16 @@ proc endSession(hdl: PutHdlRef; db: RdbBackendRef): RdbPutHdlRef =
proc getKvpFn(db: RdbBackendRef): GetKvpFn =
result =
proc(key: openArray[byte]): Result[Blob,KvtError] =
if key.len == 0:
return err(KeyInvalid)
let rc = db.rdb.get key
if rc.isErr:
debug logTxt "getKvpFn() failed", key,
error=rc.error[0], info=rc.error[1]
return err(rc.error[0])
# Decode data record
if 0 < rc.value.len:
return ok(rc.value)
# Get data record
let data = db.rdb.get(key).valueOr:
when extraTraceMessages:
debug logTxt "getKvpFn() failed", key, error=error[0], info=error[1]
return err(error[0])
# Return if non-empty
if 0 < data.len:
return ok(data)
err(GetNotFound)
@ -101,6 +101,7 @@ proc getKvpFn(db: RdbBackendRef): GetKvpFn =
proc putBegFn(db: RdbBackendRef): PutBegFn =
result =
proc(): PutHdlRef =
db.rdb.begin()
db.newSession()
proc putKvpFn(db: RdbBackendRef): PutKvpFn =
@ -108,25 +109,27 @@ proc putKvpFn(db: RdbBackendRef): PutKvpFn =
proc(hdl: PutHdlRef; kvps: openArray[(Blob,Blob)]) =
let hdl = hdl.getSession db
if hdl.error == KvtError(0):
for (k,v) in kvps:
if k.isValid:
hdl.tab[k] = v
else:
hdl.error = KeyInvalid
# Collect batch session arguments
db.rdb.put(kvps).isOkOr:
hdl.error = error[1]
hdl.info = error[2]
return
proc putEndFn(db: RdbBackendRef): PutEndFn =
result =
proc(hdl: PutHdlRef): Result[void,KvtError] =
let hdl = hdl.endSession db
if hdl.error != KvtError(0):
debug logTxt "putEndFn: key/value failed", error=hdl.error
return err(hdl.error)
let rc = db.rdb.put hdl.tab
if rc.isErr:
when extraTraceMessages:
debug logTxt "putEndFn: failed",
error=rc.error[0], info=rc.error[1]
return err(rc.error[0])
debug logTxt "putEndFn: failed", error=hdl.error, info=hdl.info
return err(hdl.error)
# Commit session
db.rdb.commit().isOkOr:
when extraTraceMessages:
trace logTxt "putEndFn: failed", error=($error[0]), info=error[1]
return err(error[0])
ok()
@ -135,25 +138,9 @@ proc closeFn(db: RdbBackendRef): CloseFn =
proc(flush: bool) =
db.rdb.destroy(flush)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc rocksDbBackend*(
path: string;
): Result[BackendRef,KvtError] =
let db = RdbBackendRef(
beKind: BackendRocksDB)
# Initialise RocksDB
block:
let rc = db.rdb.init(path, maxOpenFiles)
if rc.isErr:
when extraTraceMessages:
trace logTxt "constructor failed",
error=rc.error[0], info=rc.error[1]
return err(rc.error[0])
# --------------
proc setup(db: RdbBackendRef) =
db.getKvpFn = getKvpFn db
db.putBegFn = putBegFn db
@ -162,6 +149,28 @@ proc rocksDbBackend*(
db.closeFn = closeFn db
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc rocksDbBackend*(path: string): Result[BackendRef,KvtError] =
let db = RdbBackendRef(
beKind: BackendRocksDB)
# Initialise RocksDB
db.rdb.init(path, maxOpenFiles).isOkOr:
when extraTraceMessages:
trace logTxt "constructor failed", error=error[0], info=error[1]
return err(error[0])
db.setup()
ok db
proc rocksDbBackend*(store: ColFamilyReadWrite): Result[BackendRef,KvtError] =
let db = RdbBackendRef(
beKind: BackendRocksDB)
db.rdb.init(store)
db.setup()
ok db
proc dup*(db: RdbBackendRef): RdbBackendRef =

View File

@ -15,24 +15,18 @@
import
std/os,
rocksdb/lib/librocksdb,
rocksdb
type
RdbInst* = object
dbOpts*: DbOptionsRef
store*: RocksDbReadWriteRef ## Rocks DB database handler
store*: ColFamilyReadWrite ## Rocks DB database handler
session*: WriteBatchRef ## For batched `put()`
basePath*: string ## Database directory
# Low level Rocks DB access for bulk store
envOpt*: ptr rocksdb_envoptions_t
impOpt*: ptr rocksdb_ingestexternalfileoptions_t
const
BaseFolder* = "nimbus" # Same as for Legacy DB
DataFolder* = "kvt" # Legacy DB has "data"
SstCache* = "bulkput" # Rocks DB bulk load file name in temp folder
TempFolder* = "tmp" # No `tmp` directory used with legacy DB
KvtFamily* = "Kvt" ## RocksDB column family
BaseFolder* = "nimbus" ## Same as for Legacy DB
DataFolder* = "kvt" ## Legacy DB has "data"
# ------------------------------------------------------------------------------
# Public functions
@ -44,11 +38,9 @@ func baseDir*(rdb: RdbInst): string =
func dataDir*(rdb: RdbInst): string =
rdb.baseDir / DataFolder
func cacheDir*(rdb: RdbInst): string =
rdb.dataDir / TempFolder
func sstFilePath*(rdb: RdbInst): string =
rdb.cacheDir / SstCache
template logTxt(info: static[string]): static[string] =
"RocksDB/" & info
# ------------------------------------------------------------------------------
# End

View File

@ -1,5 +1,5 @@
# nimbus-eth1
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
@ -20,6 +20,17 @@ import
"../.."/[kvt_constants, kvt_desc],
./rdb_desc
const
extraTraceMessages = false
## Enable additional logging noise
when extraTraceMessages:
import
chronicles
logScope:
topics = "kvt-rocksdb"
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
@ -31,10 +42,14 @@ proc get*(
var res: Blob
let onData: DataProc = proc(data: openArray[byte]) =
res = @data
let rc = rdb.store.get(key, onData)
if rc.isErr:
return err((RdbBeDriverGetError,rc.error))
if not rc.value:
let gotData = rdb.store.get(key, onData).valueOr:
const errSym = RdbBeDriverGetError
when extraTraceMessages:
trace logTxt "get", error=errSym, info=error
return err((errSym,error))
if not gotData:
res = EmptyBlob
ok res

View File

@ -15,22 +15,20 @@
import
std/os,
chronicles,
rocksdb/lib/librocksdb,
rocksdb,
results,
../../kvt_desc,
./rdb_desc
logScope:
topics = "kvt-backend"
const
extraTraceMessages = false
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
when extraTraceMessages:
import chronicles
template logTxt(info: static[string]): static[string] =
"RocksDB/init " & info
logScope:
topics = "kvt-backend"
# ------------------------------------------------------------------------------
# Public constructor
@ -52,52 +50,53 @@ proc init*(
dataDir.createDir
except OSError, IOError:
return err((RdbBeCantCreateDataDir, ""))
try:
rdb.cacheDir.createDir
except OSError, IOError:
return err((RdbBeCantCreateTmpDir, ""))
let dbOpts = defaultDbOptions()
dbOpts.setMaxOpenFiles(openMax)
let
cfs = @[initColFamilyDescriptor KvtFamily]
opts = defaultDbOptions()
opts.setMaxOpenFiles(openMax)
let rc = openRocksDb(dataDir, dbOpts)
if rc.isErr:
let error = RdbBeDriverInitError
debug logTxt "driver failed", dataDir, openMax,
error, info=rc.error
return err((RdbBeDriverInitError, rc.error))
# Reserve a family corner for `Kvt` on the database
let baseDb = openRocksDb(dataDir, opts, columnFamilies=cfs).valueOr:
let errSym = RdbBeDriverInitError
when extraTraceMessages:
debug logTxt "init failed", dataDir, openMax, error=errSym, info=error
return err((errSym, error))
rdb.dbOpts = dbOpts
rdb.store = rc.get()
# The following is a default setup (subject to change)
rdb.impOpt = rocksdb_ingestexternalfileoptions_create()
rdb.envOpt = rocksdb_envoptions_create()
# Initialise `Kvt` family
rdb.store = baseDb.withColFamily(KvtFamily).valueOr:
let errSym = RdbBeDriverInitError
when extraTraceMessages:
debug logTxt "init failed", dataDir, openMax, error=errSym, info=error
return err((errSym, error))
ok()
proc init*(
rdb: var RdbInst;
store: ColFamilyReadWrite;
) =
## Piggyback on other database
rdb.store = store # that's it
proc destroy*(rdb: var RdbInst; flush: bool) =
## Destructor
rdb.envOpt.rocksdb_envoptions_destroy()
rdb.impOpt.rocksdb_ingestexternalfileoptions_destroy()
rdb.store.close()
try:
rdb.cacheDir.removeDir
## Destructor (no need to do anything if piggybacked)
if 0 < rdb.basePath.len:
rdb.store.db.close()
if flush:
rdb.dataDir.removeDir
try:
rdb.dataDir.removeDir
# Remove the base folder if it is empty
block done:
for w in rdb.baseDir.walkDirRec:
# Ignore backup files
if 0 < w.len and w[^1] != '~':
break done
rdb.baseDir.removeDir
# Remove the base folder if it is empty
block done:
for w in rdb.baseDir.walkDirRec:
# Ignore backup files
if 0 < w.len and w[^1] != '~':
break done
rdb.baseDir.removeDir
except CatchableError:
discard
except CatchableError:
discard
# ------------------------------------------------------------------------------
# End

View File

@ -14,189 +14,74 @@
{.push raises: [].}
import
std/[algorithm, os, sequtils, strutils, sets, tables],
chronicles,
eth/common,
rocksdb/lib/librocksdb,
stew/byteutils,
rocksdb,
results,
../../kvt_desc,
./rdb_desc
logScope:
topics = "kvt-backend"
type
RdbPutSession = object
writer: ptr rocksdb_sstfilewriter_t
sstPath: string
nRecords: int
const
extraTraceMessages = false or true
extraTraceMessages = false
## Enable additional logging noise
when extraTraceMessages:
import chronicles
logScope:
topics = "kvt-rocksdb"
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
template logTxt(info: static[string]): static[string] =
"RocksDB/put " & info
proc getFileSize(fileName: string): int64 {.used.} =
var f: File
if f.open fileName:
defer: f.close
try:
result = f.getFileSize
except CatchableError:
discard
proc rmFileIgnExpt(fileName: string) =
try:
fileName.removeFile
except CatchableError:
discard
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc destroy(rps: RdbPutSession) =
rps.writer.rocksdb_sstfilewriter_destroy()
rps.sstPath.rmFileIgnExpt
proc begin(
rdb: var RdbInst;
): Result[RdbPutSession,(KvtError,string)] =
## Begin a new bulk load session storing data into a temporary cache file
## `fileName`. When finished, this file will bi direcly imported into the
## database.
var csError: cstring
var session = RdbPutSession(
writer: rocksdb_sstfilewriter_create(rdb.envOpt, rdb.dbOpts.cPtr),
sstPath: rdb.sstFilePath)
if session.writer.isNil:
return err((RdbBeCreateSstWriter, "Cannot create sst writer session"))
session.sstPath.rmFileIgnExpt
session.writer.rocksdb_sstfilewriter_open(
session.sstPath.cstring, cast[cstringArray](csError.addr))
if not csError.isNil:
session.destroy()
let info = $csError
if "no such file or directory" in info.toLowerAscii:
# Somebody might have killed the "tmp" directory?
raiseAssert info
return err((RdbBeOpenSstWriter, info))
ok session
proc add(
session: var RdbPutSession;
key: openArray[byte];
val: openArray[byte];
): Result[void,(KvtError,string)] =
## Append a record to the SST file. Note that consecutive records must be
## strictly increasing.
##
## This function is a wrapper around `rocksdb_sstfilewriter_add()` or
## `rocksdb_sstfilewriter_put()` (stragely enough, there are two functions
## with exactly the same impementation code.)
var csError: cstring
session.writer.rocksdb_sstfilewriter_add(
cast[cstring](unsafeAddr key[0]), csize_t(key.len),
cast[cstring](unsafeAddr val[0]), csize_t(val.len),
cast[cstringArray](csError.addr))
if not csError.isNil:
return err((RdbBeAddSstWriter, $csError))
session.nRecords.inc
ok()
proc commit(
rdb: var RdbInst;
session: RdbPutSession;
): Result[void,(KvtError,string)] =
## Commit collected and cached data to the database. This function implies
## `destroy()` if successful. Otherwise `destroy()` must be called
## explicitely, e.g. after error analysis.
var csError: cstring
if 0 < session.nRecords:
session.writer.rocksdb_sstfilewriter_finish(cast[cstringArray](csError.addr))
if not csError.isNil:
return err((RdbBeFinishSstWriter, $csError))
var sstPath = session.sstPath.cstring
rdb.store.cPtr.rocksdb_ingest_external_file(
cast[cstringArray](sstPath.addr),
1, rdb.impOpt, cast[cstringArray](csError.addr))
if not csError.isNil:
return err((RdbBeIngestSstWriter, $csError))
when extraTraceMessages:
trace logTxt "finished sst", fileSize=session.sstPath.getFileSize
session.destroy()
ok()
proc disposeSession(rdb: var RdbInst) =
rdb.session.close()
rdb.session = WriteBatchRef(nil)
proc `$`(a: Blob): string =
a.toHex
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc begin*(rdb: var RdbInst) =
if rdb.session.isNil:
rdb.session = rdb.store.openWriteBatch()
proc rollback*(rdb: var RdbInst) =
if not rdb.session.isClosed():
rdb.disposeSession()
proc commit*(rdb: var RdbInst): Result[void,(KvtError,string)] =
if not rdb.session.isClosed():
defer: rdb.disposeSession()
rdb.store.write(rdb.session).isOkOr:
const errSym = RdbBeDriverWriteError
when extraTraceMessages:
trace logTxt "commit", error=errSym, info=error
return err((errSym,error))
ok()
proc put*(
rdb: var RdbInst;
tab: Table[Blob,Blob];
): Result[void,(KvtError,string)] =
var session = block:
let rc = rdb.begin()
if rc.isErr:
return err(rc.error)
rc.value
# Vertices with empty table values will be deleted
var delKey: HashSet[Blob]
# Compare `Blob`s as left aligned big endian numbers, right padded with zeros
proc cmpBlobs(a, b: Blob): int =
let minLen = min(a.len, b.len)
for n in 0 ..< minLen:
if a[n] != b[n]:
return a[n].cmp b[n]
if a.len < b.len:
return -1
if b.len < a.len:
return 1
for key in tab.keys.toSeq.sorted cmpBlobs:
let val = tab.getOrVoid key
if val.isValid:
let rc = session.add(key, val)
if rc.isErr:
session.destroy()
return err(rc.error)
rdb: RdbInst;
data: openArray[(Blob,Blob)];
): Result[void,(Blob,KvtError,string)] =
let dsc = rdb.session
for (key,val) in data:
if val.len == 0:
dsc.delete(key, rdb.store.name).isOkOr:
const errSym = RdbBeDriverDelError
when extraTraceMessages:
trace logTxt "del", key, error=errSym, info=error
return err((key,errSym,error))
else:
delKey.incl key
block:
let rc = rdb.commit session
if rc.isErr:
trace logTxt "commit error", error=rc.error[0], info=rc.error[1]
return err(rc.error)
# Delete vertices after successfully updating vertices with non-zero values.
for key in delKey:
let rc = rdb.store.delete key
if rc.isErr:
return err((RdbBeDriverDelError,rc.error))
dsc.put(key, val, rdb.store.name).isOkOr:
const errSym = RdbBeDriverPutError
when extraTraceMessages:
trace logTxt "put", key, error=errSym, info=error
return err((key,errSym,error))
ok()
# ------------------------------------------------------------------------------

View File

@ -14,12 +14,21 @@
{.push raises: [].}
import
std/sequtils,
eth/common,
rocksdb/lib/librocksdb,
rocksdb,
./rdb_desc
const
extraTraceMessages = false
## Enable additional logging noise
when extraTraceMessages:
import
chronicles
logScope:
topics = "aristo-rocksdb"
# ------------------------------------------------------------------------------
# Public iterators
# ------------------------------------------------------------------------------
@ -27,32 +36,17 @@ import
iterator walk*(rdb: RdbInst): tuple[key: Blob, data: Blob] =
## Walk over all key-value pairs of the database.
##
let
readOptions = rocksdb_readoptions_create()
rit = rdb.store.cPtr.rocksdb_create_iterator(readOptions)
defer:
rit.rocksdb_iter_destroy()
readOptions.rocksdb_readoptions_destroy()
## Non-decodable entries are stepped over and ignored.
block walkBody:
let rit = rdb.store.openIterator().valueOr:
when extraTraceMessages:
trace logTxt "walk", pfx="all", error
break walkBody
defer: rit.close()
rit.rocksdb_iter_seek_to_first()
while rit.rocksdb_iter_valid() != 0:
var kLen: csize_t
let kData = rit.rocksdb_iter_key(addr kLen)
if not kData.isNil and 0 < kLen:
var vLen: csize_t
let vData = rit.rocksdb_iter_value(addr vLen)
if not vData.isNil and 0 < vLen:
let
key = kData.toOpenArrayByte(0,int(kLen)-1).toSeq
data = vData.toOpenArrayByte(0,int(vLen)-1).toSeq
yield (key,data)
# Update Iterator (might overwrite kData/vdata)
rit.rocksdb_iter_next()
# End while
for (key,val) in rit.pairs:
if 0 < key.len:
yield (key, val)
# ------------------------------------------------------------------------------
# End

View File

@ -1,5 +1,5 @@
# nimbus-eth1
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)

2
vendor/nim-rocksdb vendored

@ -1 +1 @@
Subproject commit f37d7d486caa7a21490d96caca7f0103295c5e46
Subproject commit eb594e33b29d1475d8f1ba7819bfb7ac5fa48ea3