From d6a42053249c8745ed359e206c6ba37bb08d0116 Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Tue, 16 Apr 2024 20:39:11 +0000 Subject: [PATCH] Aristo update rocksdb backend drivers (#2135) * Aristo+RocksDB: Update backend drivers why: RocksDB update allows use some of the newly provided methods which were previously implemented by using the very C backend (for the lack of NIM methods.) * Aristo+RocksDB: Simplify drivers wrapper * Kvt: Update backend drivers and wrappers similar to `Aristo` * Aristo+Kvm: Use column families for RocksDB * Aristo+MemoryDB: Code cosmetics * Aristo: Provide guest column family for export why: So `Kvt` can piggyback on `Aristo` so there avoiding to run a second DBMS system in parallel. * Kvt: Provide import mechanism for RoksDB guest column family why: So `Kvt` can piggyback on `Aristo` so there avoiding to run a second DBMS system in parallel. * CoreDb+Aristo: Run persistent `Kvt` DB piggybacked on `Aristo` why: Avoiding to run two DBMS systems in parallel. * Fix copyright year * Ditto --- nimbus/db/aristo/aristo_desc/desc_backend.nim | 17 ++ nimbus/db/aristo/aristo_desc/desc_error.nim | 12 +- nimbus/db/aristo/aristo_init/init_common.nim | 9 +- nimbus/db/aristo/aristo_init/memory_db.nim | 51 +++-- nimbus/db/aristo/aristo_init/memory_only.nim | 10 + nimbus/db/aristo/aristo_init/persistent.nim | 9 + nimbus/db/aristo/aristo_init/rocks_db.nim | 215 +++++++++++------- .../aristo/aristo_init/rocks_db/rdb_desc.nim | 44 ++-- .../aristo/aristo_init/rocks_db/rdb_get.nim | 35 ++- .../aristo/aristo_init/rocks_db/rdb_init.nim | 76 ++++--- .../aristo/aristo_init/rocks_db/rdb_put.nim | 201 ++++------------ .../aristo/aristo_init/rocks_db/rdb_walk.nim | 140 ++++-------- nimbus/db/core_db/backend/aristo_db.nim | 74 ++---- nimbus/db/core_db/backend/aristo_rocksdb.nim | 33 +-- nimbus/db/kvt/kvt_desc/desc_error.nim | 12 +- nimbus/db/kvt/kvt_init/init_common.nim | 1 + nimbus/db/kvt/kvt_init/persistent.nim | 17 +- nimbus/db/kvt/kvt_init/rocks_db.nim | 107 +++++---- nimbus/db/kvt/kvt_init/rocks_db/rdb_desc.nim | 22 +- nimbus/db/kvt/kvt_init/rocks_db/rdb_get.nim | 25 +- nimbus/db/kvt/kvt_init/rocks_db/rdb_init.nim | 87 ++++--- nimbus/db/kvt/kvt_init/rocks_db/rdb_put.nim | 211 ++++------------- nimbus/db/kvt/kvt_init/rocks_db/rdb_walk.nim | 48 ++-- nimbus/db/kvt/kvt_persistent.nim | 2 +- vendor/nim-rocksdb | 2 +- 25 files changed, 637 insertions(+), 823 deletions(-) diff --git a/nimbus/db/aristo/aristo_desc/desc_backend.nim b/nimbus/db/aristo/aristo_desc/desc_backend.nim index e9adebe8b..f037661a3 100644 --- a/nimbus/db/aristo/aristo_desc/desc_backend.nim +++ b/nimbus/db/aristo/aristo_desc/desc_backend.nim @@ -91,6 +91,20 @@ type # ------------- + GuestDbFn* = + proc(): Result[RootRef,AristoError] {.gcsafe, raises: [].} + ## Generic function that returns a compartmentalised database handle that + ## can be used by another application. If non-nil, this handle allows to + ## use a previously allocated database. It is separated from the `Aristo` + ## columns. + ## + ## A successful return value might be `nil` if this feature is + ## unsupported. + ## + ## Caveat: + ## The guest database is closed automatically when closing the `Aristo` + ## database. + CloseFn* = proc(flush: bool) {.gcsafe, raises: [].} ## Generic destructor for the `Aristo DB` backend. The argument `flush` @@ -119,6 +133,8 @@ type putFqsFn*: PutFqsFn ## Store filter ID state putEndFn*: PutEndFn ## Commit bulk store session + guestDbFn*: GuestDbFn ## Piggyback DB for another application + closeFn*: CloseFn ## Generic destructor proc init*(trg: var BackendObj; src: BackendObj) = @@ -135,6 +151,7 @@ proc init*(trg: var BackendObj; src: BackendObj) = trg.putIdgFn = src.putIdgFn trg.putFqsFn = src.putFqsFn trg.putEndFn = src.putEndFn + trg.guestDbFn = src.guestDbFn trg.closeFn = src.closeFn # ------------------------------------------------------------------------------ diff --git a/nimbus/db/aristo/aristo_desc/desc_error.nim b/nimbus/db/aristo/aristo_desc/desc_error.nim index 4c6eb2631..6d050c74b 100644 --- a/nimbus/db/aristo/aristo_desc/desc_error.nim +++ b/nimbus/db/aristo/aristo_desc/desc_error.nim @@ -242,14 +242,12 @@ type # RocksDB backend RdbBeCantCreateDataDir RdbBeCantCreateTmpDir - RdbBeDriverInitError - RdbBeDriverGetError RdbBeDriverDelError - RdbBeCreateSstWriter - RdbBeOpenSstWriter - RdbBeAddSstWriter - RdbBeFinishSstWriter - RdbBeIngestSstWriter + RdbBeDriverGetError + RdbBeDriverGuestError + RdbBeDriverInitError + RdbBeDriverPutError + RdbBeDriverWriteError RdbHashKeyExpected # Transaction wrappers diff --git a/nimbus/db/aristo/aristo_init/init_common.nim b/nimbus/db/aristo/aristo_init/init_common.nim index 2e49d8785..7186541b8 100644 --- a/nimbus/db/aristo/aristo_init/init_common.nim +++ b/nimbus/db/aristo/aristo_init/init_common.nim @@ -35,6 +35,10 @@ type ## Access keys for admin table records. When exposed (e.g. when itereating ## over the tables), this data type is to be used. + GuestDbRef* = ref object of RootRef + ## Object returned from `GuestDbFn` (if any) + beKind*: BackendType ## Backend type identifier + TypedBackendRef* = ref TypedBackendObj TypedBackendObj* = object of BackendObj beKind*: BackendType ## Backend type identifier @@ -48,9 +52,12 @@ type vid*: VertexID ## Vertex ID where the error occured of FilPfx: qid*: QueueID ## Ditto - of AdmPfx, Oops: + of AdmPfx: + aid*: AdminTabID + of Oops: discard code*: AristoError ## Error code (if any) + info*: string ## Error description (if any) TypedPutHdlRef* = ref object of PutHdlRef error*: TypedPutHdlErrRef ## Track error while collecting transaction diff --git a/nimbus/db/aristo/aristo_init/memory_db.nim b/nimbus/db/aristo/aristo_init/memory_db.nim index fda152ae0..714ac087e 100644 --- a/nimbus/db/aristo/aristo_init/memory_db.nim +++ b/nimbus/db/aristo/aristo_init/memory_db.nim @@ -28,7 +28,6 @@ import std/[algorithm, options, sequtils, tables], - chronicles, eth/common, results, ../aristo_constants, @@ -37,6 +36,10 @@ import ../aristo_blobify, ./init_common +const + extraTraceMessages = false or true + ## Enabled additional logging noise + type MemDbRef = ref object ## Database @@ -58,13 +61,18 @@ type vGen: Option[seq[VertexID]] vFqs: Option[seq[(QueueID,QueueID)]] +when extraTraceMessages: + import chronicles + + logScope: + topics = "aristo-backend" + # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ template logTxt(info: static[string]): static[string] = - "MemoryDB " & info - + "MemoryDB/" & info proc newSession(db: MemBackendRef): MemPutHdlRef = new result @@ -88,9 +96,10 @@ proc getVtxFn(db: MemBackendRef): GetVtxFn = # Fetch serialised data record let data = db.mdb.sTab.getOrDefault(vid, EmptyBlob) if 0 < data.len: - let rc = data.deblobify VertexRef - if rc.isErr: - debug logTxt "getVtxFn() failed", vid, error=rc.error, info=rc.error + let rc = data.deblobify(VertexRef) + when extraTraceMessages: + if rc.isErr: + trace logTxt "getVtxFn() failed", vid, error=rc.error return rc err(GetVtxNotFound) @@ -211,6 +220,7 @@ proc putFqsFn(db: MemBackendRef): PutFqsFn = if hdl.error.isNil: hdl.error = TypedPutHdlErrRef( pfx: AdmPfx, + aid: AdmTabIdFqs, code: FilQuSchedDisabled) else: result = @@ -225,16 +235,16 @@ proc putEndFn(db: MemBackendRef): PutEndFn = proc(hdl: PutHdlRef): Result[void,AristoError] = let hdl = hdl.endSession db if not hdl.error.isNil: - case hdl.error.pfx: - of VtxPfx, KeyPfx: - debug logTxt "putEndFn: vtx/key failed", + when extraTraceMessages: + case hdl.error.pfx: + of VtxPfx, KeyPfx: trace logTxt "putEndFn: vtx/key failed", pfx=hdl.error.pfx, vid=hdl.error.vid, error=hdl.error.code - of FilPfx: - debug logTxt "putEndFn: filter failed", + of FilPfx: trace logTxt "putEndFn: filter failed", pfx=hdl.error.pfx, qid=hdl.error.qid, error=hdl.error.code - else: - debug logTxt "putEndFn: failed", - pfx=hdl.error.pfx, error=hdl.error.code + of AdmPfx: trace logTxt "putEndFn: admin failed", + pfx=AdmPfx, aid=hdl.error.aid.uint64, error=hdl.error.code + of Oops: trace logTxt "putEndFn: failed", + pfx=hdl.error.pfx, error=hdl.error.code return err(hdl.error.code) for (vid,data) in hdl.sTab.pairs: @@ -273,6 +283,11 @@ proc putEndFn(db: MemBackendRef): PutEndFn = # ------------- +proc guestDbFn(db: MemBackendRef): GuestDbFn = + result = + proc(): Result[RootRef,AristoError] = + ok(RootRef nil) + proc closeFn(db: MemBackendRef): CloseFn = result = proc(ignore: bool) = @@ -303,6 +318,8 @@ proc memoryBackend*(qidLayout: QidLayoutRef): BackendRef = db.putFqsFn = putFqsFn db db.putEndFn = putEndFn db + db.guestDbFn = guestDbFn db + db.closeFn = closeFn db # Set up filter management table @@ -330,7 +347,8 @@ iterator walkVtx*( if 0 < data.len: let rc = data.deblobify VertexRef if rc.isErr: - debug logTxt "walkVtxFn() skip", n, vid, error=rc.error + when extraTraceMessages: + debug logTxt "walkVtxFn() skip", n, vid, error=rc.error else: yield (vid, rc.value) @@ -353,7 +371,8 @@ iterator walkFil*( if 0 < data.len: let rc = data.deblobify FilterRef if rc.isErr: - debug logTxt "walkFilFn() skip", n, qid, error=rc.error + when extraTraceMessages: + debug logTxt "walkFilFn() skip", n, qid, error=rc.error else: yield (qid, rc.value) diff --git a/nimbus/db/aristo/aristo_init/memory_only.nim b/nimbus/db/aristo/aristo_init/memory_only.nim index 6b92d892e..b0505a38e 100644 --- a/nimbus/db/aristo/aristo_init/memory_only.nim +++ b/nimbus/db/aristo/aristo_init/memory_only.nim @@ -14,6 +14,7 @@ {.push raises: [].} import + results, ../aristo_desc, ../aristo_desc/desc_backend, "."/[init_common, memory_db] @@ -26,6 +27,7 @@ type export BackendType, + GuestDbRef, MemBackendRef, QidLayoutRef @@ -86,6 +88,14 @@ proc init*( ## Shortcut for `AristoDbRef.init(VoidBackendRef)` AristoDbRef.init VoidBackendRef +proc guestDb*(db: AristoDbRef): Result[GuestDbRef,AristoError] = + ## Database pigiback feature + if db.backend.isNil: + ok(GuestDbRef(nil)) + else: + let gdb = db.backend.guestDbFn().valueOr: + return err(error) + ok(gdb.GuestDbRef) proc finish*(db: AristoDbRef; flush = false) = ## Backend destructor. The argument `flush` indicates that a full database diff --git a/nimbus/db/aristo/aristo_init/persistent.nim b/nimbus/db/aristo/aristo_init/persistent.nim index 1e4c1a658..5a16fb95a 100644 --- a/nimbus/db/aristo/aristo_init/persistent.nim +++ b/nimbus/db/aristo/aristo_init/persistent.nim @@ -20,8 +20,11 @@ import results, + rocksdb, ../aristo_desc, + ./rocks_db/rdb_desc, "."/[rocks_db, memory_only] + export RdbBackendRef, memory_only @@ -79,6 +82,12 @@ proc init*[W: RdbBackendRef]( when B is RdbBackendRef: basePath.newAristoRdbDbRef DEFAULT_QID_QUEUES.to(QidLayoutRef) +proc getRocksDbFamily*(gdb: GuestDbRef): Result[ColFamilyReadWrite,void] = + ## Database pigiback feature + if not gdb.isNil and gdb.beKind == BackendRocksDB: + return ok RdbGuestDbRef(gdb).guestDb + err() + # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/db/aristo/aristo_init/rocks_db.nim b/nimbus/db/aristo/aristo_init/rocks_db.nim index e21495745..7a061cb47 100644 --- a/nimbus/db/aristo/aristo_init/rocks_db.nim +++ b/nimbus/db/aristo/aristo_init/rocks_db.nim @@ -28,7 +28,6 @@ {.warning: "*** importing rocks DB which needs a linker library".} import - chronicles, eth/common, rocksdb, results, @@ -39,23 +38,23 @@ import ./init_common, ./rocks_db/[rdb_desc, rdb_get, rdb_init, rdb_put, rdb_walk] -logScope: - topics = "aristo-backend" +const + maxOpenFiles = 512 ## Rocks DB setup, open files limit + + extraTraceMessages = false + ## Enabled additional logging noise type RdbBackendRef* = ref object of TypedBackendRef rdb: RdbInst ## Allows low level access to database RdbPutHdlRef = ref object of TypedPutHdlRef - cache: RdbTabs ## Transaction cache -const - extraTraceMessages = false or true - ## Enabled additional logging noise +when extraTraceMessages: + import chronicles - # ---------- - - maxOpenFiles = 512 ## Rocks DB setup, open files limit + logScope: + topics = "aristo-backend" # ------------------------------------------------------------------------------ # Private helpers @@ -77,19 +76,6 @@ proc endSession(hdl: PutHdlRef; db: RdbBackendRef): RdbPutHdlRef = hdl.TypedPutHdlRef.finishSession db hdl.RdbPutHdlRef - -proc `vtxCache=`(hdl: RdbPutHdlRef; val: tuple[vid: VertexID; data: Blob]) = - hdl.cache[VtxPfx][val.vid.uint64] = val.data - -proc `keyCache=`(hdl: RdbPutHdlRef; val: tuple[vid: VertexID; data: Blob]) = - hdl.cache[KeyPfx][val.vid.uint64] = val.data - -proc `filCache=`(hdl: RdbPutHdlRef; val: tuple[qid: QueueID; data: Blob]) = - hdl.cache[FilPfx][val.qid.uint64] = val.data - -proc `admCache=`(hdl: RdbPutHdlRef; val: tuple[id: AdminTabID; data: Blob]) = - hdl.cache[AdmPfx][val.id.uint64] = val.data - # ------------------------------------------------------------------------------ # Private functions: interface # ------------------------------------------------------------------------------ @@ -99,15 +85,14 @@ proc getVtxFn(db: RdbBackendRef): GetVtxFn = proc(vid: VertexID): Result[VertexRef,AristoError] = # Fetch serialised data record - let rc = db.rdb.get vid.toOpenArray(VtxPfx) - if rc.isErr: - debug logTxt "getVtxFn() failed", vid, - error=rc.error[0], info=rc.error[1] - return err(rc.error[0]) + let data = db.rdb.get(VtxPfx, vid.uint64).valueOr: + when extraTraceMessages: + trace logTxt "getVtxFn() failed", vid, error=error[0], info=error[1] + return err(error[0]) # Decode data record - if 0 < rc.value.len: - return rc.value.deblobify VertexRef + if 0 < data.len: + return data.deblobify VertexRef err(GetVtxNotFound) @@ -116,15 +101,14 @@ proc getKeyFn(db: RdbBackendRef): GetKeyFn = proc(vid: VertexID): Result[HashKey,AristoError] = # Fetch serialised data record - let rc = db.rdb.get vid.toOpenArray(KeyPfx) - if rc.isErr: - debug logTxt "getKeyFn: failed", vid, - error=rc.error[0], info=rc.error[1] - return err(rc.error[0]) + let data = db.rdb.get(KeyPfx, vid.uint64).valueOr: + when extraTraceMessages: + trace logTxt "getKeyFn: failed", vid, error=error[0], info=error[1] + return err(error[0]) # Decode data record - if 0 < rc.value.len: - let lid = HashKey.fromBytes(rc.value).valueOr: + if 0 < data.len: + let lid = HashKey.fromBytes(data).valueOr: return err(RdbHashKeyExpected) return ok lid @@ -140,15 +124,14 @@ proc getFilFn(db: RdbBackendRef): GetFilFn = proc(qid: QueueID): Result[FilterRef,AristoError] = # Fetch serialised data record - let rc = db.rdb.get qid.toOpenArray() - if rc.isErr: - debug logTxt "getFilFn: failed", qid, - error=rc.error[0], info=rc.error[1] - return err(rc.error[0]) + let data = db.rdb.get(FilPfx, qid.uint64).valueOr: + when extraTraceMessages: + trace logTxt "getFilFn: failed", qid, error=error[0], info=error[1] + return err(error[0]) # Decode data record - if 0 < rc.value.len: - return rc.value.deblobify FilterRef + if 0 < data.len: + return data.deblobify FilterRef err(GetFilNotFound) @@ -157,17 +140,18 @@ proc getIdgFn(db: RdbBackendRef): GetIdgFn = proc(): Result[seq[VertexID],AristoError]= # Fetch serialised data record - let rc = db.rdb.get AdmTabIdIdg.toOpenArray() - if rc.isErr: - debug logTxt "getIdgFn: failed", error=rc.error[1] - return err(rc.error[0]) - - if rc.value.len == 0: - let w = EmptyVidSeq - return ok w + let data = db.rdb.get(AdmPfx, AdmTabIdIdg.uint64).valueOr: + when extraTraceMessages: + trace logTxt "getIdgFn: failed", error=error[0], info=error[1] + return err(error[0]) # Decode data record - rc.value.deblobify seq[VertexID] + if data.len == 0: + let w = EmptyVidSeq # Must be `let` + return ok w # Compiler error with `ok(EmptyVidSeq)` + + # Decode data record + data.deblobify seq[VertexID] proc getFqsFn(db: RdbBackendRef): GetFqsFn = if db.rdb.noFq: @@ -179,23 +163,24 @@ proc getFqsFn(db: RdbBackendRef): GetFqsFn = proc(): Result[seq[(QueueID,QueueID)],AristoError]= # Fetch serialised data record - let rc = db.rdb.get AdmTabIdFqs.toOpenArray() - if rc.isErr: - debug logTxt "getFqsFn: failed", error=rc.error[1] - return err(rc.error[0]) + let data = db.rdb.get(AdmPfx, AdmTabIdFqs.uint64).valueOr: + when extraTraceMessages: + trace logTxt "getFqsFn: failed", error=error[0], info=error[1] + return err(error[0]) - if rc.value.len == 0: - let w = EmptyQidPairSeq - return ok w + if data.len == 0: + let w = EmptyQidPairSeq # Must be `let` + return ok w # Compiler error with `ok(EmptyQidPairSeq)` # Decode data record - rc.value.deblobify seq[(QueueID,QueueID)] + data.deblobify seq[(QueueID,QueueID)] # ------------- proc putBegFn(db: RdbBackendRef): PutBegFn = result = proc(): PutHdlRef = + db.rdb.begin() db.newSession() @@ -204,6 +189,9 @@ proc putVtxFn(db: RdbBackendRef): PutVtxFn = proc(hdl: PutHdlRef; vrps: openArray[(VertexID,VertexRef)]) = let hdl = hdl.getSession db if hdl.error.isNil: + + # Collect batch session arguments + var batch: seq[(uint64,Blob)] for (vid,vtx) in vrps: if vtx.isValid: let rc = vtx.blobify() @@ -213,20 +201,39 @@ proc putVtxFn(db: RdbBackendRef): PutVtxFn = vid: vid, code: rc.error) return - hdl.vtxCache = (vid, rc.value) + batch.add (vid.uint64, rc.value) else: - hdl.vtxCache = (vid, EmptyBlob) + batch.add (vid.uint64, EmptyBlob) + + # Stash batch session data + db.rdb.put(VtxPfx, batch).isOkOr: + hdl.error = TypedPutHdlErrRef( + pfx: VtxPfx, + vid: VertexID(error[0]), + code: error[1], + info: error[2]) proc putKeyFn(db: RdbBackendRef): PutKeyFn = result = proc(hdl: PutHdlRef; vkps: openArray[(VertexID,HashKey)]) = let hdl = hdl.getSession db if hdl.error.isNil: + + # Collect batch session arguments + var batch: seq[(uint64,Blob)] for (vid,key) in vkps: if key.isValid: - hdl.keyCache = (vid, @key) + batch.add (vid.uint64, @key) else: - hdl.keyCache = (vid, EmptyBlob) + batch.add (vid.uint64, EmptyBlob) + + # Stash batch session data + db.rdb.put(KeyPfx, batch).isOkOr: + hdl.error = TypedPutHdlErrRef( + pfx: KeyPfx, + vid: VertexID(error[0]), + code: error[1], + info: error[2]) proc putFilFn(db: RdbBackendRef): PutFilFn = if db.rdb.noFq: @@ -243,6 +250,9 @@ proc putFilFn(db: RdbBackendRef): PutFilFn = proc(hdl: PutHdlRef; vrps: openArray[(QueueID,FilterRef)]) = let hdl = hdl.getSession db if hdl.error.isNil: + + # Collect batch session arguments + var batch: seq[(uint64,Blob)] for (qid,filter) in vrps: if filter.isValid: let rc = filter.blobify() @@ -252,19 +262,30 @@ proc putFilFn(db: RdbBackendRef): PutFilFn = qid: qid, code: rc.error) return - hdl.filCache = (qid, rc.value) + batch.add (qid.uint64, rc.value) else: - hdl.filCache = (qid, EmptyBlob) + batch.add (qid.uint64, EmptyBlob) + + # Stash batch session data + db.rdb.put(FilPfx, batch).isOkOr: + hdl.error = TypedPutHdlErrRef( + pfx: FilPfx, + qid: QueueID(error[0]), + code: error[1], + info: error[2]) proc putIdgFn(db: RdbBackendRef): PutIdgFn = result = proc(hdl: PutHdlRef; vs: openArray[VertexID]) = let hdl = hdl.getSession db if hdl.error.isNil: - if 0 < vs.len: - hdl.admCache = (AdmTabIdIdg, vs.blobify) - else: - hdl.admCache = (AdmTabIdIdg, EmptyBlob) + let idg = if 0 < vs.len: vs.blobify else: EmptyBlob + db.rdb.put(AdmPfx, @[(AdmTabIdIdg.uint64, idg)]).isOkOr: + hdl.error = TypedPutHdlErrRef( + pfx: AdmPfx, + aid: AdmTabIdIdg, + code: error[1], + info: error[2]) proc putFqsFn(db: RdbBackendRef): PutFqsFn = if db.rdb.noFq: @@ -280,10 +301,15 @@ proc putFqsFn(db: RdbBackendRef): PutFqsFn = proc(hdl: PutHdlRef; vs: openArray[(QueueID,QueueID)]) = let hdl = hdl.getSession db if hdl.error.isNil: - if 0 < vs.len: - hdl.admCache = (AdmTabIdFqs, vs.blobify) - else: - hdl.admCache = (AdmTabIdFqs, EmptyBlob) + + # Stash batch session data + let fqs = if 0 < vs.len: vs.blobify else: EmptyBlob + db.rdb.put(AdmPfx, @[(AdmTabIdFqs.uint64, fqs)]).isOkOr: + hdl.error = TypedPutHdlErrRef( + pfx: AdmPfx, + aid: AdmTabIdFqs, + code: error[1], + info: error[2]) proc putEndFn(db: RdbBackendRef): PutEndFn = @@ -291,22 +317,33 @@ proc putEndFn(db: RdbBackendRef): PutEndFn = proc(hdl: PutHdlRef): Result[void,AristoError] = let hdl = hdl.endSession db if not hdl.error.isNil: - case hdl.error.pfx: - of VtxPfx, KeyPfx: - debug logTxt "putEndFn: vtx/key failed", - pfx=hdl.error.pfx, vid=hdl.error.vid, error=hdl.error.code - else: - debug logTxt "putEndFn: failed", - pfx=hdl.error.pfx, error=hdl.error.code - return err(hdl.error.code) - let rc = db.rdb.put hdl.cache - if rc.isErr: when extraTraceMessages: - debug logTxt "putEndFn: failed", - error=rc.error[0], info=rc.error[1] - return err(rc.error[0]) + case hdl.error.pfx: + of VtxPfx, KeyPfx: trace logTxt "putEndFn: vtx/key failed", + pfx=hdl.error.pfx, vid=hdl.error.vid, error=hdl.error.code + of FilPfx: trace logTxt "putEndFn: filter failed", + pfx=FilPfx, qid=hdl.error.qid, error=hdl.error.code + of AdmPfx: trace logTxt "putEndFn: admin failed", + pfx=AdmPfx, aid=hdl.error.aid.uint64, error=hdl.error.code + of Oops: trace logTxt "putEndFn: oops", + error=hdl.error.code + return err(hdl.error.code) + + # Commit session + db.rdb.commit().isOkOr: + when extraTraceMessages: + trace logTxt "putEndFn: failed", error=($error[0]), info=error[1] + return err(error[0]) ok() +proc guestDbFn(db: RdbBackendRef): GuestDbFn = + result = + proc(): Result[RootRef,AristoError] = + let gdb = db.rdb.guestDb().valueOr: + when extraTraceMessages: + trace logTxt "guestDbFn", error=error[0], info=error[1] + return err(error[0]) + ok gdb proc closeFn(db: RdbBackendRef): CloseFn = result = @@ -349,6 +386,8 @@ proc rocksDbBackend*( db.putFqsFn = putFqsFn db db.putEndFn = putEndFn db + db.guestDbFn = guestDbFn db + db.closeFn = closeFn db # Set up filter management table diff --git a/nimbus/db/aristo/aristo_init/rocks_db/rdb_desc.nim b/nimbus/db/aristo/aristo_init/rocks_db/rdb_desc.nim index 63b7e6adb..3d219cea1 100644 --- a/nimbus/db/aristo/aristo_init/rocks_db/rdb_desc.nim +++ b/nimbus/db/aristo/aristo_init/rocks_db/rdb_desc.nim @@ -14,9 +14,7 @@ {.push raises: [].} import - std/[tables, os], - eth/common, - rocksdb/lib/librocksdb, + std/os, rocksdb, stew/endians2, ../../aristo_desc, @@ -24,58 +22,42 @@ import type RdbInst* = object - dbOpts*: DbOptionsRef - store*: RocksDbReadWriteRef ## Rocks DB database handler + store*: ColFamilyReadWrite ## Rocks DB database handler + session*: WriteBatchRef ## For batched `put()` basePath*: string ## Database directory noFq*: bool ## No filter queues available - # Low level Rocks DB access for bulk store - envOpt*: ptr rocksdb_envoptions_t - impOpt*: ptr rocksdb_ingestexternalfileoptions_t + RdbGuestDbRef* = ref object of GuestDbRef + guestDb*: ColFamilyReadWrite ## Pigiback feature reference RdbKey* = array[1 + sizeof VertexID, byte] ## Sub-table key, + VertexID - RdbTabs* = array[StorageType, Table[uint64,Blob]] - ## Combined table for caching data to be stored/updated - const - BaseFolder* = "nimbus" # Same as for Legacy DB - DataFolder* = "aristo" # Legacy DB has "data" - SstCache* = "bulkput" # Rocks DB bulk load file name in temp folder - TempFolder* = "tmp" # No `tmp` directory used with legacy DB + GuestFamily* = "Guest" ## Guest family (e.g. for Kvt) + AristoFamily* = "Aristo" ## RocksDB column family + BaseFolder* = "nimbus" ## Same as for Legacy DB + DataFolder* = "aristo" ## Legacy DB has "data" # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ +template logTxt*(info: static[string]): static[string] = + "RocksDB/" & info + + func baseDir*(rdb: RdbInst): string = rdb.basePath / BaseFolder func dataDir*(rdb: RdbInst): string = rdb.baseDir / DataFolder -func cacheDir*(rdb: RdbInst): string = - rdb.dataDir / TempFolder - -func sstFilePath*(rdb: RdbInst): string = - rdb.cacheDir / SstCache - - func toRdbKey*(id: uint64; pfx: StorageType): RdbKey = let idKey = id.toBytesBE result[0] = pfx.ord.byte copyMem(addr result[1], unsafeAddr idKey, sizeof idKey) -template toOpenArray*(vid: VertexID; pfx: StorageType): openArray[byte] = - vid.uint64.toRdbKey(pfx).toOpenArray(0, sizeof uint64) - -template toOpenArray*(qid: QueueID): openArray[byte] = - qid.uint64.toRdbKey(FilPfx).toOpenArray(0, sizeof uint64) - -template toOpenArray*(aid: AdminTabID): openArray[byte] = - aid.uint64.toRdbKey(AdmPfx).toOpenArray(0, sizeof uint64) - # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/db/aristo/aristo_init/rocks_db/rdb_get.nim b/nimbus/db/aristo/aristo_init/rocks_db/rdb_get.nim index 30193ce3d..6182d8f9f 100644 --- a/nimbus/db/aristo/aristo_init/rocks_db/rdb_get.nim +++ b/nimbus/db/aristo/aristo_init/rocks_db/rdb_get.nim @@ -1,5 +1,5 @@ # nimbus-eth1 -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or # http://www.apache.org/licenses/LICENSE-2.0) @@ -17,24 +17,41 @@ import eth/common, rocksdb, results, - "../.."/[aristo_constants, aristo_desc], + ../../aristo_desc, + ../init_common, ./rdb_desc +const + extraTraceMessages = false + ## Enable additional logging noise + +when extraTraceMessages: + import + chronicles + + logScope: + topics = "aristo-rocksdb" + # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ proc get*( rdb: RdbInst; - key: openArray[byte], + pfx: StorageType; + xid: uint64, ): Result[Blob,(AristoError,string)] = var res: Blob - let onData: DataProc = proc(data: openArray[byte]) = - res = @data - let rc = rdb.store.get(key, onData) - if rc.isErr: - return err((RdbBeDriverGetError,rc.error)) - if not rc.value: + let onData = proc(data: openArray[byte]) = + res = @data + + let gotData = rdb.store.get(xid.toRdbKey pfx, onData).valueOr: + const errSym = RdbBeDriverGetError + when extraTraceMessages: + trace logTxt "get", error=errSym, info=error + return err((errSym,error)) + + if not gotData: res = EmptyBlob ok res diff --git a/nimbus/db/aristo/aristo_init/rocks_db/rdb_init.nim b/nimbus/db/aristo/aristo_init/rocks_db/rdb_init.nim index 7c9001469..c63e40cc5 100644 --- a/nimbus/db/aristo/aristo_init/rocks_db/rdb_init.nim +++ b/nimbus/db/aristo/aristo_init/rocks_db/rdb_init.nim @@ -15,22 +15,21 @@ import std/os, - chronicles, - rocksdb/lib/librocksdb, rocksdb, results, ../../aristo_desc, ./rdb_desc -logScope: - topics = "aristo-backend" +const + extraTraceMessages = false + ## Enable additional logging noise -# ------------------------------------------------------------------------------ -# Private helpers -# ------------------------------------------------------------------------------ +when extraTraceMessages: + import + chronicles -template logTxt(info: static[string]): static[string] = - "RocksDB/init " & info + logScope: + topics = "aristo-rocksdb" # ------------------------------------------------------------------------------ # Public constructor @@ -52,40 +51,47 @@ proc init*( dataDir.createDir except OSError, IOError: return err((RdbBeCantCreateDataDir, "")) - try: - rdb.cacheDir.createDir - except OSError, IOError: - return err((RdbBeCantCreateTmpDir, "")) - let dbOpts = defaultDbOptions() - dbOpts.setMaxOpenFiles(openMax) + let + cfs = @[initColFamilyDescriptor AristoFamily, + initColFamilyDescriptor GuestFamily] + opts = defaultDbOptions() + opts.setMaxOpenFiles(openMax) - let rc = openRocksDb(dataDir, dbOpts) - if rc.isErr: - let error = RdbBeDriverInitError - debug logTxt "driver failed", dataDir, openMax, - error, info=rc.error - return err((RdbBeDriverInitError, rc.error)) + # Reserve a family corner for `Aristo` on the database + let baseDb = openRocksDb(dataDir, opts, columnFamilies=cfs).valueOr: + let errSym = RdbBeDriverInitError + when extraTraceMessages: + trace logTxt "init failed", dataDir, openMax, error=errSym, info=error + return err((errSym, error)) - rdb.dbOpts = dbOpts - rdb.store = rc.get() + # Initialise `Aristo` family + rdb.store = baseDb.withColFamily(AristoFamily).valueOr: + let errSym = RdbBeDriverInitError + when extraTraceMessages: + trace logTxt "init failed", dataDir, openMax, error=errSym, info=error + return err((errSym, error)) - # The following is a default setup (subject to change) - rdb.impOpt = rocksdb_ingestexternalfileoptions_create() - rdb.envOpt = rocksdb_envoptions_create() ok() +proc guestDb*(rdb: RdbInst): Result[RootRef,(AristoError,string)] = + # Initialise `Guest` family + let guestDb = rdb.store.db.withColFamily(GuestFamily).valueOr: + let errSym = RdbBeDriverGuestError + when extraTraceMessages: + trace logTxt "guestDb failed", error=errSym, info=error + return err((errSym, error)) + + ok RdbGuestDbRef( + beKind: BackendRocksDB, + guestDb: guestDb) proc destroy*(rdb: var RdbInst; flush: bool) = ## Destructor - rdb.envOpt.rocksdb_envoptions_destroy() - rdb.impOpt.rocksdb_ingestexternalfileoptions_destroy() - rdb.store.close() + rdb.store.db.close() - try: - rdb.cacheDir.removeDir - - if flush: + if flush: + try: rdb.dataDir.removeDir # Remove the base folder if it is empty @@ -96,8 +102,8 @@ proc destroy*(rdb: var RdbInst; flush: bool) = break done rdb.baseDir.removeDir - except CatchableError: - discard + except CatchableError: + discard # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/aristo/aristo_init/rocks_db/rdb_put.nim b/nimbus/db/aristo/aristo_init/rocks_db/rdb_put.nim index 62ffe4450..2a6807764 100644 --- a/nimbus/db/aristo/aristo_init/rocks_db/rdb_put.nim +++ b/nimbus/db/aristo/aristo_init/rocks_db/rdb_put.nim @@ -14,19 +14,14 @@ {.push raises: [].} import - std/[algorithm, os, sequtils, strutils, sets, tables], - chronicles, eth/common, rocksdb/lib/librocksdb, rocksdb, results, - "../.."/[aristo_constants, aristo_desc], + ../../aristo_desc, ../init_common, ./rdb_desc -logScope: - topics = "aristo-backend" - type RdbPutSession = object writer: ptr rocksdb_sstfilewriter_t @@ -34,165 +29,65 @@ type nRecords: int const - extraTraceMessages = false or true + extraTraceMessages = false ## Enable additional logging noise +when extraTraceMessages: + import chronicles + + logScope: + topics = "aristo-rocksdb" + # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ -template logTxt(info: static[string]): static[string] = - "RocksDB/put " & info - -proc getFileSize(fileName: string): int64 {.used.} = - var f: File - if f.open fileName: - defer: f.close - try: - result = f.getFileSize - except CatchableError: - discard - -proc rmFileIgnExpt(fileName: string) = - try: - fileName.removeFile - except CatchableError: - discard - -# ------------------------------------------------------------------------------ -# Private functions -# ------------------------------------------------------------------------------ - -proc destroy(rps: RdbPutSession) = - rps.writer.rocksdb_sstfilewriter_destroy() - rps.sstPath.rmFileIgnExpt - -proc begin( - rdb: var RdbInst; - ): Result[RdbPutSession,(AristoError,string)] = - ## Begin a new bulk load session storing data into a temporary cache file - ## `fileName`. When finished, this file will bi direcly imported into the - ## database. - var csError: cstring - - var session = RdbPutSession( - writer: rocksdb_sstfilewriter_create(rdb.envOpt, rdb.dbOpts.cPtr), - sstPath: rdb.sstFilePath) - - if session.writer.isNil: - let info = $csError - if "no such file or directory" in info.toLowerAscii: - # Somebody might have killed the "tmp" directory? - raiseAssert info - return err((RdbBeOpenSstWriter, info)) - - session.sstPath.rmFileIgnExpt - - session.writer.rocksdb_sstfilewriter_open( - session.sstPath.cstring, cast[cstringArray](csError.addr)) - if not csError.isNil: - session.destroy() - return err((RdbBeOpenSstWriter, $csError)) - - ok session - - -proc add( - session: var RdbPutSession; - key: openArray[byte]; - val: openArray[byte]; - ): Result[void,(AristoError,string)] = - ## Append a record to the SST file. Note that consecutive records must be - ## strictly increasing. - ## - ## This function is a wrapper around `rocksdb_sstfilewriter_add()` or - ## `rocksdb_sstfilewriter_put()` (stragely enough, there are two functions - ## with exactly the same impementation code.) - var csError: cstring - - session.writer.rocksdb_sstfilewriter_add( - cast[cstring](unsafeAddr key[0]), csize_t(key.len), - cast[cstring](unsafeAddr val[0]), csize_t(val.len), - cast[cstringArray](csError.addr)) - if not csError.isNil: - return err((RdbBeAddSstWriter, $csError)) - - session.nRecords.inc - ok() - - -proc commit( - rdb: var RdbInst; - session: RdbPutSession; - ): Result[void,(AristoError,string)] = - ## Commit collected and cached data to the database. This function implies - ## `destroy()` if successful. Otherwise `destroy()` must be called - ## explicitely, e.g. after error analysis. - var csError: cstring - - if 0 < session.nRecords: - session.writer.rocksdb_sstfilewriter_finish(cast[cstringArray](csError.addr)) - if not csError.isNil: - return err((RdbBeFinishSstWriter, $csError)) - - var sstPath = session.sstPath.cstring - rdb.store.cPtr.rocksdb_ingest_external_file( - cast[cstringArray](sstPath.addr), 1, rdb.impOpt, cast[cstringArray](csError.addr)) - if not csError.isNil: - return err((RdbBeIngestSstWriter, $csError)) - - when extraTraceMessages: - trace logTxt "finished sst", fileSize=session.sstPath.getFileSize - - session.destroy() - ok() +proc disposeSession(rdb: var RdbInst) = + rdb.session.close() + rdb.session = WriteBatchRef(nil) # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ +proc begin*(rdb: var RdbInst) = + if rdb.session.isNil: + rdb.session = rdb.store.openWriteBatch() + +proc rollback*(rdb: var RdbInst) = + if not rdb.session.isClosed(): + rdb.disposeSession() + +proc commit*(rdb: var RdbInst): Result[void,(AristoError,string)] = + if not rdb.session.isClosed(): + defer: rdb.disposeSession() + rdb.store.write(rdb.session).isOkOr: + const errSym = RdbBeDriverWriteError + when extraTraceMessages: + trace logTxt "commit", error=errSym, info=error + return err((errSym,error)) + ok() + proc put*( - rdb: var RdbInst; - tabs: RdbTabs; - ): Result[void,(AristoError,string)] = - - var session = block: - let rc = rdb.begin() - if rc.isErr: - return err(rc.error) - rc.value - - # Vertices with empty table values will be deleted - var delKey: HashSet[RdbKey] - - for pfx in low(StorageType) .. high(StorageType): - when extraTraceMessages: - trace logTxt "sub-table", pfx, nItems=tabs[pfx].len - - for id in tabs[pfx].keys.toSeq.sorted: - let - key = id.toRdbKey pfx - val = tabs[pfx].getOrDefault(id, EmptyBlob) - if val.len == 0: - delKey.incl key - else: - let rc = session.add(key, val) - if rc.isErr: - session.destroy() - return err(rc.error) - - block: - let rc = rdb.commit session - if rc.isErr: - trace logTxt "commit error", error=rc.error[0], info=rc.error[1] - return err(rc.error) - - # Delete vertices after successfully updating vertices with non-zero values. - for key in delKey: - let rc = rdb.store.delete key - if rc.isErr: - return err((RdbBeDriverDelError,rc.error)) - + rdb: RdbInst; + pfx: StorageType; + data: openArray[(uint64,Blob)]; + ): Result[void,(uint64,AristoError,string)] = + let dsc = rdb.session + for (xid,val) in data: + let key = xid.toRdbKey pfx + if val.len == 0: + dsc.delete(key, rdb.store.name).isOkOr: + const errSym = RdbBeDriverDelError + when extraTraceMessages: + trace logTxt "del", pfx, xid, error=errSym, info=error + return err((xid,errSym,error)) + else: + dsc.put(key, val, rdb.store.name).isOkOr: + const errSym = RdbBeDriverPutError + when extraTraceMessages: + trace logTxt "put", pfx, xid, error=errSym, info=error + return err((xid,errSym,error)) ok() # ------------------------------------------------------------------------------ diff --git a/nimbus/db/aristo/aristo_init/rocks_db/rdb_walk.nim b/nimbus/db/aristo/aristo_init/rocks_db/rdb_walk.nim index 7890de4dc..09f0e0854 100644 --- a/nimbus/db/aristo/aristo_init/rocks_db/rdb_walk.nim +++ b/nimbus/db/aristo/aristo_init/rocks_db/rdb_walk.nim @@ -14,31 +14,22 @@ {.push raises: [].} import - std/sequtils, eth/common, stew/endians2, - rocksdb/lib/librocksdb, rocksdb, ../init_common, ./rdb_desc -# ------------------------------------------------------------------------------ -# Private helpers -# ------------------------------------------------------------------------------ +const + extraTraceMessages = false + ## Enable additional logging noise -func keyPfx(kData: cstring, kLen: csize_t): int = - if not kData.isNil and kLen == 1 + sizeof(uint64): - kData.toOpenArrayByte(0,0)[0].int - else: - -1 +when extraTraceMessages: + import + chronicles -func keyXid(kData: cstring, kLen: csize_t): uint64 = - if not kData.isNil and kLen == 1 + sizeof(uint64): - return uint64.fromBytesBE kData.toOpenArrayByte(1,int(kLen)-1).toSeq - -func valBlob(vData: cstring, vLen: csize_t): Blob = - if not vData.isNil and 0 < vLen: - return vData.toOpenArrayByte(0,int(vLen)-1).toSeq + logScope: + topics = "aristo-rocksdb" # ------------------------------------------------------------------------------ # Public iterators @@ -50,37 +41,21 @@ iterator walk*( ## Walk over all key-value pairs of the database. ## ## Non-decodable entries are stepped over and ignored. + block walkBody: + let rit = rdb.store.openIterator().valueOr: + when extraTraceMessages: + trace logTxt "walk", pfx="all", error + break walkBody + defer: rit.close() - let - readOptions = rocksdb_readoptions_create() - rit = rdb.store.cPtr.rocksdb_create_iterator(readOptions) - defer: - rit.rocksdb_iter_destroy() - readOptions.rocksdb_readoptions_destroy() - - rit.rocksdb_iter_seek_to_first() - - while rit.rocksdb_iter_valid() != 0: - var kLen: csize_t - let kData = rit.rocksdb_iter_key(addr kLen) - - let pfx = kData.keyPfx(kLen) - if 0 <= pfx: - if high(StorageType).ord < pfx: - break - - let xid = kData.keyXid(kLen) - if 0 < xid: - var vLen: csize_t - let vData = rit.rocksdb_iter_value(addr vLen) - - let val = vData.valBlob(vLen) - if 0 < val.len: - yield (pfx.StorageType, xid, val) - - # Update Iterator (might overwrite kData/vdata) - rit.rocksdb_iter_next() - # End while + for (key,val) in rit.pairs: + if key.len == 9: + if StorageType.high.ord < key[0]: + break walkBody + let + pfx = StorageType(key[0]) + id = uint64.fromBytesBE key[1..^1] + yield (pfx, id, val) iterator walk*( @@ -93,65 +68,30 @@ iterator walk*( ## Non-decodable entries are stepped over and ignored. ## block walkBody: - if pfx in {Oops, AdmPfx}: - # Unsupported + let rit = rdb.store.openIterator().valueOr: + when extraTraceMessages: + echo ">>> walk (2) oops", + " pfx=", pfx + trace logTxt "walk", pfx, error break walkBody + defer: rit.close() - let - readOptions = rocksdb_readoptions_create() - rit = rdb.store.cPtr.rocksdb_create_iterator(readOptions) - defer: - rit.rocksdb_iter_destroy() - readOptions.rocksdb_readoptions_destroy() + # Start at first entry not less than ` & 1` + rit.seekToKey 1u64.toRdbKey pfx - var - kLen: csize_t - kData: cstring + # Fetch sub-table data as long as the current key is acceptable + while rit.isValid(): + let key = rit.key() + if key.len == 9: + if key[0] != pfx.ord.uint: + break walkBody # done - # Seek for `VertexID(1)` and subsequent entries if that fails. There should - # always be a `VertexID(1)` entry unless the sub-table is empty. There is - # no such control for the filter table in which case there is a blind guess - # (in case `rocksdb_iter_seek()` does not search `ge` for some reason.) - let keyOne = 1u64.toRdbKey pfx - - # It is not clear what happens when the `key` does not exist. The guess - # is that the interation will proceed at the next key position. - # - # Comment from GO port at - # //github.com/DanielMorsing/rocksdb/blob/master/iterator.go: - # - # Seek moves the iterator the position of the key given or, if the key - # doesn't exist, the next key that does exist in the database. If the key - # doesn't exist, and there is no next key, the Iterator becomes invalid. - # - kData = cast[cstring](unsafeAddr keyOne[0]) - kLen = sizeof(keyOne).csize_t - rit.rocksdb_iter_seek(kData, kLen) - if rit.rocksdb_iter_valid() == 0: - break walkBody - - # Fetch sub-table data - while true: - kData = rit.rocksdb_iter_key(addr kLen) - if pfx.ord != kData.keyPfx kLen: - break walkBody # done - - let xid = kData.keyXid(kLen) - if 0 < xid: - # Fetch value data - var vLen: csize_t - let vData = rit.rocksdb_iter_value(addr vLen) - - let val = vData.valBlob(vLen) - if 0 < val.len: - yield (xid, val) + let val = rit.value() + if val.len != 0: + yield (uint64.fromBytesBE key[1..^1], val) # Update Iterator - rit.rocksdb_iter_next() - if rit.rocksdb_iter_valid() == 0: - break walkBody - - # End while + rit.next() # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/core_db/backend/aristo_db.nim b/nimbus/db/core_db/backend/aristo_db.nim index e1f80b4f9..a6ed8b618 100644 --- a/nimbus/db/core_db/backend/aristo_db.nim +++ b/nimbus/db/core_db/backend/aristo_db.nim @@ -14,8 +14,10 @@ import std/tables, eth/common, results, - "../.."/[aristo, aristo/aristo_walk], - "../.."/[kvt, kvt/kvt_init/memory_only, kvt/kvt_walk], + ../../aristo as use_ari, + ../../aristo/aristo_walk, + ../../kvt as use_kvt, + ../../kvt/[kvt_init/memory_only, kvt_walk], ".."/[base, base/base_desc], ./aristo_db/[common_desc, handlers_aristo, handlers_kvt, handlers_trace] @@ -183,10 +185,10 @@ proc baseMethods( ok(db.bless db.tracerSetup(flags))) # ------------------------------------------------------------------------------ -# Private constructor helpers +# Public constructor and helper # ------------------------------------------------------------------------------ -proc create( +proc create*( dbType: CoreDbType; kdb: KvtDbRef; K: typedesc; @@ -205,58 +207,20 @@ proc create( db.methods = db.baseMethods(A,K) db.bless -proc init( - dbType: CoreDbType; - K: typedesc; - A: typedesc; - qlr: QidLayoutRef; - ): CoreDbRef = - dbType.create(KvtDbRef.init(K), K, AristoDbRef.init(A, qlr), A) - -proc init( - dbType: CoreDbType; - K: typedesc; - A: typedesc; - ): CoreDbRef = - dbType.create(KvtDbRef.init(K), K, AristoDbRef.init(A), A) - -# ------------------------------------------------------------------------------ -# Public constructor helpers -# ------------------------------------------------------------------------------ - -proc init*( - dbType: CoreDbType; - K: typedesc; - A: typedesc; - path: string; - qlr: QidLayoutRef; - ): CoreDbRef = - dbType.create( - KvtDbRef.init(K, path).expect "Kvt/RocksDB init() failed", K, - AristoDbRef.init(A, path, qlr).expect "Aristo/RocksDB init() failed", A) - -proc init*( - dbType: CoreDbType; - K: typedesc; - A: typedesc; - path: string; - ): CoreDbRef = - dbType.create( - KvtDbRef.init(K, path).expect "Kvt/RocksDB init() failed", K, - AristoDbRef.init(A, path).expect "Aristo/RocksDB init() failed", A) - -# ------------------------------------------------------------------------------ -# Public constructor -# ------------------------------------------------------------------------------ - proc newAristoMemoryCoreDbRef*(qlr: QidLayoutRef): CoreDbRef = - AristoDbMemory.init(kvt.MemBackendRef, aristo.MemBackendRef, qlr) + AristoDbMemory.create( + KvtDbRef.init(use_kvt.MemBackendRef), use_ari.MemBackendRef, + AristoDbRef.init(use_ari.MemBackendRef, qlr), use_kvt.MemBackendRef) proc newAristoMemoryCoreDbRef*(): CoreDbRef = - AristoDbMemory.init(kvt.MemBackendRef, aristo.MemBackendRef) + AristoDbMemory.create( + KvtDbRef.init(use_kvt.MemBackendRef), use_ari.MemBackendRef, + AristoDbRef.init(use_ari.MemBackendRef), use_kvt.MemBackendRef) proc newAristoVoidCoreDbRef*(): CoreDbRef = - AristoDbVoid.init(kvt.VoidBackendRef, aristo.VoidBackendRef) + AristoDbVoid.create( + KvtDbRef.init(use_kvt.VoidBackendRef), use_ari.VoidBackendRef, + AristoDbRef.init(use_ari.VoidBackendRef), use_kvt.VoidBackendRef) # ------------------------------------------------------------------------------ # Public helpers, e.g. for direct backend access @@ -304,7 +268,7 @@ iterator aristoKvtPairsVoid*(dsc: CoreDxKvtRef): (Blob,Blob) {.rlpRaise.} = api = dsc.toAristoApi() p = api.forkTop(dsc.to(KvtDbRef)).valueOrApiError "aristoKvtPairs()" defer: discard api.forget(p) - for (k,v) in kvt.VoidBackendRef.walkPairs p: + for (k,v) in use_kvt.VoidBackendRef.walkPairs p: yield (k,v) iterator aristoKvtPairsMem*(dsc: CoreDxKvtRef): (Blob,Blob) {.rlpRaise.} = @@ -312,7 +276,7 @@ iterator aristoKvtPairsMem*(dsc: CoreDxKvtRef): (Blob,Blob) {.rlpRaise.} = api = dsc.toAristoApi() p = api.forkTop(dsc.to(KvtDbRef)).valueOrApiError "aristoKvtPairs()" defer: discard api.forget(p) - for (k,v) in kvt.MemBackendRef.walkPairs p: + for (k,v) in use_kvt.MemBackendRef.walkPairs p: yield (k,v) iterator aristoMptPairs*(dsc: CoreDxMptRef): (Blob,Blob) {.noRaise.} = @@ -324,12 +288,12 @@ iterator aristoMptPairs*(dsc: CoreDxMptRef): (Blob,Blob) {.noRaise.} = iterator aristoReplicateMem*(dsc: CoreDxMptRef): (Blob,Blob) {.rlpRaise.} = ## Instantiation for `MemBackendRef` - for k,v in aristoReplicate[aristo.MemBackendRef](dsc): + for k,v in aristoReplicate[use_ari.MemBackendRef](dsc): yield (k,v) iterator aristoReplicateVoid*(dsc: CoreDxMptRef): (Blob,Blob) {.rlpRaise.} = ## Instantiation for `VoidBackendRef` - for k,v in aristoReplicate[aristo.VoidBackendRef](dsc): + for k,v in aristoReplicate[use_ari.VoidBackendRef](dsc): yield (k,v) # ------------------------------------------------------------------------------ diff --git a/nimbus/db/core_db/backend/aristo_rocksdb.nim b/nimbus/db/core_db/backend/aristo_rocksdb.nim index 9e7b4dc60..26cf4d6f8 100644 --- a/nimbus/db/core_db/backend/aristo_rocksdb.nim +++ b/nimbus/db/core_db/backend/aristo_rocksdb.nim @@ -14,17 +14,22 @@ import eth/common, results, ../../aristo, - ../../aristo/[ - aristo_desc, aristo_persistent, aristo_walk/persistent, aristo_tx], + ../../aristo/aristo_persistent as use_ari, + ../../aristo/[aristo_desc, aristo_walk/persistent, aristo_tx], ../../kvt, - ../../kvt/kvt_persistent, + ../../kvt/kvt_persistent as use_kvt, ../base, ./aristo_db, - ./aristo_db/handlers_aristo + ./aristo_db/[common_desc, handlers_aristo] include ./aristo_db/aristo_replicate +const + # Expectation messages + aristoFail = "Aristo/RocksDB init() failed" + kvtFail = "Kvt/RocksDB init() failed" + # Annotation helper(s) {.pragma: rlpRaise, gcsafe, raises: [AristoApiRlpError].} @@ -33,16 +38,18 @@ include # ------------------------------------------------------------------------------ proc newAristoRocksDbCoreDbRef*(path: string; qlr: QidLayoutRef): CoreDbRef = - AristoDbRocks.init( - kvt_persistent.RdbBackendRef, - aristo_persistent.RdbBackendRef, - path, qlr) + let + adb = AristoDbRef.init(use_ari.RdbBackendRef, path, qlr).expect aristoFail + gdb = adb.guestDb().valueOr: GuestDbRef(nil) + kdb = KvtDbRef.init(use_kvt.RdbBackendRef, path, gdb).expect kvtFail + AristoDbRocks.create(kdb, use_kvt.RdbBackendRef, adb, use_ari.RdbBackendRef) proc newAristoRocksDbCoreDbRef*(path: string): CoreDbRef = - AristoDbRocks.init( - kvt_persistent.RdbBackendRef, - aristo_persistent.RdbBackendRef, - path) + let + adb = AristoDbRef.init(use_ari.RdbBackendRef, path).expect aristoFail + gdb = adb.guestDb().valueOr: GuestDbRef(nil) + kdb = KvtDbRef.init(use_kvt.RdbBackendRef, path, gdb).expect kvtFail + AristoDbRocks.create(kdb, use_kvt.RdbBackendRef, adb, use_ari.RdbBackendRef) # ------------------------------------------------------------------------------ # Public aristo iterators @@ -50,7 +57,7 @@ proc newAristoRocksDbCoreDbRef*(path: string): CoreDbRef = iterator aristoReplicateRdb*(dsc: CoreDxMptRef): (Blob,Blob) {.rlpRaise.} = ## Instantiation for `VoidBackendRef` - for k,v in aristoReplicate[aristo_persistent.RdbBackendRef](dsc): + for k,v in aristoReplicate[use_ari.RdbBackendRef](dsc): yield (k,v) # ------------------------------------------------------------------------------ diff --git a/nimbus/db/kvt/kvt_desc/desc_error.nim b/nimbus/db/kvt/kvt_desc/desc_error.nim index 0a9caecdc..de7e093a1 100644 --- a/nimbus/db/kvt/kvt_desc/desc_error.nim +++ b/nimbus/db/kvt/kvt_desc/desc_error.nim @@ -19,15 +19,11 @@ type # RocksDB backend RdbBeCantCreateDataDir - RdbBeCantCreateTmpDir - RdbBeDriverInitError - RdbBeDriverGetError RdbBeDriverDelError - RdbBeCreateSstWriter - RdbBeOpenSstWriter - RdbBeAddSstWriter - RdbBeFinishSstWriter - RdbBeIngestSstWriter + RdbBeDriverGetError + RdbBeDriverInitError + RdbBeDriverPutError + RdbBeDriverWriteError # Transaction wrappers TxArgStaleTx diff --git a/nimbus/db/kvt/kvt_init/init_common.nim b/nimbus/db/kvt/kvt_init/init_common.nim index 4350b56b9..3ba5386e0 100644 --- a/nimbus/db/kvt/kvt_init/init_common.nim +++ b/nimbus/db/kvt/kvt_init/init_common.nim @@ -32,6 +32,7 @@ type TypedPutHdlRef* = ref object of PutHdlRef error*: KvtError ## Track error while collecting transaction + info*: string ## Error description (if any) when verifyIxId: txId: uint ## Transaction ID (for debugging) diff --git a/nimbus/db/kvt/kvt_init/persistent.nim b/nimbus/db/kvt/kvt_init/persistent.nim index 3b8ce83e9..b0afbcd22 100644 --- a/nimbus/db/kvt/kvt_init/persistent.nim +++ b/nimbus/db/kvt/kvt_init/persistent.nim @@ -1,5 +1,5 @@ # nimbus-eth1 -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or # http://www.apache.org/licenses/LICENSE-2.0) @@ -22,6 +22,10 @@ import results, ../kvt_desc, "."/[rocks_db, memory_only] + +from ../../aristo/aristo_persistent + import GuestDbRef, getRocksDbFamily + export RdbBackendRef, memory_only @@ -34,13 +38,22 @@ proc init*[W: MemOnlyBackend|RdbBackendRef]( T: type KvtDbRef; B: type W; basePath: string; + guestDb = GuestDbRef(nil); ): Result[KvtDbRef,KvtError] = ## Generic constructor, `basePath` argument is ignored for `BackendNone` and ## `BackendMemory` type backend database. Also, both of these backends ## aways succeed initialising. ## + ## If the argument `guestDb` is set and is a RocksDB column familly, the + ## `Kvt`batabase is built upon this column familly. Othewise it is newly + ## created with `basePath` as storage location. + ## when B is RdbBackendRef: - ok KvtDbRef(top: LayerRef(), backend: ? rocksDbBackend basePath) + let rc = guestDb.getRocksDbFamily() + if rc.isOk: + ok KvtDbRef(top: LayerRef(), backend: ? rocksDbBackend rc.value) + else: + ok KvtDbRef(top: LayerRef(), backend: ? rocksDbBackend basePath) else: ok KvtDbRef.init B diff --git a/nimbus/db/kvt/kvt_init/rocks_db.nim b/nimbus/db/kvt/kvt_init/rocks_db.nim index 874042b82..7151af28f 100644 --- a/nimbus/db/kvt/kvt_init/rocks_db.nim +++ b/nimbus/db/kvt/kvt_init/rocks_db.nim @@ -28,7 +28,6 @@ {.warning: "*** importing rocks DB which needs a linker library".} import - chronicles, eth/common, rocksdb, results, @@ -37,8 +36,12 @@ import ./init_common, ./rocks_db/[rdb_desc, rdb_get, rdb_init, rdb_put, rdb_walk] -logScope: - topics = "kvt-backend" + +const + maxOpenFiles = 512 ## Rocks DB setup, open files limit + + extraTraceMessages = false or true + ## Enabled additional logging noise type RdbBackendRef* = ref object of TypedBackendRef @@ -47,13 +50,11 @@ type RdbPutHdlRef = ref object of TypedPutHdlRef tab: Table[Blob,Blob] ## Transaction cache -const - extraTraceMessages = false or true - ## Enabled additional logging noise +when extraTraceMessages: + import chronicles - # ---------- - - maxOpenFiles = 512 ## Rocks DB setup, open files limit + logScope: + topics = "aristo-backend" # ------------------------------------------------------------------------------ # Private helpers @@ -82,17 +83,16 @@ proc endSession(hdl: PutHdlRef; db: RdbBackendRef): RdbPutHdlRef = proc getKvpFn(db: RdbBackendRef): GetKvpFn = result = proc(key: openArray[byte]): Result[Blob,KvtError] = - if key.len == 0: - return err(KeyInvalid) - let rc = db.rdb.get key - if rc.isErr: - debug logTxt "getKvpFn() failed", key, - error=rc.error[0], info=rc.error[1] - return err(rc.error[0]) - # Decode data record - if 0 < rc.value.len: - return ok(rc.value) + # Get data record + let data = db.rdb.get(key).valueOr: + when extraTraceMessages: + debug logTxt "getKvpFn() failed", key, error=error[0], info=error[1] + return err(error[0]) + + # Return if non-empty + if 0 < data.len: + return ok(data) err(GetNotFound) @@ -101,6 +101,7 @@ proc getKvpFn(db: RdbBackendRef): GetKvpFn = proc putBegFn(db: RdbBackendRef): PutBegFn = result = proc(): PutHdlRef = + db.rdb.begin() db.newSession() proc putKvpFn(db: RdbBackendRef): PutKvpFn = @@ -108,25 +109,27 @@ proc putKvpFn(db: RdbBackendRef): PutKvpFn = proc(hdl: PutHdlRef; kvps: openArray[(Blob,Blob)]) = let hdl = hdl.getSession db if hdl.error == KvtError(0): - for (k,v) in kvps: - if k.isValid: - hdl.tab[k] = v - else: - hdl.error = KeyInvalid + + # Collect batch session arguments + db.rdb.put(kvps).isOkOr: + hdl.error = error[1] + hdl.info = error[2] + return proc putEndFn(db: RdbBackendRef): PutEndFn = result = proc(hdl: PutHdlRef): Result[void,KvtError] = let hdl = hdl.endSession db if hdl.error != KvtError(0): - debug logTxt "putEndFn: key/value failed", error=hdl.error - return err(hdl.error) - let rc = db.rdb.put hdl.tab - if rc.isErr: when extraTraceMessages: - debug logTxt "putEndFn: failed", - error=rc.error[0], info=rc.error[1] - return err(rc.error[0]) + debug logTxt "putEndFn: failed", error=hdl.error, info=hdl.info + return err(hdl.error) + + # Commit session + db.rdb.commit().isOkOr: + when extraTraceMessages: + trace logTxt "putEndFn: failed", error=($error[0]), info=error[1] + return err(error[0]) ok() @@ -135,25 +138,9 @@ proc closeFn(db: RdbBackendRef): CloseFn = proc(flush: bool) = db.rdb.destroy(flush) -# ------------------------------------------------------------------------------ -# Public functions -# ------------------------------------------------------------------------------ - -proc rocksDbBackend*( - path: string; - ): Result[BackendRef,KvtError] = - let db = RdbBackendRef( - beKind: BackendRocksDB) - - # Initialise RocksDB - block: - let rc = db.rdb.init(path, maxOpenFiles) - if rc.isErr: - when extraTraceMessages: - trace logTxt "constructor failed", - error=rc.error[0], info=rc.error[1] - return err(rc.error[0]) +# -------------- +proc setup(db: RdbBackendRef) = db.getKvpFn = getKvpFn db db.putBegFn = putBegFn db @@ -162,6 +149,28 @@ proc rocksDbBackend*( db.closeFn = closeFn db +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc rocksDbBackend*(path: string): Result[BackendRef,KvtError] = + let db = RdbBackendRef( + beKind: BackendRocksDB) + + # Initialise RocksDB + db.rdb.init(path, maxOpenFiles).isOkOr: + when extraTraceMessages: + trace logTxt "constructor failed", error=error[0], info=error[1] + return err(error[0]) + + db.setup() + ok db + +proc rocksDbBackend*(store: ColFamilyReadWrite): Result[BackendRef,KvtError] = + let db = RdbBackendRef( + beKind: BackendRocksDB) + db.rdb.init(store) + db.setup() ok db proc dup*(db: RdbBackendRef): RdbBackendRef = diff --git a/nimbus/db/kvt/kvt_init/rocks_db/rdb_desc.nim b/nimbus/db/kvt/kvt_init/rocks_db/rdb_desc.nim index bd2e811f4..2b66b0f13 100644 --- a/nimbus/db/kvt/kvt_init/rocks_db/rdb_desc.nim +++ b/nimbus/db/kvt/kvt_init/rocks_db/rdb_desc.nim @@ -15,24 +15,18 @@ import std/os, - rocksdb/lib/librocksdb, rocksdb type RdbInst* = object - dbOpts*: DbOptionsRef - store*: RocksDbReadWriteRef ## Rocks DB database handler + store*: ColFamilyReadWrite ## Rocks DB database handler + session*: WriteBatchRef ## For batched `put()` basePath*: string ## Database directory - # Low level Rocks DB access for bulk store - envOpt*: ptr rocksdb_envoptions_t - impOpt*: ptr rocksdb_ingestexternalfileoptions_t - const - BaseFolder* = "nimbus" # Same as for Legacy DB - DataFolder* = "kvt" # Legacy DB has "data" - SstCache* = "bulkput" # Rocks DB bulk load file name in temp folder - TempFolder* = "tmp" # No `tmp` directory used with legacy DB + KvtFamily* = "Kvt" ## RocksDB column family + BaseFolder* = "nimbus" ## Same as for Legacy DB + DataFolder* = "kvt" ## Legacy DB has "data" # ------------------------------------------------------------------------------ # Public functions @@ -44,11 +38,9 @@ func baseDir*(rdb: RdbInst): string = func dataDir*(rdb: RdbInst): string = rdb.baseDir / DataFolder -func cacheDir*(rdb: RdbInst): string = - rdb.dataDir / TempFolder -func sstFilePath*(rdb: RdbInst): string = - rdb.cacheDir / SstCache +template logTxt(info: static[string]): static[string] = + "RocksDB/" & info # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/kvt/kvt_init/rocks_db/rdb_get.nim b/nimbus/db/kvt/kvt_init/rocks_db/rdb_get.nim index 67de2d6b9..e4d378c1b 100644 --- a/nimbus/db/kvt/kvt_init/rocks_db/rdb_get.nim +++ b/nimbus/db/kvt/kvt_init/rocks_db/rdb_get.nim @@ -1,5 +1,5 @@ # nimbus-eth1 -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or # http://www.apache.org/licenses/LICENSE-2.0) @@ -20,6 +20,17 @@ import "../.."/[kvt_constants, kvt_desc], ./rdb_desc +const + extraTraceMessages = false + ## Enable additional logging noise + +when extraTraceMessages: + import + chronicles + + logScope: + topics = "kvt-rocksdb" + # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ @@ -31,10 +42,14 @@ proc get*( var res: Blob let onData: DataProc = proc(data: openArray[byte]) = res = @data - let rc = rdb.store.get(key, onData) - if rc.isErr: - return err((RdbBeDriverGetError,rc.error)) - if not rc.value: + + let gotData = rdb.store.get(key, onData).valueOr: + const errSym = RdbBeDriverGetError + when extraTraceMessages: + trace logTxt "get", error=errSym, info=error + return err((errSym,error)) + + if not gotData: res = EmptyBlob ok res diff --git a/nimbus/db/kvt/kvt_init/rocks_db/rdb_init.nim b/nimbus/db/kvt/kvt_init/rocks_db/rdb_init.nim index 6ef9ba807..610a62570 100644 --- a/nimbus/db/kvt/kvt_init/rocks_db/rdb_init.nim +++ b/nimbus/db/kvt/kvt_init/rocks_db/rdb_init.nim @@ -15,22 +15,20 @@ import std/os, - chronicles, - rocksdb/lib/librocksdb, rocksdb, results, ../../kvt_desc, ./rdb_desc -logScope: - topics = "kvt-backend" +const + extraTraceMessages = false + ## Enabled additional logging noise -# ------------------------------------------------------------------------------ -# Private helpers -# ------------------------------------------------------------------------------ +when extraTraceMessages: + import chronicles -template logTxt(info: static[string]): static[string] = - "RocksDB/init " & info + logScope: + topics = "kvt-backend" # ------------------------------------------------------------------------------ # Public constructor @@ -52,52 +50,53 @@ proc init*( dataDir.createDir except OSError, IOError: return err((RdbBeCantCreateDataDir, "")) - try: - rdb.cacheDir.createDir - except OSError, IOError: - return err((RdbBeCantCreateTmpDir, "")) - let dbOpts = defaultDbOptions() - dbOpts.setMaxOpenFiles(openMax) + let + cfs = @[initColFamilyDescriptor KvtFamily] + opts = defaultDbOptions() + opts.setMaxOpenFiles(openMax) - let rc = openRocksDb(dataDir, dbOpts) - if rc.isErr: - let error = RdbBeDriverInitError - debug logTxt "driver failed", dataDir, openMax, - error, info=rc.error - return err((RdbBeDriverInitError, rc.error)) + # Reserve a family corner for `Kvt` on the database + let baseDb = openRocksDb(dataDir, opts, columnFamilies=cfs).valueOr: + let errSym = RdbBeDriverInitError + when extraTraceMessages: + debug logTxt "init failed", dataDir, openMax, error=errSym, info=error + return err((errSym, error)) - rdb.dbOpts = dbOpts - rdb.store = rc.get() - - # The following is a default setup (subject to change) - rdb.impOpt = rocksdb_ingestexternalfileoptions_create() - rdb.envOpt = rocksdb_envoptions_create() + # Initialise `Kvt` family + rdb.store = baseDb.withColFamily(KvtFamily).valueOr: + let errSym = RdbBeDriverInitError + when extraTraceMessages: + debug logTxt "init failed", dataDir, openMax, error=errSym, info=error + return err((errSym, error)) ok() +proc init*( + rdb: var RdbInst; + store: ColFamilyReadWrite; + ) = + ## Piggyback on other database + rdb.store = store # that's it proc destroy*(rdb: var RdbInst; flush: bool) = - ## Destructor - rdb.envOpt.rocksdb_envoptions_destroy() - rdb.impOpt.rocksdb_ingestexternalfileoptions_destroy() - rdb.store.close() - - try: - rdb.cacheDir.removeDir + ## Destructor (no need to do anything if piggybacked) + if 0 < rdb.basePath.len: + rdb.store.db.close() if flush: - rdb.dataDir.removeDir + try: + rdb.dataDir.removeDir - # Remove the base folder if it is empty - block done: - for w in rdb.baseDir.walkDirRec: - # Ignore backup files - if 0 < w.len and w[^1] != '~': - break done - rdb.baseDir.removeDir + # Remove the base folder if it is empty + block done: + for w in rdb.baseDir.walkDirRec: + # Ignore backup files + if 0 < w.len and w[^1] != '~': + break done + rdb.baseDir.removeDir - except CatchableError: - discard + except CatchableError: + discard # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/kvt/kvt_init/rocks_db/rdb_put.nim b/nimbus/db/kvt/kvt_init/rocks_db/rdb_put.nim index 59b25a867..f564a7091 100644 --- a/nimbus/db/kvt/kvt_init/rocks_db/rdb_put.nim +++ b/nimbus/db/kvt/kvt_init/rocks_db/rdb_put.nim @@ -14,189 +14,74 @@ {.push raises: [].} import - std/[algorithm, os, sequtils, strutils, sets, tables], - chronicles, eth/common, - rocksdb/lib/librocksdb, + stew/byteutils, rocksdb, results, ../../kvt_desc, ./rdb_desc -logScope: - topics = "kvt-backend" - -type - RdbPutSession = object - writer: ptr rocksdb_sstfilewriter_t - sstPath: string - nRecords: int - const - extraTraceMessages = false or true + extraTraceMessages = false ## Enable additional logging noise +when extraTraceMessages: + import chronicles + + logScope: + topics = "kvt-rocksdb" + # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ -template logTxt(info: static[string]): static[string] = - "RocksDB/put " & info - -proc getFileSize(fileName: string): int64 {.used.} = - var f: File - if f.open fileName: - defer: f.close - try: - result = f.getFileSize - except CatchableError: - discard - -proc rmFileIgnExpt(fileName: string) = - try: - fileName.removeFile - except CatchableError: - discard - -# ------------------------------------------------------------------------------ -# Private functions -# ------------------------------------------------------------------------------ - -proc destroy(rps: RdbPutSession) = - rps.writer.rocksdb_sstfilewriter_destroy() - rps.sstPath.rmFileIgnExpt - -proc begin( - rdb: var RdbInst; - ): Result[RdbPutSession,(KvtError,string)] = - ## Begin a new bulk load session storing data into a temporary cache file - ## `fileName`. When finished, this file will bi direcly imported into the - ## database. - var csError: cstring - - var session = RdbPutSession( - writer: rocksdb_sstfilewriter_create(rdb.envOpt, rdb.dbOpts.cPtr), - sstPath: rdb.sstFilePath) - - if session.writer.isNil: - return err((RdbBeCreateSstWriter, "Cannot create sst writer session")) - session.sstPath.rmFileIgnExpt - - session.writer.rocksdb_sstfilewriter_open( - session.sstPath.cstring, cast[cstringArray](csError.addr)) - if not csError.isNil: - session.destroy() - let info = $csError - if "no such file or directory" in info.toLowerAscii: - # Somebody might have killed the "tmp" directory? - raiseAssert info - return err((RdbBeOpenSstWriter, info)) - - ok session - - -proc add( - session: var RdbPutSession; - key: openArray[byte]; - val: openArray[byte]; - ): Result[void,(KvtError,string)] = - ## Append a record to the SST file. Note that consecutive records must be - ## strictly increasing. - ## - ## This function is a wrapper around `rocksdb_sstfilewriter_add()` or - ## `rocksdb_sstfilewriter_put()` (stragely enough, there are two functions - ## with exactly the same impementation code.) - var csError: cstring - - session.writer.rocksdb_sstfilewriter_add( - cast[cstring](unsafeAddr key[0]), csize_t(key.len), - cast[cstring](unsafeAddr val[0]), csize_t(val.len), - cast[cstringArray](csError.addr)) - if not csError.isNil: - return err((RdbBeAddSstWriter, $csError)) - - session.nRecords.inc - ok() - - -proc commit( - rdb: var RdbInst; - session: RdbPutSession; - ): Result[void,(KvtError,string)] = - ## Commit collected and cached data to the database. This function implies - ## `destroy()` if successful. Otherwise `destroy()` must be called - ## explicitely, e.g. after error analysis. - var csError: cstring - - if 0 < session.nRecords: - session.writer.rocksdb_sstfilewriter_finish(cast[cstringArray](csError.addr)) - if not csError.isNil: - return err((RdbBeFinishSstWriter, $csError)) - - var sstPath = session.sstPath.cstring - rdb.store.cPtr.rocksdb_ingest_external_file( - cast[cstringArray](sstPath.addr), - 1, rdb.impOpt, cast[cstringArray](csError.addr)) - if not csError.isNil: - return err((RdbBeIngestSstWriter, $csError)) - - when extraTraceMessages: - trace logTxt "finished sst", fileSize=session.sstPath.getFileSize - - session.destroy() - ok() +proc disposeSession(rdb: var RdbInst) = + rdb.session.close() + rdb.session = WriteBatchRef(nil) +proc `$`(a: Blob): string = + a.toHex + # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ +proc begin*(rdb: var RdbInst) = + if rdb.session.isNil: + rdb.session = rdb.store.openWriteBatch() + +proc rollback*(rdb: var RdbInst) = + if not rdb.session.isClosed(): + rdb.disposeSession() + +proc commit*(rdb: var RdbInst): Result[void,(KvtError,string)] = + if not rdb.session.isClosed(): + defer: rdb.disposeSession() + rdb.store.write(rdb.session).isOkOr: + const errSym = RdbBeDriverWriteError + when extraTraceMessages: + trace logTxt "commit", error=errSym, info=error + return err((errSym,error)) + ok() + proc put*( - rdb: var RdbInst; - tab: Table[Blob,Blob]; - ): Result[void,(KvtError,string)] = - - var session = block: - let rc = rdb.begin() - if rc.isErr: - return err(rc.error) - rc.value - - # Vertices with empty table values will be deleted - var delKey: HashSet[Blob] - - # Compare `Blob`s as left aligned big endian numbers, right padded with zeros - proc cmpBlobs(a, b: Blob): int = - let minLen = min(a.len, b.len) - for n in 0 ..< minLen: - if a[n] != b[n]: - return a[n].cmp b[n] - if a.len < b.len: - return -1 - if b.len < a.len: - return 1 - - for key in tab.keys.toSeq.sorted cmpBlobs: - let val = tab.getOrVoid key - if val.isValid: - let rc = session.add(key, val) - if rc.isErr: - session.destroy() - return err(rc.error) + rdb: RdbInst; + data: openArray[(Blob,Blob)]; + ): Result[void,(Blob,KvtError,string)] = + let dsc = rdb.session + for (key,val) in data: + if val.len == 0: + dsc.delete(key, rdb.store.name).isOkOr: + const errSym = RdbBeDriverDelError + when extraTraceMessages: + trace logTxt "del", key, error=errSym, info=error + return err((key,errSym,error)) else: - delKey.incl key - - block: - let rc = rdb.commit session - if rc.isErr: - trace logTxt "commit error", error=rc.error[0], info=rc.error[1] - return err(rc.error) - - # Delete vertices after successfully updating vertices with non-zero values. - for key in delKey: - let rc = rdb.store.delete key - if rc.isErr: - return err((RdbBeDriverDelError,rc.error)) - + dsc.put(key, val, rdb.store.name).isOkOr: + const errSym = RdbBeDriverPutError + when extraTraceMessages: + trace logTxt "put", key, error=errSym, info=error + return err((key,errSym,error)) ok() # ------------------------------------------------------------------------------ diff --git a/nimbus/db/kvt/kvt_init/rocks_db/rdb_walk.nim b/nimbus/db/kvt/kvt_init/rocks_db/rdb_walk.nim index ec18abf52..6d5bb6843 100644 --- a/nimbus/db/kvt/kvt_init/rocks_db/rdb_walk.nim +++ b/nimbus/db/kvt/kvt_init/rocks_db/rdb_walk.nim @@ -14,12 +14,21 @@ {.push raises: [].} import - std/sequtils, eth/common, - rocksdb/lib/librocksdb, rocksdb, ./rdb_desc +const + extraTraceMessages = false + ## Enable additional logging noise + +when extraTraceMessages: + import + chronicles + + logScope: + topics = "aristo-rocksdb" + # ------------------------------------------------------------------------------ # Public iterators # ------------------------------------------------------------------------------ @@ -27,32 +36,17 @@ import iterator walk*(rdb: RdbInst): tuple[key: Blob, data: Blob] = ## Walk over all key-value pairs of the database. ## - let - readOptions = rocksdb_readoptions_create() - rit = rdb.store.cPtr.rocksdb_create_iterator(readOptions) - defer: - rit.rocksdb_iter_destroy() - readOptions.rocksdb_readoptions_destroy() + ## Non-decodable entries are stepped over and ignored. + block walkBody: + let rit = rdb.store.openIterator().valueOr: + when extraTraceMessages: + trace logTxt "walk", pfx="all", error + break walkBody + defer: rit.close() - rit.rocksdb_iter_seek_to_first() - while rit.rocksdb_iter_valid() != 0: - var kLen: csize_t - let kData = rit.rocksdb_iter_key(addr kLen) - - if not kData.isNil and 0 < kLen: - var vLen: csize_t - let vData = rit.rocksdb_iter_value(addr vLen) - - if not vData.isNil and 0 < vLen: - let - key = kData.toOpenArrayByte(0,int(kLen)-1).toSeq - data = vData.toOpenArrayByte(0,int(vLen)-1).toSeq - yield (key,data) - - # Update Iterator (might overwrite kData/vdata) - rit.rocksdb_iter_next() - - # End while + for (key,val) in rit.pairs: + if 0 < key.len: + yield (key, val) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/kvt/kvt_persistent.nim b/nimbus/db/kvt/kvt_persistent.nim index 40544e0fe..e526ccd03 100644 --- a/nimbus/db/kvt/kvt_persistent.nim +++ b/nimbus/db/kvt/kvt_persistent.nim @@ -1,5 +1,5 @@ # nimbus-eth1 -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or # http://www.apache.org/licenses/LICENSE-2.0) diff --git a/vendor/nim-rocksdb b/vendor/nim-rocksdb index f37d7d486..eb594e33b 160000 --- a/vendor/nim-rocksdb +++ b/vendor/nim-rocksdb @@ -1 +1 @@ -Subproject commit f37d7d486caa7a21490d96caca7f0103295c5e46 +Subproject commit eb594e33b29d1475d8f1ba7819bfb7ac5fa48ea3