Aristo use rocksdb cf instead of key pfx (#2332)
* Use RocksDb column families instead of a prefixed single column why: Better performance * Use structural objects `VertexRef` and `HashKey` in LRU cache for RocksDb why: Avoids repeated de/serialisation
This commit is contained in:
parent
c72d6aa5d6
commit
a347291413
|
@ -318,8 +318,8 @@ data, for RLP encoded or for unstructured data as defined below.
|
|||
where each bitmask(2)-word array entry defines the length of
|
||||
the preceeding data fields:
|
||||
00 -- field is missing
|
||||
01 -- field lengthh is 8 bytes
|
||||
10 -- field lengthh is 32 bytes
|
||||
01 -- field length is 8 bytes
|
||||
10 -- field length is 32 bytes
|
||||
|
||||
Apparently, entries 0 and and 2 of the *4 x bitmask(2)* word array cannot have
|
||||
the two bit value *10* as they refer to the nonce and the storage ID data
|
||||
|
|
|
@ -130,10 +130,6 @@ proc checkBE*[T: RdbBackendRef|MemBackendRef|VoidBackendRef](
|
|||
# been deleted.
|
||||
for vid in max(topVidBe + 1, VertexID(LEAST_FREE_VID)) .. vidTuvBe:
|
||||
if db.getVtxBE(vid).isOk or db.getKeyBE(vid).isOk:
|
||||
echo ">>>",
|
||||
" topVidBe=", topVidBe,
|
||||
" vidTuvBe=", vidTuvBe,
|
||||
" vid=", vid
|
||||
return err((vid,CheckBeGarbledVTop))
|
||||
|
||||
# Check layer cache against backend
|
||||
|
@ -195,10 +191,6 @@ proc checkBE*[T: RdbBackendRef|MemBackendRef|VoidBackendRef](
|
|||
for vid in max(db.vTop + 1, VertexID(LEAST_FREE_VID)) .. topVidCache:
|
||||
if db.layersGetVtxOrVoid(vid).isValid or
|
||||
db.layersGetKeyOrVoid(vid).isValid:
|
||||
echo ">>>",
|
||||
" topVidCache=", topVidCache,
|
||||
" vTop=", db.vTop,
|
||||
" vid=", vid
|
||||
return err((db.vTop,CheckBeCacheGarbledVTop))
|
||||
|
||||
ok()
|
||||
|
|
|
@ -241,11 +241,16 @@ type
|
|||
# RocksDB backend
|
||||
RdbBeCantCreateDataDir
|
||||
RdbBeCantCreateTmpDir
|
||||
RdbBeDriverDelError
|
||||
RdbBeDriverGetError
|
||||
RdbBeDriverDelAdmError
|
||||
RdbBeDriverDelKeyError
|
||||
RdbBeDriverDelVtxError
|
||||
RdbBeDriverGetAdmError
|
||||
RdbBeDriverGetKeyError
|
||||
RdbBeDriverGetVtxError
|
||||
RdbBeDriverGuestError
|
||||
RdbBeDriverInitError
|
||||
RdbBeDriverPutError
|
||||
RdbBeDriverPutAdmError
|
||||
RdbBeDriverPutVtxError
|
||||
RdbBeDriverPutKeyError
|
||||
RdbBeDriverWriteError
|
||||
RdbGuestInstanceUnsupported
|
||||
RdbHashKeyExpected
|
||||
|
|
|
@ -79,14 +79,13 @@ proc getVtxFn(db: RdbBackendRef): GetVtxFn =
|
|||
proc(vid: VertexID): Result[VertexRef,AristoError] =
|
||||
|
||||
# Fetch serialised data record
|
||||
let data = db.rdb.getVtx(vid.uint64).valueOr:
|
||||
let vtx = db.rdb.getVtx(vid).valueOr:
|
||||
when extraTraceMessages:
|
||||
trace logTxt "getVtxFn() failed", vid, error=error[0], info=error[1]
|
||||
return err(error[0])
|
||||
|
||||
# Decode data record
|
||||
if 0 < data.len:
|
||||
return data.deblobify VertexRef
|
||||
if vtx.isValid:
|
||||
return ok(vtx)
|
||||
|
||||
err(GetVtxNotFound)
|
||||
|
||||
|
@ -95,16 +94,13 @@ proc getKeyFn(db: RdbBackendRef): GetKeyFn =
|
|||
proc(vid: VertexID): Result[HashKey,AristoError] =
|
||||
|
||||
# Fetch serialised data record
|
||||
let data = db.rdb.getKey(vid.uint64).valueOr:
|
||||
let key = db.rdb.getKey(vid).valueOr:
|
||||
when extraTraceMessages:
|
||||
trace logTxt "getKeyFn: failed", vid, error=error[0], info=error[1]
|
||||
return err(error[0])
|
||||
|
||||
# Decode data record
|
||||
if 0 < data.len:
|
||||
let lid = HashKey.fromBytes(data).valueOr:
|
||||
return err(RdbHashKeyExpected)
|
||||
return ok lid
|
||||
if key.isValid:
|
||||
return ok(key)
|
||||
|
||||
err(GetKeyNotFound)
|
||||
|
||||
|
@ -113,7 +109,7 @@ proc getTuvFn(db: RdbBackendRef): GetTuvFn =
|
|||
proc(): Result[VertexID,AristoError]=
|
||||
|
||||
# Fetch serialised data record.
|
||||
let data = db.rdb.getByPfx(AdmPfx, AdmTabIdTuv.uint64).valueOr:
|
||||
let data = db.rdb.getAdm(AdmTabIdTuv).valueOr:
|
||||
when extraTraceMessages:
|
||||
trace logTxt "getTuvFn: failed", error=error[0], info=error[1]
|
||||
return err(error[0])
|
||||
|
@ -123,14 +119,14 @@ proc getTuvFn(db: RdbBackendRef): GetTuvFn =
|
|||
return ok VertexID(0)
|
||||
|
||||
# Decode data record
|
||||
data.deblobify VertexID
|
||||
result = data.deblobify VertexID
|
||||
|
||||
proc getLstFn(db: RdbBackendRef): GetLstFn =
|
||||
result =
|
||||
proc(): Result[SavedState,AristoError]=
|
||||
|
||||
# Fetch serialised data record.
|
||||
let data = db.rdb.getByPfx(AdmPfx, AdmTabIdLst.uint64).valueOr:
|
||||
let data = db.rdb.getAdm(AdmTabIdLst).valueOr:
|
||||
when extraTraceMessages:
|
||||
trace logTxt "getLstFn: failed", error=error[0], info=error[1]
|
||||
return err(error[0])
|
||||
|
@ -151,27 +147,10 @@ proc putVtxFn(db: RdbBackendRef): PutVtxFn =
|
|||
proc(hdl: PutHdlRef; vrps: openArray[(VertexID,VertexRef)]) =
|
||||
let hdl = hdl.getSession db
|
||||
if hdl.error.isNil:
|
||||
|
||||
# Collect batch session arguments
|
||||
var batch: seq[(uint64,Blob)]
|
||||
for (vid,vtx) in vrps:
|
||||
if vtx.isValid:
|
||||
let rc = vtx.blobify()
|
||||
if rc.isErr:
|
||||
db.rdb.putVtx(vrps).isOkOr:
|
||||
hdl.error = TypedPutHdlErrRef(
|
||||
pfx: VtxPfx,
|
||||
vid: vid,
|
||||
code: rc.error)
|
||||
return
|
||||
batch.add (vid.uint64, rc.value)
|
||||
else:
|
||||
batch.add (vid.uint64, EmptyBlob)
|
||||
|
||||
# Stash batch session data via LRU cache
|
||||
db.rdb.putVtx(batch).isOkOr:
|
||||
hdl.error = TypedPutHdlErrRef(
|
||||
pfx: VtxPfx,
|
||||
vid: VertexID(error[0]),
|
||||
vid: error[0],
|
||||
code: error[1],
|
||||
info: error[2])
|
||||
|
||||
|
@ -180,20 +159,10 @@ proc putKeyFn(db: RdbBackendRef): PutKeyFn =
|
|||
proc(hdl: PutHdlRef; vkps: openArray[(VertexID,HashKey)]) =
|
||||
let hdl = hdl.getSession db
|
||||
if hdl.error.isNil:
|
||||
|
||||
# Collect batch session arguments
|
||||
var batch: seq[(uint64,Blob)]
|
||||
for (vid,key) in vkps:
|
||||
if key.isValid:
|
||||
batch.add (vid.uint64, @(key.data))
|
||||
else:
|
||||
batch.add (vid.uint64, EmptyBlob)
|
||||
|
||||
# Stash batch session data via LRU cache
|
||||
db.rdb.putKey(batch).isOkOr:
|
||||
db.rdb.putKey(vkps).isOkOr:
|
||||
hdl.error = TypedPutHdlErrRef(
|
||||
pfx: KeyPfx,
|
||||
vid: VertexID(error[0]),
|
||||
vid: error[0],
|
||||
code: error[1],
|
||||
info: error[2])
|
||||
|
||||
|
@ -203,12 +172,14 @@ proc putTuvFn(db: RdbBackendRef): PutTuvFn =
|
|||
let hdl = hdl.getSession db
|
||||
if hdl.error.isNil:
|
||||
if vs.isValid:
|
||||
db.rdb.putByPfx(AdmPfx, @[(AdmTabIdTuv.uint64, vs.blobify)]).isOkOr:
|
||||
db.rdb.putAdm(AdmTabIdTuv, vs.blobify).isOkOr:
|
||||
hdl.error = TypedPutHdlErrRef(
|
||||
pfx: AdmPfx,
|
||||
aid: AdmTabIdTuv,
|
||||
code: error[1],
|
||||
info: error[2])
|
||||
return
|
||||
|
||||
|
||||
proc putLstFn(db: RdbBackendRef): PutLstFn =
|
||||
result =
|
||||
|
@ -221,7 +192,7 @@ proc putLstFn(db: RdbBackendRef): PutLstFn =
|
|||
aid: AdmTabIdLst,
|
||||
code: error)
|
||||
return
|
||||
db.rdb.putByPfx(AdmPfx, @[(AdmTabIdLst.uint64, data)]).isOkOr:
|
||||
db.rdb.putAdm(AdmTabIdLst, data).isOkOr:
|
||||
hdl.error = TypedPutHdlErrRef(
|
||||
pfx: AdmPfx,
|
||||
aid: AdmTabIdLst,
|
||||
|
@ -243,6 +214,7 @@ proc putEndFn(db: RdbBackendRef): PutEndFn =
|
|||
pfx=AdmPfx, aid=hdl.error.aid.uint64, error=hdl.error.code
|
||||
of Oops: trace logTxt "putEndFn: oops",
|
||||
error=hdl.error.code
|
||||
db.rdb.rollback()
|
||||
return err(hdl.error.code)
|
||||
|
||||
# Commit session
|
||||
|
@ -317,28 +289,32 @@ iterator walk*(
|
|||
): tuple[pfx: StorageType, xid: uint64, data: Blob] =
|
||||
## Walk over all key-value pairs of the database.
|
||||
##
|
||||
## Non-decodable entries are stepped over while the counter `n` of the
|
||||
## yield record is still incremented.
|
||||
for w in be.rdb.walk:
|
||||
yield w
|
||||
## Non-decodable entries are ignored
|
||||
##
|
||||
for (xid, data) in be.rdb.walkAdm:
|
||||
yield (AdmPfx, xid, data)
|
||||
for (vid, data) in be.rdb.walkVtx:
|
||||
yield (VtxPfx, vid, data)
|
||||
for (vid, data) in be.rdb.walkKey:
|
||||
yield (KeyPfx, vid, data)
|
||||
|
||||
iterator walkVtx*(
|
||||
be: RdbBackendRef;
|
||||
): tuple[vid: VertexID, vtx: VertexRef] =
|
||||
## Variant of `walk()` iteration over the vertex sub-table.
|
||||
for (xid, data) in be.rdb.walk VtxPfx:
|
||||
for (vid, data) in be.rdb.walkVtx:
|
||||
let rc = data.deblobify VertexRef
|
||||
if rc.isOk:
|
||||
yield (VertexID(xid), rc.value)
|
||||
yield (VertexID(vid), rc.value)
|
||||
|
||||
iterator walkKey*(
|
||||
be: RdbBackendRef;
|
||||
): tuple[vid: VertexID, key: HashKey] =
|
||||
## Variant of `walk()` iteration over the Markle hash sub-table.
|
||||
for (xid, data) in be.rdb.walk KeyPfx:
|
||||
for (vid, data) in be.rdb.walkKey:
|
||||
let lid = HashKey.fromBytes(data).valueOr:
|
||||
continue
|
||||
yield (VertexID(xid), lid)
|
||||
yield (VertexID(vid), lid)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
|
|
@ -23,10 +23,12 @@ import
|
|||
|
||||
type
|
||||
RdbInst* = object
|
||||
store*: ColFamilyReadWrite ## Rocks DB database handler
|
||||
admCol*: ColFamilyReadWrite ## Admin column family handler
|
||||
vtxCol*: ColFamilyReadWrite ## Vertex column family handler
|
||||
keyCol*: ColFamilyReadWrite ## Hash key column family handler
|
||||
session*: WriteBatchRef ## For batched `put()`
|
||||
rdKeyLru*: KeyedQueue[RdbKey,Blob] ## Read cache
|
||||
rdVtxLru*: KeyedQueue[RdbKey,Blob] ## Read cache
|
||||
rdKeyLru*: KeyedQueue[VertexID,HashKey] ## Read cache
|
||||
rdVtxLru*: KeyedQueue[VertexID,VertexRef] ## Read cache
|
||||
basePath*: string ## Database directory
|
||||
noFq*: bool ## No filter queues available
|
||||
|
||||
|
@ -35,15 +37,21 @@ type
|
|||
|
||||
# Alien interface
|
||||
RdbGuest* = enum
|
||||
## The guest CF was worth a try, but there are better solutions and this
|
||||
## item will be removed in future.
|
||||
GuestFamily0 = "Guest0" ## Guest family (e.g. for Kvt)
|
||||
GuestFamily1 = "Guest1" ## Ditto
|
||||
GuestFamily2 = "Guest2" ## Ditto
|
||||
|
||||
RdbGuestDbRef* = ref object of GuestDbRef
|
||||
## The guest CF was worth a try, but there are better solutions and this
|
||||
## item will be removed in future.
|
||||
guestDb*: ColFamilyReadWrite ## Pigiback feature references
|
||||
|
||||
const
|
||||
AristoFamily* = "Aristo" ## RocksDB column family
|
||||
AdmCF* = "AdmAri" ## Admin column family name
|
||||
VtxCF* = "VtxAri" ## Vertex column family name
|
||||
KeyCF* = "KeyAri" ## Hash key column family name
|
||||
BaseFolder* = "nimbus" ## Same as for Legacy DB
|
||||
DataFolder* = "aristo" ## Legacy DB has "data"
|
||||
RdKeyLruMaxSize* = 4096 ## Max size of read cache for keys
|
||||
|
@ -56,6 +64,9 @@ const
|
|||
template logTxt*(info: static[string]): static[string] =
|
||||
"RocksDB/" & info
|
||||
|
||||
template baseDb*(rdb: RdbInst): RocksDbReadWriteRef =
|
||||
rdb.admCol.db
|
||||
|
||||
|
||||
func baseDir*(rdb: RdbInst): string =
|
||||
rdb.basePath / BaseFolder
|
||||
|
@ -63,10 +74,12 @@ func baseDir*(rdb: RdbInst): string =
|
|||
func dataDir*(rdb: RdbInst): string =
|
||||
rdb.baseDir / DataFolder
|
||||
|
||||
func toRdbKey*(id: uint64; pfx: StorageType): RdbKey =
|
||||
let idKey = id.toBytesBE
|
||||
result[0] = pfx.ord.byte
|
||||
copyMem(addr result[1], unsafeAddr idKey, sizeof idKey)
|
||||
|
||||
template toOpenArray*(xid: AdminTabID): openArray[byte] =
|
||||
xid.uint64.toBytesBE.toOpenArray(0,7)
|
||||
|
||||
template toOpenArray*(vid: VertexID): openArray[byte] =
|
||||
vid.uint64.toBytesBE.toOpenArray(0,7)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
|
|
@ -18,7 +18,7 @@ import
|
|||
rocksdb,
|
||||
results,
|
||||
stew/keyed_queue,
|
||||
../../aristo_desc,
|
||||
../../[aristo_blobify, aristo_desc],
|
||||
../init_common,
|
||||
./rdb_desc
|
||||
|
||||
|
@ -33,15 +33,19 @@ when extraTraceMessages:
|
|||
logScope:
|
||||
topics = "aristo-rocksdb"
|
||||
|
||||
proc getImpl(rdb: RdbInst; key: RdbKey): Result[Blob,(AristoError,string)] =
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc getAdm*(rdb: RdbInst; xid: AdminTabID): Result[Blob,(AristoError,string)] =
|
||||
var res: Blob
|
||||
let onData = proc(data: openArray[byte]) =
|
||||
res = @data
|
||||
|
||||
let gotData = rdb.store.get(key, onData).valueOr:
|
||||
const errSym = RdbBeDriverGetError
|
||||
let gotData = rdb.admCol.get(xid.toOpenArray, onData).valueOr:
|
||||
const errSym = RdbBeDriverGetAdmError
|
||||
when extraTraceMessages:
|
||||
trace logTxt "get", pfx=key[0], error=errSym, info=error
|
||||
trace logTxt "getAdm", xid, error=errSym, info=error
|
||||
return err((errSym,error))
|
||||
|
||||
# Correct result if needed
|
||||
|
@ -49,46 +53,68 @@ proc getImpl(rdb: RdbInst; key: RdbKey): Result[Blob,(AristoError,string)] =
|
|||
res = EmptyBlob
|
||||
ok move(res)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc getByPfx*(
|
||||
rdb: RdbInst;
|
||||
pfx: StorageType;
|
||||
xid: uint64,
|
||||
): Result[Blob,(AristoError,string)] =
|
||||
rdb.getImpl(xid.toRdbKey pfx)
|
||||
|
||||
proc getKey*(rdb: var RdbInst; xid: uint64): Result[Blob,(AristoError,string)] =
|
||||
proc getKey*(
|
||||
rdb: var RdbInst;
|
||||
vid: VertexID;
|
||||
): Result[HashKey,(AristoError,string)] =
|
||||
# Try LRU cache first
|
||||
let
|
||||
key = xid.toRdbKey KeyPfx
|
||||
var
|
||||
rc = rdb.rdKeyLru.lruFetch(key)
|
||||
var rc = rdb.rdKeyLru.lruFetch(vid)
|
||||
if rc.isOK:
|
||||
return ok(move(rc.value))
|
||||
|
||||
# Otherwise fetch from backend database
|
||||
let res = ? rdb.getImpl(key)
|
||||
var res: Blob
|
||||
let onData = proc(data: openArray[byte]) =
|
||||
res = @data
|
||||
|
||||
let gotData = rdb.keyCol.get(vid.toOpenArray, onData).valueOr:
|
||||
const errSym = RdbBeDriverGetKeyError
|
||||
when extraTraceMessages:
|
||||
trace logTxt "getKey", vid, error=errSym, info=error
|
||||
return err((errSym,error))
|
||||
|
||||
# Correct result if needed
|
||||
let key = block:
|
||||
if gotData:
|
||||
HashKey.fromBytes(res).valueOr:
|
||||
return err((RdbHashKeyExpected,""))
|
||||
else:
|
||||
VOID_HASH_KEY
|
||||
|
||||
# Update cache and return
|
||||
ok rdb.rdKeyLru.lruAppend(key, res, RdKeyLruMaxSize)
|
||||
ok rdb.rdKeyLru.lruAppend(vid, key, RdKeyLruMaxSize)
|
||||
|
||||
proc getVtx*(rdb: var RdbInst; xid: uint64): Result[Blob,(AristoError,string)] =
|
||||
|
||||
proc getVtx*(
|
||||
rdb: var RdbInst;
|
||||
vid: VertexID;
|
||||
): Result[VertexRef,(AristoError,string)] =
|
||||
# Try LRU cache first
|
||||
let
|
||||
key = xid.toRdbKey VtxPfx
|
||||
var
|
||||
rc = rdb.rdVtxLru.lruFetch(key)
|
||||
var rc = rdb.rdVtxLru.lruFetch(vid)
|
||||
if rc.isOK:
|
||||
return ok(move(rc.value))
|
||||
|
||||
# Otherwise fetch from backend database
|
||||
let res = ? rdb.getImpl(key)
|
||||
var res: Blob
|
||||
let onData = proc(data: openArray[byte]) =
|
||||
res = @data
|
||||
|
||||
let gotData = rdb.vtxCol.get(vid.toOpenArray, onData).valueOr:
|
||||
const errSym = RdbBeDriverGetVtxError
|
||||
when extraTraceMessages:
|
||||
trace logTxt "getVtx", vid, error=errSym, info=error
|
||||
return err((errSym,error))
|
||||
|
||||
var vtx = VertexRef(nil)
|
||||
if gotData:
|
||||
let rc = res.deblobify VertexRef
|
||||
if rc.isErr:
|
||||
return err((rc.error,""))
|
||||
vtx = rc.value
|
||||
|
||||
# Update cache and return
|
||||
ok rdb.rdVtxLru.lruAppend(key, res, RdVtxLruMaxSize)
|
||||
ok rdb.rdVtxLru.lruAppend(vid, vtx, RdVtxLruMaxSize)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
|
|
@ -21,17 +21,6 @@ import
|
|||
./rdb_desc,
|
||||
../../../opts
|
||||
|
||||
const
|
||||
extraTraceMessages = false
|
||||
## Enable additional logging noise
|
||||
|
||||
when extraTraceMessages:
|
||||
import
|
||||
chronicles
|
||||
|
||||
logScope:
|
||||
topics = "aristo-rocksdb"
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public constructor
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -43,11 +32,12 @@ proc init*(
|
|||
): Result[void,(AristoError,string)] =
|
||||
## Constructor c ode inspired by `RocksStoreRef.init()` from
|
||||
## kvstore_rocksdb.nim
|
||||
const initFailed = "RocksDB/init() failed"
|
||||
|
||||
rdb.basePath = basePath
|
||||
|
||||
let
|
||||
dataDir = rdb.dataDir
|
||||
|
||||
try:
|
||||
dataDir.createDir
|
||||
except OSError, IOError:
|
||||
|
@ -60,7 +50,9 @@ proc init*(
|
|||
cfOpts.setWriteBufferSize(opts.writeBufferSize)
|
||||
|
||||
let
|
||||
cfs = @[initColFamilyDescriptor(AristoFamily, cfOpts)] &
|
||||
cfs = @[initColFamilyDescriptor(AdmCF, cfOpts),
|
||||
initColFamilyDescriptor(VtxCF, cfOpts),
|
||||
initColFamilyDescriptor(KeyCF, cfOpts)] &
|
||||
RdbGuest.mapIt(initColFamilyDescriptor($it, cfOpts))
|
||||
dbOpts = defaultDbOptions()
|
||||
|
||||
|
@ -77,17 +69,15 @@ proc init*(
|
|||
|
||||
# Reserve a family corner for `Aristo` on the database
|
||||
let baseDb = openRocksDb(dataDir, dbOpts, columnFamilies=cfs).valueOr:
|
||||
let errSym = RdbBeDriverInitError
|
||||
when extraTraceMessages:
|
||||
trace logTxt "init failed", dataDir, openMax, error=errSym, info=error
|
||||
return err((errSym, error))
|
||||
raiseAssert initFailed & " cannot create base descriptor: " & error
|
||||
|
||||
# Initialise `Aristo` family
|
||||
rdb.store = baseDb.withColFamily(AristoFamily).valueOr:
|
||||
let errSym = RdbBeDriverInitError
|
||||
when extraTraceMessages:
|
||||
trace logTxt "init failed", dataDir, openMax, error=errSym, info=error
|
||||
return err((errSym, error))
|
||||
# Initialise column handlers (this stores implicitely `baseDb`)
|
||||
rdb.admCol = baseDb.withColFamily(AdmCF).valueOr:
|
||||
raiseAssert initFailed & " cannot initialise AdmCF descriptor: " & error
|
||||
rdb.vtxCol = baseDb.withColFamily(VtxCF).valueOr:
|
||||
raiseAssert initFailed & " cannot initialise VtxCF descriptor: " & error
|
||||
rdb.keyCol = baseDb.withColFamily(KeyCF).valueOr:
|
||||
raiseAssert initFailed & " cannot initialise KeyCF descriptor: " & error
|
||||
|
||||
ok()
|
||||
|
||||
|
@ -95,16 +85,17 @@ proc initGuestDb*(
|
|||
rdb: RdbInst;
|
||||
instance: int;
|
||||
): Result[RootRef,(AristoError,string)] =
|
||||
# Initialise `Guest` family
|
||||
## Initialise `Guest` family
|
||||
##
|
||||
## Thus was a worth a try, but there are better solutions and this item
|
||||
## will be removed in future.
|
||||
##
|
||||
if high(RdbGuest).ord < instance:
|
||||
return err((RdbGuestInstanceUnsupported,""))
|
||||
let
|
||||
guestSym = $RdbGuest(instance)
|
||||
guestDb = rdb.store.db.withColFamily(guestSym).valueOr:
|
||||
let errSym = RdbBeDriverGuestError
|
||||
when extraTraceMessages:
|
||||
trace logTxt "guestDb failed", error=errSym, info=error
|
||||
return err((errSym, error))
|
||||
guestDb = rdb.baseDb.withColFamily(guestSym).valueOr:
|
||||
raiseAssert "RocksDb/initGuestDb() failed: " & error
|
||||
|
||||
ok RdbGuestDbRef(
|
||||
beKind: BackendRocksDB,
|
||||
|
@ -113,7 +104,7 @@ proc initGuestDb*(
|
|||
|
||||
proc destroy*(rdb: var RdbInst; flush: bool) =
|
||||
## Destructor
|
||||
rdb.store.db.close()
|
||||
rdb.baseDb.close()
|
||||
|
||||
if flush:
|
||||
try:
|
||||
|
|
|
@ -17,8 +17,8 @@ import
|
|||
eth/common,
|
||||
rocksdb,
|
||||
results,
|
||||
stew/[endians2, keyed_queue],
|
||||
../../aristo_desc,
|
||||
stew/keyed_queue,
|
||||
../../[aristo_blobify, aristo_desc],
|
||||
../init_common,
|
||||
./rdb_desc
|
||||
|
||||
|
@ -40,37 +40,13 @@ proc disposeSession(rdb: var RdbInst) =
|
|||
rdb.session.close()
|
||||
rdb.session = WriteBatchRef(nil)
|
||||
|
||||
proc putImpl(
|
||||
dsc: WriteBatchRef;
|
||||
name: string;
|
||||
key: RdbKey;
|
||||
val: Blob;
|
||||
): Result[void,(uint64,AristoError,string)] =
|
||||
if val.len == 0:
|
||||
dsc.delete(key, name).isOkOr:
|
||||
const errSym = RdbBeDriverDelError
|
||||
let xid = uint64.fromBytesBE key[1 .. 8]
|
||||
when extraTraceMessages:
|
||||
trace logTxt "del",
|
||||
pfx=StorageType(key[0]), xid, error=errSym, info=error
|
||||
return err((xid,errSym,error))
|
||||
else:
|
||||
dsc.put(key, val, name).isOkOr:
|
||||
const errSym = RdbBeDriverPutError
|
||||
let xid = uint64.fromBytesBE key[1 .. 8]
|
||||
when extraTraceMessages:
|
||||
trace logTxt "put",
|
||||
pfx=StorageType(key[0]), xid, error=errSym, info=error
|
||||
return err((xid,errSym,error))
|
||||
ok()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc begin*(rdb: var RdbInst) =
|
||||
if rdb.session.isNil:
|
||||
rdb.session = rdb.store.openWriteBatch()
|
||||
rdb.session = rdb.baseDb.openWriteBatch()
|
||||
|
||||
proc rollback*(rdb: var RdbInst) =
|
||||
if not rdb.session.isClosed():
|
||||
|
@ -81,62 +57,103 @@ proc rollback*(rdb: var RdbInst) =
|
|||
proc commit*(rdb: var RdbInst): Result[void,(AristoError,string)] =
|
||||
if not rdb.session.isClosed():
|
||||
defer: rdb.disposeSession()
|
||||
rdb.store.write(rdb.session).isOkOr:
|
||||
rdb.baseDb.write(rdb.session).isOkOr:
|
||||
const errSym = RdbBeDriverWriteError
|
||||
when extraTraceMessages:
|
||||
trace logTxt "commit", error=errSym, info=error
|
||||
return err((errSym,error))
|
||||
ok()
|
||||
|
||||
proc putByPfx*(
|
||||
|
||||
proc putAdm*(
|
||||
rdb: var RdbInst;
|
||||
pfx: StorageType;
|
||||
data: openArray[(uint64,Blob)];
|
||||
): Result[void,(uint64,AristoError,string)] =
|
||||
let
|
||||
dsc = rdb.session
|
||||
name = rdb.store.name
|
||||
for (xid,val) in data:
|
||||
dsc.putImpl(name, xid.toRdbKey pfx, val).isOkOr:
|
||||
return err(error)
|
||||
xid: AdminTabID;
|
||||
data: Blob;
|
||||
): Result[void,(AdminTabID,AristoError,string)] =
|
||||
let dsc = rdb.session
|
||||
if data.len == 0:
|
||||
dsc.delete(xid.toOpenArray, AdmCF).isOkOr:
|
||||
const errSym = RdbBeDriverDelAdmError
|
||||
when extraTraceMessages:
|
||||
trace logTxt "putAdm()", xid, error=errSym, info=error
|
||||
return err((xid,errSym,error))
|
||||
else:
|
||||
dsc.put(xid.toOpenArray, data, AdmCF).isOkOr:
|
||||
const errSym = RdbBeDriverPutAdmError
|
||||
when extraTraceMessages:
|
||||
trace logTxt "putAdm()", xid, error=errSym, info=error
|
||||
return err((xid,errSym,error))
|
||||
ok()
|
||||
|
||||
|
||||
proc putKey*(
|
||||
rdb: var RdbInst;
|
||||
data: openArray[(uint64,Blob)];
|
||||
): Result[void,(uint64,AristoError,string)] =
|
||||
let
|
||||
dsc = rdb.session
|
||||
name = rdb.store.name
|
||||
for (xid,val) in data:
|
||||
let key = xid.toRdbKey KeyPfx
|
||||
data: openArray[(VertexID,HashKey)];
|
||||
): Result[void,(VertexID,AristoError,string)] =
|
||||
let dsc = rdb.session
|
||||
for (vid,key) in data:
|
||||
|
||||
if key.isValid:
|
||||
dsc.put(vid.toOpenArray, key.data, KeyCF).isOkOr:
|
||||
# Caller must `rollback()` which will flush the `rdKeyLru` cache
|
||||
const errSym = RdbBeDriverPutKeyError
|
||||
when extraTraceMessages:
|
||||
trace logTxt "putKey()", vid, error=errSym, info=error
|
||||
return err((vid,errSym,error))
|
||||
|
||||
# Update cache
|
||||
if not rdb.rdKeyLru.lruUpdate(key, val):
|
||||
discard rdb.rdKeyLru.lruAppend(key, val, RdKeyLruMaxSize)
|
||||
if not rdb.rdKeyLru.lruUpdate(vid, key):
|
||||
discard rdb.rdKeyLru.lruAppend(vid, key, RdKeyLruMaxSize)
|
||||
|
||||
else:
|
||||
dsc.delete(vid.toOpenArray, KeyCF).isOkOr:
|
||||
# Caller must `rollback()` which will flush the `rdKeyLru` cache
|
||||
const errSym = RdbBeDriverDelKeyError
|
||||
when extraTraceMessages:
|
||||
trace logTxt "putKey()", vid, error=errSym, info=error
|
||||
return err((vid,errSym,error))
|
||||
|
||||
# Update cache, vertex will most probably never be visited anymore
|
||||
rdb.rdKeyLru.del vid
|
||||
|
||||
# Store on write batch queue
|
||||
dsc.putImpl(name, key, val).isOkOr:
|
||||
return err(error)
|
||||
ok()
|
||||
|
||||
|
||||
proc putVtx*(
|
||||
rdb: var RdbInst;
|
||||
data: openArray[(uint64,Blob)];
|
||||
): Result[void,(uint64,AristoError,string)] =
|
||||
let
|
||||
dsc = rdb.session
|
||||
name = rdb.store.name
|
||||
for (xid,val) in data:
|
||||
let key = xid.toRdbKey VtxPfx
|
||||
data: openArray[(VertexID,VertexRef)];
|
||||
): Result[void,(VertexID,AristoError,string)] =
|
||||
let dsc = rdb.session
|
||||
for (vid,vtx) in data:
|
||||
|
||||
if vtx.isValid:
|
||||
let rc = vtx.blobify()
|
||||
if rc.isErr:
|
||||
# Caller must `rollback()` which will flush the `rdVtxLru` cache
|
||||
return err((vid,rc.error,""))
|
||||
|
||||
dsc.put(vid.toOpenArray, rc.value, VtxCF).isOkOr:
|
||||
# Caller must `rollback()` which will flush the `rdVtxLru` cache
|
||||
const errSym = RdbBeDriverPutVtxError
|
||||
when extraTraceMessages:
|
||||
trace logTxt "putVtx()", vid, error=errSym, info=error
|
||||
return err((vid,errSym,error))
|
||||
|
||||
# Update cache
|
||||
if not rdb.rdVtxLru.lruUpdate(key, val):
|
||||
discard rdb.rdVtxLru.lruAppend(key, val, RdVtxLruMaxSize)
|
||||
if not rdb.rdVtxLru.lruUpdate(vid, vtx):
|
||||
discard rdb.rdVtxLru.lruAppend(vid, vtx, RdVtxLruMaxSize)
|
||||
|
||||
else:
|
||||
dsc.delete(vid.toOpenArray, VtxCF).isOkOr:
|
||||
# Caller must `rollback()` which will flush the `rdVtxLru` cache
|
||||
const errSym = RdbBeDriverDelVtxError
|
||||
when extraTraceMessages:
|
||||
trace logTxt "putVtx()", vid, error=errSym, info=error
|
||||
return err((vid,errSym,error))
|
||||
|
||||
# Update cache, vertex will most probably never be visited anymore
|
||||
rdb.rdVtxLru.del vid
|
||||
|
||||
# Store on write batch queue
|
||||
dsc.putImpl(name, key, val).isOkOr:
|
||||
return err(error)
|
||||
ok()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
|
|
|
@ -17,7 +17,6 @@ import
|
|||
eth/common,
|
||||
stew/endians2,
|
||||
rocksdb,
|
||||
../init_common,
|
||||
./rdb_desc
|
||||
|
||||
const
|
||||
|
@ -35,63 +34,53 @@ when extraTraceMessages:
|
|||
# Public iterators
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
iterator walk*(
|
||||
rdb: RdbInst;
|
||||
): tuple[pfx: StorageType, xid: uint64, data: Blob] =
|
||||
## Walk over all key-value pairs of the database.
|
||||
iterator walkAdm*(rdb: RdbInst): tuple[xid: uint64, data: Blob] =
|
||||
## Walk over key-value pairs of the admin column of the database.
|
||||
##
|
||||
## Non-decodable entries are are ignored.
|
||||
##
|
||||
## Non-decodable entries are stepped over and ignored.
|
||||
block walkBody:
|
||||
let rit = rdb.store.openIterator().valueOr:
|
||||
let rit = rdb.admCol.openIterator().valueOr:
|
||||
when extraTraceMessages:
|
||||
trace logTxt "walk", pfx="all", error
|
||||
trace logTxt "walkAdm()", error
|
||||
break walkBody
|
||||
defer: rit.close()
|
||||
|
||||
for (key,val) in rit.pairs:
|
||||
if key.len == 9:
|
||||
if StorageType.high.ord < key[0]:
|
||||
break walkBody
|
||||
let
|
||||
pfx = StorageType(key[0])
|
||||
id = uint64.fromBytesBE key.toOpenArray(1, key.len - 1)
|
||||
yield (pfx, id, val)
|
||||
if key.len == 8 and val.len != 0:
|
||||
yield (uint64.fromBytesBE key, val)
|
||||
|
||||
|
||||
iterator walk*(
|
||||
rdb: RdbInst;
|
||||
pfx: StorageType;
|
||||
): tuple[xid: uint64, data: Blob] =
|
||||
## Walk over key-value pairs of the table referted to by the argument `pfx`
|
||||
## whic must be different from `Oops` and `AdmPfx`.
|
||||
iterator walkKey*(rdb: RdbInst): tuple[vid: uint64, data: Blob] =
|
||||
## Walk over key-value pairs of the hash key column of the database.
|
||||
##
|
||||
## Non-decodable entries are stepped over and ignored.
|
||||
## Non-decodable entries are are ignored.
|
||||
##
|
||||
block walkBody:
|
||||
let rit = rdb.store.openIterator().valueOr:
|
||||
let rit = rdb.keyCol.openIterator().valueOr:
|
||||
when extraTraceMessages:
|
||||
echo ">>> walk (2) oops",
|
||||
" pfx=", pfx
|
||||
trace logTxt "walk", pfx, error
|
||||
trace logTxt "walkKey()", error
|
||||
break walkBody
|
||||
defer: rit.close()
|
||||
|
||||
# Start at first entry not less than `<pfx> & 1`
|
||||
rit.seekToKey 1u64.toRdbKey pfx
|
||||
for (key,val) in rit.pairs:
|
||||
if key.len == 8 and val.len != 0:
|
||||
yield (uint64.fromBytesBE key, val)
|
||||
|
||||
# Fetch sub-table data as long as the current key is acceptable
|
||||
while rit.isValid():
|
||||
let key = rit.key()
|
||||
if key.len == 9:
|
||||
if key[0] != pfx.ord.uint:
|
||||
break walkBody # done
|
||||
iterator walkVtx*(rdb: RdbInst): tuple[vid: uint64, data: Blob] =
|
||||
## Walk over key-value pairs of the hash key column of the database.
|
||||
##
|
||||
## Non-decodable entries are are ignored.
|
||||
##
|
||||
block walkBody:
|
||||
let rit = rdb.vtxCol.openIterator().valueOr:
|
||||
when extraTraceMessages:
|
||||
trace logTxt "walkVtx()", error
|
||||
break walkBody
|
||||
defer: rit.close()
|
||||
|
||||
let val = rit.value()
|
||||
if val.len != 0:
|
||||
yield (uint64.fromBytesBE key.toOpenArray(1, key.high()), val)
|
||||
|
||||
# Update Iterator
|
||||
rit.next()
|
||||
for (key,val) in rit.pairs:
|
||||
if key.len == 8 and val.len != 0:
|
||||
yield (uint64.fromBytesBE key, val)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
|
|
@ -172,7 +172,7 @@ when isMainModule:
|
|||
noisy.accountsRunner(sam, resetDb=true)
|
||||
|
||||
when true: # and false:
|
||||
let persistent = false # or true
|
||||
let persistent = false or true
|
||||
noisy.showElapsed("@snap_test_list"):
|
||||
for n,sam in snapTestList:
|
||||
noisy.accountsRunner(sam, persistent=persistent)
|
||||
|
|
|
@ -134,7 +134,7 @@ proc dbTriplet(w: LeafQuartet; rdbPath: string): Result[DbTriplet,AristoError] =
|
|||
check (n, report.error) == (n,0)
|
||||
return err(report.error)
|
||||
|
||||
return ok dx
|
||||
ok dx
|
||||
|
||||
# ----------------------
|
||||
|
||||
|
@ -338,7 +338,6 @@ proc testDistributedAccess*(
|
|||
let c11Fil1_eq_db1RoFilter = c11Filter1.isDbEq(db1.balancer, db1, noisy)
|
||||
xCheck c11Fil1_eq_db1RoFilter:
|
||||
noisy.say "*** testDistributedAccess (7)", "n=", n,
|
||||
"\n c11Filter1\n ", c11Filter1.pp(db1),
|
||||
"db1".dump(db1),
|
||||
""
|
||||
|
||||
|
@ -346,10 +345,8 @@ proc testDistributedAccess*(
|
|||
let c11Fil3_eq_db3RoFilter = c11Filter3.isDbEq(db3.balancer, db3, noisy)
|
||||
xCheck c11Fil3_eq_db3RoFilter:
|
||||
noisy.say "*** testDistributedAccess (8)", "n=", n,
|
||||
"\n c11Filter3\n ", c11Filter3.pp(db3),
|
||||
"db3".dump(db3),
|
||||
""
|
||||
|
||||
# Check/verify backends
|
||||
block:
|
||||
let ok = dy.checkBeOk(noisy=noisy,fifos=true)
|
||||
|
|
Loading…
Reference in New Issue