diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 0e2e106..73df27f 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -13,7 +13,7 @@ jobs: fail-fast: false matrix: cache_nonce: [ 1 ] - nim_version: [ 1.6.14 ] + nim_version: [ 1.6.18 ] platform: - { icon: 🐧, diff --git a/datastore/datastore.nim b/datastore/datastore.nim index d7fa45f..f664ec0 100644 --- a/datastore/datastore.nim +++ b/datastore/datastore.nim @@ -1,4 +1,5 @@ import pkg/chronos +import pkg/questionable import pkg/questionable/results import pkg/upraises @@ -12,6 +13,9 @@ push: {.upraises: [].} type BatchEntry* = tuple[key: Key, data: seq[byte]] + Function*[T, U] = proc(value: T): U {.raises: [CatchableError], gcsafe, closure.} + Modify* = Function[?seq[byte], Future[?seq[byte]]] + ModifyGet* = Function[?seq[byte], Future[(?seq[byte], seq[byte])]] method has*(self: Datastore, key: Key): Future[?!bool] {.base, locks: "unknown".} = raiseAssert("Not implemented!") @@ -42,3 +46,34 @@ method query*( proc contains*(self: Datastore, key: Key): Future[bool] {.async.} = return (await self.has(key)) |? false + +method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, locks: "unknown".} = + ## Concurrently safe way of modifying the value associated with the `key`. + ## + ## Same as `modifyGet`, but this takes `fn` that doesn't produce any auxillary value. + ## + + raiseAssert("Not implemented!") + +method modifyGet*(self: Datastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, locks: "unknown".} = + ## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on + ## successful update. + ## + ## This method first reads a value stored under the `key`, if such value exists it's wrapped into `some` + ## and passed as the only arg to the `fn`, otherwise `none` is passed. + ## + ## Table below presents four possibilities of execution. `curr` represents a value passed to `fn`, + ## while `fn(curr)` represents a value returned by calling `fn` (auxillary value is omitted for clarity). + ## + ## | curr | fn(curr) | action | + ## |---------|----------|------------------------------| + ## | none | none | no action | + ## | none | some(v) | insert v | + ## | some(u) | none | delete u | + ## | some(u) | some(v) | replace u with v (if u != v) | + ## + ## Note that `fn` can be called multiple times (when concurrent modification was detected). In such case + ## only the last auxillary value is returned. + ## + + raiseAssert("Not implemented!") \ No newline at end of file diff --git a/datastore/defaultimpl.nim b/datastore/defaultimpl.nim new file mode 100644 index 0000000..1562263 --- /dev/null +++ b/datastore/defaultimpl.nim @@ -0,0 +1,63 @@ +import pkg/chronos +import pkg/questionable +import pkg/questionable/results + +import ./datastore + +proc defaultModifyGetImpl*( + self: Datastore, + lock: AsyncLock, + key: Key, + fn: ModifyGet + ): Future[?!seq[byte]] {.async.} = + # Default implementation, serializes all modify operations using provided lock + # + + await lock.acquire() + + try: + without data =? await self.get(key), err: + if not (err of DatastoreKeyNotFound): + return failure(err) + + let maybeCurrentData = + if data.len == 0: + seq[byte].none + else: + data.some + + var + maybeNewData: ?seq[byte] + aux: seq[byte] + + try: + (maybeNewData, aux) = await fn(maybeCurrentData) + except CatchableError as err: + return failure(err) + + if newData =? maybeNewData: + if err =? (await self.put(key, newData)).errorOption: + return failure(err) + elif currentData =? maybeCurrentData: + if err =? (await self.delete(key)).errorOption: + return failure(err) + + return aux.success + finally: + lock.release() + +proc defaultModifyImpl*( + self: Datastore, + lock: AsyncLock, + key: Key, + fn: Modify + ): Future[?!void] {.async.} = + proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} = + let res = await fn(maybeValue) + let ignoredAux = newSeq[byte]() + return (res, ignoredAux) + + if err =? (await self.defaultModifyGetImpl(lock, key, wrappedFn)).errorOption: + return failure(err) + else: + return success() diff --git a/datastore/fsds.nim b/datastore/fsds.nim index 0be95c9..2a58a99 100644 --- a/datastore/fsds.nim +++ b/datastore/fsds.nim @@ -1,5 +1,6 @@ import std/os import std/options +import std/tables import std/strutils import pkg/chronos @@ -8,6 +9,7 @@ import pkg/questionable/results from pkg/stew/results as stewResults import get, isErr import pkg/upraises +import ./defaultimpl import ./datastore export datastore @@ -19,6 +21,7 @@ type root*: string ignoreProtected: bool depth: int + locks: TableRef[Key, AsyncLock] proc validDepth*(self: FSDatastore, key: Key): bool = key.len <= self.depth @@ -217,6 +220,30 @@ method query*( iter.next = next return success iter +method modifyGet*( + self: FSDatastore, + key: Key, + fn: ModifyGet): Future[?!seq[byte]] {.async.} = + var lock: AsyncLock + try: + lock = self.locks.mgetOrPut(key, newAsyncLock()) + return await defaultModifyGetImpl(self, lock, key, fn) + finally: + if not lock.locked: + self.locks.del(key) + +method modify*( + self: FSDatastore, + key: Key, + fn: Modify): Future[?!void] {.async.} = + var lock: AsyncLock + try: + lock = self.locks.mgetOrPut(key, newAsyncLock()) + return await defaultModifyImpl(self, lock, key, fn) + finally: + if not lock.locked: + self.locks.del(key) + proc new*( T: type FSDatastore, root: string, @@ -235,4 +262,6 @@ proc new*( success T( root: root, ignoreProtected: ignoreProtected, - depth: depth) + depth: depth, + locks: newTable[Key, AsyncLock]() + ) diff --git a/datastore/mountedds.nim b/datastore/mountedds.nim index dbe74df..9102e49 100644 --- a/datastore/mountedds.nim +++ b/datastore/mountedds.nim @@ -119,6 +119,26 @@ method put*( return success() +method modifyGet*( + self: MountedDatastore, + key: Key, + fn: ModifyGet): Future[?!seq[byte]] {.async.} = + + without mounted =? self.dispatch(key), error: + return failure(error) + + return await mounted.store.store.modifyGet(mounted.relative, fn) + +method modify*( + self: MountedDatastore, + key: Key, + fn: Modify): Future[?!void] {.async.} = + + without mounted =? self.dispatch(key), error: + return failure(error) + + return await mounted.store.store.modify(mounted.relative, fn) + method close*(self: MountedDatastore): Future[?!void] {.async.} = for s in self.stores.values: discard await s.store.close() diff --git a/datastore/sql/sqliteds.nim b/datastore/sql/sqliteds.nim index aa63274..ff440e4 100644 --- a/datastore/sql/sqliteds.nim +++ b/datastore/sql/sqliteds.nim @@ -29,6 +29,106 @@ proc `readOnly=`*(self: SQLiteDatastore): bool proc timestamp*(t = epochTime()): int64 = (t * 1_000_000).int64 +const initVersion* = 0.int64 + +method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.async.} = + var + retriesLeft = 100 # allows reasonable concurrency, avoids infinite loop + aux: seq[byte] + + while retriesLeft > 0: + var + currentData: seq[byte] + currentVersion: int64 + + proc onData(s: RawStmtPtr) = + currentData = dataCol(s, GetVersionedStmtDataCol)() + currentVersion = versionCol(s, GetVersionedStmtVersionCol)() + + if err =? self.db.getVersionedStmt.query((key.id), onData).errorOption: + return failure(err) + + let maybeCurrentData = if currentData.len > 0: some(currentData) else: seq[byte].none + var maybeNewData: ?seq[byte] + + try: + (maybeNewData, aux) = await fn(maybeCurrentData) + except CatchableError as err: + return failure(err) + + if maybeCurrentData == maybeNewData: + # no need to change currently stored value + break + + if err =? self.db.beginStmt.exec().errorOption: + return failure(err) + if currentData =? maybeCurrentData and newData =? maybeNewData: + let updateParams = ( + newData, + currentVersion + 1, + timestamp(), + key.id, + currentVersion + ) + if err =? (self.db.updateVersionedStmt.exec(updateParams)).errorOption: + return failure(err) + elif currentData =? maybeCurrentData: + let deleteParams = ( + key.id, + currentVersion + ) + if err =? (self.db.deleteVersionedStmt.exec(deleteParams)).errorOption: + return failure(err) + elif newData =? maybeNewData: + let insertParams = ( + key.id, + newData, + initVersion, + timestamp() + ) + if err =? (self.db.insertVersionedStmt.exec(insertParams)).errorOption: + return failure(err) + + var changes = 0.int64 + proc onChangesResult(s: RawStmtPtr) = + changes = changesCol(s, 0)() + + if err =? self.db.getChangesStmt.query((), onChangesResult).errorOption: + if err =? self.db.rollbackStmt.exec().errorOption: + return failure(err) + return failure(err) + + if changes == 1: + if err =? self.db.endStmt.exec().errorOption: + return failure(err) + break + elif changes == 0: + # race condition detected + if err =? self.db.rollbackStmt.exec().errorOption: + return failure(err) + retriesLeft.dec + else: + if err =? self.db.rollbackStmt.exec().errorOption: + return failure(err) + return failure("Unexpected number of changes, expected either 0 or 1, was " & $changes) + + if retriesLeft == 0: + return failure("Retry limit exceeded") + + return success(aux) + + +method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.async.} = + proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} = + let res = await fn(maybeValue) + let ignoredAux = newSeq[byte]() + return (res, ignoredAux) + + if err =? (await self.modifyGet(key, wrappedFn)).errorOption: + return failure(err) + else: + return success() + method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} = var exists = false @@ -81,14 +181,14 @@ method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} = return success bytes method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} = - return self.db.putStmt.exec((key.id, data, timestamp())) + return self.db.putStmt.exec((key.id, data, initVersion, timestamp())) method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} = if err =? self.db.beginStmt.exec().errorOption: return failure err for entry in batch: - if err =? self.db.putStmt.exec((entry.key.id, entry.data, timestamp())).errorOption: + if err =? self.db.putStmt.exec((entry.key.id, entry.data, initVersion, timestamp())).errorOption: if err =? self.db.rollbackStmt.exec().errorOption: return failure err diff --git a/datastore/sql/sqlitedsdb.nim b/datastore/sql/sqlitedsdb.nim index 503dea4..f35e087 100644 --- a/datastore/sql/sqlitedsdb.nim +++ b/datastore/sql/sqlitedsdb.nim @@ -1,4 +1,5 @@ import std/os +import std/strformat import pkg/questionable import pkg/questionable/results @@ -10,6 +11,7 @@ export sqliteutils type BoundIdCol* = proc (): string {.closure, gcsafe, upraises: [].} + BoundVersionCol* = proc (): int64 {.closure, gcsafe, upraises: [].} BoundDataCol* = proc (): seq[byte] {.closure, gcsafe, upraises: [].} BoundTimestampCol* = proc (): int64 {.closure, gcsafe, upraises: [].} @@ -19,8 +21,13 @@ type ContainsStmt* = SQLiteStmt[(string), void] DeleteStmt* = SQLiteStmt[(string), void] GetStmt* = SQLiteStmt[(string), void] - PutStmt* = SQLiteStmt[(string, seq[byte], int64), void] + PutStmt* = SQLiteStmt[(string, seq[byte], int64, int64), void] QueryStmt* = SQLiteStmt[(string), void] + GetVersionedStmt* = SQLiteStmt[(string), void] + InsertVersionedStmt* = SQLiteStmt[(string, seq[byte], int64, int64), void] + UpdateVersionedStmt* = SQLiteStmt[(seq[byte], int64, int64, string, int64), void] + DeleteVersionedStmt* = SQLiteStmt[(string, int64), void] + GetChangesStmt* = NoParamsStmt BeginStmt* = NoParamsStmt EndStmt* = NoParamsStmt RollbackStmt* = NoParamsStmt @@ -34,6 +41,11 @@ type getDataCol*: BoundDataCol getStmt*: GetStmt putStmt*: PutStmt + getVersionedStmt*: GetVersionedStmt + updateVersionedStmt*: UpdateVersionedStmt + insertVersionedStmt*: InsertVersionedStmt + deleteVersionedStmt*: DeleteVersionedStmt + getChangesStmt*: GetChangesStmt beginStmt*: BeginStmt endStmt*: EndStmt rollbackStmt*: RollbackStmt @@ -44,10 +56,12 @@ const IdColName* = "id" DataColName* = "data" + VersionColName* = "version" TimestampColName* = "timestamp" IdColType = "TEXT" DataColType = "BLOB" + VersionColType = "INTEGER" TimestampColType = "INTEGER" Memory* = ":memory:" @@ -69,6 +83,7 @@ const CREATE TABLE IF NOT EXISTS """ & TableName & """ ( """ & IdColName & """ """ & IdColType & """ NOT NULL PRIMARY KEY, """ & DataColName & """ """ & DataColType & """, + """ & VersionColName & """ """ & VersionColType & """ NOT NULL, """ & TimestampColName & """ """ & TimestampColType & """ NOT NULL ) WITHOUT ROWID; """ @@ -89,8 +104,9 @@ const REPLACE INTO """ & TableName & """ ( """ & IdColName & """, """ & DataColName & """, + """ & VersionColName & """, """ & TimestampColName & """ - ) VALUES (?, ?, ?) + ) VALUES (?, ?, ?, ?) """ QueryStmtIdStr* = """ @@ -119,6 +135,43 @@ const ORDER BY """ & IdColName & """ DESC """ + GetVersionedStmtStr* = fmt""" + SELECT {DataColName}, {VersionColName} FROM {TableName} + WHERE {IdColName} = ? + """ + + GetVersionedStmtDataCol* = 0 + GetVersionedStmtVersionCol* = 1 + + InsertVersionedStmtStr* = fmt""" + INSERT INTO {TableName} + ( + {IdColName}, + {DataColName}, + {VersionColName}, + {TimestampColName} + ) + VALUES (?, ?, ?, ?) + """ + + UpdateVersionedStmtStr* = fmt""" + UPDATE {TableName} + SET + {DataColName} = ?, + {VersionColName} = ?, + {TimestampColName} = ? + WHERE {IdColName} = ? AND {VersionColName} = ? + """ + + DeleteVersionedStmtStr* = fmt""" + DELETE FROM {TableName} + WHERE {IdColName} = ? AND {VersionColName} = ? + """ + + GetChangesStmtStr* = fmt""" + SELECT changes() + """ + BeginTransactionStr* = """ BEGIN; """ @@ -197,6 +250,21 @@ proc timestampCol*( return proc (): int64 = sqlite3_column_int64(s, index.cint) +proc versionCol*( + s: RawStmtPtr, + index: int): BoundVersionCol = + + checkColMetadata(s, index, VersionColName) + + return proc (): int64 = + sqlite3_column_int64(s, index.cint) + +proc changesCol*( + s: RawStmtPtr, + index: int): BoundVersionCol = + return proc (): int64 = + sqlite3_column_int64(s, index.cint) + proc getDBFilePath*(path: string): ?!string = try: let @@ -217,6 +285,11 @@ proc close*(self: SQLiteDsDb) = self.beginStmt.dispose self.endStmt.dispose self.rollbackStmt.dispose + self.getVersionedStmt.dispose + self.updateVersionedStmt.dispose + self.insertVersionedStmt.dispose + self.deleteVersionedStmt.dispose + self.getChangesStmt.dispose if not RawStmtPtr(self.deleteStmt).isNil: self.deleteStmt.dispose @@ -266,6 +339,11 @@ proc open*( deleteStmt: DeleteStmt getStmt: GetStmt putStmt: PutStmt + getVersionedStmt: GetVersionedStmt + updateVersionedStmt: UpdateVersionedStmt + insertVersionedStmt: InsertVersionedStmt + deleteVersionedStmt: DeleteVersionedStmt + getChangesStmt: GetChangesStmt beginStmt: BeginStmt endStmt: EndStmt rollbackStmt: RollbackStmt @@ -279,6 +357,18 @@ proc open*( putStmt = ? PutStmt.prepare( env.val, PutStmtStr, SQLITE_PREPARE_PERSISTENT) + insertVersionedStmt = ? InsertVersionedStmt.prepare( + env.val, InsertVersionedStmtStr, SQLITE_PREPARE_PERSISTENT) + + updateVersionedStmt = ? UpdateVersionedStmt.prepare( + env.val, UpdateVersionedStmtStr, SQLITE_PREPARE_PERSISTENT) + + deleteVersionedStmt = ? DeleteVersionedStmt.prepare( + env.val, DeleteVersionedStmtStr, SQLITE_PREPARE_PERSISTENT) + + getChangesStmt = ? GetChangesStmt.prepare( + env.val, GetChangesStmtStr, SQLITE_PREPARE_PERSISTENT) + beginStmt = ? BeginStmt.prepare( env.val, BeginTransactionStr, SQLITE_PREPARE_PERSISTENT) @@ -294,6 +384,9 @@ proc open*( getStmt = ? GetStmt.prepare( env.val, GetStmtStr, SQLITE_PREPARE_PERSISTENT) + getVersionedStmt = ? GetVersionedStmt.prepare( + env.val, GetVersionedStmtStr, SQLITE_PREPARE_PERSISTENT) + # if a readOnly/existing database does not satisfy the expected schema # `pepare()` will fail and `new` will return an error with message # "SQL logic error" @@ -310,6 +403,11 @@ proc open*( getStmt: getStmt, getDataCol: getDataCol, putStmt: putStmt, + getVersionedStmt: getVersionedStmt, + updateVersionedStmt: updateVersionedStmt, + insertVersionedStmt: insertVersionedStmt, + deleteVersionedStmt: deleteVersionedStmt, + getChangesStmt: getChangesStmt, beginStmt: beginStmt, endStmt: endStmt, rollbackStmt: rollbackStmt) diff --git a/datastore/tieredds.nim b/datastore/tieredds.nim index 4eca23b..0bc0044 100644 --- a/datastore/tieredds.nim +++ b/datastore/tieredds.nim @@ -116,6 +116,37 @@ method put*( return success() +method modifyGet*( + self: TieredDatastore, + key: Key, + fn: ModifyGet): Future[?!seq[byte]] {.async.} = + + let + pending = await allFinished(self.stores.mapIt(it.modifyGet(key, fn))) + + var aux = newSeq[byte]() + + for fut in pending: + if fut.read().isErr: + return fut.read() + else: + aux.add(fut.read().get) + + return success(aux) + +method modify*( + self: TieredDatastore, + key: Key, + fn: Modify): Future[?!void] {.async.} = + + let + pending = await allFinished(self.stores.mapIt(it.modify(key, fn))) + + for fut in pending: + if fut.read().isErr: return fut.read() + + return success() + # method query*( # self: TieredDatastore, # query: ...): Future[?!(?...)] {.async.} = diff --git a/tests/datastore/modifycommontests.nim b/tests/datastore/modifycommontests.nim new file mode 100644 index 0000000..b7f4871 --- /dev/null +++ b/tests/datastore/modifycommontests.nim @@ -0,0 +1,130 @@ +import std/options +import std/sugar +import std/random +import std/sequtils +import std/strutils + +import pkg/asynctest +import pkg/chronos +import pkg/stew/endians2 +import pkg/questionable +import pkg/questionable/results + +import pkg/datastore + +proc modifyTests*( + ds: Datastore, + key: Key, + multiAux: bool = false) = + + randomize() + + let + processCount = 100 + timeout = (1 + processCount div 10).seconds + + proc withRandDelay(op: Future[?!void]): Future[void] {.async: (raises: [Exception]).} = + await sleepAsync(rand(processCount).millis) + + let errMsg = (await op).errorOption.map((err) => err.msg) + + require none(string) == errMsg + + proc incAsyncFn(maybeBytes: ?seq[byte]): Future[?seq[byte]] {.async.} = + await sleepAsync(2.millis) # allows interleaving + if bytes =? maybeBytes: + let value = uint64.fromBytes(bytes) + return some(@((value + 1).toBytes)) + else: + return seq[byte].none + + test "unsafe increment - demo": + # this test demonstrates non synchronized read-modify-write sequence + (await ds.put(key, @(0.uint64.toBytes))).tryGet + + proc getIncAndPut(): Future[?!void] {.async.} = + without bytes =? (await ds.get(key)), err: + return failure(err) + + let value = uint64.fromBytes(bytes) + await sleepAsync(2.millis) # allows interleaving + + if err =? (await ds.put(key, @((value + 1).toBytes))).errorOption: + return failure(err) + else: + return success() + + let futs = newSeqWith(processCount, withRandDelay(getIncAndPut())) + await allFutures(futs).wait(timeout) + + let finalValue = uint64.fromBytes((await ds.get(key)).tryGet) + + check finalValue.int < processCount + + test "safe increment": + (await ds.put(key, @(0.uint64.toBytes))).tryGet + + let futs = newSeqWith(processCount, withRandDelay(ds.modify(key, incAsyncFn))) + await allFutures(futs).wait(timeout) + + let finalValue = uint64.fromBytes((await ds.get(key)).tryGet) + + check finalValue.int == processCount + + test "should update value": + (await ds.put(key, @((0.uint64).toBytes))).tryGet + + (await ds.modify(key, incAsyncFn)).tryGet + + let finalValue = uint64.fromBytes((await ds.get(key)).tryGet) + + check finalValue.int == 1 + + test "should put value": + (await ds.delete(key)).tryGet() + + proc returningSomeValue(_: ?seq[byte]): Future[?seq[byte]] {.async.} = + return @(123.uint64.toBytes).some + + (await ds.modify(key, returningSomeValue)).tryGet + + let finalValue = uint64.fromBytes((await ds.get(key)).tryGet) + + check finalValue.int == 123 + + test "should delete value": + (await ds.put(key, @(0.uint64.toBytes))).tryGet + + proc returningNone(_: ?seq[byte]): Future[?seq[byte]] {.async.} = + return seq[byte].none + + (await ds.modify(key, returningNone)).tryGet + + let hasKey = (await ds.has(key)).tryGet + + check not hasKey + + test "should return correct auxillary value": + proc returningAux(_: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} = + return (seq[byte].none, @[byte 123]) + + let res = await ds.modifyGet(key, returningAux) + + if multiAux: + check: + res.errorOption.map((err) => err.msg) == none(string) + for b in res |? @[]: + check: + b == 123.byte + else: + check: + res == success(@[byte 123]) + + test "should propagate exception as failure": + proc throwing(a: ?seq[byte]): Future[?seq[byte]] {.async.} = + raise newException(CatchableError, "some error msg") + + let res = await ds.modify(key, throwing) + + check: + res.errorOption.map((err) => err.msg) == some("some error msg") diff --git a/tests/datastore/sql/testsqliteds.nim b/tests/datastore/sql/testsqliteds.nim index c629eb0..778d7f1 100644 --- a/tests/datastore/sql/testsqliteds.nim +++ b/tests/datastore/sql/testsqliteds.nim @@ -11,6 +11,7 @@ import pkg/stew/byteutils import pkg/datastore/sql/sqliteds import ../dscommontests +import ../modifycommontests import ../querycommontests suite "Test Basic SQLiteDatastore": @@ -24,6 +25,7 @@ suite "Test Basic SQLiteDatastore": (await ds.close()).tryGet() basicStoreTests(ds, key, bytes, otherBytes) + modifyTests(ds, key) suite "Test Read Only SQLiteDatastore": let diff --git a/tests/datastore/sql/testsqlitedsdb.nim b/tests/datastore/sql/testsqlitedsdb.nim index d104933..b6ff105 100644 --- a/tests/datastore/sql/testsqlitedsdb.nim +++ b/tests/datastore/sql/testsqlitedsdb.nim @@ -106,9 +106,9 @@ suite "Test SQLite Datastore DB operations": test "Should insert key": check: - readOnlyDb.putStmt.exec((key.id, data, timestamp())).isErr() + readOnlyDb.putStmt.exec((key.id, data, initVersion, timestamp())).isErr() - dsDb.putStmt.exec((key.id, data, timestamp())).tryGet() + dsDb.putStmt.exec((key.id, data, initVersion, timestamp())).tryGet() test "Should select key": let @@ -124,9 +124,9 @@ suite "Test SQLite Datastore DB operations": test "Should update key": check: - readOnlyDb.putStmt.exec((key.id, otherData, timestamp())).isErr() + readOnlyDb.putStmt.exec((key.id, otherData, initVersion, timestamp())).isErr() - dsDb.putStmt.exec((key.id, otherData, timestamp())).tryGet() + dsDb.putStmt.exec((key.id, otherData, initVersion, timestamp())).tryGet() test "Should select updated key": let diff --git a/tests/datastore/testfsds.nim b/tests/datastore/testfsds.nim index 2633677..af9adfc 100644 --- a/tests/datastore/testfsds.nim +++ b/tests/datastore/testfsds.nim @@ -11,6 +11,7 @@ import pkg/stew/byteutils import pkg/datastore/fsds import ./dscommontests +import ./modifycommontests import ./querycommontests suite "Test Basic FSDatastore": @@ -37,6 +38,7 @@ suite "Test Basic FSDatastore": require(not dirExists(basePathAbs)) basicStoreTests(fsStore, key, bytes, otherBytes) + modifyTests(fsStore, key) suite "Test Misc FSDatastore": let diff --git a/tests/datastore/testmountedds.nim b/tests/datastore/testmountedds.nim index fbe0dfd..15f91f3 100644 --- a/tests/datastore/testmountedds.nim +++ b/tests/datastore/testmountedds.nim @@ -12,6 +12,7 @@ import pkg/datastore/sql import pkg/datastore/fsds import ./dscommontests +import ./modifycommontests suite "Test Basic Mounted Datastore": let @@ -50,10 +51,12 @@ suite "Test Basic Mounted Datastore": suite "Mounted sql": let namespace = Key.init(sqlKey, key).tryGet basicStoreTests(mountedDs, namespace, bytes, otherBytes) + modifyTests(mountedDs, namespace) suite "Mounted fs": let namespace = Key.init(fsKey, key).tryGet basicStoreTests(mountedDs, namespace, bytes, otherBytes) + modifyTests(mountedDs, namespace) suite "Test Mounted Datastore": diff --git a/tests/datastore/testtieredds.nim b/tests/datastore/testtieredds.nim index 4ea76a7..272c852 100644 --- a/tests/datastore/testtieredds.nim +++ b/tests/datastore/testtieredds.nim @@ -10,6 +10,7 @@ import pkg/datastore/fsds import pkg/datastore/sql import pkg/datastore/tieredds +import ./modifycommontests import ./dscommontests suite "Test Basic Tired Datastore": @@ -40,6 +41,7 @@ suite "Test Basic Tired Datastore": require(not dirExists(rootAbs)) basicStoreTests(tiredDs, key, bytes, otherBytes) + modifyTests(tiredDs, key, multiAux = true) suite "TieredDatastore": # assumes tests/test_all is run from project root, e.g. with `nimble test`