Add support for optimistic transaction db, write batch with index, keyMayExist and empty keys. (#63)

* Add support for optimistic transaction db.

* Add keyMayExist to RocksDbRef.

* Add support for write batch with index.

* Allow empty keys to be used in API.
This commit is contained in:
web3-developer 2024-07-08 22:18:30 +08:00 committed by GitHub
parent 6b7de5730b
commit cf1267e845
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 1106 additions and 86 deletions

View File

@ -9,9 +9,10 @@
import import
./rocksdb/[ ./rocksdb/[
backup, columnfamily, rocksdb, rocksiterator, sstfilewriter, transactiondb, backup, columnfamily, optimistictxdb, rocksdb, rocksiterator, sstfilewriter,
writebatch, transactiondb, writebatch, writebatchwi,
] ]
export export
backup, columnfamily, rocksdb, rocksiterator, sstfilewriter, transactiondb, writebatch backup, columnfamily, optimistictxdb, rocksdb, rocksiterator, sstfilewriter,
transactiondb, writebatch, writebatchwi

View File

@ -38,3 +38,9 @@ template bailOnErrors*(errors: cstring): auto =
let res = err($(errors)) let res = err($(errors))
rocksdb_free(errors) rocksdb_free(errors)
return res return res
template unsafeAddrOrNil*(s: openArray[byte]): auto =
if s.len > 0:
unsafeAddr s[0]
else:
nil

131
rocksdb/optimistictxdb.nim Normal file
View File

@ -0,0 +1,131 @@
# Nim-RocksDB
# Copyright 2024 Status Research & Development GmbH
# Licensed under either of
#
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
# * GPL license, version 2.0, ([LICENSE-GPLv2](LICENSE-GPLv2) or https://www.gnu.org/licenses/old-licenses/gpl-2.0.en.html)
#
# at your option. This file may not be copied, modified, or distributed except according to those terms.
## A `OptimisticTxDbRef` can be used to open a connection to the RocksDB database
## with support for transactional operations against multiple column families.
## To create a new transaction call `beginTransaction` which will return a
## `TransactionRef`. To commit or rollback the transaction call `commit` or
## `rollback` on the `TransactionRef` type after applying changes to the transaction.
{.push raises: [].}
import
std/[sequtils, locks],
./lib/librocksdb,
./options/[dbopts, readopts, writeopts],
./transactions/[transaction, otxopts],
./columnfamily/[cfopts, cfdescriptor, cfhandle],
./internal/[cftable, utils],
./rocksresult
export dbopts, cfdescriptor, readopts, writeopts, otxopts, transaction, rocksresult
type
OptimisticTxDbPtr* = ptr rocksdb_optimistictransactiondb_t
OptimisticTxDbRef* = ref object
lock: Lock
cPtr: OptimisticTxDbPtr
path: string
dbOpts: DbOptionsRef
cfDescriptors: seq[ColFamilyDescriptor]
defaultCfHandle: ColFamilyHandleRef
cfTable: ColFamilyTableRef
proc openOptimisticTxDb*(
path: string,
dbOpts = defaultDbOptions(autoClose = true),
columnFamilies: openArray[ColFamilyDescriptor] = [],
): RocksDBResult[OptimisticTxDbRef] =
## Open a `OptimisticTxDbRef` with the given options and column families.
## If no column families are provided the default column family will be used.
## If no options are provided the default options will be used.
## These default options will be closed when the database is closed.
## If any options are provided, they will need to be closed manually.
var cfs = columnFamilies.toSeq()
if DEFAULT_COLUMN_FAMILY_NAME notin columnFamilies.mapIt(it.name()):
cfs.add(defaultColFamilyDescriptor(autoClose = true))
var
cfNames = cfs.mapIt(it.name().cstring)
cfOpts = cfs.mapIt(it.options.cPtr)
cfHandles = newSeq[ColFamilyHandlePtr](cfs.len)
errors: cstring
let txDbPtr = rocksdb_optimistictransactiondb_open_column_families(
dbOpts.cPtr,
path.cstring,
cfNames.len().cint,
cast[cstringArray](cfNames[0].addr),
cfOpts[0].addr,
cfHandles[0].addr,
cast[cstringArray](errors.addr),
)
bailOnErrorsWithCleanup(errors):
autoCloseNonNil(dbOpts)
autoCloseAll(cfs)
let
cfTable = newColFamilyTable(cfNames.mapIt($it), cfHandles)
db = OptimisticTxDbRef(
lock: createLock(),
cPtr: txDbPtr,
path: path,
dbOpts: dbOpts,
cfDescriptors: cfs,
defaultCfHandle: cfTable.get(DEFAULT_COLUMN_FAMILY_NAME),
cfTable: cfTable,
)
ok(db)
proc getColFamilyHandle*(
db: OptimisticTxDbRef, name: string
): RocksDBResult[ColFamilyHandleRef] =
let cfHandle = db.cfTable.get(name)
if cfHandle.isNil():
err("rocksdb: unknown column family")
else:
ok(cfHandle)
proc isClosed*(db: OptimisticTxDbRef): bool {.inline.} =
## Returns `true` if the `OptimisticTxDbRef` has been closed.
db.cPtr.isNil()
proc beginTransaction*(
db: OptimisticTxDbRef,
readOpts = defaultReadOptions(autoClose = true),
writeOpts = defaultWriteOptions(autoClose = true),
otxOpts = defaultOptimisticTxOptions(autoClose = true),
cfHandle = db.defaultCfHandle,
): TransactionRef =
## Begin a new transaction against the database. The transaction will default
## to using the specified column family. If no column family is specified
## then the default column family will be used.
doAssert not db.isClosed()
let txPtr =
rocksdb_optimistictransaction_begin(db.cPtr, writeOpts.cPtr, otxOpts.cPtr, nil)
newTransaction(txPtr, readOpts, writeOpts, nil, otxOpts, cfHandle)
proc close*(db: OptimisticTxDbRef) =
## Close the `OptimisticTxDbRef`.
withLock(db.lock):
if not db.isClosed():
# the column families should be closed before the database
db.cfTable.close()
rocksdb_optimistictransactiondb_close(db.cPtr)
db.cPtr = nil
# opts should be closed after the database is closed
autoCloseNonNil(db.dbOpts)
autoCloseAll(db.cfDescriptors)

View File

@ -31,11 +31,11 @@ import
./options/[dbopts, readopts, writeopts], ./options/[dbopts, readopts, writeopts],
./columnfamily/[cfopts, cfdescriptor, cfhandle], ./columnfamily/[cfopts, cfdescriptor, cfhandle],
./internal/[cftable, utils], ./internal/[cftable, utils],
./rocksiterator, ./[rocksiterator, rocksresult, writebatch, writebatchwi]
./rocksresult,
./writebatch
export rocksresult, dbopts, readopts, writeopts, cfdescriptor, rocksiterator, writebatch export
rocksresult, dbopts, readopts, writeopts, cfdescriptor, cfhandle, rocksiterator,
writebatch, writebatchwi
type type
RocksDbPtr* = ptr rocksdb_t RocksDbPtr* = ptr rocksdb_t
@ -236,9 +236,6 @@ proc get*(
## The `onData` callback reduces the number of copies and therefore should be ## The `onData` callback reduces the number of copies and therefore should be
## preferred if performance is required. ## preferred if performance is required.
if key.len() == 0:
return err("rocksdb: key is empty")
var var
len: csize_t len: csize_t
errors: cstring errors: cstring
@ -246,7 +243,7 @@ proc get*(
db.cPtr, db.cPtr,
db.readOpts.cPtr, db.readOpts.cPtr,
cfHandle.cPtr, cfHandle.cPtr,
cast[cstring](unsafeAddr key[0]), cast[cstring](key.unsafeAddrOrNil()),
csize_t(key.len), csize_t(key.len),
len.addr, len.addr,
cast[cstringArray](errors.addr), cast[cstringArray](errors.addr),
@ -283,21 +280,14 @@ proc put*(
): RocksDBResult[void] = ): RocksDBResult[void] =
## Put the value for the given key into the specified column family. ## Put the value for the given key into the specified column family.
if key.len() == 0:
return err("rocksdb: key is empty")
var errors: cstring var errors: cstring
rocksdb_put_cf( rocksdb_put_cf(
db.cPtr, db.cPtr,
db.writeOpts.cPtr, db.writeOpts.cPtr,
cfHandle.cPtr, cfHandle.cPtr,
cast[cstring](unsafeAddr key[0]), cast[cstring](key.unsafeAddrOrNil()),
csize_t(key.len), csize_t(key.len),
cast[cstring](if val.len > 0: cast[cstring](val.unsafeAddrOrNil()),
unsafeAddr val[0]
else:
nil
),
csize_t(val.len), csize_t(val.len),
cast[cstringArray](errors.addr), cast[cstringArray](errors.addr),
) )
@ -305,6 +295,30 @@ proc put*(
ok() ok()
proc keyMayExist*(
db: RocksDbRef, key: openArray[byte], cfHandle = db.defaultCfHandle
): RocksDBResult[bool] =
## If the key definitely does not exist in the database, then this method
## returns false, otherwise it returns true if the key might exist. That is
## to say that this method is probabilistic and may return false positives,
## but never a false negative. This check is potentially lighter-weight than
## invoking keyExists.
let keyMayExist = rocksdb_key_may_exist_cf(
db.cPtr,
db.readOpts.cPtr,
cfHandle.cPtr,
cast[cstring](key.unsafeAddrOrNil()),
csize_t(key.len),
nil,
nil,
nil,
0,
nil,
).bool
ok(keyMayExist)
proc keyExists*( proc keyExists*(
db: RocksDbRef, key: openArray[byte], cfHandle = db.defaultCfHandle db: RocksDbRef, key: openArray[byte], cfHandle = db.defaultCfHandle
): RocksDBResult[bool] = ): RocksDBResult[bool] =
@ -312,9 +326,6 @@ proc keyExists*(
## Returns a result containing `true` if the key exists or a result ## Returns a result containing `true` if the key exists or a result
## containing `false` otherwise. ## containing `false` otherwise.
# TODO: Call rocksdb_key_may_exist_cf to improve performance for the case
# when the key does not exist
db.get( db.get(
key, key,
proc(data: openArray[byte]) = proc(data: openArray[byte]) =
@ -330,15 +341,12 @@ proc delete*(
## If the value does not exist, the delete will be a no-op. ## If the value does not exist, the delete will be a no-op.
## To check if the value exists before or after a delete, use `keyExists`. ## To check if the value exists before or after a delete, use `keyExists`.
if key.len() == 0:
return err("rocksdb: key is empty")
var errors: cstring var errors: cstring
rocksdb_delete_cf( rocksdb_delete_cf(
db.cPtr, db.cPtr,
db.writeOpts.cPtr, db.writeOpts.cPtr,
cfHandle.cPtr, cfHandle.cPtr,
cast[cstring](unsafeAddr key[0]), cast[cstring](key.unsafeAddrOrNil()),
csize_t(key.len), csize_t(key.len),
cast[cstringArray](errors.addr), cast[cstringArray](errors.addr),
) )
@ -350,6 +358,7 @@ proc openIterator*(
db: RocksDbRef, cfHandle = db.defaultCfHandle db: RocksDbRef, cfHandle = db.defaultCfHandle
): RocksDBResult[RocksIteratorRef] = ): RocksDBResult[RocksIteratorRef] =
## Opens an `RocksIteratorRef` for the specified column family. ## Opens an `RocksIteratorRef` for the specified column family.
## The iterator should be closed using the `close` method after usage.
doAssert not db.isClosed() doAssert not db.isClosed()
let rocksIterPtr = let rocksIterPtr =
@ -361,10 +370,31 @@ proc openWriteBatch*(
db: RocksDbReadWriteRef, cfHandle = db.defaultCfHandle db: RocksDbReadWriteRef, cfHandle = db.defaultCfHandle
): WriteBatchRef = ): WriteBatchRef =
## Opens a `WriteBatchRef` which defaults to using the specified column family. ## Opens a `WriteBatchRef` which defaults to using the specified column family.
## The write batch should be closed using the `close` method after usage.
doAssert not db.isClosed() doAssert not db.isClosed()
createWriteBatch(cfHandle) createWriteBatch(cfHandle)
proc openWriteBatchWithIndex*(
db: RocksDbReadWriteRef,
reservedBytes = 0,
overwriteKey = false,
cfHandle = db.defaultCfHandle,
): WriteBatchWIRef =
## Opens a `WriteBatchWIRef` which defaults to using the specified column family.
## The write batch should be closed using the `close` method after usage.
## `WriteBatchWIRef` is similar to `WriteBatchRef` but with a binary searchable
## index built for all the keys inserted which allows reading the data which has
## been writen to the batch.
##
## Optionally set the number of bytes to be reserved for the batch by setting
## `reservedBytes`. Set `overwriteKey` to true to overwrite the key in the index
## when inserting a duplicate key, in this way an iterator will never show two
## entries with the same key.
doAssert not db.isClosed()
createWriteBatch(reservedBytes, overwriteKey, db.dbOpts, cfHandle)
proc write*(db: RocksDbReadWriteRef, updates: WriteBatchRef): RocksDBResult[void] = proc write*(db: RocksDbReadWriteRef, updates: WriteBatchRef): RocksDBResult[void] =
## Apply the updates in the `WriteBatchRef` to the database. ## Apply the updates in the `WriteBatchRef` to the database.
doAssert not db.isClosed() doAssert not db.isClosed()
@ -377,6 +407,18 @@ proc write*(db: RocksDbReadWriteRef, updates: WriteBatchRef): RocksDBResult[void
ok() ok()
proc write*(db: RocksDbReadWriteRef, updates: WriteBatchWIRef): RocksDBResult[void] =
## Apply the updates in the `WriteBatchWIRef` to the database.
doAssert not db.isClosed()
var errors: cstring
rocksdb_write_writebatch_wi(
db.cPtr, db.writeOpts.cPtr, updates.cPtr, cast[cstringArray](errors.addr)
)
bailOnErrors(errors)
ok()
proc ingestExternalFile*( proc ingestExternalFile*(
db: RocksDbReadWriteRef, filePath: string, cfHandle = db.defaultCfHandle db: RocksDbReadWriteRef, filePath: string, cfHandle = db.defaultCfHandle
): RocksDBResult[void] = ): RocksDBResult[void] =

View File

@ -46,7 +46,7 @@ proc seekToKey*(iter: RocksIteratorRef, key: openArray[byte]) =
## invalid. ## invalid.
## ##
doAssert not iter.isClosed() doAssert not iter.isClosed()
rocksdb_iter_seek(iter.cPtr, cast[cstring](unsafeAddr key[0]), csize_t(key.len)) rocksdb_iter_seek(iter.cPtr, cast[cstring](key.unsafeAddrOrNil()), csize_t(key.len))
proc seekToFirst*(iter: RocksIteratorRef) = proc seekToFirst*(iter: RocksIteratorRef) =
## Seeks to the first entry in the column family. ## Seeks to the first entry in the column family.

View File

@ -61,9 +61,9 @@ proc put*(
var errors: cstring var errors: cstring
rocksdb_sstfilewriter_put( rocksdb_sstfilewriter_put(
writer.cPtr, writer.cPtr,
cast[cstring](unsafeAddr key[0]), cast[cstring](key.unsafeAddrOrNil()),
csize_t(key.len), csize_t(key.len),
cast[cstring](unsafeAddr val[0]), cast[cstring](val.unsafeAddrOrNil()),
csize_t(val.len), csize_t(val.len),
cast[cstringArray](errors.addr), cast[cstringArray](errors.addr),
) )
@ -77,7 +77,7 @@ proc delete*(writer: SstFileWriterRef, key: openArray[byte]): RocksDBResult[void
var errors: cstring var errors: cstring
rocksdb_sstfilewriter_delete( rocksdb_sstfilewriter_delete(
writer.cPtr, writer.cPtr,
cast[cstring](unsafeAddr key[0]), cast[cstring](key.unsafeAddrOrNil()),
csize_t(key.len), csize_t(key.len),
cast[cstringArray](errors.addr), cast[cstringArray](errors.addr),
) )

View File

@ -114,13 +114,11 @@ proc beginTransaction*(
## Begin a new transaction against the database. The transaction will default ## Begin a new transaction against the database. The transaction will default
## to using the specified column family. If no column family is specified ## to using the specified column family. If no column family is specified
## then the default column family will be used. ## then the default column family will be used.
##
##
doAssert not db.isClosed() doAssert not db.isClosed()
let txPtr = rocksdb_transaction_begin(db.cPtr, writeOpts.cPtr, txOpts.cPtr, nil) let txPtr = rocksdb_transaction_begin(db.cPtr, writeOpts.cPtr, txOpts.cPtr, nil)
newTransaction(txPtr, readOpts, writeOpts, txOpts, cfHandle) newTransaction(txPtr, readOpts, writeOpts, txOpts, nil, cfHandle)
proc close*(db: TransactionDbRef) = proc close*(db: TransactionDbRef) =
## Close the `TransactionDbRef`. ## Close the `TransactionDbRef`.

View File

@ -0,0 +1,49 @@
# Nim-RocksDB
# Copyright 2024 Status Research & Development GmbH
# Licensed under either of
#
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
# * GPL license, version 2.0, ([LICENSE-GPLv2](LICENSE-GPLv2) or https://www.gnu.org/licenses/old-licenses/gpl-2.0.en.html)
#
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import ../lib/librocksdb
type
OptimisticTxOptionsPtr* = ptr rocksdb_optimistictransaction_options_t
OptimisticTxOptionsRef* = ref object
cPtr: OptimisticTxOptionsPtr
autoClose*: bool # if true then close will be called when the transaction is closed
proc createOptimisticTxOptions*(autoClose = false): OptimisticTxOptionsRef =
OptimisticTxOptionsRef(
cPtr: rocksdb_optimistictransaction_options_create(), autoClose: autoClose
)
proc isClosed*(txOpts: OptimisticTxOptionsRef): bool {.inline.} =
txOpts.cPtr.isNil()
proc cPtr*(txOpts: OptimisticTxOptionsRef): OptimisticTxOptionsPtr =
doAssert not txOpts.isClosed()
txOpts.cPtr
template setOpt(nname, ntyp, ctyp: untyped) =
proc `nname=`*(txOpts: OptimisticTxOptionsRef, value: ntyp) =
doAssert not txOpts.isClosed()
`rocksdb_optimistictransaction_options_set nname`(txOpts.cPtr, value.ctyp)
setOpt setSnapshot, bool, uint8
proc defaultOptimisticTxOptions*(autoClose = false): OptimisticTxOptionsRef {.inline.} =
let txOpts = createOptimisticTxOptions(autoClose)
# TODO: set prefered defaults
txOpts
proc close*(txOpts: OptimisticTxOptionsRef) =
if not txOpts.isClosed():
rocksdb_optimistictransaction_options_destroy(txOpts.cPtr)
txOpts.cPtr = nil

View File

@ -24,7 +24,7 @@ import
../options/[readopts, writeopts], ../options/[readopts, writeopts],
../internal/[cftable, utils], ../internal/[cftable, utils],
../rocksresult, ../rocksresult,
./txopts ./[txopts, otxopts]
export rocksresult export rocksresult
@ -36,6 +36,7 @@ type
readOpts: ReadOptionsRef readOpts: ReadOptionsRef
writeOpts: WriteOptionsRef writeOpts: WriteOptionsRef
txOpts: TransactionOptionsRef txOpts: TransactionOptionsRef
otxOpts: OptimisticTxOptionsRef
defaultCfHandle: ColFamilyHandleRef defaultCfHandle: ColFamilyHandleRef
proc newTransaction*( proc newTransaction*(
@ -43,6 +44,7 @@ proc newTransaction*(
readOpts: ReadOptionsRef, readOpts: ReadOptionsRef,
writeOpts: WriteOptionsRef, writeOpts: WriteOptionsRef,
txOpts: TransactionOptionsRef, txOpts: TransactionOptionsRef,
otxOpts: OptimisticTxOptionsRef,
defaultCfHandle: ColFamilyHandleRef, defaultCfHandle: ColFamilyHandleRef,
): TransactionRef = ): TransactionRef =
TransactionRef( TransactionRef(
@ -50,6 +52,7 @@ proc newTransaction*(
readOpts: readOpts, readOpts: readOpts,
writeOpts: writeOpts, writeOpts: writeOpts,
txOpts: txOpts, txOpts: txOpts,
otxOpts: otxOpts,
defaultCfHandle: defaultCfHandle, defaultCfHandle: defaultCfHandle,
) )
@ -66,9 +69,6 @@ proc get*(
## Get the value for a given key from the transaction using the provided ## Get the value for a given key from the transaction using the provided
## `onData` callback. ## `onData` callback.
if key.len() == 0:
return err("rocksdb: key is empty")
var var
len: csize_t len: csize_t
errors: cstring errors: cstring
@ -76,7 +76,7 @@ proc get*(
tx.cPtr, tx.cPtr,
tx.readOpts.cPtr, tx.readOpts.cPtr,
cfHandle.cPtr, cfHandle.cPtr,
cast[cstring](unsafeAddr key[0]), cast[cstring](key.unsafeAddrOrNil()),
csize_t(key.len), csize_t(key.len),
len.addr, len.addr,
cast[cstringArray](errors.addr), cast[cstringArray](errors.addr),
@ -111,20 +111,13 @@ proc put*(
): RocksDBResult[void] = ): RocksDBResult[void] =
## Put the value for the given key into the transaction. ## Put the value for the given key into the transaction.
if key.len() == 0:
return err("rocksdb: key is empty")
var errors: cstring var errors: cstring
rocksdb_transaction_put_cf( rocksdb_transaction_put_cf(
tx.cPtr, tx.cPtr,
cfHandle.cPtr, cfHandle.cPtr,
cast[cstring](unsafeAddr key[0]), cast[cstring](key.unsafeAddrOrNil()),
csize_t(key.len), csize_t(key.len),
cast[cstring](if val.len > 0: cast[cstring](val.unsafeAddrOrNil()),
unsafeAddr val[0]
else:
nil
),
csize_t(val.len), csize_t(val.len),
cast[cstringArray](errors.addr), cast[cstringArray](errors.addr),
) )
@ -137,14 +130,11 @@ proc delete*(
): RocksDBResult[void] = ): RocksDBResult[void] =
## Delete the value for the given key from the transaction. ## Delete the value for the given key from the transaction.
if key.len() == 0:
return err("rocksdb: key is empty")
var errors: cstring var errors: cstring
rocksdb_transaction_delete_cf( rocksdb_transaction_delete_cf(
tx.cPtr, tx.cPtr,
cfHandle.cPtr, cfHandle.cPtr,
cast[cstring](unsafeAddr key[0]), cast[cstring](key.unsafeAddrOrNil()),
csize_t(key.len), csize_t(key.len),
cast[cstringArray](errors.addr), cast[cstringArray](errors.addr),
) )
@ -182,3 +172,4 @@ proc close*(tx: TransactionRef) =
autoCloseNonNil(tx.readOpts) autoCloseNonNil(tx.readOpts)
autoCloseNonNil(tx.writeOpts) autoCloseNonNil(tx.writeOpts)
autoCloseNonNil(tx.txOpts) autoCloseNonNil(tx.txOpts)
autoCloseNonNil(tx.otxOpts)

View File

@ -8,10 +8,12 @@
# at your option. This file may not be copied, modified, or distributed except according to those terms. # at your option. This file may not be copied, modified, or distributed except according to those terms.
## A `WriteBatchRef` holds a collection of updates to apply atomically to the database. ## A `WriteBatchRef` holds a collection of updates to apply atomically to the database.
## It depends on resources from an instance of `RocksDbRef' and therefore should be used
## and closed before the `RocksDbRef` is closed.
{.push raises: [].} {.push raises: [].}
import ./lib/librocksdb, ./internal/cftable, ./rocksresult import ./lib/librocksdb, ./internal/[cftable, utils], ./rocksresult
export rocksresult export rocksresult
@ -49,19 +51,12 @@ proc put*(
): RocksDBResult[void] = ): RocksDBResult[void] =
## Add a put operation to the write batch. ## Add a put operation to the write batch.
if key.len() == 0:
return err("rocksdb: key is empty")
rocksdb_writebatch_put_cf( rocksdb_writebatch_put_cf(
batch.cPtr, batch.cPtr,
cfHandle.cPtr, cfHandle.cPtr,
cast[cstring](unsafeAddr key[0]), cast[cstring](key.unsafeAddrOrNil()),
csize_t(key.len), csize_t(key.len),
cast[cstring](if val.len > 0: cast[cstring](val.unsafeAddrOrNil()),
unsafeAddr val[0]
else:
nil
),
csize_t(val.len), csize_t(val.len),
) )
@ -72,11 +67,8 @@ proc delete*(
): RocksDBResult[void] = ): RocksDBResult[void] =
## Add a delete operation to the write batch. ## Add a delete operation to the write batch.
if key.len() == 0:
return err("rocksdb: key is empty")
rocksdb_writebatch_delete_cf( rocksdb_writebatch_delete_cf(
batch.cPtr, cfHandle.cPtr, cast[cstring](unsafeAddr key[0]), csize_t(key.len) batch.cPtr, cfHandle.cPtr, cast[cstring](key.unsafeAddrOrNil()), csize_t(key.len)
) )
ok() ok()

140
rocksdb/writebatchwi.nim Normal file
View File

@ -0,0 +1,140 @@
# Nim-RocksDB
# Copyright 2024 Status Research & Development GmbH
# Licensed under either of
#
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
# * GPL license, version 2.0, ([LICENSE-GPLv2](LICENSE-GPLv2) or https://www.gnu.org/licenses/old-licenses/gpl-2.0.en.html)
#
# at your option. This file may not be copied, modified, or distributed except according to those terms.
## A `WriteBatchWIRef` holds a collection of updates to apply atomically to the database.
## It depends on resources from an instance of `RocksDbRef' and therefore should be used
## and closed before the `RocksDbRef` is closed.
##
## `WriteBatchWIRef` is similar to `WriteBatchRef` but with a binary searchable index
## built for all the keys inserted which allows reading the data which has been writen
## to the batch.
{.push raises: [].}
import ./lib/librocksdb, ./internal/[cftable, utils], ./options/dbopts, ./rocksresult
export rocksresult
type
WriteBatchWIPtr* = ptr rocksdb_writebatch_wi_t
WriteBatchWIRef* = ref object
cPtr: WriteBatchWIPtr
dbOpts: DbOptionsRef
defaultCfHandle: ColFamilyHandleRef
proc createWriteBatch*(
reservedBytes: int,
overwriteKey: bool,
dbOpts: DbOptionsRef,
defaultCfHandle: ColFamilyHandleRef,
): WriteBatchWIRef =
WriteBatchWIRef(
cPtr: rocksdb_writebatch_wi_create(reservedBytes.csize_t, overwriteKey.uint8),
dbOpts: dbOpts,
defaultCfHandle: defaultCfHandle,
)
proc isClosed*(batch: WriteBatchWIRef): bool {.inline.} =
## Returns `true` if the `WriteBatchWIRef` has been closed and `false` otherwise.
batch.cPtr.isNil()
proc cPtr*(batch: WriteBatchWIRef): WriteBatchWIPtr =
## Get the underlying database pointer.
doAssert not batch.isClosed()
batch.cPtr
proc clear*(batch: WriteBatchWIRef) =
## Clears the write batch.
doAssert not batch.isClosed()
rocksdb_writebatch_wi_clear(batch.cPtr)
proc count*(batch: WriteBatchWIRef): int =
## Get the number of updates in the write batch.
doAssert not batch.isClosed()
rocksdb_writebatch_wi_count(batch.cPtr).int
proc put*(
batch: WriteBatchWIRef, key, val: openArray[byte], cfHandle = batch.defaultCfHandle
): RocksDBResult[void] =
## Add a put operation to the write batch.
rocksdb_writebatch_wi_put_cf(
batch.cPtr,
cfHandle.cPtr,
cast[cstring](key.unsafeAddrOrNil()),
csize_t(key.len),
cast[cstring](val.unsafeAddrOrNil()),
csize_t(val.len),
)
ok()
proc delete*(
batch: WriteBatchWIRef, key: openArray[byte], cfHandle = batch.defaultCfHandle
): RocksDBResult[void] =
## Add a delete operation to the write batch.
rocksdb_writebatch_wi_delete_cf(
batch.cPtr, cfHandle.cPtr, cast[cstring](key.unsafeAddrOrNil()), csize_t(key.len)
)
ok()
proc get*(
batch: WriteBatchWIRef,
key: openArray[byte],
onData: DataProc,
cfHandle = batch.defaultCfHandle,
): RocksDBResult[bool] =
## Get the value for a given key from the batch using the provided
## `onData` callback.
var
len: csize_t
errors: cstring
let data = rocksdb_writebatch_wi_get_from_batch_cf(
batch.cPtr,
batch.dbOpts.cPtr,
cfHandle.cPtr,
cast[cstring](key.unsafeAddrOrNil()),
csize_t(key.len),
len.addr,
cast[cstringArray](errors.addr),
)
bailOnErrors(errors)
if data.isNil():
doAssert len == 0
ok(false)
else:
onData(toOpenArrayByte(data, 0, len.int - 1))
rocksdb_free(data)
ok(true)
proc get*(
batch: WriteBatchWIRef, key: openArray[byte], cfHandle = batch.defaultCfHandle
): RocksDBResult[seq[byte]] =
## Get the value for a given key from the batch.
var dataRes: RocksDBResult[seq[byte]]
proc onData(data: openArray[byte]) =
dataRes.ok(@data)
let res = batch.get(key, onData, cfHandle)
if res.isOk():
return dataRes
dataRes.err(res.error())
proc close*(batch: WriteBatchWIRef) =
## Close the `WriteBatchWIRef`.
if not batch.isClosed():
rocksdb_writebatch_wi_destroy(batch.cPtr)
batch.cPtr = nil

View File

@ -18,12 +18,15 @@ import
./options/test_readopts, ./options/test_readopts,
./options/test_tableopts, ./options/test_tableopts,
./options/test_writeopts, ./options/test_writeopts,
./transactions/test_otxopts,
./transactions/test_txdbopts, ./transactions/test_txdbopts,
./transactions/test_txopts, ./transactions/test_txopts,
./test_backup, ./test_backup,
./test_columnfamily, ./test_columnfamily,
./test_optimistictxdb,
./test_rocksdb, ./test_rocksdb,
./test_rocksiterator, ./test_rocksiterator,
./test_sstfilewriter, ./test_sstfilewriter,
./test_transactiondb, ./test_transactiondb,
./test_writebatch ./test_writebatch,
./test_writebatchwi

View File

@ -9,7 +9,7 @@
{.used.} {.used.}
import std/sequtils, ../rocksdb/backup, ../rocksdb/rocksdb, ../rocksdb/transactiondb import std/sequtils, ../rocksdb/[backup, rocksdb, transactiondb, optimistictxdb]
proc initReadWriteDb*( proc initReadWriteDb*(
path: string, columnFamilyNames: openArray[string] = @[] path: string, columnFamilyNames: openArray[string] = @[]
@ -57,3 +57,17 @@ proc initTransactionDb*(
echo res.error() echo res.error()
doAssert res.isOk() doAssert res.isOk()
res.value() res.value()
proc initOptimisticTxDb*(
path: string, columnFamilyNames: openArray[string] = @[]
): OptimisticTxDbRef =
let res = openOptimisticTxDb(
path,
columnFamilies = columnFamilyNames.mapIt(
initColFamilyDescriptor(it, defaultColFamilyOptions(autoClose = true))
),
)
if res.isErr():
echo res.error()
doAssert res.isOk()
res.value()

View File

@ -0,0 +1,291 @@
# Nim-RocksDB
# Copyright 2024 Status Research & Development GmbH
# Licensed under either of
#
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
# * GPL license, version 2.0, ([LICENSE-GPLv2](LICENSE-GPLv2) or https://www.gnu.org/licenses/old-licenses/gpl-2.0.en.html)
#
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.used.}
import std/os, tempfile, unittest2, ../rocksdb/optimistictxdb, ./test_helper
suite "OptimisticTxDbRef Tests":
const
CF_DEFAULT = "default"
CF_OTHER = "other"
let
key1 = @[byte(1)]
val1 = @[byte(1)]
key2 = @[byte(2)]
val2 = @[byte(2)]
key3 = @[byte(3)]
val3 = @[byte(3)]
setup:
let
dbPath = mkdtemp() / "data"
db = initOptimisticTxDb(dbPath, columnFamilyNames = @[CF_OTHER])
defaultCfHandle = db.getColFamilyHandle(CF_DEFAULT).get()
otherCfHandle = db.getColFamilyHandle(CF_OTHER).get()
teardown:
db.close()
removeDir($dbPath)
# test multiple transactions
test "Test rollback using default column family":
var tx = db.beginTransaction()
defer:
tx.close()
check not tx.isClosed()
check:
tx.put(key1, val1).isOk()
tx.put(key2, val2).isOk()
tx.put(key3, val3).isOk()
tx.delete(key2).isOk()
not tx.isClosed()
check:
tx.get(key1).get() == val1
tx.get(key2).error() == ""
tx.get(key3).get() == val3
let res = tx.rollback()
check:
res.isOk()
tx.get(key1).error() == ""
tx.get(key2).error() == ""
tx.get(key3).error() == ""
test "Test commit using default column family":
var tx = db.beginTransaction()
defer:
tx.close()
check not tx.isClosed()
check:
tx.put(key1, val1).isOk()
tx.put(key2, val2).isOk()
tx.put(key3, val3).isOk()
tx.delete(key2).isOk()
not tx.isClosed()
check:
tx.get(key1).get() == val1
tx.get(key2).error() == ""
tx.get(key3).get() == val3
let res = tx.commit()
check:
res.isOk()
tx.get(key1).get() == val1
tx.get(key2).error() == ""
tx.get(key3).get() == val3
test "Test setting column family in beginTransaction":
var tx = db.beginTransaction(cfHandle = otherCfHandle)
defer:
tx.close()
check not tx.isClosed()
check:
tx.put(key1, val1).isOk()
tx.put(key2, val2).isOk()
tx.put(key3, val3).isOk()
tx.delete(key2).isOk()
not tx.isClosed()
check:
tx.get(key1, defaultCfHandle).error() == ""
tx.get(key2, defaultCfHandle).error() == ""
tx.get(key3, defaultCfHandle).error() == ""
tx.get(key1, otherCfHandle).get() == val1
tx.get(key2, otherCfHandle).error() == ""
tx.get(key3, otherCfHandle).get() == val3
test "Test rollback and commit with multiple transactions":
var tx1 = db.beginTransaction(cfHandle = defaultCfHandle)
defer:
tx1.close()
check not tx1.isClosed()
var tx2 = db.beginTransaction(cfHandle = otherCfHandle)
defer:
tx2.close()
check not tx2.isClosed()
check:
tx1.put(key1, val1).isOk()
tx1.put(key2, val2).isOk()
tx1.put(key3, val3).isOk()
tx1.delete(key2).isOk()
not tx1.isClosed()
tx2.put(key1, val1).isOk()
tx2.put(key2, val2).isOk()
tx2.put(key3, val3).isOk()
tx2.delete(key2).isOk()
not tx2.isClosed()
check:
tx1.get(key1, defaultCfHandle).get() == val1
tx1.get(key2, defaultCfHandle).error() == ""
tx1.get(key3, defaultCfHandle).get() == val3
tx1.get(key1, otherCfHandle).error() == ""
tx1.get(key2, otherCfHandle).error() == ""
tx1.get(key3, otherCfHandle).error() == ""
tx2.get(key1, defaultCfHandle).error() == ""
tx2.get(key2, defaultCfHandle).error() == ""
tx2.get(key3, defaultCfHandle).error() == ""
tx2.get(key1, otherCfHandle).get() == val1
tx2.get(key2, otherCfHandle).error() == ""
tx2.get(key3, otherCfHandle).get() == val3
block:
let res = tx1.rollback()
check:
res.isOk()
tx1.get(key1, defaultCfHandle).error() == ""
tx1.get(key2, defaultCfHandle).error() == ""
tx1.get(key3, defaultCfHandle).error() == ""
tx1.get(key1, otherCfHandle).error() == ""
tx1.get(key2, otherCfHandle).error() == ""
tx1.get(key3, otherCfHandle).error() == ""
block:
let res = tx2.commit()
check:
res.isOk()
tx2.get(key1, defaultCfHandle).error() == ""
tx2.get(key2, defaultCfHandle).error() == ""
tx2.get(key3, defaultCfHandle).error() == ""
tx2.get(key1, otherCfHandle).get() == val1
tx2.get(key2, otherCfHandle).error() == ""
tx2.get(key3, otherCfHandle).get() == val3
test "Test close":
var tx = db.beginTransaction()
check not tx.isClosed()
tx.close()
check tx.isClosed()
tx.close()
check tx.isClosed()
check not db.isClosed()
db.close()
check db.isClosed()
db.close()
check db.isClosed()
test "Test close multiple tx":
var tx1 = db.beginTransaction()
var tx2 = db.beginTransaction()
check not db.isClosed()
check not tx1.isClosed()
tx1.close()
check tx1.isClosed()
tx1.close()
check tx1.isClosed()
check not db.isClosed()
check not tx2.isClosed()
tx2.close()
check tx2.isClosed()
tx2.close()
check tx2.isClosed()
test "Test auto close enabled":
let
dbPath = mkdtemp() / "autoclose-enabled"
dbOpts = defaultDbOptions(autoClose = true)
columnFamilies =
@[
initColFamilyDescriptor(CF_DEFAULT, defaultColFamilyOptions(autoClose = true))
]
db = openOptimisticTxDb(dbPath, dbOpts, columnFamilies).get()
check:
dbOpts.isClosed() == false
columnFamilies[0].isClosed() == false
db.isClosed() == false
db.close()
check:
dbOpts.isClosed() == true
columnFamilies[0].isClosed() == true
db.isClosed() == true
test "Test auto close enabled":
let
dbPath = mkdtemp() / "autoclose-disabled"
dbOpts = defaultDbOptions(autoClose = false)
columnFamilies =
@[
initColFamilyDescriptor(
CF_DEFAULT, defaultColFamilyOptions(autoClose = false)
)
]
db = openOptimisticTxDb(dbPath, dbOpts, columnFamilies).get()
check:
dbOpts.isClosed() == false
columnFamilies[0].isClosed() == false
db.isClosed() == false
db.close()
check:
dbOpts.isClosed() == false
columnFamilies[0].isClosed() == false
db.isClosed() == true
test "Test auto close tx enabled":
let
readOpts = defaultReadOptions(autoClose = true)
writeOpts = defaultWriteOptions(autoClose = true)
otxOpts = defaultOptimisticTxOptions(autoClose = true)
tx = db.beginTransaction(readOpts, writeOpts, otxOpts)
check:
readOpts.isClosed() == false
writeOpts.isClosed() == false
otxOpts.isClosed() == false
tx.isClosed() == false
tx.close()
check:
readOpts.isClosed() == true
writeOpts.isClosed() == true
otxOpts.isClosed() == true
tx.isClosed() == true
test "Test auto close tx disabled":
let
readOpts = defaultReadOptions(autoClose = false)
writeOpts = defaultWriteOptions(autoClose = false)
otxOpts = defaultOptimisticTxOptions(autoClose = false)
tx = db.beginTransaction(readOpts, writeOpts, otxOpts)
check:
readOpts.isClosed() == false
writeOpts.isClosed() == false
otxOpts.isClosed() == false
tx.isClosed() == false
tx.close()
check:
readOpts.isClosed() == false
writeOpts.isClosed() == false
otxOpts.isClosed() == false
tx.isClosed() == true

View File

@ -324,6 +324,38 @@ suite "RocksDbRef Tests":
v.len() == 0 v.len() == 0
db.get(key5).isErr() db.get(key5).isErr()
test "Test keyMayExist":
let
key1 = @[byte(1)] # exists with non empty value
val1 = @[byte(1)]
key2 = @[byte(2)] # exists with empty seq value
val2: seq[byte] = @[]
key3 = @[byte(3)] # exists with empty array value
val3: array[0, byte] = []
key4 = @[byte(4)] # deleted key
key5 = @[byte(5)] # key not created
check:
db.put(key1, val1).isOk()
db.put(key2, val2).isOk()
db.put(key3, val3).isOk()
db.delete(key4).isOk()
db.keyMayExist(key1).isOk()
db.keyMayExist(key2).isOk()
db.keyMayExist(key3).isOk()
db.keyMayExist(key4).get() == false
db.keyMayExist(key5).get() == false
test "Put, get and delete empty key":
let empty: seq[byte] = @[]
check:
db.put(empty, val).isOk()
db.get(empty).get() == val
db.delete(empty).isOk()
db.get(empty).isErr()
test "List column familes": test "List column familes":
let cfRes1 = listColumnFamilies(dbPath) let cfRes1 = listColumnFamilies(dbPath)
check: check:

View File

@ -165,6 +165,20 @@ suite "RocksIteratorRef Tests":
iter.key() == key2 iter.key() == key2
iter.value() == val2 iter.value() == val2
test "Seek to empty key":
let empty: seq[byte] = @[]
check db.put(empty, val1).isOk()
let iter = db.openIterator().get()
defer:
iter.close()
iter.seekToKey(empty)
check:
iter.isValid()
iter.key() == empty
iter.value() == val1
test "Empty column family": test "Empty column family":
let res = db.openIterator(emptyCfHandle) let res = db.openIterator(emptyCfHandle)
check res.isOk() check res.isOk()

View File

@ -76,6 +76,19 @@ suite "SstFileWriterRef Tests":
db.get(key2, otherCfHandle).get() == val2 db.get(key2, otherCfHandle).get() == val2
db.get(key3, otherCfHandle).get() == val3 db.get(key3, otherCfHandle).get() == val3
test "Put, get and delete empty key":
let writer = openSstFileWriter(sstFilePath).get()
defer:
writer.close()
let empty: seq[byte] = @[]
check:
writer.put(empty, val1).isOk()
writer.finish().isOk()
db.ingestExternalFile(sstFilePath).isOk()
db.keyExists(empty).get() == true
db.get(empty).get() == val1
test "Test close": test "Test close":
let res = openSstFileWriter(sstFilePath) let res = openSstFileWriter(sstFilePath)
check res.isOk() check res.isOk()

View File

@ -169,6 +169,18 @@ suite "TransactionDbRef Tests":
tx2.get(key2, otherCfHandle).error() == "" tx2.get(key2, otherCfHandle).error() == ""
tx2.get(key3, otherCfHandle).get() == val3 tx2.get(key3, otherCfHandle).get() == val3
test "Put, get and delete empty key":
let tx = db.beginTransaction()
defer:
tx.close()
let empty: seq[byte] = @[]
check:
tx.put(empty, val1).isOk()
tx.get(empty).get() == val1
tx.delete(empty).isOk()
tx.get(empty).isErr()
test "Test close": test "Test close":
var tx = db.beginTransaction() var tx = db.beginTransaction()
@ -227,7 +239,7 @@ suite "TransactionDbRef Tests":
columnFamilies[0].isClosed() == true columnFamilies[0].isClosed() == true
db.isClosed() == true db.isClosed() == true
test "Test auto close enabled": test "Test auto close disabled":
let let
dbPath = mkdtemp() / "autoclose-disabled" dbPath = mkdtemp() / "autoclose-disabled"
dbOpts = defaultDbOptions(autoClose = false) dbOpts = defaultDbOptions(autoClose = false)

View File

@ -36,7 +36,7 @@ suite "WriteBatchRef Tests":
removeDir($dbPath) removeDir($dbPath)
test "Test writing batch to the default column family": test "Test writing batch to the default column family":
var batch = db.openWriteBatch() let batch = db.openWriteBatch()
defer: defer:
batch.close() batch.close()
check not batch.isClosed() check not batch.isClosed()
@ -65,7 +65,7 @@ suite "WriteBatchRef Tests":
not batch.isClosed() not batch.isClosed()
test "Test writing batch to column family": test "Test writing batch to column family":
var batch = db.openWriteBatch() let batch = db.openWriteBatch()
defer: defer:
batch.close() batch.close()
check not batch.isClosed() check not batch.isClosed()
@ -93,7 +93,7 @@ suite "WriteBatchRef Tests":
not batch.isClosed() not batch.isClosed()
test "Test writing to multiple column families in single batch": test "Test writing to multiple column families in single batch":
var batch = db.openWriteBatch() let batch = db.openWriteBatch()
defer: defer:
batch.close() batch.close()
check not batch.isClosed() check not batch.isClosed()
@ -123,17 +123,16 @@ suite "WriteBatchRef Tests":
not batch.isClosed() not batch.isClosed()
test "Test writing to multiple column families in multiple batches": test "Test writing to multiple column families in multiple batches":
var batch1 = db.openWriteBatch() let
batch1 = db.openWriteBatch()
batch2 = db.openWriteBatch()
defer: defer:
batch1.close() batch1.close()
check not batch1.isClosed()
var batch2 = db.openWriteBatch()
defer:
batch2.close() batch2.close()
check not batch2.isClosed()
check: check:
not batch1.isClosed()
not batch2.isClosed()
batch1.put(key1, val1).isOk() batch1.put(key1, val1).isOk()
batch1.delete(key2, otherCfHandle).isOk() batch1.delete(key2, otherCfHandle).isOk()
batch1.put(key3, val3, otherCfHandle).isOk() batch1.put(key3, val3, otherCfHandle).isOk()
@ -155,8 +154,28 @@ suite "WriteBatchRef Tests":
db.keyExists(key2, otherCfHandle).get() == false db.keyExists(key2, otherCfHandle).get() == false
db.get(key3, otherCfHandle).get() == val3 db.get(key3, otherCfHandle).get() == val3
# Write batch is unchanged after write
batch1.count() == 3
batch2.count() == 3
not batch1.isClosed()
not batch2.isClosed()
test "Put, get and delete empty key":
let batch = db.openWriteBatch()
defer:
batch.close()
let empty: seq[byte] = @[]
check:
batch.put(empty, val1).isOk()
db.write(batch).isOk()
db.get(empty).get() == val1
batch.delete(empty).isOk()
db.write(batch).isOk()
db.get(empty).isErr()
test "Test write empty batch": test "Test write empty batch":
var batch = db.openWriteBatch() let batch = db.openWriteBatch()
defer: defer:
batch.close() batch.close()
check not batch.isClosed() check not batch.isClosed()
@ -166,9 +185,10 @@ suite "WriteBatchRef Tests":
check: check:
res1.isOk() res1.isOk()
batch.count() == 0 batch.count() == 0
not batch.isClosed()
test "Test close": test "Test close":
var batch = db.openWriteBatch() let batch = db.openWriteBatch()
check not batch.isClosed() check not batch.isClosed()
batch.close() batch.close()

235
tests/test_writebatchwi.nim Normal file
View File

@ -0,0 +1,235 @@
# Nim-RocksDB
# Copyright 2024 Status Research & Development GmbH
# Licensed under either of
#
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
# * GPL license, version 2.0, ([LICENSE-GPLv2](LICENSE-GPLv2) or https://www.gnu.org/licenses/old-licenses/gpl-2.0.en.html)
#
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.used.}
import std/os, tempfile, unittest2, ../rocksdb/[rocksdb, writebatchwi], ./test_helper
suite "WriteBatchWIRef Tests":
const
CF_DEFAULT = "default"
CF_OTHER = "other"
let
key1 = @[byte(1)]
val1 = @[byte(1)]
key2 = @[byte(2)]
val2 = @[byte(2)]
key3 = @[byte(3)]
val3 = @[byte(3)]
setup:
let
dbPath = mkdtemp() / "data"
db = initReadWriteDb(dbPath, columnFamilyNames = @[CF_DEFAULT, CF_OTHER])
defaultCfHandle = db.getColFamilyHandle(CF_DEFAULT).get()
otherCfHandle = db.getColFamilyHandle(CF_OTHER).get()
teardown:
db.close()
removeDir($dbPath)
test "Test writing batch to the default column family":
let batch = db.openWriteBatchWithIndex()
defer:
batch.close()
check not batch.isClosed()
check:
batch.put(key1, val1).isOk()
batch.put(key2, val2).isOk()
batch.put(key3, val3).isOk()
batch.count() == 3
batch.delete(key2).isOk()
batch.count() == 4
not batch.isClosed()
batch.get(key1).get() == val1
batch.get(key2).isErr()
batch.get(key3).get() == val3
let res = db.write(batch)
check:
res.isOk()
db.write(batch).isOk() # test that it's idempotent
db.get(key1).get() == val1
db.keyExists(key2).get() == false
db.get(key3).get() == val3
batch.get(key1).get() == val1
batch.get(key2).isErr()
batch.get(key3).get() == val3
batch.clear()
check:
batch.count() == 0
not batch.isClosed()
test "Test writing batch to column family":
let batch = db.openWriteBatchWithIndex()
defer:
batch.close()
check not batch.isClosed()
check:
batch.put(key1, val1, otherCfHandle).isOk()
batch.put(key2, val2, otherCfHandle).isOk()
batch.put(key3, val3, otherCfHandle).isOk()
batch.count() == 3
batch.delete(key2, otherCfHandle).isOk()
batch.count() == 4
not batch.isClosed()
batch.get(key1, otherCfHandle).get() == val1
batch.get(key2, otherCfHandle).isErr()
batch.get(key3, otherCfHandle).get() == val3
let res = db.write(batch)
check:
res.isOk()
db.get(key1, otherCfHandle).get() == val1
db.keyExists(key2, otherCfHandle).get() == false
db.get(key3, otherCfHandle).get() == val3
batch.get(key1, otherCfHandle).get() == val1
batch.get(key2, otherCfHandle).isErr()
batch.get(key3, otherCfHandle).get() == val3
batch.clear()
check:
batch.count() == 0
not batch.isClosed()
test "Test writing to multiple column families in single batch":
let batch = db.openWriteBatchWithIndex()
defer:
batch.close()
check not batch.isClosed()
check:
batch.put(key1, val1, defaultCfHandle).isOk()
batch.put(key1, val1, otherCfHandle).isOk()
batch.put(key2, val2, otherCfHandle).isOk()
batch.put(key3, val3, otherCfHandle).isOk()
batch.count() == 4
batch.delete(key2, otherCfHandle).isOk()
batch.count() == 5
not batch.isClosed()
let res = db.write(batch)
check:
res.isOk()
db.get(key1, defaultCfHandle).get() == val1
db.get(key1, otherCfHandle).get() == val1
db.keyExists(key2, otherCfHandle).get() == false
db.get(key3, otherCfHandle).get() == val3
batch.clear()
check:
batch.count() == 0
not batch.isClosed()
test "Test writing to multiple column families in multiple batches":
let
batch1 = db.openWriteBatchWithIndex()
batch2 = db.openWriteBatchWithIndex()
defer:
batch1.close()
batch2.close()
check:
not batch1.isClosed()
not batch2.isClosed()
batch1.put(key1, val1).isOk()
batch1.delete(key2, otherCfHandle).isOk()
batch1.put(key3, val3, otherCfHandle).isOk()
batch2.put(key1, val1, otherCfHandle).isOk()
batch2.delete(key1, otherCfHandle).isOk()
batch2.put(key3, val3).isOk()
batch1.count() == 3
batch2.count() == 3
let res1 = db.write(batch1)
let res2 = db.write(batch2)
check:
res1.isOk()
res2.isOk()
db.get(key1).get() == val1
db.keyExists(key2).get() == false
db.get(key3).get() == val3
db.keyExists(key1, otherCfHandle).get() == false
db.keyExists(key2, otherCfHandle).get() == false
db.get(key3, otherCfHandle).get() == val3
# Write batch is unchanged after write
batch1.count() == 3
batch2.count() == 3
not batch1.isClosed()
not batch2.isClosed()
test "Test write empty batch":
let batch = db.openWriteBatchWithIndex()
defer:
batch.close()
check not batch.isClosed()
check batch.count() == 0
let res1 = db.write(batch)
check:
res1.isOk()
batch.count() == 0
not batch.isClosed()
test "Test multiple writes to same key":
let
batch1 = db.openWriteBatchWithIndex(overwriteKey = false)
batch2 = db.openWriteBatchWithIndex(overwriteKey = true)
defer:
batch1.close()
batch2.close()
check:
not batch1.isClosed()
not batch2.isClosed()
check:
batch1.put(key1, val1).isOk()
batch1.delete(key1).isOk()
batch1.put(key1, val3).isOk()
batch1.count() == 3
batch1.get(key1).get() == val3
batch2.put(key1, val3).isOk()
batch2.put(key1, val2).isOk()
batch2.put(key1, val1).isOk()
batch2.count() == 3
batch2.get(key1).get() == val1
test "Put, get and delete empty key":
let batch = db.openWriteBatchWithIndex()
defer:
batch.close()
let empty: seq[byte] = @[]
check:
batch.put(empty, val1).isOk()
batch.get(empty).get() == val1
batch.delete(empty).isOk()
batch.get(empty).isErr()
test "Test close":
let batch = db.openWriteBatchWithIndex()
check not batch.isClosed()
batch.close()
check batch.isClosed()
batch.close()
check batch.isClosed()

View File

@ -0,0 +1,36 @@
# Nim-RocksDB
# Copyright 2024 Status Research & Development GmbH
# Licensed under either of
#
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
# * GPL license, version 2.0, ([LICENSE-GPLv2](LICENSE-GPLv2) or https://www.gnu.org/licenses/old-licenses/gpl-2.0.en.html)
#
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.used.}
import unittest2, ../../rocksdb/transactions/otxopts
suite "OptimisticTxOptionsRef Tests":
test "Test createOptimisticTxOptions":
let txOpts = createOptimisticTxOptions()
check not txOpts.cPtr.isNil()
txOpts.close()
test "Test defaultTransactionOptions":
let txOpts = defaultOptimisticTxOptions()
check not txOpts.cPtr.isNil()
txOpts.close()
test "Test close":
let txOpts = defaultOptimisticTxOptions()
check not txOpts.isClosed()
txOpts.close()
check txOpts.isClosed()
txOpts.close()
check txOpts.isClosed()

View File

@ -13,21 +13,21 @@ import unittest2, ../../rocksdb/transactions/txdbopts
suite "TransactionDbOptionsRef Tests": suite "TransactionDbOptionsRef Tests":
test "Test newTransactionDbOptions": test "Test newTransactionDbOptions":
var txDbOpts = createTransactionDbOptions() let txDbOpts = createTransactionDbOptions()
check not txDbOpts.cPtr.isNil() check not txDbOpts.cPtr.isNil()
txDbOpts.close() txDbOpts.close()
test "Test defaultTransactionDbOptions": test "Test defaultTransactionDbOptions":
var txDbOpts = defaultTransactionDbOptions() let txDbOpts = defaultTransactionDbOptions()
check not txDbOpts.cPtr.isNil() check not txDbOpts.cPtr.isNil()
txDbOpts.close() txDbOpts.close()
test "Test close": test "Test close":
var txDbOpts = defaultTransactionDbOptions() let txDbOpts = defaultTransactionDbOptions()
check not txDbOpts.isClosed() check not txDbOpts.isClosed()
txDbOpts.close() txDbOpts.close()

View File

@ -13,21 +13,21 @@ import unittest2, ../../rocksdb/transactions/txopts
suite "TransactionOptionsRef Tests": suite "TransactionOptionsRef Tests":
test "Test newTransactionOptions": test "Test newTransactionOptions":
var txOpts = createTransactionOptions() let txOpts = createTransactionOptions()
check not txOpts.cPtr.isNil() check not txOpts.cPtr.isNil()
txOpts.close() txOpts.close()
test "Test defaultTransactionOptions": test "Test defaultTransactionOptions":
var txOpts = defaultTransactionOptions() let txOpts = defaultTransactionOptions()
check not txOpts.cPtr.isNil() check not txOpts.cPtr.isNil()
txOpts.close() txOpts.close()
test "Test close": test "Test close":
var txOpts = defaultTransactionOptions() let txOpts = defaultTransactionOptions()
check not txOpts.isClosed() check not txOpts.isClosed()
txOpts.close() txOpts.close()