expose memtable flush (#72)

* expose memtable flush

This can be used to avoid log replays on startup, for example

* Add flush to RocksDbReadWriteRef.

* Remove atomic flush default.

---------

Co-authored-by: bhartnett <51288821+bhartnett@users.noreply.github.com>
This commit is contained in:
Jacek Sieka 2024-10-29 01:14:36 +01:00 committed by GitHub
parent 7b41c5c1e5
commit 17eb247702
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 58 additions and 0 deletions

View File

@ -41,6 +41,7 @@ export
type
RocksDbPtr* = ptr rocksdb_t
IngestExternalFilesOptionsPtr = ptr rocksdb_ingestexternalfileoptions_t
FlushOptionsPtr = ptr rocksdb_flushoptions_t
RocksDbRef* = ref object of RootObj
lock: Lock
@ -57,6 +58,7 @@ type
RocksDbReadWriteRef* = ref object of RocksDbRef
writeOpts: WriteOptionsRef
ingestOptsPtr: IngestExternalFilesOptionsPtr
flushOptsPtr: FlushOptionsPtr
proc listColumnFamilies*(path: string): RocksDBResult[seq[string]] =
## List exisiting column families on disk. This might be used to find out
@ -135,6 +137,9 @@ proc openRocksDb*(
autoCloseNonNil(writeOpts)
autoCloseAll(cfs)
let flushOptsPtr = rocksdb_flushoptions_create()
rocksdb_flushoptions_set_wait(flushOptsPtr, 1)
let
cfTable = newColFamilyTable(cfNames.mapIt($it), cfHandles)
db = RocksDbReadWriteRef(
@ -144,6 +149,7 @@ proc openRocksDb*(
dbOpts: dbOpts,
readOpts: readOpts,
writeOpts: writeOpts,
flushOptsPtr: flushOptsPtr,
cfDescriptors: cfs,
ingestOptsPtr: rocksdb_ingestexternalfileoptions_create(),
defaultCfHandle: cfTable.get(DEFAULT_COLUMN_FAMILY_NAME),
@ -465,6 +471,40 @@ proc releaseSnapshot*(db: RocksDbRef, snapshot: SnapshotRef) =
rocksdb_release_snapshot(db.cPtr, snapshot.cPtr)
snapshot.setClosed()
proc flush*(
db: RocksDbReadWriteRef, cfHandle = db.defaultCfHandle
): RocksDBResult[void] =
## Flush all memory table data for the given column family.
doAssert not db.isClosed()
var errors: cstring
rocksdb_flush_cf(
db.cPtr, db.flushOptsPtr, cfHandle.cPtr, cast[cstringArray](errors.addr)
)
bailOnErrors(errors)
ok()
proc flush*(
db: RocksDbReadWriteRef, cfHandles: openArray[ColFamilyHandleRef]
): RocksDBResult[void] =
## Flush all memory table data for the given column families.
doAssert not db.isClosed()
var
cfs = cfHandles.mapIt(it.cPtr)
errors: cstring
rocksdb_flush_cfs(
db.cPtr,
db.flushOptsPtr,
addr cfs[0],
cint(cfs.len),
cast[cstringArray](errors.addr),
)
bailOnErrors(errors)
ok()
proc close*(db: RocksDbRef) =
## Close the `RocksDbRef` which will release the connection to the database
## and free the memory associated with it. `close` is idempotent and can
@ -490,3 +530,6 @@ proc close*(db: RocksDbRef) =
rocksdb_ingestexternalfileoptions_destroy(db.ingestOptsPtr)
db.ingestOptsPtr = nil
rocksdb_flushoptions_destroy(db.flushOptsPtr)
db.flushOptsPtr = nil

View File

@ -499,3 +499,18 @@ suite "RocksDbRef Tests":
db.releaseSnapshot(snapshot)
check snapshot.isClosed()
test "Test flush":
check:
db.put(key, val).isOk()
db.flush().isOk()
check:
db.put(otherKey, val, otherCfHandle).isOk()
db.flush(otherCfHandle).isOk()
let cfHandles = [defaultCfHandle, otherCfHandle]
check:
db.put(otherKey, val, defaultCfHandle).isOk()
db.put(key, val, otherCfHandle).isOk()
db.flush(cfHandles).isOk()