Expose function for listing existing column families (#48)

* Expose function for listing existing column families

* Free CF list after use

* Update, fix error case

* Rename listRocksDbCFs -> listColumnFamilies

* Need to free each item separately

* Handle default option objects, close after use
This commit is contained in:
Jordan Hrycaj 2024-06-13 15:43:33 +00:00 committed by GitHub
parent e36f454cd4
commit c5bbf83114
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 107 additions and 26 deletions

View File

@ -63,18 +63,71 @@ type
writeOpts: WriteOptionsRef writeOpts: WriteOptionsRef
ingestOptsPtr: IngestExternalFilesOptionsPtr ingestOptsPtr: IngestExternalFilesOptionsPtr
proc listColumnFamilies*(
path: string;
dbOpts = DbOptionsRef(nil);
): RocksDBResult[seq[string]] =
## List exisiting column families on disk. This might be used to find out
## whether there were some columns missing with the version on disk.
##
## Column families previously used must be declared when re-opening an
## existing database. So this function can be used to add some CFs
## on-the-fly to the opener list of CFs
##
## Note that the on-the-fly adding might not be needed in the way described
## above once rocksdb has been upgraded to the latest version, see comments
## at the end of ./columnfamily/cfhandle.nim.
##
let
useDbOpts = (if dbOpts.isNil: defaultDbOptions() else: dbOpts)
defer:
if dbOpts.isNil: useDbOpts.close()
var
lencf: csize_t
errors: cstring
let
cList = rocksdb_list_column_families(
useDbOpts.cPtr,
path.cstring,
addr lencf,
cast[cstringArray](errors.addr))
bailOnErrors(errors)
var cfs: seq[string]
if not cList.isNil:
defer: rocksdb_free(cList)
for n in 0 ..< lencf:
if cList[n].isNil:
# Clean up the rest
for z in n+1 ..< lencf:
if not cList[z].isNil:
rocksdb_free(cList[z])
return err("short reply")
cfs.add $cList[n]
rocksdb_free(cList[n])
ok cfs
proc openRocksDb*( proc openRocksDb*(
path: string, path: string;
dbOpts = defaultDbOptions(), dbOpts = DbOptionsRef(nil);
readOpts = defaultReadOptions(), readOpts = ReadOptionsRef(nil);
writeOpts = defaultWriteOptions(), writeOpts = WriteOptionsRef(nil);
columnFamilies: openArray[ColFamilyDescriptor] = []): RocksDBResult[RocksDbReadWriteRef] = columnFamilies: openArray[ColFamilyDescriptor] = [];
): RocksDBResult[RocksDbReadWriteRef] =
## Open a RocksDB instance in read-write mode. If `columnFamilies` is empty ## Open a RocksDB instance in read-write mode. If `columnFamilies` is empty
## then it will open the default column family. If `dbOpts`, `readOpts`, or ## then it will open the default column family. If `dbOpts`, `readOpts`, or
## `writeOpts` are not supplied then the default options will be used. ## `writeOpts` are not supplied then the default options will be used.
## By default, column families will be created if they don't yet exist. ## By default, column families will be created if they don't yet exist.
## All existing column families must be specified if the database has ## All existing column families must be specified if the database has
## previously created any column families. ## previously created any column families.
let
useDbOpts = (if dbOpts.isNil: defaultDbOptions() else: dbOpts)
defer:
if dbOpts.isNil: useDbOpts.close()
var cfs = columnFamilies.toSeq() var cfs = columnFamilies.toSeq()
if DEFAULT_COLUMN_FAMILY_NAME notin columnFamilies.mapIt(it.name()): if DEFAULT_COLUMN_FAMILY_NAME notin columnFamilies.mapIt(it.name()):
@ -86,7 +139,7 @@ proc openRocksDb*(
cfHandles = newSeq[ColFamilyHandlePtr](cfs.len) cfHandles = newSeq[ColFamilyHandlePtr](cfs.len)
errors: cstring errors: cstring
let rocksDbPtr = rocksdb_open_column_families( let rocksDbPtr = rocksdb_open_column_families(
dbOpts.cPtr, useDbOpts.cPtr,
path.cstring, path.cstring,
cfNames.len().cint, cfNames.len().cint,
cast[cstringArray](cfNames[0].addr), cast[cstringArray](cfNames[0].addr),
@ -95,7 +148,11 @@ proc openRocksDb*(
cast[cstringArray](errors.addr)) cast[cstringArray](errors.addr))
bailOnErrors(errors) bailOnErrors(errors)
let db = RocksDbReadWriteRef( let
dbOpts = useDbOpts # don't close on exit
readOpts = (if readOpts.isNil: defaultReadOptions() else: readOpts)
writeOpts = (if writeOpts.isNil: defaultWriteOptions() else: writeOpts)
db = RocksDbReadWriteRef(
lock: createLock(), lock: createLock(),
cPtr: rocksDbPtr, cPtr: rocksDbPtr,
path: path, path: path,
@ -108,17 +165,22 @@ proc openRocksDb*(
ok(db) ok(db)
proc openRocksDbReadOnly*( proc openRocksDbReadOnly*(
path: string, path: string;
dbOpts = defaultDbOptions(), dbOpts = DbOptionsRef(nil);
readOpts = defaultReadOptions(), readOpts = ReadOptionsRef(nil);
columnFamilies: openArray[ColFamilyDescriptor] = [], columnFamilies: openArray[ColFamilyDescriptor] = [];
errorIfWalFileExists = false): RocksDBResult[RocksDbReadOnlyRef] = errorIfWalFileExists = false;
): RocksDBResult[RocksDbReadOnlyRef] =
## Open a RocksDB instance in read-only mode. If `columnFamilies` is empty ## Open a RocksDB instance in read-only mode. If `columnFamilies` is empty
## then it will open the default column family. If `dbOpts` or `readOpts` are ## then it will open the default column family. If `dbOpts` or `readOpts` are
## not supplied then the default options will be used. By default, column ## not supplied then the default options will be used. By default, column
## families will be created if they don't yet exist. If the database already ## families will be created if they don't yet exist. If the database already
## contains any column families, then all or a subset of the existing column ## contains any column families, then all or a subset of the existing column
## families can be opened for reading. ## families can be opened for reading.
let
useDbOpts = (if dbOpts.isNil: defaultDbOptions() else: dbOpts)
defer:
if dbOpts.isNil: useDbOpts.close()
var cfs = columnFamilies.toSeq() var cfs = columnFamilies.toSeq()
if DEFAULT_COLUMN_FAMILY_NAME notin columnFamilies.mapIt(it.name()): if DEFAULT_COLUMN_FAMILY_NAME notin columnFamilies.mapIt(it.name()):
@ -130,7 +192,7 @@ proc openRocksDbReadOnly*(
cfHandles = newSeq[ColFamilyHandlePtr](cfs.len) cfHandles = newSeq[ColFamilyHandlePtr](cfs.len)
errors: cstring errors: cstring
let rocksDbPtr = rocksdb_open_for_read_only_column_families( let rocksDbPtr = rocksdb_open_for_read_only_column_families(
dbOpts.cPtr, useDbOpts.cPtr,
path.cstring, path.cstring,
cfNames.len().cint, cfNames.len().cint,
cast[cstringArray](cfNames[0].addr), cast[cstringArray](cfNames[0].addr),
@ -140,7 +202,10 @@ proc openRocksDbReadOnly*(
cast[cstringArray](errors.addr)) cast[cstringArray](errors.addr))
bailOnErrors(errors) bailOnErrors(errors)
let db = RocksDbReadOnlyRef( let
dbOpts = useDbOpts # don't close on exit
readOpts = (if readOpts.isNil: defaultReadOptions() else: readOpts)
db = RocksDbReadOnlyRef(
lock: createLock(), lock: createLock(),
cPtr: rocksDbPtr, cPtr: rocksDbPtr,
path: path, path: path,

View File

@ -30,9 +30,11 @@ type
dbOpts: DbOptionsRef dbOpts: DbOptionsRef
proc openSstFileWriter*( proc openSstFileWriter*(
filePath: string, filePath: string;
dbOpts = defaultDbOptions()): RocksDBResult[SstFileWriterRef] = dbOpts = DbOptionsRef(nil);
): RocksDBResult[SstFileWriterRef] =
## Creates a new `SstFileWriterRef` and opens the file at the given `filePath`. ## Creates a new `SstFileWriterRef` and opens the file at the given `filePath`.
let dbOpts = (if dbOpts.isNil: defaultDbOptions() else: dbOpts)
doAssert not dbOpts.isClosed() doAssert not dbOpts.isClosed()
let envOptsPtr = rocksdb_envoptions_create() let envOptsPtr = rocksdb_envoptions_create()

View File

@ -47,12 +47,17 @@ type
proc openTransactionDb*( proc openTransactionDb*(
path: string, path: string,
dbOpts = defaultDbOptions(), dbOpts = DbOptionsRef(nil);
txDbOpts = defaultTransactionDbOptions(), txDbOpts = TransactionDbOptionsRef(nil);
columnFamilies: openArray[ColFamilyDescriptor] = []): RocksDBResult[TransactionDbRef] = columnFamilies: openArray[ColFamilyDescriptor] = [];
): RocksDBResult[TransactionDbRef] =
## Open a `TransactionDbRef` with the given options and column families. ## Open a `TransactionDbRef` with the given options and column families.
## If no column families are provided the default column family will be used. ## If no column families are provided the default column family will be used.
## If no options are provided the default options will be used. ## If no options are provided the default options will be used.
let
useDbOpts = (if dbOpts.isNil: defaultDbOptions() else: dbOpts)
defer:
if dbOpts.isNil: useDbOpts.close()
var cfs = columnFamilies.toSeq() var cfs = columnFamilies.toSeq()
if DEFAULT_COLUMN_FAMILY_NAME notin columnFamilies.mapIt(it.name()): if DEFAULT_COLUMN_FAMILY_NAME notin columnFamilies.mapIt(it.name()):
@ -65,7 +70,7 @@ proc openTransactionDb*(
errors: cstring errors: cstring
let txDbPtr = rocksdb_transactiondb_open_column_families( let txDbPtr = rocksdb_transactiondb_open_column_families(
dbOpts.cPtr, useDbOpts.cPtr,
txDbOpts.cPtr, txDbOpts.cPtr,
path.cstring, path.cstring,
cfNames.len().cint, cfNames.len().cint,
@ -75,7 +80,10 @@ proc openTransactionDb*(
cast[cstringArray](errors.addr)) cast[cstringArray](errors.addr))
bailOnErrors(errors) bailOnErrors(errors)
let db = TransactionDbRef( let
dbOpts = useDbOpts # don't close on exit
txDbOpts = (if txDbOpts.isNil: defaultTransactionDbOptions() else: txDbOpts)
db = TransactionDbRef(
lock: createLock(), lock: createLock(),
cPtr: txDbPtr, cPtr: txDbPtr,
path: path, path: path,
@ -89,15 +97,21 @@ proc isClosed*(db: TransactionDbRef): bool {.inline.} =
db.cPtr.isNil() db.cPtr.isNil()
proc beginTransaction*( proc beginTransaction*(
db: TransactionDbRef, db: TransactionDbRef;
readOpts = defaultReadOptions(), readOpts = ReadOptionsRef(nil);
writeOpts = defaultWriteOptions(), writeOpts = WriteOptionsRef(nil);
txOpts = defaultTransactionOptions(), txDbOpts = TransactionDbOptionsRef(nil);
columnFamily = DEFAULT_COLUMN_FAMILY_NAME): TransactionRef = txOpts = defaultTransactionOptions();
columnFamily = DEFAULT_COLUMN_FAMILY_NAME;
): TransactionRef =
## Begin a new transaction against the database. The transaction will default ## Begin a new transaction against the database. The transaction will default
## to using the specified column family. If no column family is specified ## to using the specified column family. If no column family is specified
## then the default column family will be used. ## then the default column family will be used.
doAssert not db.isClosed() doAssert not db.isClosed()
let
txDbOpts = (if txDbOpts.isNil: defaultTransactionDbOptions() else: txDbOpts)
readOpts = (if readOpts.isNil: defaultReadOptions() else: readOpts)
writeOpts = (if writeOpts.isNil: defaultWriteOptions() else: writeOpts)
let txPtr = rocksdb_transaction_begin( let txPtr = rocksdb_transaction_begin(
db.cPtr, db.cPtr,