Triggered write event for kvt (#2351)

* bump rockdb

* Rename `KVT` objects related to filters according to `Aristo` naming

details:
  filter* => delta*
  roFilter => balancer

* Compulsory error handling if `persistent()` fails

* Add return code to `reCentre()`

why:
  Might eventually fail if re-centring is blocked. Some logic will be
  added in subsequent patch sets.

* Add column families from earlier session to rocksdb in opening procedure

why:
  All previously used CFs must be declared when re-opening an existing
  database.

* Update `init()` and add rocksdb `reinit()` methods for changing parameters

why:
  Opening a set column families (with different open options) must span
  at least the ones that are already on disk.

* Provide write-trigger-event interface into `Aristo` backend

why:
  This allows to save data from a guest application (think `KVT`) to
  get synced with the write cycle so the guest and `Aristo` save all
  atomically.

* Use `KVT` with new column family interface from `Aristo`

* Remove obsolete guest interface

* Implement `KVT` piggyback on `Aristo` backend

* CoreDb: Add separate `KVT`/`Aristo` backend mode for debugging

* Remove `rocks_db` import from `persist()` function

why:
  Some systems (i.p `fluffy` and friends) use the `Aristo` memory
  backend emulation and do not link against rocksdb when building the
  application. So this should fix that problem.
This commit is contained in:
Jordan Hrycaj 2024-06-13 18:15:11 +00:00 committed by GitHub
parent 23027baf30
commit 5a5cc6295e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
46 changed files with 725 additions and 314 deletions

View File

@ -140,7 +140,8 @@ proc persistBlocksImpl(
dbTx.commit()
# Save and record the block number before the last saved block state.
c.db.persistent(toBlock)
c.db.persistent(toBlock).isOkOr:
return err("Failed to save state: " & $$error)
if c.com.pruneHistory:
# There is a feature for test systems to regularly clean up older blocks

View File

@ -300,7 +300,8 @@ type
AristoApiReCentreFn* =
proc(db: AristoDbRef;
) {.noRaise.}
): Result[void,AristoError]
{.noRaise.}
## Re-focus the `db` argument descriptor so that it becomes the centre.
## Nothing is done if the `db` descriptor is the centre, already.
##
@ -494,7 +495,7 @@ proc dup(be: BackendRef): BackendRef =
of BackendMemory:
return MemBackendRef(be).dup
of BackendRocksDB:
of BackendRocksDB, BackendRdbHosting:
when AristoPersistentBackendOk:
return RdbBackendRef(be).dup
@ -705,9 +706,9 @@ func init*(
result = api.persist(a, b, c)
profApi.reCentre =
proc(a: AristoDbRef) =
proc(a: AristoDbRef): auto =
AristoApiProfReCentreFn.profileRunner:
api.reCentre(a)
result = api.reCentre(a)
profApi.rollback =
proc(a: AristoTxRef): auto =

View File

@ -74,7 +74,7 @@ proc checkBE*(
case db.backend.kind:
of BackendMemory:
return MemBackendRef.checkBE(db, cache=cache, relax=relax)
of BackendRocksDB:
of BackendRocksDB, BackendRdbHosting:
return RdbBackendRef.checkBE(db, cache=cache, relax=relax)
of BackendVoid:
return VoidBackendRef.checkBE(db, cache=cache, relax=relax)

View File

@ -733,7 +733,7 @@ proc pp*(
case be.kind:
of BackendMemory:
result &= be.MemBackendRef.ppBe(db, limit, indent+1)
of BackendRocksDB:
of BackendRocksDB, BackendRdbHosting:
result &= be.RdbBackendRef.ppBe(db, limit, indent+1)
of BackendVoid:
result &= "<NoBackend>"

View File

@ -60,9 +60,9 @@ proc deltaPersistent*(
if db != parent:
if not reCentreOk:
return err(FilBackendRoMode)
db.reCentre
? db.reCentre()
# Always re-centre to `parent` (in case `reCentreOk` was set)
defer: parent.reCentre
defer: discard parent.reCentre()
# Initialise peer filter balancer.
let updateSiblings = ? UpdateSiblingsRef.init db
@ -74,7 +74,7 @@ proc deltaPersistent*(
serial: nxtFid)
# Store structural single trie entries
let writeBatch = be.putBegFn()
let writeBatch = ? be.putBegFn()
be.putVtxFn(writeBatch, db.balancer.sTab.pairs.toSeq)
be.putKeyFn(writeBatch, db.balancer.kMap.pairs.toSeq)
be.putTuvFn(writeBatch, db.balancer.vTop)

View File

@ -58,8 +58,7 @@ type
centre: AristoDbRef ## Link to peer with write permission
peers: HashSet[AristoDbRef] ## List of all peers
AristoDbRef* = ref AristoDbObj
AristoDbObj* = object
AristoDbRef* = ref object
## Three tier database object supporting distributed instances.
top*: LayerRef ## Database working layer, mutable
stack*: seq[LayerRef] ## Stashed immutable parent layers
@ -150,7 +149,7 @@ func getCentre*(db: AristoDbRef): AristoDbRef =
##
if db.dudes.isNil: db else: db.dudes.centre
proc reCentre*(db: AristoDbRef) =
proc reCentre*(db: AristoDbRef): Result[void,AristoError] =
## Re-focus the `db` argument descriptor so that it becomes the centre.
## Nothing is done if the `db` descriptor is the centre, already.
##
@ -166,6 +165,7 @@ proc reCentre*(db: AristoDbRef) =
##
if not db.dudes.isNil:
db.dudes.centre = db
ok()
proc fork*(
db: AristoDbRef;

View File

@ -48,7 +48,7 @@ type
## by any library function using the backend.
PutBegFn* =
proc(): PutHdlRef {.gcsafe, raises: [].}
proc(): Result[PutHdlRef,AristoError] {.gcsafe, raises: [].}
## Generic transaction initialisation function
PutVtxFn* =
@ -81,20 +81,6 @@ type
# -------------
GuestDbFn* =
proc(instance: int): Result[RootRef,AristoError] {.gcsafe, raises: [].}
## Generic function that returns a compartmentalised database handle that
## can be used by another application. If non-nil, this handle allows to
## use a previously allocated database. It is separated from the `Aristo`
## columns.
##
## A successful return value might be `nil` if this feature is
## unsupported.
##
## Caveat:
## The guest database is closed automatically when closing the `Aristo`
## database.
CloseFn* =
proc(flush: bool) {.gcsafe, raises: [].}
## Generic destructor for the `Aristo DB` backend. The argument `flush`
@ -119,7 +105,6 @@ type
putLstFn*: PutLstFn ## Store saved state
putEndFn*: PutEndFn ## Commit bulk store session
guestDbFn*: GuestDbFn ## Piggyback DB for another application
closeFn*: CloseFn ## Generic destructor
proc init*(trg: var BackendObj; src: BackendObj) =
@ -135,7 +120,6 @@ proc init*(trg: var BackendObj; src: BackendObj) =
trg.putLstFn = src.putLstFn
trg.putEndFn = src.putEndFn
trg.guestDbFn = src.guestDbFn
trg.closeFn = src.closeFn
# ------------------------------------------------------------------------------

View File

@ -249,9 +249,14 @@ type
RdbBeDriverGetVtxError
RdbBeDriverGuestError
RdbBeDriverPutAdmError
RdbBeDriverPutVtxError
RdbBeDriverPutKeyError
RdbBeDriverPutVtxError
RdbBeDriverWriteError
RdbBeTypeUnsupported
RdbBeWrSessionUnfinished
RdbBeWrTriggerActiveAlready
RdbBeWrTriggerNilFn
RdbGuestInstanceAborted
RdbGuestInstanceUnsupported
RdbHashKeyExpected

View File

@ -22,6 +22,7 @@ type
BackendVoid = 0 ## For providing backend-less constructor
BackendMemory
BackendRocksDB
BackendRdbHosting ## Allowed piggybacking write session
StorageType* = enum
## Storage types, key prefix

View File

@ -126,8 +126,8 @@ proc getLstFn(db: MemBackendRef): GetLstFn =
proc putBegFn(db: MemBackendRef): PutBegFn =
result =
proc(): PutHdlRef =
db.newSession()
proc(): Result[PutHdlRef,AristoError] =
ok db.newSession()
proc putVtxFn(db: MemBackendRef): PutVtxFn =
@ -215,11 +215,6 @@ proc putEndFn(db: MemBackendRef): PutEndFn =
# -------------
proc guestDbFn(db: MemBackendRef): GuestDbFn =
result =
proc(instance: int): Result[RootRef,AristoError] =
ok(RootRef nil)
proc closeFn(db: MemBackendRef): CloseFn =
result =
proc(ignore: bool) =
@ -246,7 +241,6 @@ proc memoryBackend*(): BackendRef =
db.putLstFn = putLstFn db
db.putEndFn = putEndFn db
db.guestDbFn = guestDbFn db
db.closeFn = closeFn db
db

View File

@ -14,7 +14,6 @@
{.push raises: [].}
import
results,
../aristo_desc,
../aristo_desc/desc_backend,
"."/[init_common, memory_db]
@ -85,14 +84,6 @@ proc init*(
## Shortcut for `AristoDbRef.init(VoidBackendRef)`
AristoDbRef.init VoidBackendRef
proc guestDb*(db: AristoDbRef; instance = 0): Result[GuestDbRef,AristoError] =
## Database pigiback feature
if db.backend.isNil:
ok(GuestDbRef(nil))
else:
let gdb = db.backend.guestDbFn(instance).valueOr:
return err(error)
ok(gdb.GuestDbRef)
proc finish*(db: AristoDbRef; flush = false) =
## Backend destructor. The argument `flush` indicates that a full database

View File

@ -21,13 +21,15 @@
import
results,
rocksdb,
../../opts,
../aristo_desc,
./rocks_db/rdb_desc,
"."/[rocks_db, memory_only],
../../opts
"."/[rocks_db, memory_only]
export
AristoDbRef,
RdbBackendRef,
RdbWriteEventCb,
memory_only
# ------------------------------------------------------------------------------
@ -56,26 +58,68 @@ proc newAristoRdbDbRef(
# Public database constuctors, destructor
# ------------------------------------------------------------------------------
proc init*[W: RdbBackendRef](
proc init*(
T: type AristoDbRef;
B: type W;
B: type RdbBackendRef;
basePath: string;
opts: DbOptions
): Result[T, AristoError] =
## Generic constructor, `basePath` argument is ignored for memory backend
## databases (which also unconditionally succeed initialising.)
##
when B is RdbBackendRef:
basePath.newAristoRdbDbRef opts
basePath.newAristoRdbDbRef opts
proc getRocksDbFamily*(
gdb: GuestDbRef;
instance = 0;
): Result[ColFamilyReadWrite,void] =
## Database pigiback feature
if not gdb.isNil and gdb.beKind == BackendRocksDB:
return ok RdbGuestDbRef(gdb).guestDb
err()
proc reinit*(
db: AristoDbRef;
cfs: openArray[ColFamilyDescriptor];
): Result[seq[ColFamilyReadWrite],AristoError] =
## Re-initialise the `RocksDb` backend database with additional or changed
## column family settings. This can be used to make space for guest use of
## the backend used by `Aristo`. The function returns a list of column family
## descriptors in the same order as the `cfs` argument.
##
## The argument `cfs` list replaces and extends the CFs already on disk by
## its options except for the ones defined for use with `Aristo`.
##
## Even though tx layers and filters might not be affected by this function,
## it is prudent to have them clean and saved on the backend database before
## changing it. On error conditions, data might get lost.
##
case db.backend.kind:
of BackendRocksDB:
db.backend.rocksDbUpdateCfs cfs
of BackendRdbHosting:
err(RdbBeWrTriggerActiveAlready)
else:
return err(RdbBeTypeUnsupported)
proc activateWrTrigger*(
db: AristoDbRef;
hdl: RdbWriteEventCb;
): Result[void,AristoError] =
## This function allows to link an application to the `Aristo` storage event
## for the `RocksDb` backend via call back argument function `hdl`.
##
## The argument handler `hdl` of type
## ::
## proc(session: WriteBatchRef): bool
##
## will be invoked when a write batch for the `Aristo` database is opened in
## order to save current changes to the backend. The `session` argument passed
## to the handler in conjunction with a list of `ColFamilyReadWrite` items
## (as returned from `reinit()`) might be used to store additional items
## to the database with the same write batch.
##
## If the handler returns `true` upon return from running, the write batch
## will proceed saving. Otherwise it is aborted and no data are saved at all.
##
case db.backend.kind:
of BackendRocksDB:
db.backend.rocksDbSetEventTrigger hdl
of BackendRdbHosting:
err(RdbBeWrTriggerActiveAlready)
else:
err(RdbBeTypeUnsupported)
# ------------------------------------------------------------------------------
# End

View File

@ -137,9 +137,9 @@ proc getLstFn(db: RdbBackendRef): GetLstFn =
proc putBegFn(db: RdbBackendRef): PutBegFn =
result =
proc(): PutHdlRef =
proc(): Result[PutHdlRef,AristoError] =
db.rdb.begin()
db.newSession()
ok db.newSession()
proc putVtxFn(db: RdbBackendRef): PutVtxFn =
result =
@ -212,7 +212,7 @@ proc putEndFn(db: RdbBackendRef): PutEndFn =
of AdmPfx: trace logTxt "putEndFn: admin failed",
pfx=AdmPfx, aid=hdl.error.aid.uint64, error=hdl.error.code
of Oops: trace logTxt "putEndFn: oops",
error=hdl.error.code
pfx=hdl.error.pfx, error=hdl.error.code
db.rdb.rollback()
return err(hdl.error.code)
@ -223,20 +223,27 @@ proc putEndFn(db: RdbBackendRef): PutEndFn =
return err(error[0])
ok()
proc guestDbFn(db: RdbBackendRef): GuestDbFn =
result =
proc(instance: int): Result[RootRef,AristoError] =
let gdb = db.rdb.initGuestDb(instance).valueOr:
when extraTraceMessages:
trace logTxt "guestDbFn", error=error[0], info=error[1]
return err(error[0])
ok gdb
proc closeFn(db: RdbBackendRef): CloseFn =
result =
proc(flush: bool) =
db.rdb.destroy(flush)
# ------------------------------------------------------------------------------
# Private functions: hosting interface changes
# ------------------------------------------------------------------------------
proc putBegHostingFn(db: RdbBackendRef): PutBegFn =
result =
proc(): Result[PutHdlRef,AristoError] =
db.rdb.begin()
if db.rdb.trgWriteEvent(db.rdb.session):
ok db.newSession()
else:
when extraTraceMessages:
trace logTxt "putBegFn: guest trigger aborted session"
db.rdb.rollback()
err(RdbGuestInstanceAborted)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
@ -269,10 +276,37 @@ proc rocksDbBackend*(
db.putLstFn = putLstFn db
db.putEndFn = putEndFn db
db.guestDbFn = guestDbFn db
db.closeFn = closeFn db
ok db
proc rocksDbUpdateCfs*(
be: BackendRef;
cfs: openArray[ColFamilyDescriptor];
): Result[seq[ColFamilyReadWrite],AristoError] =
## Reopen with extended column families given as argument.
let
db = RdbBackendRef(be)
rCfs = db.rdb.reinit(cfs).valueOr:
return err(error[0])
ok rCfs
proc rocksDbSetEventTrigger*(
be: BackendRef;
hdl: RdbWriteEventCb;
): Result[void,AristoError] =
## Store event trigger. This also changes the backend type.
if hdl.isNil:
err(RdbBeWrTriggerNilFn)
else:
let db = RdbBackendRef(be)
db.rdb.trgWriteEvent = hdl
db.beKind = BackendRdbHosting
db.putBegFn = putBegHostingFn db
ok()
proc dup*(db: RdbBackendRef): RdbBackendRef =
## Duplicate descriptor shell as needed for API debugging
new result

View File

@ -18,10 +18,22 @@ import
eth/common,
rocksdb,
stew/[endians2, keyed_queue],
../../../opts,
../../aristo_desc,
../init_common
type
RdbWriteEventCb* =
proc(session: WriteBatchRef): bool {.gcsafe, raises: [].}
## Call back closure function that passes the the write session handle
## to a guest peer right after it was opened. The guest may store any
## data on its own column family and return `true` if that worked
## all right. Then the `Aristo` handler will stor its own columns and
## finalise the write session.
##
## In case of an error when `false` is returned, `Aristo` will abort the
## write session and return a session error.
RdbInst* = object
admCol*: ColFamilyReadWrite ## Admin column family handler
vtxCol*: ColFamilyReadWrite ## Vertex column family handler
@ -29,26 +41,18 @@ type
session*: WriteBatchRef ## For batched `put()`
rdKeyLru*: KeyedQueue[VertexID,HashKey] ## Read cache
rdVtxLru*: KeyedQueue[VertexID,VertexRef] ## Read cache
basePath*: string ## Database directory
noFq*: bool ## No filter queues available
opts*: DbOptions ## Just a copy here for re-opening
trgWriteEvent*: RdbWriteEventCb ## Database piggiback call back handler
# Alien interface
RdbGuest* = enum
## The guest CF was worth a try, but there are better solutions and this
## item will be removed in future.
GuestFamily0 = "Guest0" ## Guest family (e.g. for Kvt)
GuestFamily1 = "Guest1" ## Ditto
GuestFamily2 = "Guest2" ## Ditto
RdbGuestDbRef* = ref object of GuestDbRef
## The guest CF was worth a try, but there are better solutions and this
## item will be removed in future.
guestDb*: ColFamilyReadWrite ## Pigiback feature references
AristoCFs* = enum
## Column family symbols/handles and names used on the database
AdmCF = "AriAdm" ## Admin column family name
VtxCF = "AriVtx" ## Vertex column family name
KeyCF = "AriKey" ## Hash key column family name
const
AdmCF* = "AdmAri" ## Admin column family name
VtxCF* = "VtxAri" ## Vertex column family name
KeyCF* = "KeyAri" ## Hash key column family name
BaseFolder* = "nimbus" ## Same as for Legacy DB
DataFolder* = "aristo" ## Legacy DB has "data"
RdKeyLruMaxSize* = 4096 ## Max size of read cache for keys

View File

@ -14,7 +14,7 @@
{.push raises: [].}
import
std/[sequtils, os],
std/[sets, sequtils, os],
rocksdb,
results,
../../aristo_desc,
@ -22,27 +22,12 @@ import
../../../opts
# ------------------------------------------------------------------------------
# Public constructor
# Private constructor
# ------------------------------------------------------------------------------
proc init*(
rdb: var RdbInst;
basePath: string;
proc getInitOptions(
opts: DbOptions;
): Result[void,(AristoError,string)] =
## Constructor c ode inspired by `RocksStoreRef.init()` from
## kvstore_rocksdb.nim
const initFailed = "RocksDB/init() failed"
rdb.basePath = basePath
let
dataDir = rdb.dataDir
try:
dataDir.createDir
except OSError, IOError:
return err((RdbBeCantCreateDataDir, ""))
): tuple[cfOpts: ColFamilyOptionsRef, dbOpts: DbOptionsRef] =
# TODO the configuration options below have not been tuned but are rather
# based on gut feeling, guesses and by looking at other clients - it
# would make sense to test different settings and combinations once the
@ -86,13 +71,7 @@ proc init*(
# https://github.com/facebook/rocksdb/wiki/Compression
cfOpts.setBottommostCompression(Compression.lz4Compression)
let
cfs = @[initColFamilyDescriptor(AdmCF, cfOpts),
initColFamilyDescriptor(VtxCF, cfOpts),
initColFamilyDescriptor(KeyCF, cfOpts)] &
RdbGuest.mapIt(initColFamilyDescriptor($it, cfOpts))
dbOpts = defaultDbOptions()
let dbOpts = defaultDbOptions()
dbOpts.setMaxOpenFiles(opts.maxOpenFiles)
dbOpts.setMaxBytesForLevelBase(opts.writeBufferSize)
@ -110,6 +89,7 @@ proc init*(
# https://github.com/facebook/rocksdb/blob/af50823069818fc127438e39fef91d2486d6e76c/include/rocksdb/advanced_options.h#L696
dbOpts.setOptimizeFiltersForHits(true)
let tableOpts = defaultTableOptions()
# This bloom filter helps avoid having to read multiple SST files when looking
# for a value.
@ -141,39 +121,114 @@ proc init*(
dbOpts.setBlockBasedTableFactory(tableOpts)
# Reserve a family corner for `Aristo` on the database
let baseDb = openRocksDb(dataDir, dbOpts, columnFamilies = cfs).valueOr:
(cfOpts,dbOpts)
proc initImpl(
rdb: var RdbInst;
basePath: string;
opts: DbOptions;
guestCFs: openArray[ColFamilyDescriptor] = [];
): Result[void,(AristoError,string)] =
## Database backend constructor
const initFailed = "RocksDB/init() failed"
rdb.basePath = basePath
rdb.opts = opts
let
dataDir = rdb.dataDir
try:
dataDir.createDir
except OSError, IOError:
return err((RdbBeCantCreateDataDir, ""))
# Expand argument `opts` to rocksdb options
let (cfOpts, dbOpts) = opts.getInitOptions()
# Column familiy names to allocate when opening the database. This list
# might be extended below.
var useCFs = AristoCFs.mapIt($it).toHashSet
# The `guestCFs` list must not overwrite `AristoCFs` options
let guestCFs = guestCFs.filterIt(it.name notin useCFs)
# If the database exists already, check for missing column families and
# allocate them for opening. Otherwise rocksdb might reject the peristent
# database.
if (dataDir / "CURRENT").fileExists:
let hdCFs = dataDir.listColumnFamilies.valueOr:
raiseAssert initFailed & " cannot read existing CFs: " & error
# Update list of column families for opener.
useCFs = useCFs + hdCFs.toHashSet
# The `guestCFs` list might come with a different set of options. So it is
# temporarily removed from `useCFs` and will be re-added with appropriate
# options.
let guestCFq = @guestCFs
useCFs = useCFs - guestCFs.mapIt(it.name).toHashSet
# Finalise list of column families
let cfs = useCFs.toSeq.mapIt(it.initColFamilyDescriptor cfOpts) & guestCFq
# Open database for the extended family :)
let baseDb = openRocksDb(dataDir, dbOpts, columnFamilies=cfs).valueOr:
raiseAssert initFailed & " cannot create base descriptor: " & error
# Initialise column handlers (this stores implicitely `baseDb`)
rdb.admCol = baseDb.withColFamily(AdmCF).valueOr:
rdb.admCol = baseDb.withColFamily($AdmCF).valueOr:
raiseAssert initFailed & " cannot initialise AdmCF descriptor: " & error
rdb.vtxCol = baseDb.withColFamily(VtxCF).valueOr:
rdb.vtxCol = baseDb.withColFamily($VtxCF).valueOr:
raiseAssert initFailed & " cannot initialise VtxCF descriptor: " & error
rdb.keyCol = baseDb.withColFamily(KeyCF).valueOr:
rdb.keyCol = baseDb.withColFamily($KeyCF).valueOr:
raiseAssert initFailed & " cannot initialise KeyCF descriptor: " & error
ok()
proc initGuestDb*(
rdb: RdbInst;
instance: int;
): Result[RootRef,(AristoError,string)] =
## Initialise `Guest` family
##
## Thus was a worth a try, but there are better solutions and this item
## will be removed in future.
##
if high(RdbGuest).ord < instance:
return err((RdbGuestInstanceUnsupported,""))
let
guestSym = $RdbGuest(instance)
guestDb = rdb.baseDb.withColFamily(guestSym).valueOr:
raiseAssert "RocksDb/initGuestDb() failed: " & error
# ------------------------------------------------------------------------------
# Public constructor
# ------------------------------------------------------------------------------
ok RdbGuestDbRef(
beKind: BackendRocksDB,
guestDb: guestDb)
proc init*(
rdb: var RdbInst;
basePath: string;
opts: DbOptions;
): Result[void,(AristoError,string)] =
## Temporarily define a guest CF list here.
rdb.initImpl(basePath, opts)
proc reinit*(
rdb: var RdbInst;
cfs: openArray[ColFamilyDescriptor];
): Result[seq[ColFamilyReadWrite],(AristoError,string)] =
## Re-open database with changed parameters. Even though tx layers and
## filters might not be affected it is prudent to have them clean and
## saved on the backend database before changing it.
##
## The function returns a list of column family descriptors in the same
## order as the `cfs` argument.
##
## The `cfs` list replaces and extends the CFs already on disk by its
## options except for the ones defined with `AristoCFs`.
##
const initFailed = "RocksDB/reinit() failed"
if not rdb.session.isNil:
return err((RdbBeWrSessionUnfinished,""))
if not rdb.baseDb.isClosed():
rdb.baseDb.close()
rdb.initImpl(rdb.basePath, rdb.opts, cfs).isOkOr:
return err(error)
# Assemble list of column family descriptors
var guestCols = newSeq[ColFamilyReadWrite](cfs.len)
for n,col in cfs:
guestCols[n] = rdb.baseDb.withColFamily(col.name).valueOr:
raiseAssert initFailed & " cannot initialise " &
col.name & " descriptor: " & error
ok guestCols
proc destroy*(rdb: var RdbInst; flush: bool) =

View File

@ -72,13 +72,13 @@ proc putAdm*(
): Result[void,(AdminTabID,AristoError,string)] =
let dsc = rdb.session
if data.len == 0:
dsc.delete(xid.toOpenArray, AdmCF).isOkOr:
dsc.delete(xid.toOpenArray, $AdmCF).isOkOr:
const errSym = RdbBeDriverDelAdmError
when extraTraceMessages:
trace logTxt "putAdm()", xid, error=errSym, info=error
return err((xid,errSym,error))
else:
dsc.put(xid.toOpenArray, data, AdmCF).isOkOr:
dsc.put(xid.toOpenArray, data, $AdmCF).isOkOr:
const errSym = RdbBeDriverPutAdmError
when extraTraceMessages:
trace logTxt "putAdm()", xid, error=errSym, info=error
@ -94,7 +94,7 @@ proc putKey*(
for (vid,key) in data:
if key.isValid:
dsc.put(vid.toOpenArray, key.data, KeyCF).isOkOr:
dsc.put(vid.toOpenArray, key.data, $KeyCF).isOkOr:
# Caller must `rollback()` which will flush the `rdKeyLru` cache
const errSym = RdbBeDriverPutKeyError
when extraTraceMessages:
@ -106,7 +106,7 @@ proc putKey*(
discard rdb.rdKeyLru.lruAppend(vid, key, RdKeyLruMaxSize)
else:
dsc.delete(vid.toOpenArray, KeyCF).isOkOr:
dsc.delete(vid.toOpenArray, $KeyCF).isOkOr:
# Caller must `rollback()` which will flush the `rdKeyLru` cache
const errSym = RdbBeDriverDelKeyError
when extraTraceMessages:
@ -132,7 +132,7 @@ proc putVtx*(
# Caller must `rollback()` which will flush the `rdVtxLru` cache
return err((vid,rc.error,""))
dsc.put(vid.toOpenArray, rc.value, VtxCF).isOkOr:
dsc.put(vid.toOpenArray, rc.value, $VtxCF).isOkOr:
# Caller must `rollback()` which will flush the `rdVtxLru` cache
const errSym = RdbBeDriverPutVtxError
when extraTraceMessages:
@ -144,7 +144,7 @@ proc putVtx*(
discard rdb.rdVtxLru.lruAppend(vid, vtx, RdVtxLruMaxSize)
else:
dsc.delete(vid.toOpenArray, VtxCF).isOkOr:
dsc.delete(vid.toOpenArray, $VtxCF).isOkOr:
# Caller must `rollback()` which will flush the `rdVtxLru` cache
const errSym = RdbBeDriverDelVtxError
when extraTraceMessages:

View File

@ -679,7 +679,8 @@ proc swapCtx*(base: AristoBaseRef; ctx: CoreDbCtxRef): CoreDbCtxRef =
# Set read-write access and install
base.ctx = AristoCoreDbCtxRef(ctx)
base.api.reCentre(base.ctx.mpt)
base.api.reCentre(base.ctx.mpt).isOkOr:
raiseAssert "swapCtx() failed: " & $error
proc persistent*(

View File

@ -204,10 +204,13 @@ proc persistent*(
rc = api.persist(kvt)
if rc.isOk:
ok()
elif api.level(kvt) == 0:
err(rc.error.toError(base, info))
else:
elif api.level(kvt) != 0:
err(rc.error.toError(base, info, TxPending))
elif rc.error == TxPersistDelayed:
# This is OK: Piggybacking on `Aristo` backend
ok()
else:
err(rc.error.toError(base, info))
# ------------------------------------------------------------------------------
# Public constructors and related

View File

@ -39,10 +39,22 @@ const
# ------------------------------------------------------------------------------
proc newAristoRocksDbCoreDbRef*(path: string, opts: DbOptions): CoreDbRef =
## This funcion piggybacks the `KVT` on the `Aristo` backend.
let
adb = AristoDbRef.init(use_ari.RdbBackendRef, path, opts).expect aristoFail
gdb = adb.guestDb().valueOr: GuestDbRef(nil)
kdb = KvtDbRef.init(use_kvt.RdbBackendRef, path, gdb).expect kvtFail
adb = AristoDbRef.init(use_ari.RdbBackendRef, path, opts).valueOr:
raiseAssert aristoFail & ": " & $error
kdb = KvtDbRef.init(use_kvt.RdbBackendRef, adb, opts).valueOr:
raiseAssert kvtFail & ": " & $error
AristoDbRocks.create(kdb, adb)
proc newAristoDualRocksDbCoreDbRef*(path: string, opts: DbOptions): CoreDbRef =
## This is mainly for debugging. The KVT is run on a completely separate
## database backend.
let
adb = AristoDbRef.init(use_ari.RdbBackendRef, path, opts).valueOr:
raiseAssert aristoFail & ": " & $error
kdb = KvtDbRef.init(use_kvt.RdbBackendRef, path, opts).valueOr:
raiseAssert kvtFail & ": " & $error
AristoDbRocks.create(kdb, adb)
# ------------------------------------------------------------------------------

View File

@ -820,7 +820,7 @@ proc level*(db: CoreDbRef): int =
proc persistent*(
db: CoreDbRef;
): CoreDbRc[void] {.discardable.} =
): CoreDbRc[void] =
## For the legacy database, this function has no effect and succeeds always.
## It will nevertheless return a discardable error if there is a pending
## transaction (i.e. `db.level() == 0`.)

View File

@ -28,6 +28,7 @@ type
Ooops
AristoDbMemory ## Memory backend emulator
AristoDbRocks ## RocksDB backend
AristoDbDualRocks ## Dual RocksDB backends for `Kvt` and `Aristo`
AristoDbVoid ## No backend
const

View File

@ -39,11 +39,20 @@ proc newCoreDbRef*(
): CoreDbRef =
## Constructor for persistent type DB
##
## Note: Using legacy notation `newCoreDbRef()` rather than
## `CoreDbRef.init()` because of compiler coughing.
## The production database type is `AristoDbRocks` which uses a single
## `RocksDb` backend for both, `Aristo` and `KVT`.
##
## For debugging, there is the `AristoDbDualRocks` database with split
## backends for `Aristo` and `KVT`. This database is not compatible with
## `AristoDbRocks` so it cannot be reliably switched between both versions
## with consecutive sessions.
##
when dbType == AristoDbRocks:
newAristoRocksDbCoreDbRef path, opts
elif dbType == AristoDbDualRocks:
newAristoDualRocksDbCoreDbRef path, opts
else:
{.error: "Unsupported dbType for persistent newCoreDbRef()".}

View File

@ -58,7 +58,7 @@ type
KvtApiNForkedFn* = proc(db: KvtDbRef): int {.noRaise.}
KvtApiPutFn* = proc(db: KvtDbRef,
key, data: openArray[byte]): Result[void,KvtError] {.noRaise.}
KvtApiReCentreFn* = proc(db: KvtDbRef) {.noRaise.}
KvtApiReCentreFn* = proc(db: KvtDbRef): Result[void,KvtError] {.noRaise.}
KvtApiRollbackFn* = proc(tx: KvtTxRef): Result[void,KvtError] {.noRaise.}
KvtApiPersistFn* = proc(db: KvtDbRef): Result[void,KvtError] {.noRaise.}
KvtApiToKvtDbRefFn* = proc(tx: KvtTxRef): KvtDbRef {.noRaise.}
@ -156,7 +156,7 @@ proc dup(be: BackendRef): BackendRef =
of BackendMemory:
return MemBackendRef(be).dup
of BackendRocksDB:
of BackendRocksDB, BackendRdbTriggered:
when KvtPersistentBackendOk:
return RdbBackendRef(be).dup
@ -306,9 +306,9 @@ func init*(
result = api.put(a, b, c)
profApi.reCentre =
proc(a: KvtDbRef) =
proc(a: KvtDbRef): auto =
KvtApiProfReCentreFn.profileRunner:
api.reCentre(a)
result = api.reCentre(a)
profApi.rollback =
proc(a: KvtTxRef): auto =

View File

@ -17,29 +17,29 @@ import
results,
./kvt_desc,
./kvt_desc/desc_backend,
./kvt_filter/[filter_merge, filter_reverse]
./kvt_delta/[delta_merge, delta_reverse]
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc filterMerge*(
proc deltaMerge*(
db: KvtDbRef; # Database
filter: LayerDeltaRef; # Filter to apply to database
delta: LayerDeltaRef; # Filter to apply to database
) =
## Merge the argument `filter` into the read-only filter layer. Note that
## Merge the argument `delta` into the balancer filter layer. Note that
## this function has no control of the filter source. Having merged the
## argument `filter`, all the `top` and `stack` layers should be cleared.
## argument `delta`, all the `top` and `stack` layers should be cleared.
##
db.merge(filter, db.roFilter)
db.merge(delta, db.balancer)
proc filterUpdateOk*(db: KvtDbRef): bool =
## Check whether the read-only filter can be merged into the backend
proc deltaUpdateOk*(db: KvtDbRef): bool =
## Check whether the balancer filter can be merged into the backend
not db.backend.isNil and db.isCentre
proc filterUpdate*(
proc deltaUpdate*(
db: KvtDbRef; # Database
reCentreOk = false;
): Result[void,KvtError] =
@ -58,7 +58,7 @@ proc filterUpdate*(
return err(FilBackendMissing)
# Blind or missing filter
if db.roFilter.isNil:
if db.balancer.isNil:
return ok()
# Make sure that the argument `db` is at the centre so the backend is in
@ -67,21 +67,21 @@ proc filterUpdate*(
if db != parent:
if not reCentreOk:
return err(FilBackendRoMode)
db.reCentre
? db.reCentre()
# Always re-centre to `parent` (in case `reCentreOk` was set)
defer: parent.reCentre
defer: discard parent.reCentre()
# Store structural single trie entries
let writeBatch = be.putBegFn()
be.putKvpFn(writeBatch, db.roFilter.sTab.pairs.toSeq)
let writeBatch = ? be.putBegFn()
be.putKvpFn(writeBatch, db.balancer.sTab.pairs.toSeq)
? be.putEndFn writeBatch
# Update peer filter balance.
let rev = db.filterReverse db.roFilter
let rev = db.deltaReverse db.balancer
for w in db.forked:
db.merge(rev, w.roFilter)
db.merge(rev, w.balancer)
db.roFilter = LayerDeltaRef(nil)
db.balancer = LayerDeltaRef(nil)
ok()
# ------------------------------------------------------------------------------

View File

@ -17,21 +17,21 @@ import
# Public functions
# ------------------------------------------------------------------------------
proc filterReverse*(
proc deltaReverse*(
db: KvtDbRef; # Database
filter: LayerDeltaRef; # Filter to revert
delta: LayerDeltaRef; # Filter to revert
): LayerDeltaRef =
## Assemble reverse filter for the `filter` argument, i.e. changes to the
## backend that reverse the effect of applying the this read-only filter.
## Assemble a reverse filter for the `delta` argument, i.e. changes to the
## backend that reverse the effect of applying this to the balancer filter.
## The resulting filter is calculated against the current *unfiltered*
## backend (excluding optionally installed read-only filter.)
## backend (excluding optionally installed balancer filters.)
##
## If `filter` is `nil`, the result will be `nil` as well.
if not filter.isNil:
## If `delta` is `nil`, the result will be `nil` as well.
if not delta.isNil:
result = LayerDeltaRef()
# Calculate reverse changes for the `sTab[]` structural table
for key in filter.sTab.keys:
for key in delta.sTab.keys:
let rc = db.getUbe key
if rc.isOk:
result.sTab[key] = rc.value

View File

@ -42,12 +42,11 @@ type
centre: KvtDbRef ## Link to peer with write permission
peers: HashSet[KvtDbRef] ## List of all peers
KvtDbRef* = ref KvtDbObj
KvtDbObj* = object
KvtDbRef* = ref object of RootRef
## Three tier database object supporting distributed instances.
top*: LayerRef ## Database working layer, mutable
stack*: seq[LayerRef] ## Stashed immutable parent layers
roFilter*: LayerDeltaRef ## Apply read filter (locks writing)
balancer*: LayerDeltaRef ## Apply read filter (locks writing)
backend*: BackendRef ## Backend database (may well be `nil`)
txRef*: KvtTxRef ## Latest active transaction
@ -62,6 +61,17 @@ type
KvtDbAction* = proc(db: KvtDbRef) {.gcsafe, raises: [].}
## Generic call back function/closure.
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc canMod(db: KvtDbRef): Result[void,KvtError] =
## Ask for permission before doing nasty stuff
if db.backend.isNil:
ok()
else:
db.backend.canModFn()
# ------------------------------------------------------------------------------
# Public helpers
# ------------------------------------------------------------------------------
@ -97,7 +107,7 @@ func getCentre*(db: KvtDbRef): KvtDbRef =
##
if db.dudes.isNil: db else: db.dudes.centre
proc reCentre*(db: KvtDbRef) =
proc reCentre*(db: KvtDbRef): Result[void,KvtError] =
## Re-focus the `db` argument descriptor so that it becomes the centre.
## Nothing is done if the `db` descriptor is the centre, already.
##
@ -111,8 +121,10 @@ proc reCentre*(db: KvtDbRef) =
## accessing the same backend database. Descriptors where `isCentre()`
## returns `false` must be single destructed with `forget()`.
##
if not db.dudes.isNil:
if not db.dudes.isNil and db.dudes.centre != db:
? db.canMod()
db.dudes.centre = db
ok()
proc fork*(
db: KvtDbRef;
@ -140,7 +152,7 @@ proc fork*(
dudes: db.dudes)
if not noFilter:
clone.roFilter = db.roFilter # Ref is ok here (filters are immutable)
clone.balancer = db.balancer # Ref is ok here (filters are immutable)
if not noTopLayer:
clone.top = LayerRef.init()
@ -177,6 +189,7 @@ proc forget*(db: KvtDbRef): Result[void,KvtError] =
elif db notin db.dudes.peers:
err(StaleDescriptor)
else:
? db.canMod()
db.dudes.peers.excl db # Unlink argument `db` from peers list
ok()
@ -187,7 +200,7 @@ proc forgetOthers*(db: KvtDbRef): Result[void,KvtError] =
if not db.dudes.isNil:
if db.dudes.centre != db:
return err(MustBeOnCentre)
? db.canMod()
db.dudes = DudesRef(nil)
ok()

View File

@ -33,7 +33,7 @@ type
## by any library function using the backend.
PutBegFn* =
proc(): PutHdlRef {.gcsafe, raises: [].}
proc(): Result[PutHdlRef,KvtError] {.gcsafe, raises: [].}
## Generic transaction initialisation function
PutKvpFn* =
@ -53,6 +53,21 @@ type
## `false` the outcome might differ depending on the type of backend
## (e.g. in-memory backends would flush on close.)
CanModFn* =
proc(): Result[void,KvtError] {.gcsafe, raises: [].}
## This function returns OK if there is nothing to prevent the main
## `KVT` descriptors being modified (e.g. by `reCentre()`) or by
## adding/removing a new peer (e.g. by `fork()` or `forget()`.)
SetWrReqFn* =
proc(db: RootRef): Result[void,KvtError] {.gcsafe, raises: [].}
## This function stores a request function for the piggiback mode
## writing to the `Aristo` set of column families.
##
## If used at all, this function would run `rocks_db.setWrReqTriggeredFn()()`
## with a `KvtDbRef` type argument for `db`. This allows to run the `Kvt`
## without linking to the rocksdb interface unless it is really needed.
# -------------
BackendRef* = ref BackendObj
@ -66,6 +81,9 @@ type
putEndFn*: PutEndFn ## Commit bulk store session
closeFn*: CloseFn ## Generic destructor
canModFn*: CanModFn ## Lock-alike
setWrReqFn*: SetWrReqFn ## Register main descr for write request
proc init*(trg: var BackendObj; src: BackendObj) =
trg.getKvpFn = src.getKvpFn
@ -73,6 +91,7 @@ proc init*(trg: var BackendObj; src: BackendObj) =
trg.putKvpFn = src.putKvpFn
trg.putEndFn = src.putEndFn
trg.closeFn = src.closeFn
trg.canModFn = src.canModFn
# ------------------------------------------------------------------------------
# End

View File

@ -19,11 +19,16 @@ type
# RocksDB backend
RdbBeCantCreateDataDir
RdbBeDelayedAlreadyRegistered
RdbBeDelayedLocked
RdbBeDelayedNotReady
RdbBeDriverDelError
RdbBeDriverGetError
RdbBeDriverInitError
RdbBeDriverPutError
RdbBeDriverWriteError
RdbBeHostError
RdbBeHostNotApplicable
# Transaction wrappers
TxArgStaleTx
@ -33,6 +38,7 @@ type
TxNoPendingTx
TxNotTopTx
TxPendingTx
TxPersistDelayed
TxStackGarbled
TxStackUnderflow

View File

@ -11,18 +11,20 @@
{.push raises: [].}
import
../../aristo/aristo_init/init_common,
../kvt_desc,
../kvt_desc/desc_backend
export
BackendType # borrowed from Aristo
const
verifyIxId = true # and false
## Enforce session tracking
type
BackendType* = enum
BackendVoid = 0 ## For providing backend-less constructor
BackendMemory ## Same as Aristo
BackendRocksDB ## Same as Aristo
BackendRdbTriggered ## Piggybacked on remote write session
TypedBackendRef* = ref TypedBackendObj
TypedBackendObj* = object of BackendObj
beKind*: BackendType ## Backend type identifier

View File

@ -86,8 +86,8 @@ proc getKvpFn(db: MemBackendRef): GetKvpFn =
proc putBegFn(db: MemBackendRef): PutBegFn =
result =
proc(): PutHdlRef =
db.newSession()
proc(): Result[PutHdlRef,KvtError] =
ok db.newSession()
proc putKvpFn(db: MemBackendRef): PutKvpFn =
result =
@ -120,6 +120,16 @@ proc closeFn(db: MemBackendRef): CloseFn =
proc(ignore: bool) =
discard
proc canModFn(db: MemBackendRef): CanModFn =
result =
proc(): Result[void,KvtError] =
ok()
proc setWrReqFn(db: MemBackendRef): SetWrReqFn =
result =
proc(kvt: RootRef): Result[void,KvtError] =
err(RdbBeHostNotApplicable)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
@ -136,7 +146,8 @@ proc memoryBackend*: BackendRef =
db.putEndFn = putEndFn db
db.closeFn = closeFn db
db.canModFn = canModFn db
db.setWrReqFn = setWrReqFn db
db
proc dup*(db: MemBackendRef): MemBackendRef =

View File

@ -32,7 +32,7 @@ export
# Public helpers
# -----------------------------------------------------------------------------
proc kind*(
func kind*(
be: BackendRef;
): BackendType =
## Retrieves the backend type symbol for a `be` backend database argument

View File

@ -20,43 +20,70 @@
import
results,
../../aristo,
../../opts,
../kvt_desc,
"."/[rocks_db, memory_only]
from ../../aristo/aristo_persistent
import GuestDbRef, getRocksDbFamily
export
RdbBackendRef,
memory_only
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
func toErr0(err: (KvtError,string)): KvtError =
err[0]
# ------------------------------------------------------------------------------
# Public database constuctors, destructor
# ------------------------------------------------------------------------------
proc init*[W: MemOnlyBackend|RdbBackendRef](
proc init*(
T: type KvtDbRef;
B: type W;
B: type RdbBackendRef;
basePath: string;
guestDb = GuestDbRef(nil);
opts: DbOptions;
): Result[KvtDbRef,KvtError] =
## Generic constructor, `basePath` argument is ignored for `BackendNone` and
## `BackendMemory` type backend database. Also, both of these backends
## aways succeed initialising.
## Generic constructor for `RocksDb` backend
##
## If the argument `guestDb` is set and is a RocksDB column familly, the
## `Kvt`batabase is built upon this column familly. Othewise it is newly
## created with `basePath` as storage location.
##
when B is RdbBackendRef:
let rc = guestDb.getRocksDbFamily()
if rc.isOk:
ok KvtDbRef(top: LayerRef.init(), backend: ? rocksDbKvtBackend rc.value)
else:
ok KvtDbRef(top: LayerRef.init(), backend: ? rocksDbKvtBackend basePath)
ok KvtDbRef(
top: LayerRef.init(),
backend: ? rocksDbKvtBackend(basePath, opts).mapErr toErr0)
else:
ok KvtDbRef.init B
proc init*(
T: type KvtDbRef;
B: type RdbBackendRef;
adb: AristoDbRef;
opts: DbOptions;
): Result[KvtDbRef,KvtError] =
## Constructor for `RocksDb` backend which piggybacks on the `Aristo`
## backend. The following changes will occur after successful instantiation:
##
## * When invoked, the function `kvt_tx.persistent()` will always return an
## error. If everything is all right (e.g. saving is possible), the error
## returned will be `TxPersistDelayed`. This indicates that the save
## request was queued, waiting for being picked up by an event handler.
##
## * There should be an invocation of `aristo_tx.persistent()` immediately
## follwing the `kvt_tx.persistent()` call (some `KVT` functions might
## return `RdbBeDelayedLocked` or similar errors while the save request
## is pending.) Once successful, the`aristo_tx.persistent()` function will
## also have commited the pending save request mentioned above.
##
## * The function `kvt_init/memory_only.finish()` does nothing.
##
## * The function `aristo_init/memory_only.finish()` will close both
## sessions, the one for `KVT` and the other for `Aristo`.
##
## * The functiond `kvt_delta.deltaUpdate()` and `tx_stow.tcStow()` should
## not be invoked directly (they will stop with an error most of the time,
## anyway.)
##
ok KvtDbRef(
top: LayerRef.init(),
backend: ? rocksDbKvtTriggeredBackend(adb, opts).mapErr toErr0)
# ------------------------------------------------------------------------------
# End

View File

@ -27,18 +27,19 @@
{.push raises: [].}
import
chronicles,
eth/common,
rocksdb,
results,
../../aristo/aristo_init/persistent,
../../opts,
../kvt_desc,
../kvt_desc/desc_backend,
../kvt_tx/tx_stow,
./init_common,
./rocks_db/[rdb_desc, rdb_get, rdb_init, rdb_put, rdb_walk]
const
maxOpenFiles = 512 ## Rocks DB setup, open files limit
extraTraceMessages = false or true
## Enabled additional logging noise
@ -48,11 +49,8 @@ type
RdbPutHdlRef = ref object of TypedPutHdlRef
when extraTraceMessages:
import chronicles
logScope:
topics = "aristo-backend"
logScope:
topics = "kvt-backend"
# ------------------------------------------------------------------------------
# Private helpers
@ -75,7 +73,7 @@ proc endSession(hdl: PutHdlRef; db: RdbBackendRef): RdbPutHdlRef =
hdl.RdbPutHdlRef
# ------------------------------------------------------------------------------
# Private functions: interface
# Private functions: standard interface
# ------------------------------------------------------------------------------
proc getKvpFn(db: RdbBackendRef): GetKvpFn =
@ -98,9 +96,10 @@ proc getKvpFn(db: RdbBackendRef): GetKvpFn =
proc putBegFn(db: RdbBackendRef): PutBegFn =
result =
proc(): PutHdlRef =
proc(): Result[PutHdlRef,KvtError] =
db.rdb.begin()
db.newSession()
ok db.newSession()
proc putKvpFn(db: RdbBackendRef): PutKvpFn =
result =
@ -114,6 +113,7 @@ proc putKvpFn(db: RdbBackendRef): PutKvpFn =
hdl.info = error[2]
return
proc putEndFn(db: RdbBackendRef): PutEndFn =
result =
proc(hdl: PutHdlRef): Result[void,KvtError] =
@ -121,6 +121,7 @@ proc putEndFn(db: RdbBackendRef): PutEndFn =
if hdl.error != KvtError(0):
when extraTraceMessages:
debug logTxt "putEndFn: failed", error=hdl.error, info=hdl.info
db.rdb.rollback()
return err(hdl.error)
# Commit session
@ -136,16 +137,117 @@ proc closeFn(db: RdbBackendRef): CloseFn =
proc(flush: bool) =
db.rdb.destroy(flush)
# --------------
proc canModFn(db: RdbBackendRef): CanModFn =
result =
proc(): Result[void,KvtError] =
ok()
proc setup(db: RdbBackendRef) =
db.getKvpFn = getKvpFn db
proc setWrReqFn(db: RdbBackendRef): SetWrReqFn =
result =
proc(kvt: RootRef): Result[void,KvtError] =
err(RdbBeHostNotApplicable)
db.putBegFn = putBegFn db
db.putKvpFn = putKvpFn db
db.putEndFn = putEndFn db
# ------------------------------------------------------------------------------
# Private functions: triggered interface changes
# ------------------------------------------------------------------------------
db.closeFn = closeFn db
proc putBegTriggeredFn(db: RdbBackendRef): PutBegFn =
## Variant of `putBegFn()` for piggyback write batch
result =
proc(): Result[PutHdlRef,KvtError] =
# Check whether somebody else initiated the rocksdb write batch/session
if db.rdb.session.isNil:
const error = RdbBeDelayedNotReady
when extraTraceMessages:
debug logTxt "putBegTriggeredFn: failed", error
return err(error)
ok db.newSession()
proc putEndTriggeredFn(db: RdbBackendRef): PutEndFn =
## Variant of `putEndFn()` for piggyback write batch
result =
proc(hdl: PutHdlRef): Result[void,KvtError] =
# There is no commit()/rollback() here as we do not own the backend.
let hdl = hdl.endSession db
if hdl.error != KvtError(0):
when extraTraceMessages:
debug logTxt "putEndTriggeredFn: failed",
error=hdl.error, info=hdl.info
# The error return code will signal a problem to the `txStow()`
# function which was called by `writeEvCb()` below.
return err(hdl.error)
# Commit the session. This will be acknowledged by the `txStow()`
# function which was called by `writeEvCb()` below.
ok()
proc closeTriggeredFn(db: RdbBackendRef): CloseFn =
## Variant of `closeFn()` for piggyback write batch
result =
proc(flush: bool) =
# Nothing to do here as we do not own the backend
discard
proc canModTriggeredFn(db: RdbBackendRef): CanModFn =
## Variant of `canModFn()` for piggyback write batch
result =
proc(): Result[void,KvtError] =
# Deny modifications/changes if there is a pending write request
if not db.rdb.delayedPersist.isNil:
return err(RdbBeDelayedLocked)
ok()
proc setWrReqTriggeredFn(db: RdbBackendRef): SetWrReqFn =
result =
proc(kvt: RootRef): Result[void,KvtError] =
if db.rdb.delayedPersist.isNil:
db.rdb.delayedPersist = KvtDbRef(kvt)
ok()
else:
err(RdbBeDelayedAlreadyRegistered)
# ------------------------------------------------------------------------------
# Private function: trigger handler
# ------------------------------------------------------------------------------
proc writeEvCb(db: RdbBackendRef): RdbWriteEventCb =
## Write session event handler
result =
proc(ws: WriteBatchRef): bool =
# Only do something if a write session request was queued
if not db.rdb.delayedPersist.isNil:
defer:
# Clear session environment when leaving. This makes sure that the
# same session can only be run once.
db.rdb.session = WriteBatchRef(nil)
db.rdb.delayedPersist = KvtDbRef(nil)
# Publish session argument
db.rdb.session = ws
# Execute delayed session. Note the the `txStow()` function is located
# in `tx_stow.nim`. This module `tx_stow.nim` is also imported by
# `kvt_tx.nim` which contains `persist() `. So the logic goes:
# ::
# kvt_tx.persist() --> registers a delayed write request rather
# than excuting tx_stow.txStow()
#
# // the backend owner (i.e. Aristo) will start a write cycle and
# // invoke the envent handler rocks_db.writeEvCb()
# rocks_db.writeEvCb() --> calls tx_stow.txStow()
#
# tx_stow.txStow() --> calls rocks_db.putBegTriggeredFn()
# calls rocks_db.putKvpFn()
# calls rocks_db.putEndTriggeredFn()
#
let rc = db.rdb.delayedPersist.txStow(persistent=true)
if rc.isErr:
error "writeEventCb(): persist() failed", error=rc.error
return false
true
# ------------------------------------------------------------------------------
# Public functions
@ -153,28 +255,58 @@ proc setup(db: RdbBackendRef) =
proc rocksDbKvtBackend*(
path: string;
): Result[BackendRef,KvtError] =
opts: DbOptions;
): Result[BackendRef,(KvtError,string)] =
let db = RdbBackendRef(
beKind: BackendRocksDB)
# Initialise RocksDB
db.rdb.init(path, maxOpenFiles).isOkOr:
db.rdb.init(path, opts).isOkOr:
when extraTraceMessages:
trace logTxt "constructor failed", error=error[0], info=error[1]
return err(error[0])
return err(error)
db.setup()
db.getKvpFn = getKvpFn db
db.putBegFn = putBegFn db
db.putKvpFn = putKvpFn db
db.putEndFn = putEndFn db
db.closeFn = closeFn db
db.canModFn = canModFn db
db.setWrReqFn = setWrReqFn db
ok db
proc rocksDbKvtBackend*(
store: ColFamilyReadWrite;
): Result[BackendRef,KvtError] =
proc rocksDbKvtTriggeredBackend*(
adb: AristoDbRef;
opts: DbOptions;
): Result[BackendRef,(KvtError,string)] =
let db = RdbBackendRef(
beKind: BackendRocksDB)
db.rdb.init(store)
db.setup()
beKind: BackendRdbTriggered)
# Initialise RocksDB piggy-backed on `Aristo` backend.
db.rdb.init(adb, opts).isOkOr:
when extraTraceMessages:
trace logTxt "constructor failed", error=error[0], info=error[1]
return err(error)
# Register write session event handler
adb.activateWrTrigger(db.writeEvCb()).isOkOr:
return err((RdbBeHostError,$error))
db.getKvpFn = getKvpFn db
db.putBegFn = putBegTriggeredFn db
db.putKvpFn = putKvpFn db
db.putEndFn = putEndTriggeredFn db
db.closeFn = closeTriggeredFn db
db.canModFn = canModTriggeredFn db
db.setWrReqFn = setWrReqTriggeredFn db
ok db
proc dup*(db: RdbBackendRef): RdbBackendRef =
new result
init_common.init(result[], db[])

View File

@ -15,16 +15,25 @@
import
std/os,
../../kvt_desc,
rocksdb
type
RdbInst* = object
store*: ColFamilyReadWrite ## Rocks DB database handler
store*: KvtCfStore ## Rocks DB database handler
session*: WriteBatchRef ## For batched `put()`
basePath*: string ## Database directory
delayedPersist*: KvtDbRef ## Enable next proggyback write session
KvtCFs* = enum
## Column family symbols/handles and names used on the database
KvtGeneric = "KvtGen" ## Generic column family
KvtCfStore* = array[KvtCFs,ColFamilyReadWrite]
## List of column family handlers
const
KvtFamily* = "Kvt" ## RocksDB column family
BaseFolder* = "nimbus" ## Same as for Legacy DB
DataFolder* = "kvt" ## Legacy DB has "data"
@ -32,6 +41,10 @@ const
# Public functions
# ------------------------------------------------------------------------------
template logTxt*(info: static[string]): static[string] =
"RocksDB/" & info
func baseDir*(rdb: RdbInst): string =
rdb.basePath / BaseFolder
@ -39,8 +52,8 @@ func dataDir*(rdb: RdbInst): string =
rdb.baseDir / DataFolder
template logTxt*(info: static[string]): static[string] =
"RocksDB/" & info
template baseDb*(rdb: RdbInst): RocksDbReadWriteRef =
rdb.store[KvtGeneric].db
# ------------------------------------------------------------------------------
# End

View File

@ -43,7 +43,7 @@ proc get*(
let onData: DataProc = proc(data: openArray[byte]) =
res = @data
let gotData = rdb.store.get(key, onData).valueOr:
let gotData = rdb.store[KvtGeneric].get(key, onData).valueOr:
const errSym = RdbBeDriverGetError
when extraTraceMessages:
trace logTxt "get", error=errSym, info=error

View File

@ -14,21 +14,37 @@
{.push raises: [].}
import
std/os,
std/[sequtils, os],
rocksdb,
results,
../../../aristo/aristo_init/persistent,
../../../opts,
../../kvt_desc,
../../kvt_desc/desc_error as kdb,
./rdb_desc
const
extraTraceMessages = false
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
when extraTraceMessages:
import chronicles
proc getCFInitOptions(opts: DbOptions): ColFamilyOptionsRef =
result = defaultColFamilyOptions()
if opts.writeBufferSize > 0:
result.setWriteBufferSize(opts.writeBufferSize)
logScope:
topics = "kvt-backend"
proc getDbInitOptions(opts: DbOptions): DbOptionsRef =
result = defaultDbOptions()
result.setMaxOpenFiles(opts.maxOpenFiles)
result.setMaxBytesForLevelBase(opts.writeBufferSize)
if opts.rowCacheSize > 0:
result.setRowCache(cacheCreateLRU(opts.rowCacheSize))
if opts.blockCacheSize > 0:
let tableOpts = defaultTableOptions()
tableOpts.setBlockCache(cacheCreateLRU(opts.rowCacheSize))
result.setBlockBasedTableFactory(tableOpts)
# ------------------------------------------------------------------------------
# Public constructor
@ -37,51 +53,64 @@ when extraTraceMessages:
proc init*(
rdb: var RdbInst;
basePath: string;
openMax: int;
opts: DbOptions;
): Result[void,(KvtError,string)] =
## Constructor c ode inspired by `RocksStoreRef.init()` from
## kvstore_rocksdb.nim
## Database backend constructor for stand-alone version
##
const initFailed = "RocksDB/init() failed"
rdb.basePath = basePath
let
dataDir = rdb.dataDir
try:
dataDir.createDir
except OSError, IOError:
return err((RdbBeCantCreateDataDir, ""))
return err((kdb.RdbBeCantCreateDataDir, ""))
let
cfs = @[initColFamilyDescriptor KvtFamily]
opts = defaultDbOptions()
opts.setMaxOpenFiles(openMax)
# Expand argument `opts` to rocksdb options
let (cfOpts, dbOpts) = (opts.getCFInitOptions, opts.getDbInitOptions)
# Column familiy names to allocate when opening the database.
let cfs = KvtCFs.mapIt(($it).initColFamilyDescriptor cfOpts)
# Reserve a family corner for `Kvt` on the database
let baseDb = openRocksDb(dataDir, opts, columnFamilies=cfs).valueOr:
let errSym = RdbBeDriverInitError
when extraTraceMessages:
debug logTxt "init failed", dataDir, openMax, error=errSym, info=error
return err((errSym, error))
# Open database for the extended family :)
let baseDb = openRocksDb(dataDir, dbOpts, columnFamilies=cfs).valueOr:
raiseAssert initFailed & " cannot create base descriptor: " & error
# Initialise `Kvt` family
rdb.store = baseDb.withColFamily(KvtFamily).valueOr:
let errSym = RdbBeDriverInitError
when extraTraceMessages:
debug logTxt "init failed", dataDir, openMax, error=errSym, info=error
return err((errSym, error))
# Initialise column handlers (this stores implicitely `baseDb`)
for col in KvtCFs:
rdb.store[col] = baseDb.withColFamily($col).valueOr:
raiseAssert initFailed & " cannot initialise " &
$col & " descriptor: " & error
ok()
proc init*(
rdb: var RdbInst;
store: ColFamilyReadWrite;
) =
## Piggyback on other database
rdb.store = store # that's it
adb: AristoDbRef;
opts: DbOptions;
): Result[void,(KvtError,string)] =
## Initalise column handlers piggy-backing on the `Aristo` backend.
##
let
cfOpts = opts.getCFInitOptions()
iCfs = KvtCFs.toSeq.mapIt(initColFamilyDescriptor($it, cfOpts))
oCfs = adb.reinit(iCfs).valueOr:
return err((RdbBeHostError,$error))
# Collect column family descriptors (this stores implicitely `baseDb`)
for n in KvtCFs:
assert oCfs[n.ord].name != "" # debugging only
rdb.store[n] = oCfs[n.ord]
ok()
proc destroy*(rdb: var RdbInst; flush: bool) =
## Destructor (no need to do anything if piggybacked)
if 0 < rdb.basePath.len:
rdb.store.db.close()
rdb.baseDb.close()
if flush:
try:

View File

@ -50,7 +50,7 @@ when extraTraceMessages:
proc begin*(rdb: var RdbInst) =
if rdb.session.isNil:
rdb.session = rdb.store.openWriteBatch()
rdb.session = rdb.baseDb.openWriteBatch()
proc rollback*(rdb: var RdbInst) =
if not rdb.session.isClosed():
@ -59,7 +59,7 @@ proc rollback*(rdb: var RdbInst) =
proc commit*(rdb: var RdbInst): Result[void,(KvtError,string)] =
if not rdb.session.isClosed():
defer: rdb.disposeSession()
rdb.store.write(rdb.session).isOkOr:
rdb.baseDb.write(rdb.session).isOkOr:
const errSym = RdbBeDriverWriteError
when extraTraceMessages:
trace logTxt "commit", error=errSym, info=error
@ -70,16 +70,15 @@ proc put*(
rdb: RdbInst;
data: openArray[(Blob,Blob)];
): Result[void,(Blob,KvtError,string)] =
let dsc = rdb.session
for (key,val) in data:
if val.len == 0:
dsc.delete(key, rdb.store.name).isOkOr:
rdb.session.delete(key, $KvtGeneric).isOkOr:
const errSym = RdbBeDriverDelError
when extraTraceMessages:
trace logTxt "del", key, error=errSym, info=error
return err((key,errSym,error))
else:
dsc.put(key, val, rdb.store.name).isOkOr:
rdb.session.put(key, val, $KvtGeneric).isOkOr:
const errSym = RdbBeDriverPutError
when extraTraceMessages:
trace logTxt "put", key, error=errSym, info=error

View File

@ -38,7 +38,7 @@ iterator walk*(rdb: RdbInst): tuple[key: Blob, data: Blob] =
##
## Non-decodable entries are stepped over and ignored.
block walkBody:
let rit = rdb.store.openIterator().valueOr:
let rit = rdb.store[KvtGeneric].openIterator().valueOr:
when extraTraceMessages:
trace logTxt "walk", pfx="all", error
break walkBody

View File

@ -16,6 +16,7 @@
import
results,
./kvt_tx/[tx_fork, tx_frame, tx_stow],
./kvt_init/memory_only,
./kvt_desc
# ------------------------------------------------------------------------------
@ -177,6 +178,12 @@ proc persist*(
## and the staged data area is cleared. Wile performing this last step,
## the recovery journal is updated (if available.)
##
# Register for saving if piggybacked on remote database
if db.backend.kind == BackendRdbTriggered:
? db.txStowOk(persistent=true)
? db.backend.setWrReqFn db
return err(TxPersistDelayed)
db.txStow(persistent=true)
proc stow*(

View File

@ -16,12 +16,27 @@
import
std/tables,
results,
".."/[kvt_desc, kvt_filter]
".."/[kvt_desc, kvt_delta]
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc txStowOk*(
db: KvtDbRef; # Database
persistent: bool; # Stage only unless `true`
): Result[void,KvtError] =
## Verify that `txStow()` can go ahead
if not db.txRef.isNil:
return err(TxPendingTx)
if 0 < db.stack.len:
return err(TxStackGarbled)
if persistent and not db.deltaUpdateOk():
return err(TxBackendNotWritable)
ok()
proc txStow*(
db: KvtDbRef; # Database
persistent: bool; # Stage only unless `true`
@ -32,21 +47,15 @@ proc txStow*(
## If there is no backend the function returns immediately with an error.
## The same happens if there is a pending transaction.
##
if not db.txRef.isNil:
return err(TxPendingTx)
if 0 < db.stack.len:
return err(TxStackGarbled)
if persistent and not db.filterUpdateOk():
return err(TxBackendNotWritable)
? db.txStowOk persistent
if 0 < db.top.delta.sTab.len:
db.filterMerge db.top.delta
db.deltaMerge db.top.delta
db.top.delta = LayerDeltaRef()
if persistent:
# Move `roFilter` data into persistent tables
? db.filterUpdate()
# Move `balancer` data into persistent tables
? db.deltaUpdate()
ok()

View File

@ -41,8 +41,8 @@ proc getBe*(
key: openArray[byte]; # Key of database record
): Result[Blob,KvtError] =
## Get the vertex from the (filtered) backened if available.
if not db.roFilter.isNil:
db.roFilter.sTab.withValue(@key, w):
if not db.balancer.isNil:
db.balancer.sTab.withValue(@key, w):
if w[].len == 0:
return err(GetNotFound)
return ok(w[])

View File

@ -20,6 +20,7 @@ iterator undumpBlocksEra1*(
dir: string,
least = low(uint64), # First block to extract
stopAfter = high(uint64), # Last block to extract
doAssertOk = false;
): seq[EthBlock] =
let db = Era1DbRef.init(dir, "mainnet").expect("Era files present")
defer:
@ -32,7 +33,8 @@ iterator undumpBlocksEra1*(
for i in 0 ..< stopAfter:
var bck = db.getEthBlock(least + i).valueOr:
doAssert i > 0, "expected at least one block"
if doAssertOk:
doAssert i > 0, "expected at least one block"
break
tmp.add move(bck)

View File

@ -288,7 +288,7 @@ proc testDistributedAccess*(
xCheck db2.balancer != db3.balancer
# Clause (11) from `aristo/README.md` example
db2.reCentre()
discard db2.reCentre()
block:
let rc = db2.persist()
xCheckRc rc.error == 0
@ -321,7 +321,7 @@ proc testDistributedAccess*(
dy.cleanUp()
# Build clause (12) from `aristo/README.md` example
db2.reCentre()
discard db2.reCentre()
block:
let rc = db2.persist()
xCheckRc rc.error == 0

View File

@ -159,8 +159,9 @@ proc initRunnerDB(
case dbType:
of AristoDbMemory: AristoDbMemory.newCoreDbRef()
of AristoDbRocks: AristoDbRocks.newCoreDbRef(path, DbOptions.init())
of AristoDbDualRocks: AristoDbDualRocks.newCoreDbRef(path, DbOptions.init())
of AristoDbVoid: AristoDbVoid.newCoreDbRef()
else: raiseAssert "Oops"
of Ooops: raiseAssert "Ooops"
when false: # or true:
setDebugLevel()
@ -336,7 +337,7 @@ when isMainModule:
setErrorLevel()
when true: # and false:
when true and false:
false.coreDbMain()
# This one uses the readily available dump: `bulkTest0` and some huge replay
@ -353,6 +354,7 @@ when isMainModule:
for n,capture in sampleList:
noisy.profileSection("@sample #" & $n, state):
noisy.chainSyncRunner(
#dbType = AristoDbDualRocks,
capture = capture,
pruneHistory = true,
#profilingOk = true,

2
vendor/nim-rocksdb vendored

@ -1 +1 @@
Subproject commit a84cf5b8960756acb9027a80ba2b1e7cd059e206
Subproject commit c5bbf831145d7dd4d60420d69ce9eb601ccd5be2