From 5a5cc6295ebbd9b5e829748dea954a9d86a535a5 Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Thu, 13 Jun 2024 18:15:11 +0000 Subject: [PATCH] Triggered write event for kvt (#2351) * bump rockdb * Rename `KVT` objects related to filters according to `Aristo` naming details: filter* => delta* roFilter => balancer * Compulsory error handling if `persistent()` fails * Add return code to `reCentre()` why: Might eventually fail if re-centring is blocked. Some logic will be added in subsequent patch sets. * Add column families from earlier session to rocksdb in opening procedure why: All previously used CFs must be declared when re-opening an existing database. * Update `init()` and add rocksdb `reinit()` methods for changing parameters why: Opening a set column families (with different open options) must span at least the ones that are already on disk. * Provide write-trigger-event interface into `Aristo` backend why: This allows to save data from a guest application (think `KVT`) to get synced with the write cycle so the guest and `Aristo` save all atomically. * Use `KVT` with new column family interface from `Aristo` * Remove obsolete guest interface * Implement `KVT` piggyback on `Aristo` backend * CoreDb: Add separate `KVT`/`Aristo` backend mode for debugging * Remove `rocks_db` import from `persist()` function why: Some systems (i.p `fluffy` and friends) use the `Aristo` memory backend emulation and do not link against rocksdb when building the application. So this should fix that problem. --- nimbus/core/chain/persist_blocks.nim | 3 +- nimbus/db/aristo/aristo_api.nim | 9 +- nimbus/db/aristo/aristo_check.nim | 2 +- nimbus/db/aristo/aristo_debug.nim | 2 +- nimbus/db/aristo/aristo_delta.nim | 6 +- nimbus/db/aristo/aristo_desc.nim | 6 +- nimbus/db/aristo/aristo_desc/desc_backend.nim | 18 +- nimbus/db/aristo/aristo_desc/desc_error.nim | 7 +- nimbus/db/aristo/aristo_init/init_common.nim | 1 + nimbus/db/aristo/aristo_init/memory_db.nim | 10 +- nimbus/db/aristo/aristo_init/memory_only.nim | 9 - nimbus/db/aristo/aristo_init/persistent.nim | 72 +++++-- nimbus/db/aristo/aristo_init/rocks_db.nim | 60 ++++-- .../aristo/aristo_init/rocks_db/rdb_desc.nim | 36 ++-- .../aristo/aristo_init/rocks_db/rdb_init.nim | 153 +++++++++----- .../aristo/aristo_init/rocks_db/rdb_put.nim | 12 +- .../backend/aristo_db/handlers_aristo.nim | 3 +- .../backend/aristo_db/handlers_kvt.nim | 9 +- nimbus/db/core_db/backend/aristo_rocksdb.nim | 18 +- nimbus/db/core_db/base.nim | 2 +- nimbus/db/core_db/base/base_desc.nim | 1 + nimbus/db/core_db/persistent.nim | 13 +- nimbus/db/kvt/kvt_api.nim | 8 +- .../db/kvt/{kvt_filter.nim => kvt_delta.nim} | 34 ++-- .../delta_merge.nim} | 0 .../delta_reverse.nim} | 16 +- nimbus/db/kvt/kvt_desc.nim | 27 ++- nimbus/db/kvt/kvt_desc/desc_backend.nim | 21 +- nimbus/db/kvt/kvt_desc/desc_error.nim | 6 + nimbus/db/kvt/kvt_init/init_common.nim | 10 +- nimbus/db/kvt/kvt_init/memory_db.nim | 17 +- nimbus/db/kvt/kvt_init/memory_only.nim | 2 +- nimbus/db/kvt/kvt_init/persistent.nim | 69 +++++-- nimbus/db/kvt/kvt_init/rocks_db.nim | 188 +++++++++++++++--- nimbus/db/kvt/kvt_init/rocks_db/rdb_desc.nim | 21 +- nimbus/db/kvt/kvt_init/rocks_db/rdb_get.nim | 2 +- nimbus/db/kvt/kvt_init/rocks_db/rdb_init.nim | 97 +++++---- nimbus/db/kvt/kvt_init/rocks_db/rdb_put.nim | 9 +- nimbus/db/kvt/kvt_init/rocks_db/rdb_walk.nim | 2 +- nimbus/db/kvt/kvt_tx.nim | 7 + nimbus/db/kvt/kvt_tx/tx_stow.nim | 31 ++- nimbus/db/kvt/kvt_utils.nim | 4 +- tests/replay/undump_blocks_era1.nim | 4 +- tests/test_aristo/test_filter.nim | 4 +- tests/test_coredb.nim | 6 +- vendor/nim-rocksdb | 2 +- 46 files changed, 725 insertions(+), 314 deletions(-) rename nimbus/db/kvt/{kvt_filter.nim => kvt_delta.nim} (75%) rename nimbus/db/kvt/{kvt_filter/filter_merge.nim => kvt_delta/delta_merge.nim} (100%) rename nimbus/db/kvt/{kvt_filter/filter_reverse.nim => kvt_delta/delta_reverse.nim} (75%) diff --git a/nimbus/core/chain/persist_blocks.nim b/nimbus/core/chain/persist_blocks.nim index 8267eb022..cd8331535 100644 --- a/nimbus/core/chain/persist_blocks.nim +++ b/nimbus/core/chain/persist_blocks.nim @@ -140,7 +140,8 @@ proc persistBlocksImpl( dbTx.commit() # Save and record the block number before the last saved block state. - c.db.persistent(toBlock) + c.db.persistent(toBlock).isOkOr: + return err("Failed to save state: " & $$error) if c.com.pruneHistory: # There is a feature for test systems to regularly clean up older blocks diff --git a/nimbus/db/aristo/aristo_api.nim b/nimbus/db/aristo/aristo_api.nim index 44c5694f7..6abbdb0b7 100644 --- a/nimbus/db/aristo/aristo_api.nim +++ b/nimbus/db/aristo/aristo_api.nim @@ -300,7 +300,8 @@ type AristoApiReCentreFn* = proc(db: AristoDbRef; - ) {.noRaise.} + ): Result[void,AristoError] + {.noRaise.} ## Re-focus the `db` argument descriptor so that it becomes the centre. ## Nothing is done if the `db` descriptor is the centre, already. ## @@ -494,7 +495,7 @@ proc dup(be: BackendRef): BackendRef = of BackendMemory: return MemBackendRef(be).dup - of BackendRocksDB: + of BackendRocksDB, BackendRdbHosting: when AristoPersistentBackendOk: return RdbBackendRef(be).dup @@ -705,9 +706,9 @@ func init*( result = api.persist(a, b, c) profApi.reCentre = - proc(a: AristoDbRef) = + proc(a: AristoDbRef): auto = AristoApiProfReCentreFn.profileRunner: - api.reCentre(a) + result = api.reCentre(a) profApi.rollback = proc(a: AristoTxRef): auto = diff --git a/nimbus/db/aristo/aristo_check.nim b/nimbus/db/aristo/aristo_check.nim index f44a94c77..e8909f21a 100644 --- a/nimbus/db/aristo/aristo_check.nim +++ b/nimbus/db/aristo/aristo_check.nim @@ -74,7 +74,7 @@ proc checkBE*( case db.backend.kind: of BackendMemory: return MemBackendRef.checkBE(db, cache=cache, relax=relax) - of BackendRocksDB: + of BackendRocksDB, BackendRdbHosting: return RdbBackendRef.checkBE(db, cache=cache, relax=relax) of BackendVoid: return VoidBackendRef.checkBE(db, cache=cache, relax=relax) diff --git a/nimbus/db/aristo/aristo_debug.nim b/nimbus/db/aristo/aristo_debug.nim index 997589ac3..fd5e6a225 100644 --- a/nimbus/db/aristo/aristo_debug.nim +++ b/nimbus/db/aristo/aristo_debug.nim @@ -733,7 +733,7 @@ proc pp*( case be.kind: of BackendMemory: result &= be.MemBackendRef.ppBe(db, limit, indent+1) - of BackendRocksDB: + of BackendRocksDB, BackendRdbHosting: result &= be.RdbBackendRef.ppBe(db, limit, indent+1) of BackendVoid: result &= "" diff --git a/nimbus/db/aristo/aristo_delta.nim b/nimbus/db/aristo/aristo_delta.nim index ed5e3a24f..d8b472c58 100644 --- a/nimbus/db/aristo/aristo_delta.nim +++ b/nimbus/db/aristo/aristo_delta.nim @@ -60,9 +60,9 @@ proc deltaPersistent*( if db != parent: if not reCentreOk: return err(FilBackendRoMode) - db.reCentre + ? db.reCentre() # Always re-centre to `parent` (in case `reCentreOk` was set) - defer: parent.reCentre + defer: discard parent.reCentre() # Initialise peer filter balancer. let updateSiblings = ? UpdateSiblingsRef.init db @@ -74,7 +74,7 @@ proc deltaPersistent*( serial: nxtFid) # Store structural single trie entries - let writeBatch = be.putBegFn() + let writeBatch = ? be.putBegFn() be.putVtxFn(writeBatch, db.balancer.sTab.pairs.toSeq) be.putKeyFn(writeBatch, db.balancer.kMap.pairs.toSeq) be.putTuvFn(writeBatch, db.balancer.vTop) diff --git a/nimbus/db/aristo/aristo_desc.nim b/nimbus/db/aristo/aristo_desc.nim index 1a46de3d1..1891cd772 100644 --- a/nimbus/db/aristo/aristo_desc.nim +++ b/nimbus/db/aristo/aristo_desc.nim @@ -58,8 +58,7 @@ type centre: AristoDbRef ## Link to peer with write permission peers: HashSet[AristoDbRef] ## List of all peers - AristoDbRef* = ref AristoDbObj - AristoDbObj* = object + AristoDbRef* = ref object ## Three tier database object supporting distributed instances. top*: LayerRef ## Database working layer, mutable stack*: seq[LayerRef] ## Stashed immutable parent layers @@ -150,7 +149,7 @@ func getCentre*(db: AristoDbRef): AristoDbRef = ## if db.dudes.isNil: db else: db.dudes.centre -proc reCentre*(db: AristoDbRef) = +proc reCentre*(db: AristoDbRef): Result[void,AristoError] = ## Re-focus the `db` argument descriptor so that it becomes the centre. ## Nothing is done if the `db` descriptor is the centre, already. ## @@ -166,6 +165,7 @@ proc reCentre*(db: AristoDbRef) = ## if not db.dudes.isNil: db.dudes.centre = db + ok() proc fork*( db: AristoDbRef; diff --git a/nimbus/db/aristo/aristo_desc/desc_backend.nim b/nimbus/db/aristo/aristo_desc/desc_backend.nim index 1613f4de3..44695c388 100644 --- a/nimbus/db/aristo/aristo_desc/desc_backend.nim +++ b/nimbus/db/aristo/aristo_desc/desc_backend.nim @@ -48,7 +48,7 @@ type ## by any library function using the backend. PutBegFn* = - proc(): PutHdlRef {.gcsafe, raises: [].} + proc(): Result[PutHdlRef,AristoError] {.gcsafe, raises: [].} ## Generic transaction initialisation function PutVtxFn* = @@ -81,20 +81,6 @@ type # ------------- - GuestDbFn* = - proc(instance: int): Result[RootRef,AristoError] {.gcsafe, raises: [].} - ## Generic function that returns a compartmentalised database handle that - ## can be used by another application. If non-nil, this handle allows to - ## use a previously allocated database. It is separated from the `Aristo` - ## columns. - ## - ## A successful return value might be `nil` if this feature is - ## unsupported. - ## - ## Caveat: - ## The guest database is closed automatically when closing the `Aristo` - ## database. - CloseFn* = proc(flush: bool) {.gcsafe, raises: [].} ## Generic destructor for the `Aristo DB` backend. The argument `flush` @@ -119,7 +105,6 @@ type putLstFn*: PutLstFn ## Store saved state putEndFn*: PutEndFn ## Commit bulk store session - guestDbFn*: GuestDbFn ## Piggyback DB for another application closeFn*: CloseFn ## Generic destructor proc init*(trg: var BackendObj; src: BackendObj) = @@ -135,7 +120,6 @@ proc init*(trg: var BackendObj; src: BackendObj) = trg.putLstFn = src.putLstFn trg.putEndFn = src.putEndFn - trg.guestDbFn = src.guestDbFn trg.closeFn = src.closeFn # ------------------------------------------------------------------------------ diff --git a/nimbus/db/aristo/aristo_desc/desc_error.nim b/nimbus/db/aristo/aristo_desc/desc_error.nim index 1694aa665..5e9946902 100644 --- a/nimbus/db/aristo/aristo_desc/desc_error.nim +++ b/nimbus/db/aristo/aristo_desc/desc_error.nim @@ -249,9 +249,14 @@ type RdbBeDriverGetVtxError RdbBeDriverGuestError RdbBeDriverPutAdmError - RdbBeDriverPutVtxError RdbBeDriverPutKeyError + RdbBeDriverPutVtxError RdbBeDriverWriteError + RdbBeTypeUnsupported + RdbBeWrSessionUnfinished + RdbBeWrTriggerActiveAlready + RdbBeWrTriggerNilFn + RdbGuestInstanceAborted RdbGuestInstanceUnsupported RdbHashKeyExpected diff --git a/nimbus/db/aristo/aristo_init/init_common.nim b/nimbus/db/aristo/aristo_init/init_common.nim index d58df711c..49174e608 100644 --- a/nimbus/db/aristo/aristo_init/init_common.nim +++ b/nimbus/db/aristo/aristo_init/init_common.nim @@ -22,6 +22,7 @@ type BackendVoid = 0 ## For providing backend-less constructor BackendMemory BackendRocksDB + BackendRdbHosting ## Allowed piggybacking write session StorageType* = enum ## Storage types, key prefix diff --git a/nimbus/db/aristo/aristo_init/memory_db.nim b/nimbus/db/aristo/aristo_init/memory_db.nim index 7d34fe189..eda8bf458 100644 --- a/nimbus/db/aristo/aristo_init/memory_db.nim +++ b/nimbus/db/aristo/aristo_init/memory_db.nim @@ -126,8 +126,8 @@ proc getLstFn(db: MemBackendRef): GetLstFn = proc putBegFn(db: MemBackendRef): PutBegFn = result = - proc(): PutHdlRef = - db.newSession() + proc(): Result[PutHdlRef,AristoError] = + ok db.newSession() proc putVtxFn(db: MemBackendRef): PutVtxFn = @@ -215,11 +215,6 @@ proc putEndFn(db: MemBackendRef): PutEndFn = # ------------- -proc guestDbFn(db: MemBackendRef): GuestDbFn = - result = - proc(instance: int): Result[RootRef,AristoError] = - ok(RootRef nil) - proc closeFn(db: MemBackendRef): CloseFn = result = proc(ignore: bool) = @@ -246,7 +241,6 @@ proc memoryBackend*(): BackendRef = db.putLstFn = putLstFn db db.putEndFn = putEndFn db - db.guestDbFn = guestDbFn db db.closeFn = closeFn db db diff --git a/nimbus/db/aristo/aristo_init/memory_only.nim b/nimbus/db/aristo/aristo_init/memory_only.nim index d87165479..fda2e17f7 100644 --- a/nimbus/db/aristo/aristo_init/memory_only.nim +++ b/nimbus/db/aristo/aristo_init/memory_only.nim @@ -14,7 +14,6 @@ {.push raises: [].} import - results, ../aristo_desc, ../aristo_desc/desc_backend, "."/[init_common, memory_db] @@ -85,14 +84,6 @@ proc init*( ## Shortcut for `AristoDbRef.init(VoidBackendRef)` AristoDbRef.init VoidBackendRef -proc guestDb*(db: AristoDbRef; instance = 0): Result[GuestDbRef,AristoError] = - ## Database pigiback feature - if db.backend.isNil: - ok(GuestDbRef(nil)) - else: - let gdb = db.backend.guestDbFn(instance).valueOr: - return err(error) - ok(gdb.GuestDbRef) proc finish*(db: AristoDbRef; flush = false) = ## Backend destructor. The argument `flush` indicates that a full database diff --git a/nimbus/db/aristo/aristo_init/persistent.nim b/nimbus/db/aristo/aristo_init/persistent.nim index c01ffddab..02c384713 100644 --- a/nimbus/db/aristo/aristo_init/persistent.nim +++ b/nimbus/db/aristo/aristo_init/persistent.nim @@ -21,13 +21,15 @@ import results, rocksdb, + ../../opts, ../aristo_desc, ./rocks_db/rdb_desc, - "."/[rocks_db, memory_only], - ../../opts + "."/[rocks_db, memory_only] export + AristoDbRef, RdbBackendRef, + RdbWriteEventCb, memory_only # ------------------------------------------------------------------------------ @@ -56,26 +58,68 @@ proc newAristoRdbDbRef( # Public database constuctors, destructor # ------------------------------------------------------------------------------ -proc init*[W: RdbBackendRef]( +proc init*( T: type AristoDbRef; - B: type W; + B: type RdbBackendRef; basePath: string; opts: DbOptions ): Result[T, AristoError] = ## Generic constructor, `basePath` argument is ignored for memory backend ## databases (which also unconditionally succeed initialising.) ## - when B is RdbBackendRef: - basePath.newAristoRdbDbRef opts + basePath.newAristoRdbDbRef opts -proc getRocksDbFamily*( - gdb: GuestDbRef; - instance = 0; - ): Result[ColFamilyReadWrite,void] = - ## Database pigiback feature - if not gdb.isNil and gdb.beKind == BackendRocksDB: - return ok RdbGuestDbRef(gdb).guestDb - err() +proc reinit*( + db: AristoDbRef; + cfs: openArray[ColFamilyDescriptor]; + ): Result[seq[ColFamilyReadWrite],AristoError] = + ## Re-initialise the `RocksDb` backend database with additional or changed + ## column family settings. This can be used to make space for guest use of + ## the backend used by `Aristo`. The function returns a list of column family + ## descriptors in the same order as the `cfs` argument. + ## + ## The argument `cfs` list replaces and extends the CFs already on disk by + ## its options except for the ones defined for use with `Aristo`. + ## + ## Even though tx layers and filters might not be affected by this function, + ## it is prudent to have them clean and saved on the backend database before + ## changing it. On error conditions, data might get lost. + ## + case db.backend.kind: + of BackendRocksDB: + db.backend.rocksDbUpdateCfs cfs + of BackendRdbHosting: + err(RdbBeWrTriggerActiveAlready) + else: + return err(RdbBeTypeUnsupported) + +proc activateWrTrigger*( + db: AristoDbRef; + hdl: RdbWriteEventCb; + ): Result[void,AristoError] = + ## This function allows to link an application to the `Aristo` storage event + ## for the `RocksDb` backend via call back argument function `hdl`. + ## + ## The argument handler `hdl` of type + ## :: + ## proc(session: WriteBatchRef): bool + ## + ## will be invoked when a write batch for the `Aristo` database is opened in + ## order to save current changes to the backend. The `session` argument passed + ## to the handler in conjunction with a list of `ColFamilyReadWrite` items + ## (as returned from `reinit()`) might be used to store additional items + ## to the database with the same write batch. + ## + ## If the handler returns `true` upon return from running, the write batch + ## will proceed saving. Otherwise it is aborted and no data are saved at all. + ## + case db.backend.kind: + of BackendRocksDB: + db.backend.rocksDbSetEventTrigger hdl + of BackendRdbHosting: + err(RdbBeWrTriggerActiveAlready) + else: + err(RdbBeTypeUnsupported) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/aristo/aristo_init/rocks_db.nim b/nimbus/db/aristo/aristo_init/rocks_db.nim index 8de237f1b..e7102abaf 100644 --- a/nimbus/db/aristo/aristo_init/rocks_db.nim +++ b/nimbus/db/aristo/aristo_init/rocks_db.nim @@ -137,9 +137,9 @@ proc getLstFn(db: RdbBackendRef): GetLstFn = proc putBegFn(db: RdbBackendRef): PutBegFn = result = - proc(): PutHdlRef = + proc(): Result[PutHdlRef,AristoError] = db.rdb.begin() - db.newSession() + ok db.newSession() proc putVtxFn(db: RdbBackendRef): PutVtxFn = result = @@ -212,7 +212,7 @@ proc putEndFn(db: RdbBackendRef): PutEndFn = of AdmPfx: trace logTxt "putEndFn: admin failed", pfx=AdmPfx, aid=hdl.error.aid.uint64, error=hdl.error.code of Oops: trace logTxt "putEndFn: oops", - error=hdl.error.code + pfx=hdl.error.pfx, error=hdl.error.code db.rdb.rollback() return err(hdl.error.code) @@ -223,20 +223,27 @@ proc putEndFn(db: RdbBackendRef): PutEndFn = return err(error[0]) ok() -proc guestDbFn(db: RdbBackendRef): GuestDbFn = - result = - proc(instance: int): Result[RootRef,AristoError] = - let gdb = db.rdb.initGuestDb(instance).valueOr: - when extraTraceMessages: - trace logTxt "guestDbFn", error=error[0], info=error[1] - return err(error[0]) - ok gdb - proc closeFn(db: RdbBackendRef): CloseFn = result = proc(flush: bool) = db.rdb.destroy(flush) +# ------------------------------------------------------------------------------ +# Private functions: hosting interface changes +# ------------------------------------------------------------------------------ + +proc putBegHostingFn(db: RdbBackendRef): PutBegFn = + result = + proc(): Result[PutHdlRef,AristoError] = + db.rdb.begin() + if db.rdb.trgWriteEvent(db.rdb.session): + ok db.newSession() + else: + when extraTraceMessages: + trace logTxt "putBegFn: guest trigger aborted session" + db.rdb.rollback() + err(RdbGuestInstanceAborted) + # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ @@ -269,10 +276,37 @@ proc rocksDbBackend*( db.putLstFn = putLstFn db db.putEndFn = putEndFn db - db.guestDbFn = guestDbFn db db.closeFn = closeFn db ok db + +proc rocksDbUpdateCfs*( + be: BackendRef; + cfs: openArray[ColFamilyDescriptor]; + ): Result[seq[ColFamilyReadWrite],AristoError] = + ## Reopen with extended column families given as argument. + let + db = RdbBackendRef(be) + rCfs = db.rdb.reinit(cfs).valueOr: + return err(error[0]) + ok rCfs + + +proc rocksDbSetEventTrigger*( + be: BackendRef; + hdl: RdbWriteEventCb; + ): Result[void,AristoError] = + ## Store event trigger. This also changes the backend type. + if hdl.isNil: + err(RdbBeWrTriggerNilFn) + else: + let db = RdbBackendRef(be) + db.rdb.trgWriteEvent = hdl + db.beKind = BackendRdbHosting + db.putBegFn = putBegHostingFn db + ok() + + proc dup*(db: RdbBackendRef): RdbBackendRef = ## Duplicate descriptor shell as needed for API debugging new result diff --git a/nimbus/db/aristo/aristo_init/rocks_db/rdb_desc.nim b/nimbus/db/aristo/aristo_init/rocks_db/rdb_desc.nim index d04b3399c..723ffeae3 100644 --- a/nimbus/db/aristo/aristo_init/rocks_db/rdb_desc.nim +++ b/nimbus/db/aristo/aristo_init/rocks_db/rdb_desc.nim @@ -18,10 +18,22 @@ import eth/common, rocksdb, stew/[endians2, keyed_queue], + ../../../opts, ../../aristo_desc, ../init_common type + RdbWriteEventCb* = + proc(session: WriteBatchRef): bool {.gcsafe, raises: [].} + ## Call back closure function that passes the the write session handle + ## to a guest peer right after it was opened. The guest may store any + ## data on its own column family and return `true` if that worked + ## all right. Then the `Aristo` handler will stor its own columns and + ## finalise the write session. + ## + ## In case of an error when `false` is returned, `Aristo` will abort the + ## write session and return a session error. + RdbInst* = object admCol*: ColFamilyReadWrite ## Admin column family handler vtxCol*: ColFamilyReadWrite ## Vertex column family handler @@ -29,26 +41,18 @@ type session*: WriteBatchRef ## For batched `put()` rdKeyLru*: KeyedQueue[VertexID,HashKey] ## Read cache rdVtxLru*: KeyedQueue[VertexID,VertexRef] ## Read cache + basePath*: string ## Database directory - noFq*: bool ## No filter queues available + opts*: DbOptions ## Just a copy here for re-opening + trgWriteEvent*: RdbWriteEventCb ## Database piggiback call back handler - # Alien interface - RdbGuest* = enum - ## The guest CF was worth a try, but there are better solutions and this - ## item will be removed in future. - GuestFamily0 = "Guest0" ## Guest family (e.g. for Kvt) - GuestFamily1 = "Guest1" ## Ditto - GuestFamily2 = "Guest2" ## Ditto - - RdbGuestDbRef* = ref object of GuestDbRef - ## The guest CF was worth a try, but there are better solutions and this - ## item will be removed in future. - guestDb*: ColFamilyReadWrite ## Pigiback feature references + AristoCFs* = enum + ## Column family symbols/handles and names used on the database + AdmCF = "AriAdm" ## Admin column family name + VtxCF = "AriVtx" ## Vertex column family name + KeyCF = "AriKey" ## Hash key column family name const - AdmCF* = "AdmAri" ## Admin column family name - VtxCF* = "VtxAri" ## Vertex column family name - KeyCF* = "KeyAri" ## Hash key column family name BaseFolder* = "nimbus" ## Same as for Legacy DB DataFolder* = "aristo" ## Legacy DB has "data" RdKeyLruMaxSize* = 4096 ## Max size of read cache for keys diff --git a/nimbus/db/aristo/aristo_init/rocks_db/rdb_init.nim b/nimbus/db/aristo/aristo_init/rocks_db/rdb_init.nim index 40306c592..34c71ee66 100644 --- a/nimbus/db/aristo/aristo_init/rocks_db/rdb_init.nim +++ b/nimbus/db/aristo/aristo_init/rocks_db/rdb_init.nim @@ -14,7 +14,7 @@ {.push raises: [].} import - std/[sequtils, os], + std/[sets, sequtils, os], rocksdb, results, ../../aristo_desc, @@ -22,27 +22,12 @@ import ../../../opts # ------------------------------------------------------------------------------ -# Public constructor +# Private constructor # ------------------------------------------------------------------------------ -proc init*( - rdb: var RdbInst; - basePath: string; +proc getInitOptions( opts: DbOptions; - ): Result[void,(AristoError,string)] = - ## Constructor c ode inspired by `RocksStoreRef.init()` from - ## kvstore_rocksdb.nim - const initFailed = "RocksDB/init() failed" - - rdb.basePath = basePath - - let - dataDir = rdb.dataDir - try: - dataDir.createDir - except OSError, IOError: - return err((RdbBeCantCreateDataDir, "")) - + ): tuple[cfOpts: ColFamilyOptionsRef, dbOpts: DbOptionsRef] = # TODO the configuration options below have not been tuned but are rather # based on gut feeling, guesses and by looking at other clients - it # would make sense to test different settings and combinations once the @@ -86,13 +71,7 @@ proc init*( # https://github.com/facebook/rocksdb/wiki/Compression cfOpts.setBottommostCompression(Compression.lz4Compression) - let - cfs = @[initColFamilyDescriptor(AdmCF, cfOpts), - initColFamilyDescriptor(VtxCF, cfOpts), - initColFamilyDescriptor(KeyCF, cfOpts)] & - RdbGuest.mapIt(initColFamilyDescriptor($it, cfOpts)) - dbOpts = defaultDbOptions() - + let dbOpts = defaultDbOptions() dbOpts.setMaxOpenFiles(opts.maxOpenFiles) dbOpts.setMaxBytesForLevelBase(opts.writeBufferSize) @@ -110,6 +89,7 @@ proc init*( # https://github.com/facebook/rocksdb/blob/af50823069818fc127438e39fef91d2486d6e76c/include/rocksdb/advanced_options.h#L696 dbOpts.setOptimizeFiltersForHits(true) + let tableOpts = defaultTableOptions() # This bloom filter helps avoid having to read multiple SST files when looking # for a value. @@ -141,39 +121,114 @@ proc init*( dbOpts.setBlockBasedTableFactory(tableOpts) - # Reserve a family corner for `Aristo` on the database - let baseDb = openRocksDb(dataDir, dbOpts, columnFamilies = cfs).valueOr: + (cfOpts,dbOpts) + + +proc initImpl( + rdb: var RdbInst; + basePath: string; + opts: DbOptions; + guestCFs: openArray[ColFamilyDescriptor] = []; + ): Result[void,(AristoError,string)] = + ## Database backend constructor + const initFailed = "RocksDB/init() failed" + + rdb.basePath = basePath + rdb.opts = opts + + let + dataDir = rdb.dataDir + try: + dataDir.createDir + except OSError, IOError: + return err((RdbBeCantCreateDataDir, "")) + + # Expand argument `opts` to rocksdb options + let (cfOpts, dbOpts) = opts.getInitOptions() + + # Column familiy names to allocate when opening the database. This list + # might be extended below. + var useCFs = AristoCFs.mapIt($it).toHashSet + + # The `guestCFs` list must not overwrite `AristoCFs` options + let guestCFs = guestCFs.filterIt(it.name notin useCFs) + + # If the database exists already, check for missing column families and + # allocate them for opening. Otherwise rocksdb might reject the peristent + # database. + if (dataDir / "CURRENT").fileExists: + let hdCFs = dataDir.listColumnFamilies.valueOr: + raiseAssert initFailed & " cannot read existing CFs: " & error + # Update list of column families for opener. + useCFs = useCFs + hdCFs.toHashSet + + # The `guestCFs` list might come with a different set of options. So it is + # temporarily removed from `useCFs` and will be re-added with appropriate + # options. + let guestCFq = @guestCFs + useCFs = useCFs - guestCFs.mapIt(it.name).toHashSet + + # Finalise list of column families + let cfs = useCFs.toSeq.mapIt(it.initColFamilyDescriptor cfOpts) & guestCFq + + # Open database for the extended family :) + let baseDb = openRocksDb(dataDir, dbOpts, columnFamilies=cfs).valueOr: raiseAssert initFailed & " cannot create base descriptor: " & error # Initialise column handlers (this stores implicitely `baseDb`) - rdb.admCol = baseDb.withColFamily(AdmCF).valueOr: + rdb.admCol = baseDb.withColFamily($AdmCF).valueOr: raiseAssert initFailed & " cannot initialise AdmCF descriptor: " & error - rdb.vtxCol = baseDb.withColFamily(VtxCF).valueOr: + rdb.vtxCol = baseDb.withColFamily($VtxCF).valueOr: raiseAssert initFailed & " cannot initialise VtxCF descriptor: " & error - rdb.keyCol = baseDb.withColFamily(KeyCF).valueOr: + rdb.keyCol = baseDb.withColFamily($KeyCF).valueOr: raiseAssert initFailed & " cannot initialise KeyCF descriptor: " & error ok() -proc initGuestDb*( - rdb: RdbInst; - instance: int; - ): Result[RootRef,(AristoError,string)] = - ## Initialise `Guest` family - ## - ## Thus was a worth a try, but there are better solutions and this item - ## will be removed in future. - ## - if high(RdbGuest).ord < instance: - return err((RdbGuestInstanceUnsupported,"")) - let - guestSym = $RdbGuest(instance) - guestDb = rdb.baseDb.withColFamily(guestSym).valueOr: - raiseAssert "RocksDb/initGuestDb() failed: " & error +# ------------------------------------------------------------------------------ +# Public constructor +# ------------------------------------------------------------------------------ - ok RdbGuestDbRef( - beKind: BackendRocksDB, - guestDb: guestDb) +proc init*( + rdb: var RdbInst; + basePath: string; + opts: DbOptions; + ): Result[void,(AristoError,string)] = + ## Temporarily define a guest CF list here. + rdb.initImpl(basePath, opts) + +proc reinit*( + rdb: var RdbInst; + cfs: openArray[ColFamilyDescriptor]; + ): Result[seq[ColFamilyReadWrite],(AristoError,string)] = + ## Re-open database with changed parameters. Even though tx layers and + ## filters might not be affected it is prudent to have them clean and + ## saved on the backend database before changing it. + ## + ## The function returns a list of column family descriptors in the same + ## order as the `cfs` argument. + ## + ## The `cfs` list replaces and extends the CFs already on disk by its + ## options except for the ones defined with `AristoCFs`. + ## + const initFailed = "RocksDB/reinit() failed" + + if not rdb.session.isNil: + return err((RdbBeWrSessionUnfinished,"")) + if not rdb.baseDb.isClosed(): + rdb.baseDb.close() + + rdb.initImpl(rdb.basePath, rdb.opts, cfs).isOkOr: + return err(error) + + # Assemble list of column family descriptors + var guestCols = newSeq[ColFamilyReadWrite](cfs.len) + for n,col in cfs: + guestCols[n] = rdb.baseDb.withColFamily(col.name).valueOr: + raiseAssert initFailed & " cannot initialise " & + col.name & " descriptor: " & error + + ok guestCols proc destroy*(rdb: var RdbInst; flush: bool) = diff --git a/nimbus/db/aristo/aristo_init/rocks_db/rdb_put.nim b/nimbus/db/aristo/aristo_init/rocks_db/rdb_put.nim index 46fa91e50..02c77286b 100644 --- a/nimbus/db/aristo/aristo_init/rocks_db/rdb_put.nim +++ b/nimbus/db/aristo/aristo_init/rocks_db/rdb_put.nim @@ -72,13 +72,13 @@ proc putAdm*( ): Result[void,(AdminTabID,AristoError,string)] = let dsc = rdb.session if data.len == 0: - dsc.delete(xid.toOpenArray, AdmCF).isOkOr: + dsc.delete(xid.toOpenArray, $AdmCF).isOkOr: const errSym = RdbBeDriverDelAdmError when extraTraceMessages: trace logTxt "putAdm()", xid, error=errSym, info=error return err((xid,errSym,error)) else: - dsc.put(xid.toOpenArray, data, AdmCF).isOkOr: + dsc.put(xid.toOpenArray, data, $AdmCF).isOkOr: const errSym = RdbBeDriverPutAdmError when extraTraceMessages: trace logTxt "putAdm()", xid, error=errSym, info=error @@ -94,7 +94,7 @@ proc putKey*( for (vid,key) in data: if key.isValid: - dsc.put(vid.toOpenArray, key.data, KeyCF).isOkOr: + dsc.put(vid.toOpenArray, key.data, $KeyCF).isOkOr: # Caller must `rollback()` which will flush the `rdKeyLru` cache const errSym = RdbBeDriverPutKeyError when extraTraceMessages: @@ -106,7 +106,7 @@ proc putKey*( discard rdb.rdKeyLru.lruAppend(vid, key, RdKeyLruMaxSize) else: - dsc.delete(vid.toOpenArray, KeyCF).isOkOr: + dsc.delete(vid.toOpenArray, $KeyCF).isOkOr: # Caller must `rollback()` which will flush the `rdKeyLru` cache const errSym = RdbBeDriverDelKeyError when extraTraceMessages: @@ -132,7 +132,7 @@ proc putVtx*( # Caller must `rollback()` which will flush the `rdVtxLru` cache return err((vid,rc.error,"")) - dsc.put(vid.toOpenArray, rc.value, VtxCF).isOkOr: + dsc.put(vid.toOpenArray, rc.value, $VtxCF).isOkOr: # Caller must `rollback()` which will flush the `rdVtxLru` cache const errSym = RdbBeDriverPutVtxError when extraTraceMessages: @@ -144,7 +144,7 @@ proc putVtx*( discard rdb.rdVtxLru.lruAppend(vid, vtx, RdVtxLruMaxSize) else: - dsc.delete(vid.toOpenArray, VtxCF).isOkOr: + dsc.delete(vid.toOpenArray, $VtxCF).isOkOr: # Caller must `rollback()` which will flush the `rdVtxLru` cache const errSym = RdbBeDriverDelVtxError when extraTraceMessages: diff --git a/nimbus/db/core_db/backend/aristo_db/handlers_aristo.nim b/nimbus/db/core_db/backend/aristo_db/handlers_aristo.nim index 072899deb..72a157514 100644 --- a/nimbus/db/core_db/backend/aristo_db/handlers_aristo.nim +++ b/nimbus/db/core_db/backend/aristo_db/handlers_aristo.nim @@ -679,7 +679,8 @@ proc swapCtx*(base: AristoBaseRef; ctx: CoreDbCtxRef): CoreDbCtxRef = # Set read-write access and install base.ctx = AristoCoreDbCtxRef(ctx) - base.api.reCentre(base.ctx.mpt) + base.api.reCentre(base.ctx.mpt).isOkOr: + raiseAssert "swapCtx() failed: " & $error proc persistent*( diff --git a/nimbus/db/core_db/backend/aristo_db/handlers_kvt.nim b/nimbus/db/core_db/backend/aristo_db/handlers_kvt.nim index b7e8a2fb1..6b7d00347 100644 --- a/nimbus/db/core_db/backend/aristo_db/handlers_kvt.nim +++ b/nimbus/db/core_db/backend/aristo_db/handlers_kvt.nim @@ -204,10 +204,13 @@ proc persistent*( rc = api.persist(kvt) if rc.isOk: ok() - elif api.level(kvt) == 0: - err(rc.error.toError(base, info)) - else: + elif api.level(kvt) != 0: err(rc.error.toError(base, info, TxPending)) + elif rc.error == TxPersistDelayed: + # This is OK: Piggybacking on `Aristo` backend + ok() + else: + err(rc.error.toError(base, info)) # ------------------------------------------------------------------------------ # Public constructors and related diff --git a/nimbus/db/core_db/backend/aristo_rocksdb.nim b/nimbus/db/core_db/backend/aristo_rocksdb.nim index e99b8c029..89b391092 100644 --- a/nimbus/db/core_db/backend/aristo_rocksdb.nim +++ b/nimbus/db/core_db/backend/aristo_rocksdb.nim @@ -39,10 +39,22 @@ const # ------------------------------------------------------------------------------ proc newAristoRocksDbCoreDbRef*(path: string, opts: DbOptions): CoreDbRef = + ## This funcion piggybacks the `KVT` on the `Aristo` backend. let - adb = AristoDbRef.init(use_ari.RdbBackendRef, path, opts).expect aristoFail - gdb = adb.guestDb().valueOr: GuestDbRef(nil) - kdb = KvtDbRef.init(use_kvt.RdbBackendRef, path, gdb).expect kvtFail + adb = AristoDbRef.init(use_ari.RdbBackendRef, path, opts).valueOr: + raiseAssert aristoFail & ": " & $error + kdb = KvtDbRef.init(use_kvt.RdbBackendRef, adb, opts).valueOr: + raiseAssert kvtFail & ": " & $error + AristoDbRocks.create(kdb, adb) + +proc newAristoDualRocksDbCoreDbRef*(path: string, opts: DbOptions): CoreDbRef = + ## This is mainly for debugging. The KVT is run on a completely separate + ## database backend. + let + adb = AristoDbRef.init(use_ari.RdbBackendRef, path, opts).valueOr: + raiseAssert aristoFail & ": " & $error + kdb = KvtDbRef.init(use_kvt.RdbBackendRef, path, opts).valueOr: + raiseAssert kvtFail & ": " & $error AristoDbRocks.create(kdb, adb) # ------------------------------------------------------------------------------ diff --git a/nimbus/db/core_db/base.nim b/nimbus/db/core_db/base.nim index f891cbaab..a521c76b9 100644 --- a/nimbus/db/core_db/base.nim +++ b/nimbus/db/core_db/base.nim @@ -820,7 +820,7 @@ proc level*(db: CoreDbRef): int = proc persistent*( db: CoreDbRef; - ): CoreDbRc[void] {.discardable.} = + ): CoreDbRc[void] = ## For the legacy database, this function has no effect and succeeds always. ## It will nevertheless return a discardable error if there is a pending ## transaction (i.e. `db.level() == 0`.) diff --git a/nimbus/db/core_db/base/base_desc.nim b/nimbus/db/core_db/base/base_desc.nim index 18609b1a6..cc4381513 100644 --- a/nimbus/db/core_db/base/base_desc.nim +++ b/nimbus/db/core_db/base/base_desc.nim @@ -28,6 +28,7 @@ type Ooops AristoDbMemory ## Memory backend emulator AristoDbRocks ## RocksDB backend + AristoDbDualRocks ## Dual RocksDB backends for `Kvt` and `Aristo` AristoDbVoid ## No backend const diff --git a/nimbus/db/core_db/persistent.nim b/nimbus/db/core_db/persistent.nim index 72915ad4b..9700154af 100644 --- a/nimbus/db/core_db/persistent.nim +++ b/nimbus/db/core_db/persistent.nim @@ -39,11 +39,20 @@ proc newCoreDbRef*( ): CoreDbRef = ## Constructor for persistent type DB ## - ## Note: Using legacy notation `newCoreDbRef()` rather than - ## `CoreDbRef.init()` because of compiler coughing. + ## The production database type is `AristoDbRocks` which uses a single + ## `RocksDb` backend for both, `Aristo` and `KVT`. + ## + ## For debugging, there is the `AristoDbDualRocks` database with split + ## backends for `Aristo` and `KVT`. This database is not compatible with + ## `AristoDbRocks` so it cannot be reliably switched between both versions + ## with consecutive sessions. + ## when dbType == AristoDbRocks: newAristoRocksDbCoreDbRef path, opts + elif dbType == AristoDbDualRocks: + newAristoDualRocksDbCoreDbRef path, opts + else: {.error: "Unsupported dbType for persistent newCoreDbRef()".} diff --git a/nimbus/db/kvt/kvt_api.nim b/nimbus/db/kvt/kvt_api.nim index a53a44b7b..9eeec0bf7 100644 --- a/nimbus/db/kvt/kvt_api.nim +++ b/nimbus/db/kvt/kvt_api.nim @@ -58,7 +58,7 @@ type KvtApiNForkedFn* = proc(db: KvtDbRef): int {.noRaise.} KvtApiPutFn* = proc(db: KvtDbRef, key, data: openArray[byte]): Result[void,KvtError] {.noRaise.} - KvtApiReCentreFn* = proc(db: KvtDbRef) {.noRaise.} + KvtApiReCentreFn* = proc(db: KvtDbRef): Result[void,KvtError] {.noRaise.} KvtApiRollbackFn* = proc(tx: KvtTxRef): Result[void,KvtError] {.noRaise.} KvtApiPersistFn* = proc(db: KvtDbRef): Result[void,KvtError] {.noRaise.} KvtApiToKvtDbRefFn* = proc(tx: KvtTxRef): KvtDbRef {.noRaise.} @@ -156,7 +156,7 @@ proc dup(be: BackendRef): BackendRef = of BackendMemory: return MemBackendRef(be).dup - of BackendRocksDB: + of BackendRocksDB, BackendRdbTriggered: when KvtPersistentBackendOk: return RdbBackendRef(be).dup @@ -306,9 +306,9 @@ func init*( result = api.put(a, b, c) profApi.reCentre = - proc(a: KvtDbRef) = + proc(a: KvtDbRef): auto = KvtApiProfReCentreFn.profileRunner: - api.reCentre(a) + result = api.reCentre(a) profApi.rollback = proc(a: KvtTxRef): auto = diff --git a/nimbus/db/kvt/kvt_filter.nim b/nimbus/db/kvt/kvt_delta.nim similarity index 75% rename from nimbus/db/kvt/kvt_filter.nim rename to nimbus/db/kvt/kvt_delta.nim index 022d2bf46..2ee0feb8f 100644 --- a/nimbus/db/kvt/kvt_filter.nim +++ b/nimbus/db/kvt/kvt_delta.nim @@ -17,29 +17,29 @@ import results, ./kvt_desc, ./kvt_desc/desc_backend, - ./kvt_filter/[filter_merge, filter_reverse] + ./kvt_delta/[delta_merge, delta_reverse] # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ -proc filterMerge*( +proc deltaMerge*( db: KvtDbRef; # Database - filter: LayerDeltaRef; # Filter to apply to database + delta: LayerDeltaRef; # Filter to apply to database ) = - ## Merge the argument `filter` into the read-only filter layer. Note that + ## Merge the argument `delta` into the balancer filter layer. Note that ## this function has no control of the filter source. Having merged the - ## argument `filter`, all the `top` and `stack` layers should be cleared. + ## argument `delta`, all the `top` and `stack` layers should be cleared. ## - db.merge(filter, db.roFilter) + db.merge(delta, db.balancer) -proc filterUpdateOk*(db: KvtDbRef): bool = - ## Check whether the read-only filter can be merged into the backend +proc deltaUpdateOk*(db: KvtDbRef): bool = + ## Check whether the balancer filter can be merged into the backend not db.backend.isNil and db.isCentre -proc filterUpdate*( +proc deltaUpdate*( db: KvtDbRef; # Database reCentreOk = false; ): Result[void,KvtError] = @@ -58,7 +58,7 @@ proc filterUpdate*( return err(FilBackendMissing) # Blind or missing filter - if db.roFilter.isNil: + if db.balancer.isNil: return ok() # Make sure that the argument `db` is at the centre so the backend is in @@ -67,21 +67,21 @@ proc filterUpdate*( if db != parent: if not reCentreOk: return err(FilBackendRoMode) - db.reCentre + ? db.reCentre() # Always re-centre to `parent` (in case `reCentreOk` was set) - defer: parent.reCentre + defer: discard parent.reCentre() # Store structural single trie entries - let writeBatch = be.putBegFn() - be.putKvpFn(writeBatch, db.roFilter.sTab.pairs.toSeq) + let writeBatch = ? be.putBegFn() + be.putKvpFn(writeBatch, db.balancer.sTab.pairs.toSeq) ? be.putEndFn writeBatch # Update peer filter balance. - let rev = db.filterReverse db.roFilter + let rev = db.deltaReverse db.balancer for w in db.forked: - db.merge(rev, w.roFilter) + db.merge(rev, w.balancer) - db.roFilter = LayerDeltaRef(nil) + db.balancer = LayerDeltaRef(nil) ok() # ------------------------------------------------------------------------------ diff --git a/nimbus/db/kvt/kvt_filter/filter_merge.nim b/nimbus/db/kvt/kvt_delta/delta_merge.nim similarity index 100% rename from nimbus/db/kvt/kvt_filter/filter_merge.nim rename to nimbus/db/kvt/kvt_delta/delta_merge.nim diff --git a/nimbus/db/kvt/kvt_filter/filter_reverse.nim b/nimbus/db/kvt/kvt_delta/delta_reverse.nim similarity index 75% rename from nimbus/db/kvt/kvt_filter/filter_reverse.nim rename to nimbus/db/kvt/kvt_delta/delta_reverse.nim index 6b19c70b4..e3aca6ec6 100644 --- a/nimbus/db/kvt/kvt_filter/filter_reverse.nim +++ b/nimbus/db/kvt/kvt_delta/delta_reverse.nim @@ -17,21 +17,21 @@ import # Public functions # ------------------------------------------------------------------------------ -proc filterReverse*( +proc deltaReverse*( db: KvtDbRef; # Database - filter: LayerDeltaRef; # Filter to revert + delta: LayerDeltaRef; # Filter to revert ): LayerDeltaRef = - ## Assemble reverse filter for the `filter` argument, i.e. changes to the - ## backend that reverse the effect of applying the this read-only filter. + ## Assemble a reverse filter for the `delta` argument, i.e. changes to the + ## backend that reverse the effect of applying this to the balancer filter. ## The resulting filter is calculated against the current *unfiltered* - ## backend (excluding optionally installed read-only filter.) + ## backend (excluding optionally installed balancer filters.) ## - ## If `filter` is `nil`, the result will be `nil` as well. - if not filter.isNil: + ## If `delta` is `nil`, the result will be `nil` as well. + if not delta.isNil: result = LayerDeltaRef() # Calculate reverse changes for the `sTab[]` structural table - for key in filter.sTab.keys: + for key in delta.sTab.keys: let rc = db.getUbe key if rc.isOk: result.sTab[key] = rc.value diff --git a/nimbus/db/kvt/kvt_desc.nim b/nimbus/db/kvt/kvt_desc.nim index 424e31a8f..b65cb545d 100644 --- a/nimbus/db/kvt/kvt_desc.nim +++ b/nimbus/db/kvt/kvt_desc.nim @@ -42,12 +42,11 @@ type centre: KvtDbRef ## Link to peer with write permission peers: HashSet[KvtDbRef] ## List of all peers - KvtDbRef* = ref KvtDbObj - KvtDbObj* = object + KvtDbRef* = ref object of RootRef ## Three tier database object supporting distributed instances. top*: LayerRef ## Database working layer, mutable stack*: seq[LayerRef] ## Stashed immutable parent layers - roFilter*: LayerDeltaRef ## Apply read filter (locks writing) + balancer*: LayerDeltaRef ## Apply read filter (locks writing) backend*: BackendRef ## Backend database (may well be `nil`) txRef*: KvtTxRef ## Latest active transaction @@ -62,6 +61,17 @@ type KvtDbAction* = proc(db: KvtDbRef) {.gcsafe, raises: [].} ## Generic call back function/closure. +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +proc canMod(db: KvtDbRef): Result[void,KvtError] = + ## Ask for permission before doing nasty stuff + if db.backend.isNil: + ok() + else: + db.backend.canModFn() + # ------------------------------------------------------------------------------ # Public helpers # ------------------------------------------------------------------------------ @@ -97,7 +107,7 @@ func getCentre*(db: KvtDbRef): KvtDbRef = ## if db.dudes.isNil: db else: db.dudes.centre -proc reCentre*(db: KvtDbRef) = +proc reCentre*(db: KvtDbRef): Result[void,KvtError] = ## Re-focus the `db` argument descriptor so that it becomes the centre. ## Nothing is done if the `db` descriptor is the centre, already. ## @@ -111,8 +121,10 @@ proc reCentre*(db: KvtDbRef) = ## accessing the same backend database. Descriptors where `isCentre()` ## returns `false` must be single destructed with `forget()`. ## - if not db.dudes.isNil: + if not db.dudes.isNil and db.dudes.centre != db: + ? db.canMod() db.dudes.centre = db + ok() proc fork*( db: KvtDbRef; @@ -140,7 +152,7 @@ proc fork*( dudes: db.dudes) if not noFilter: - clone.roFilter = db.roFilter # Ref is ok here (filters are immutable) + clone.balancer = db.balancer # Ref is ok here (filters are immutable) if not noTopLayer: clone.top = LayerRef.init() @@ -177,6 +189,7 @@ proc forget*(db: KvtDbRef): Result[void,KvtError] = elif db notin db.dudes.peers: err(StaleDescriptor) else: + ? db.canMod() db.dudes.peers.excl db # Unlink argument `db` from peers list ok() @@ -187,7 +200,7 @@ proc forgetOthers*(db: KvtDbRef): Result[void,KvtError] = if not db.dudes.isNil: if db.dudes.centre != db: return err(MustBeOnCentre) - + ? db.canMod() db.dudes = DudesRef(nil) ok() diff --git a/nimbus/db/kvt/kvt_desc/desc_backend.nim b/nimbus/db/kvt/kvt_desc/desc_backend.nim index cbd352e72..4b01f89ff 100644 --- a/nimbus/db/kvt/kvt_desc/desc_backend.nim +++ b/nimbus/db/kvt/kvt_desc/desc_backend.nim @@ -33,7 +33,7 @@ type ## by any library function using the backend. PutBegFn* = - proc(): PutHdlRef {.gcsafe, raises: [].} + proc(): Result[PutHdlRef,KvtError] {.gcsafe, raises: [].} ## Generic transaction initialisation function PutKvpFn* = @@ -53,6 +53,21 @@ type ## `false` the outcome might differ depending on the type of backend ## (e.g. in-memory backends would flush on close.) + CanModFn* = + proc(): Result[void,KvtError] {.gcsafe, raises: [].} + ## This function returns OK if there is nothing to prevent the main + ## `KVT` descriptors being modified (e.g. by `reCentre()`) or by + ## adding/removing a new peer (e.g. by `fork()` or `forget()`.) + + SetWrReqFn* = + proc(db: RootRef): Result[void,KvtError] {.gcsafe, raises: [].} + ## This function stores a request function for the piggiback mode + ## writing to the `Aristo` set of column families. + ## + ## If used at all, this function would run `rocks_db.setWrReqTriggeredFn()()` + ## with a `KvtDbRef` type argument for `db`. This allows to run the `Kvt` + ## without linking to the rocksdb interface unless it is really needed. + # ------------- BackendRef* = ref BackendObj @@ -66,6 +81,9 @@ type putEndFn*: PutEndFn ## Commit bulk store session closeFn*: CloseFn ## Generic destructor + canModFn*: CanModFn ## Lock-alike + + setWrReqFn*: SetWrReqFn ## Register main descr for write request proc init*(trg: var BackendObj; src: BackendObj) = trg.getKvpFn = src.getKvpFn @@ -73,6 +91,7 @@ proc init*(trg: var BackendObj; src: BackendObj) = trg.putKvpFn = src.putKvpFn trg.putEndFn = src.putEndFn trg.closeFn = src.closeFn + trg.canModFn = src.canModFn # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/kvt/kvt_desc/desc_error.nim b/nimbus/db/kvt/kvt_desc/desc_error.nim index 0f66219fa..120b4d8ed 100644 --- a/nimbus/db/kvt/kvt_desc/desc_error.nim +++ b/nimbus/db/kvt/kvt_desc/desc_error.nim @@ -19,11 +19,16 @@ type # RocksDB backend RdbBeCantCreateDataDir + RdbBeDelayedAlreadyRegistered + RdbBeDelayedLocked + RdbBeDelayedNotReady RdbBeDriverDelError RdbBeDriverGetError RdbBeDriverInitError RdbBeDriverPutError RdbBeDriverWriteError + RdbBeHostError + RdbBeHostNotApplicable # Transaction wrappers TxArgStaleTx @@ -33,6 +38,7 @@ type TxNoPendingTx TxNotTopTx TxPendingTx + TxPersistDelayed TxStackGarbled TxStackUnderflow diff --git a/nimbus/db/kvt/kvt_init/init_common.nim b/nimbus/db/kvt/kvt_init/init_common.nim index 3ba5386e0..7a6ecaf75 100644 --- a/nimbus/db/kvt/kvt_init/init_common.nim +++ b/nimbus/db/kvt/kvt_init/init_common.nim @@ -11,18 +11,20 @@ {.push raises: [].} import - ../../aristo/aristo_init/init_common, ../kvt_desc, ../kvt_desc/desc_backend -export - BackendType # borrowed from Aristo - const verifyIxId = true # and false ## Enforce session tracking type + BackendType* = enum + BackendVoid = 0 ## For providing backend-less constructor + BackendMemory ## Same as Aristo + BackendRocksDB ## Same as Aristo + BackendRdbTriggered ## Piggybacked on remote write session + TypedBackendRef* = ref TypedBackendObj TypedBackendObj* = object of BackendObj beKind*: BackendType ## Backend type identifier diff --git a/nimbus/db/kvt/kvt_init/memory_db.nim b/nimbus/db/kvt/kvt_init/memory_db.nim index 9bc3a8426..7d37dbb0e 100644 --- a/nimbus/db/kvt/kvt_init/memory_db.nim +++ b/nimbus/db/kvt/kvt_init/memory_db.nim @@ -86,8 +86,8 @@ proc getKvpFn(db: MemBackendRef): GetKvpFn = proc putBegFn(db: MemBackendRef): PutBegFn = result = - proc(): PutHdlRef = - db.newSession() + proc(): Result[PutHdlRef,KvtError] = + ok db.newSession() proc putKvpFn(db: MemBackendRef): PutKvpFn = result = @@ -120,6 +120,16 @@ proc closeFn(db: MemBackendRef): CloseFn = proc(ignore: bool) = discard +proc canModFn(db: MemBackendRef): CanModFn = + result = + proc(): Result[void,KvtError] = + ok() + +proc setWrReqFn(db: MemBackendRef): SetWrReqFn = + result = + proc(kvt: RootRef): Result[void,KvtError] = + err(RdbBeHostNotApplicable) + # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ @@ -136,7 +146,8 @@ proc memoryBackend*: BackendRef = db.putEndFn = putEndFn db db.closeFn = closeFn db - + db.canModFn = canModFn db + db.setWrReqFn = setWrReqFn db db proc dup*(db: MemBackendRef): MemBackendRef = diff --git a/nimbus/db/kvt/kvt_init/memory_only.nim b/nimbus/db/kvt/kvt_init/memory_only.nim index 0f1245b0a..2a9fa7efb 100644 --- a/nimbus/db/kvt/kvt_init/memory_only.nim +++ b/nimbus/db/kvt/kvt_init/memory_only.nim @@ -32,7 +32,7 @@ export # Public helpers # ----------------------------------------------------------------------------- -proc kind*( +func kind*( be: BackendRef; ): BackendType = ## Retrieves the backend type symbol for a `be` backend database argument diff --git a/nimbus/db/kvt/kvt_init/persistent.nim b/nimbus/db/kvt/kvt_init/persistent.nim index 9d5df22e4..389fa6968 100644 --- a/nimbus/db/kvt/kvt_init/persistent.nim +++ b/nimbus/db/kvt/kvt_init/persistent.nim @@ -20,43 +20,70 @@ import results, + ../../aristo, + ../../opts, ../kvt_desc, "."/[rocks_db, memory_only] -from ../../aristo/aristo_persistent - import GuestDbRef, getRocksDbFamily - export RdbBackendRef, memory_only +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +func toErr0(err: (KvtError,string)): KvtError = + err[0] + # ------------------------------------------------------------------------------ # Public database constuctors, destructor # ------------------------------------------------------------------------------ -proc init*[W: MemOnlyBackend|RdbBackendRef]( +proc init*( T: type KvtDbRef; - B: type W; + B: type RdbBackendRef; basePath: string; - guestDb = GuestDbRef(nil); + opts: DbOptions; ): Result[KvtDbRef,KvtError] = - ## Generic constructor, `basePath` argument is ignored for `BackendNone` and - ## `BackendMemory` type backend database. Also, both of these backends - ## aways succeed initialising. + ## Generic constructor for `RocksDb` backend ## - ## If the argument `guestDb` is set and is a RocksDB column familly, the - ## `Kvt`batabase is built upon this column familly. Othewise it is newly - ## created with `basePath` as storage location. - ## - when B is RdbBackendRef: - let rc = guestDb.getRocksDbFamily() - if rc.isOk: - ok KvtDbRef(top: LayerRef.init(), backend: ? rocksDbKvtBackend rc.value) - else: - ok KvtDbRef(top: LayerRef.init(), backend: ? rocksDbKvtBackend basePath) + ok KvtDbRef( + top: LayerRef.init(), + backend: ? rocksDbKvtBackend(basePath, opts).mapErr toErr0) - else: - ok KvtDbRef.init B +proc init*( + T: type KvtDbRef; + B: type RdbBackendRef; + adb: AristoDbRef; + opts: DbOptions; + ): Result[KvtDbRef,KvtError] = + ## Constructor for `RocksDb` backend which piggybacks on the `Aristo` + ## backend. The following changes will occur after successful instantiation: + ## + ## * When invoked, the function `kvt_tx.persistent()` will always return an + ## error. If everything is all right (e.g. saving is possible), the error + ## returned will be `TxPersistDelayed`. This indicates that the save + ## request was queued, waiting for being picked up by an event handler. + ## + ## * There should be an invocation of `aristo_tx.persistent()` immediately + ## follwing the `kvt_tx.persistent()` call (some `KVT` functions might + ## return `RdbBeDelayedLocked` or similar errors while the save request + ## is pending.) Once successful, the`aristo_tx.persistent()` function will + ## also have commited the pending save request mentioned above. + ## + ## * The function `kvt_init/memory_only.finish()` does nothing. + ## + ## * The function `aristo_init/memory_only.finish()` will close both + ## sessions, the one for `KVT` and the other for `Aristo`. + ## + ## * The functiond `kvt_delta.deltaUpdate()` and `tx_stow.tcStow()` should + ## not be invoked directly (they will stop with an error most of the time, + ## anyway.) + ## + ok KvtDbRef( + top: LayerRef.init(), + backend: ? rocksDbKvtTriggeredBackend(adb, opts).mapErr toErr0) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/kvt/kvt_init/rocks_db.nim b/nimbus/db/kvt/kvt_init/rocks_db.nim index bcc5b88f0..ba7902020 100644 --- a/nimbus/db/kvt/kvt_init/rocks_db.nim +++ b/nimbus/db/kvt/kvt_init/rocks_db.nim @@ -27,18 +27,19 @@ {.push raises: [].} import + chronicles, eth/common, rocksdb, results, + ../../aristo/aristo_init/persistent, + ../../opts, ../kvt_desc, ../kvt_desc/desc_backend, + ../kvt_tx/tx_stow, ./init_common, ./rocks_db/[rdb_desc, rdb_get, rdb_init, rdb_put, rdb_walk] - const - maxOpenFiles = 512 ## Rocks DB setup, open files limit - extraTraceMessages = false or true ## Enabled additional logging noise @@ -48,11 +49,8 @@ type RdbPutHdlRef = ref object of TypedPutHdlRef -when extraTraceMessages: - import chronicles - - logScope: - topics = "aristo-backend" +logScope: + topics = "kvt-backend" # ------------------------------------------------------------------------------ # Private helpers @@ -75,7 +73,7 @@ proc endSession(hdl: PutHdlRef; db: RdbBackendRef): RdbPutHdlRef = hdl.RdbPutHdlRef # ------------------------------------------------------------------------------ -# Private functions: interface +# Private functions: standard interface # ------------------------------------------------------------------------------ proc getKvpFn(db: RdbBackendRef): GetKvpFn = @@ -98,9 +96,10 @@ proc getKvpFn(db: RdbBackendRef): GetKvpFn = proc putBegFn(db: RdbBackendRef): PutBegFn = result = - proc(): PutHdlRef = + proc(): Result[PutHdlRef,KvtError] = db.rdb.begin() - db.newSession() + ok db.newSession() + proc putKvpFn(db: RdbBackendRef): PutKvpFn = result = @@ -114,6 +113,7 @@ proc putKvpFn(db: RdbBackendRef): PutKvpFn = hdl.info = error[2] return + proc putEndFn(db: RdbBackendRef): PutEndFn = result = proc(hdl: PutHdlRef): Result[void,KvtError] = @@ -121,6 +121,7 @@ proc putEndFn(db: RdbBackendRef): PutEndFn = if hdl.error != KvtError(0): when extraTraceMessages: debug logTxt "putEndFn: failed", error=hdl.error, info=hdl.info + db.rdb.rollback() return err(hdl.error) # Commit session @@ -136,16 +137,117 @@ proc closeFn(db: RdbBackendRef): CloseFn = proc(flush: bool) = db.rdb.destroy(flush) -# -------------- +proc canModFn(db: RdbBackendRef): CanModFn = + result = + proc(): Result[void,KvtError] = + ok() -proc setup(db: RdbBackendRef) = - db.getKvpFn = getKvpFn db +proc setWrReqFn(db: RdbBackendRef): SetWrReqFn = + result = + proc(kvt: RootRef): Result[void,KvtError] = + err(RdbBeHostNotApplicable) - db.putBegFn = putBegFn db - db.putKvpFn = putKvpFn db - db.putEndFn = putEndFn db +# ------------------------------------------------------------------------------ +# Private functions: triggered interface changes +# ------------------------------------------------------------------------------ - db.closeFn = closeFn db +proc putBegTriggeredFn(db: RdbBackendRef): PutBegFn = + ## Variant of `putBegFn()` for piggyback write batch + result = + proc(): Result[PutHdlRef,KvtError] = + # Check whether somebody else initiated the rocksdb write batch/session + if db.rdb.session.isNil: + const error = RdbBeDelayedNotReady + when extraTraceMessages: + debug logTxt "putBegTriggeredFn: failed", error + return err(error) + ok db.newSession() + +proc putEndTriggeredFn(db: RdbBackendRef): PutEndFn = + ## Variant of `putEndFn()` for piggyback write batch + result = + proc(hdl: PutHdlRef): Result[void,KvtError] = + + # There is no commit()/rollback() here as we do not own the backend. + let hdl = hdl.endSession db + + if hdl.error != KvtError(0): + when extraTraceMessages: + debug logTxt "putEndTriggeredFn: failed", + error=hdl.error, info=hdl.info + # The error return code will signal a problem to the `txStow()` + # function which was called by `writeEvCb()` below. + return err(hdl.error) + + # Commit the session. This will be acknowledged by the `txStow()` + # function which was called by `writeEvCb()` below. + ok() + +proc closeTriggeredFn(db: RdbBackendRef): CloseFn = + ## Variant of `closeFn()` for piggyback write batch + result = + proc(flush: bool) = + # Nothing to do here as we do not own the backend + discard + +proc canModTriggeredFn(db: RdbBackendRef): CanModFn = + ## Variant of `canModFn()` for piggyback write batch + result = + proc(): Result[void,KvtError] = + # Deny modifications/changes if there is a pending write request + if not db.rdb.delayedPersist.isNil: + return err(RdbBeDelayedLocked) + ok() + +proc setWrReqTriggeredFn(db: RdbBackendRef): SetWrReqFn = + result = + proc(kvt: RootRef): Result[void,KvtError] = + if db.rdb.delayedPersist.isNil: + db.rdb.delayedPersist = KvtDbRef(kvt) + ok() + else: + err(RdbBeDelayedAlreadyRegistered) + +# ------------------------------------------------------------------------------ +# Private function: trigger handler +# ------------------------------------------------------------------------------ + +proc writeEvCb(db: RdbBackendRef): RdbWriteEventCb = + ## Write session event handler + result = + proc(ws: WriteBatchRef): bool = + + # Only do something if a write session request was queued + if not db.rdb.delayedPersist.isNil: + defer: + # Clear session environment when leaving. This makes sure that the + # same session can only be run once. + db.rdb.session = WriteBatchRef(nil) + db.rdb.delayedPersist = KvtDbRef(nil) + + # Publish session argument + db.rdb.session = ws + + # Execute delayed session. Note the the `txStow()` function is located + # in `tx_stow.nim`. This module `tx_stow.nim` is also imported by + # `kvt_tx.nim` which contains `persist() `. So the logic goes: + # :: + # kvt_tx.persist() --> registers a delayed write request rather + # than excuting tx_stow.txStow() + # + # // the backend owner (i.e. Aristo) will start a write cycle and + # // invoke the envent handler rocks_db.writeEvCb() + # rocks_db.writeEvCb() --> calls tx_stow.txStow() + # + # tx_stow.txStow() --> calls rocks_db.putBegTriggeredFn() + # calls rocks_db.putKvpFn() + # calls rocks_db.putEndTriggeredFn() + # + let rc = db.rdb.delayedPersist.txStow(persistent=true) + if rc.isErr: + error "writeEventCb(): persist() failed", error=rc.error + return false + true # ------------------------------------------------------------------------------ # Public functions @@ -153,28 +255,58 @@ proc setup(db: RdbBackendRef) = proc rocksDbKvtBackend*( path: string; - ): Result[BackendRef,KvtError] = + opts: DbOptions; + ): Result[BackendRef,(KvtError,string)] = let db = RdbBackendRef( beKind: BackendRocksDB) # Initialise RocksDB - db.rdb.init(path, maxOpenFiles).isOkOr: + db.rdb.init(path, opts).isOkOr: when extraTraceMessages: trace logTxt "constructor failed", error=error[0], info=error[1] - return err(error[0]) + return err(error) - db.setup() + db.getKvpFn = getKvpFn db + + db.putBegFn = putBegFn db + db.putKvpFn = putKvpFn db + db.putEndFn = putEndFn db + + db.closeFn = closeFn db + db.canModFn = canModFn db + db.setWrReqFn = setWrReqFn db ok db -proc rocksDbKvtBackend*( - store: ColFamilyReadWrite; - ): Result[BackendRef,KvtError] = + +proc rocksDbKvtTriggeredBackend*( + adb: AristoDbRef; + opts: DbOptions; + ): Result[BackendRef,(KvtError,string)] = let db = RdbBackendRef( - beKind: BackendRocksDB) - db.rdb.init(store) - db.setup() + beKind: BackendRdbTriggered) + + # Initialise RocksDB piggy-backed on `Aristo` backend. + db.rdb.init(adb, opts).isOkOr: + when extraTraceMessages: + trace logTxt "constructor failed", error=error[0], info=error[1] + return err(error) + + # Register write session event handler + adb.activateWrTrigger(db.writeEvCb()).isOkOr: + return err((RdbBeHostError,$error)) + + db.getKvpFn = getKvpFn db + + db.putBegFn = putBegTriggeredFn db + db.putKvpFn = putKvpFn db + db.putEndFn = putEndTriggeredFn db + + db.closeFn = closeTriggeredFn db + db.canModFn = canModTriggeredFn db + db.setWrReqFn = setWrReqTriggeredFn db ok db + proc dup*(db: RdbBackendRef): RdbBackendRef = new result init_common.init(result[], db[]) diff --git a/nimbus/db/kvt/kvt_init/rocks_db/rdb_desc.nim b/nimbus/db/kvt/kvt_init/rocks_db/rdb_desc.nim index 252ab391d..59c958d7c 100644 --- a/nimbus/db/kvt/kvt_init/rocks_db/rdb_desc.nim +++ b/nimbus/db/kvt/kvt_init/rocks_db/rdb_desc.nim @@ -15,16 +15,25 @@ import std/os, + ../../kvt_desc, rocksdb type RdbInst* = object - store*: ColFamilyReadWrite ## Rocks DB database handler + store*: KvtCfStore ## Rocks DB database handler session*: WriteBatchRef ## For batched `put()` + basePath*: string ## Database directory + delayedPersist*: KvtDbRef ## Enable next proggyback write session + + KvtCFs* = enum + ## Column family symbols/handles and names used on the database + KvtGeneric = "KvtGen" ## Generic column family + + KvtCfStore* = array[KvtCFs,ColFamilyReadWrite] + ## List of column family handlers const - KvtFamily* = "Kvt" ## RocksDB column family BaseFolder* = "nimbus" ## Same as for Legacy DB DataFolder* = "kvt" ## Legacy DB has "data" @@ -32,6 +41,10 @@ const # Public functions # ------------------------------------------------------------------------------ +template logTxt*(info: static[string]): static[string] = + "RocksDB/" & info + + func baseDir*(rdb: RdbInst): string = rdb.basePath / BaseFolder @@ -39,8 +52,8 @@ func dataDir*(rdb: RdbInst): string = rdb.baseDir / DataFolder -template logTxt*(info: static[string]): static[string] = - "RocksDB/" & info +template baseDb*(rdb: RdbInst): RocksDbReadWriteRef = + rdb.store[KvtGeneric].db # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/kvt/kvt_init/rocks_db/rdb_get.nim b/nimbus/db/kvt/kvt_init/rocks_db/rdb_get.nim index b8404f0e7..7a567e3b8 100644 --- a/nimbus/db/kvt/kvt_init/rocks_db/rdb_get.nim +++ b/nimbus/db/kvt/kvt_init/rocks_db/rdb_get.nim @@ -43,7 +43,7 @@ proc get*( let onData: DataProc = proc(data: openArray[byte]) = res = @data - let gotData = rdb.store.get(key, onData).valueOr: + let gotData = rdb.store[KvtGeneric].get(key, onData).valueOr: const errSym = RdbBeDriverGetError when extraTraceMessages: trace logTxt "get", error=errSym, info=error diff --git a/nimbus/db/kvt/kvt_init/rocks_db/rdb_init.nim b/nimbus/db/kvt/kvt_init/rocks_db/rdb_init.nim index 610a62570..0b0365a3e 100644 --- a/nimbus/db/kvt/kvt_init/rocks_db/rdb_init.nim +++ b/nimbus/db/kvt/kvt_init/rocks_db/rdb_init.nim @@ -14,21 +14,37 @@ {.push raises: [].} import - std/os, + std/[sequtils, os], rocksdb, results, + ../../../aristo/aristo_init/persistent, + ../../../opts, ../../kvt_desc, + ../../kvt_desc/desc_error as kdb, ./rdb_desc -const - extraTraceMessages = false - ## Enabled additional logging noise +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ -when extraTraceMessages: - import chronicles +proc getCFInitOptions(opts: DbOptions): ColFamilyOptionsRef = + result = defaultColFamilyOptions() + if opts.writeBufferSize > 0: + result.setWriteBufferSize(opts.writeBufferSize) - logScope: - topics = "kvt-backend" + +proc getDbInitOptions(opts: DbOptions): DbOptionsRef = + result = defaultDbOptions() + result.setMaxOpenFiles(opts.maxOpenFiles) + result.setMaxBytesForLevelBase(opts.writeBufferSize) + + if opts.rowCacheSize > 0: + result.setRowCache(cacheCreateLRU(opts.rowCacheSize)) + + if opts.blockCacheSize > 0: + let tableOpts = defaultTableOptions() + tableOpts.setBlockCache(cacheCreateLRU(opts.rowCacheSize)) + result.setBlockBasedTableFactory(tableOpts) # ------------------------------------------------------------------------------ # Public constructor @@ -37,51 +53,64 @@ when extraTraceMessages: proc init*( rdb: var RdbInst; basePath: string; - openMax: int; + opts: DbOptions; ): Result[void,(KvtError,string)] = - ## Constructor c ode inspired by `RocksStoreRef.init()` from - ## kvstore_rocksdb.nim + ## Database backend constructor for stand-alone version + ## + const initFailed = "RocksDB/init() failed" + rdb.basePath = basePath let dataDir = rdb.dataDir - try: dataDir.createDir except OSError, IOError: - return err((RdbBeCantCreateDataDir, "")) + return err((kdb.RdbBeCantCreateDataDir, "")) - let - cfs = @[initColFamilyDescriptor KvtFamily] - opts = defaultDbOptions() - opts.setMaxOpenFiles(openMax) + # Expand argument `opts` to rocksdb options + let (cfOpts, dbOpts) = (opts.getCFInitOptions, opts.getDbInitOptions) + + # Column familiy names to allocate when opening the database. + let cfs = KvtCFs.mapIt(($it).initColFamilyDescriptor cfOpts) - # Reserve a family corner for `Kvt` on the database - let baseDb = openRocksDb(dataDir, opts, columnFamilies=cfs).valueOr: - let errSym = RdbBeDriverInitError - when extraTraceMessages: - debug logTxt "init failed", dataDir, openMax, error=errSym, info=error - return err((errSym, error)) + # Open database for the extended family :) + let baseDb = openRocksDb(dataDir, dbOpts, columnFamilies=cfs).valueOr: + raiseAssert initFailed & " cannot create base descriptor: " & error - # Initialise `Kvt` family - rdb.store = baseDb.withColFamily(KvtFamily).valueOr: - let errSym = RdbBeDriverInitError - when extraTraceMessages: - debug logTxt "init failed", dataDir, openMax, error=errSym, info=error - return err((errSym, error)) + # Initialise column handlers (this stores implicitely `baseDb`) + for col in KvtCFs: + rdb.store[col] = baseDb.withColFamily($col).valueOr: + raiseAssert initFailed & " cannot initialise " & + $col & " descriptor: " & error ok() + proc init*( rdb: var RdbInst; - store: ColFamilyReadWrite; - ) = - ## Piggyback on other database - rdb.store = store # that's it + adb: AristoDbRef; + opts: DbOptions; + ): Result[void,(KvtError,string)] = + ## Initalise column handlers piggy-backing on the `Aristo` backend. + ## + let + cfOpts = opts.getCFInitOptions() + iCfs = KvtCFs.toSeq.mapIt(initColFamilyDescriptor($it, cfOpts)) + oCfs = adb.reinit(iCfs).valueOr: + return err((RdbBeHostError,$error)) + + # Collect column family descriptors (this stores implicitely `baseDb`) + for n in KvtCFs: + assert oCfs[n.ord].name != "" # debugging only + rdb.store[n] = oCfs[n.ord] + + ok() + proc destroy*(rdb: var RdbInst; flush: bool) = ## Destructor (no need to do anything if piggybacked) if 0 < rdb.basePath.len: - rdb.store.db.close() + rdb.baseDb.close() if flush: try: diff --git a/nimbus/db/kvt/kvt_init/rocks_db/rdb_put.nim b/nimbus/db/kvt/kvt_init/rocks_db/rdb_put.nim index 8e1a28f5f..50090ff50 100644 --- a/nimbus/db/kvt/kvt_init/rocks_db/rdb_put.nim +++ b/nimbus/db/kvt/kvt_init/rocks_db/rdb_put.nim @@ -50,7 +50,7 @@ when extraTraceMessages: proc begin*(rdb: var RdbInst) = if rdb.session.isNil: - rdb.session = rdb.store.openWriteBatch() + rdb.session = rdb.baseDb.openWriteBatch() proc rollback*(rdb: var RdbInst) = if not rdb.session.isClosed(): @@ -59,7 +59,7 @@ proc rollback*(rdb: var RdbInst) = proc commit*(rdb: var RdbInst): Result[void,(KvtError,string)] = if not rdb.session.isClosed(): defer: rdb.disposeSession() - rdb.store.write(rdb.session).isOkOr: + rdb.baseDb.write(rdb.session).isOkOr: const errSym = RdbBeDriverWriteError when extraTraceMessages: trace logTxt "commit", error=errSym, info=error @@ -70,16 +70,15 @@ proc put*( rdb: RdbInst; data: openArray[(Blob,Blob)]; ): Result[void,(Blob,KvtError,string)] = - let dsc = rdb.session for (key,val) in data: if val.len == 0: - dsc.delete(key, rdb.store.name).isOkOr: + rdb.session.delete(key, $KvtGeneric).isOkOr: const errSym = RdbBeDriverDelError when extraTraceMessages: trace logTxt "del", key, error=errSym, info=error return err((key,errSym,error)) else: - dsc.put(key, val, rdb.store.name).isOkOr: + rdb.session.put(key, val, $KvtGeneric).isOkOr: const errSym = RdbBeDriverPutError when extraTraceMessages: trace logTxt "put", key, error=errSym, info=error diff --git a/nimbus/db/kvt/kvt_init/rocks_db/rdb_walk.nim b/nimbus/db/kvt/kvt_init/rocks_db/rdb_walk.nim index 6d5bb6843..e3648f555 100644 --- a/nimbus/db/kvt/kvt_init/rocks_db/rdb_walk.nim +++ b/nimbus/db/kvt/kvt_init/rocks_db/rdb_walk.nim @@ -38,7 +38,7 @@ iterator walk*(rdb: RdbInst): tuple[key: Blob, data: Blob] = ## ## Non-decodable entries are stepped over and ignored. block walkBody: - let rit = rdb.store.openIterator().valueOr: + let rit = rdb.store[KvtGeneric].openIterator().valueOr: when extraTraceMessages: trace logTxt "walk", pfx="all", error break walkBody diff --git a/nimbus/db/kvt/kvt_tx.nim b/nimbus/db/kvt/kvt_tx.nim index 76eaa42e8..b50cfe11e 100644 --- a/nimbus/db/kvt/kvt_tx.nim +++ b/nimbus/db/kvt/kvt_tx.nim @@ -16,6 +16,7 @@ import results, ./kvt_tx/[tx_fork, tx_frame, tx_stow], + ./kvt_init/memory_only, ./kvt_desc # ------------------------------------------------------------------------------ @@ -177,6 +178,12 @@ proc persist*( ## and the staged data area is cleared. Wile performing this last step, ## the recovery journal is updated (if available.) ## + # Register for saving if piggybacked on remote database + if db.backend.kind == BackendRdbTriggered: + ? db.txStowOk(persistent=true) + ? db.backend.setWrReqFn db + return err(TxPersistDelayed) + db.txStow(persistent=true) proc stow*( diff --git a/nimbus/db/kvt/kvt_tx/tx_stow.nim b/nimbus/db/kvt/kvt_tx/tx_stow.nim index 077053072..c156efbbf 100644 --- a/nimbus/db/kvt/kvt_tx/tx_stow.nim +++ b/nimbus/db/kvt/kvt_tx/tx_stow.nim @@ -16,12 +16,27 @@ import std/tables, results, - ".."/[kvt_desc, kvt_filter] + ".."/[kvt_desc, kvt_delta] # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ +proc txStowOk*( + db: KvtDbRef; # Database + persistent: bool; # Stage only unless `true` + ): Result[void,KvtError] = + ## Verify that `txStow()` can go ahead + if not db.txRef.isNil: + return err(TxPendingTx) + if 0 < db.stack.len: + return err(TxStackGarbled) + + if persistent and not db.deltaUpdateOk(): + return err(TxBackendNotWritable) + + ok() + proc txStow*( db: KvtDbRef; # Database persistent: bool; # Stage only unless `true` @@ -32,21 +47,15 @@ proc txStow*( ## If there is no backend the function returns immediately with an error. ## The same happens if there is a pending transaction. ## - if not db.txRef.isNil: - return err(TxPendingTx) - if 0 < db.stack.len: - return err(TxStackGarbled) - - if persistent and not db.filterUpdateOk(): - return err(TxBackendNotWritable) + ? db.txStowOk persistent if 0 < db.top.delta.sTab.len: - db.filterMerge db.top.delta + db.deltaMerge db.top.delta db.top.delta = LayerDeltaRef() if persistent: - # Move `roFilter` data into persistent tables - ? db.filterUpdate() + # Move `balancer` data into persistent tables + ? db.deltaUpdate() ok() diff --git a/nimbus/db/kvt/kvt_utils.nim b/nimbus/db/kvt/kvt_utils.nim index 0a0d2bf7e..3d36f7feb 100644 --- a/nimbus/db/kvt/kvt_utils.nim +++ b/nimbus/db/kvt/kvt_utils.nim @@ -41,8 +41,8 @@ proc getBe*( key: openArray[byte]; # Key of database record ): Result[Blob,KvtError] = ## Get the vertex from the (filtered) backened if available. - if not db.roFilter.isNil: - db.roFilter.sTab.withValue(@key, w): + if not db.balancer.isNil: + db.balancer.sTab.withValue(@key, w): if w[].len == 0: return err(GetNotFound) return ok(w[]) diff --git a/tests/replay/undump_blocks_era1.nim b/tests/replay/undump_blocks_era1.nim index f02dcdf54..45ea3840e 100644 --- a/tests/replay/undump_blocks_era1.nim +++ b/tests/replay/undump_blocks_era1.nim @@ -20,6 +20,7 @@ iterator undumpBlocksEra1*( dir: string, least = low(uint64), # First block to extract stopAfter = high(uint64), # Last block to extract + doAssertOk = false; ): seq[EthBlock] = let db = Era1DbRef.init(dir, "mainnet").expect("Era files present") defer: @@ -32,7 +33,8 @@ iterator undumpBlocksEra1*( for i in 0 ..< stopAfter: var bck = db.getEthBlock(least + i).valueOr: - doAssert i > 0, "expected at least one block" + if doAssertOk: + doAssert i > 0, "expected at least one block" break tmp.add move(bck) diff --git a/tests/test_aristo/test_filter.nim b/tests/test_aristo/test_filter.nim index 4e7930587..20b0aa6de 100644 --- a/tests/test_aristo/test_filter.nim +++ b/tests/test_aristo/test_filter.nim @@ -288,7 +288,7 @@ proc testDistributedAccess*( xCheck db2.balancer != db3.balancer # Clause (11) from `aristo/README.md` example - db2.reCentre() + discard db2.reCentre() block: let rc = db2.persist() xCheckRc rc.error == 0 @@ -321,7 +321,7 @@ proc testDistributedAccess*( dy.cleanUp() # Build clause (12) from `aristo/README.md` example - db2.reCentre() + discard db2.reCentre() block: let rc = db2.persist() xCheckRc rc.error == 0 diff --git a/tests/test_coredb.nim b/tests/test_coredb.nim index 47353921b..32cffd581 100644 --- a/tests/test_coredb.nim +++ b/tests/test_coredb.nim @@ -159,8 +159,9 @@ proc initRunnerDB( case dbType: of AristoDbMemory: AristoDbMemory.newCoreDbRef() of AristoDbRocks: AristoDbRocks.newCoreDbRef(path, DbOptions.init()) + of AristoDbDualRocks: AristoDbDualRocks.newCoreDbRef(path, DbOptions.init()) of AristoDbVoid: AristoDbVoid.newCoreDbRef() - else: raiseAssert "Oops" + of Ooops: raiseAssert "Ooops" when false: # or true: setDebugLevel() @@ -336,7 +337,7 @@ when isMainModule: setErrorLevel() - when true: # and false: + when true and false: false.coreDbMain() # This one uses the readily available dump: `bulkTest0` and some huge replay @@ -353,6 +354,7 @@ when isMainModule: for n,capture in sampleList: noisy.profileSection("@sample #" & $n, state): noisy.chainSyncRunner( + #dbType = AristoDbDualRocks, capture = capture, pruneHistory = true, #profilingOk = true, diff --git a/vendor/nim-rocksdb b/vendor/nim-rocksdb index a84cf5b89..c5bbf8311 160000 --- a/vendor/nim-rocksdb +++ b/vendor/nim-rocksdb @@ -1 +1 @@ -Subproject commit a84cf5b8960756acb9027a80ba2b1e7cd059e206 +Subproject commit c5bbf831145d7dd4d60420d69ce9eb601ccd5be2