From 52bbe9d429a355e8e6f88a2650e750ad20c71c37 Mon Sep 17 00:00:00 2001 From: Tomasz Bekas Date: Wed, 15 Nov 2023 13:42:37 +0100 Subject: [PATCH 01/16] Concurrent datastore interface & sqlite implementation --- datastore.nim | 3 +- datastore/concurrentds.nim | 50 ++++++++++++ datastore/sql/sqliteds.nim | 103 +++++++++++++++++++++++-- datastore/sql/sqlitedsdb.nim | 102 +++++++++++++++++++++++- datastore/types.nim | 1 + tests/datastore/concurrentdstests.nim | 96 +++++++++++++++++++++++ tests/datastore/sql/testsqliteds.nim | 2 + tests/datastore/sql/testsqlitedsdb.nim | 8 +- 8 files changed, 353 insertions(+), 12 deletions(-) create mode 100644 datastore/concurrentds.nim create mode 100644 tests/datastore/concurrentdstests.nim diff --git a/datastore.nim b/datastore.nim index 6d43a20..deac2a6 100644 --- a/datastore.nim +++ b/datastore.nim @@ -1,7 +1,8 @@ import ./datastore/datastore +import ./datastore/concurrentds import ./datastore/fsds import ./datastore/sql import ./datastore/mountedds import ./datastore/tieredds -export datastore, fsds, mountedds, tieredds, sql +export datastore, concurrentds, fsds, mountedds, tieredds, sql diff --git a/datastore/concurrentds.nim b/datastore/concurrentds.nim new file mode 100644 index 0000000..da4170c --- /dev/null +++ b/datastore/concurrentds.nim @@ -0,0 +1,50 @@ +import pkg/chronos +import pkg/questionable +import pkg/questionable/results +import pkg/upraises + +import ./key +import ./query +import ./types +import ./datastore + +export key, query, types, datastore + +push: {.upraises: [].} + +type + Function*[T, U] = proc(value: T): U {.upraises: [], gcsafe, closure.} + Modify* = Function[?seq[byte], ?seq[byte]] + ModifyAsync* = Function[?seq[byte], Future[?seq[byte]]] + +method modify*(self: ConcurrentDatastore, key: Key, fn: Modify): Future[?!void] {.base, locks: "unknown".} = + ## Concurrently safe way of modifying the value associated with the `key`. + ## + ## This method first reads a value stored under the `key`, if such value exists it's wrapped into `some` + ## and passed as the only arg to the `fn`, otherwise `none` is passed. + ## + ## When `fn` returns `some`, returned value is put into the store, but only if it's different than + ## the existing value, otherwise nothing happens. + ## When `fn` returns `none` existing value is deleted from the store, if no value existed before + ## nothing happens. + ## + ## Note that `fn` can be called multiple times (when concurrent modify of the value was detected). + ## + + raiseAssert("Not implemented!") + +method modify*(self: ConcurrentDatastore, key: Key, fn: ModifyAsync): Future[?!void] {.base, locks: "unknown".} = + ## Concurrently safe way of modifying the value associated with the `key`. + ## + ## This method first reads a value stored under the `key`, if such value exists it's wrapped into `some` + ## and passed as the only arg to the `fn`, otherwise `none` is passed. + ## + ## When `fn` returns `some`, returned value is put into the store, but only if it's different than + ## the existing value, otherwise nothing happens. + ## When `fn` returns `none` existing value is deleted from the store, if no value existed before + ## nothing happens. + ## + ## Note that `fn` can be called multiple times (when concurrent modify of the value was detected). + ## + + raiseAssert("Not implemented!") diff --git a/datastore/sql/sqliteds.nim b/datastore/sql/sqliteds.nim index aa63274..a5b0b61 100644 --- a/datastore/sql/sqliteds.nim +++ b/datastore/sql/sqliteds.nim @@ -8,15 +8,15 @@ import pkg/sqlite3_abi from pkg/stew/results as stewResults import isErr import pkg/upraises -import ../datastore +import ../concurrentds import ./sqlitedsdb -export datastore, sqlitedsdb +export concurrentds, sqlitedsdb push: {.upraises: [].} type - SQLiteDatastore* = ref object of Datastore + SQLiteDatastore* = ref object of ConcurrentDatastore readOnly: bool db: SQLiteDsDb @@ -29,6 +29,99 @@ proc `readOnly=`*(self: SQLiteDatastore): bool proc timestamp*(t = epochTime()): int64 = (t * 1_000_000).int64 +const initVersion* = 0.int64 + +method modify*(self: SQLiteDatastore, key: Key, fn: ModifyAsync): Future[?!void] {.async.} = + var + retriesLeft = 100 # allows reasonable concurrency, avoids infinite loop + + while retriesLeft > 0: + var + currentData: seq[byte] + currentVersion: int64 + + proc onData(s: RawStmtPtr) = + currentData = dataCol(s, GetVersionedStmtDataCol)() + currentVersion = versionCol(s, GetVersionedStmtVersionCol)() + + if err =? self.db.getVersionedStmt.query((key.id), onData).errorOption: + return failure(err) + + let maybeCurrentData = if currentData.len > 0: some(currentData) else: seq[byte].none + var maybeNewData: ?seq[byte] + + try: + maybeNewData = await fn(maybeCurrentData) + except CatchableError as err: + return failure("Error running modify function: " & err.msg) + + if maybeCurrentData == maybeNewData: + # no need to change any stored value + break; + + if err =? self.db.beginStmt.exec().errorOption: + return failure(err) + if currentData =? maybeCurrentData and newData =? maybeNewData: + let updateParams = ( + newData, + currentVersion + 1, + timestamp(), + key.id, + currentVersion + ) + if err =? (self.db.updateVersionedStmt.exec(updateParams)).errorOption: + return failure(err) + elif currentData =? maybeCurrentData: + let deleteParams = ( + key.id, + currentVersion + ) + if err =? (self.db.deleteVersionedStmt.exec(deleteParams)).errorOption: + return failure(err) + elif newData =? maybeNewData: + let insertParams = ( + key.id, + newData, + initVersion, + timestamp() + ) + if err =? (self.db.insertVersionedStmt.exec(insertParams)).errorOption: + return failure(err) + + var changes = 0.int64 + proc onChangesResult(s: RawStmtPtr) = + changes = changesCol(s, 0)() + + if err =? self.db.getChangesStmt.query((), onChangesResult).errorOption: + if err =? self.db.rollbackStmt.exec().errorOption: + return failure(err) + return failure(err) + + if changes == 1: + if err =? self.db.endStmt.exec().errorOption: + return failure(err) + break + elif changes == 0: + # race condition detected + if err =? self.db.rollbackStmt.exec().errorOption: + return failure(err) + retriesLeft.dec + else: + if err =? self.db.rollbackStmt.exec().errorOption: + return failure(err) + return failure("Unexpected number of changes, expected either 0 or 1, was " & $changes) + + if retriesLeft == 0: + return failure("Retry limit exceeded") + + return success() + +method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.async.} = + proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte])] {.async.} = + return fn(maybeValue) + + return await self.modify(key, wrappedFn) + method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} = var exists = false @@ -81,14 +174,14 @@ method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} = return success bytes method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} = - return self.db.putStmt.exec((key.id, data, timestamp())) + return self.db.putStmt.exec((key.id, data, initVersion, timestamp())) method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} = if err =? self.db.beginStmt.exec().errorOption: return failure err for entry in batch: - if err =? self.db.putStmt.exec((entry.key.id, entry.data, timestamp())).errorOption: + if err =? self.db.putStmt.exec((entry.key.id, entry.data, initVersion, timestamp())).errorOption: if err =? self.db.rollbackStmt.exec().errorOption: return failure err diff --git a/datastore/sql/sqlitedsdb.nim b/datastore/sql/sqlitedsdb.nim index 503dea4..f35e087 100644 --- a/datastore/sql/sqlitedsdb.nim +++ b/datastore/sql/sqlitedsdb.nim @@ -1,4 +1,5 @@ import std/os +import std/strformat import pkg/questionable import pkg/questionable/results @@ -10,6 +11,7 @@ export sqliteutils type BoundIdCol* = proc (): string {.closure, gcsafe, upraises: [].} + BoundVersionCol* = proc (): int64 {.closure, gcsafe, upraises: [].} BoundDataCol* = proc (): seq[byte] {.closure, gcsafe, upraises: [].} BoundTimestampCol* = proc (): int64 {.closure, gcsafe, upraises: [].} @@ -19,8 +21,13 @@ type ContainsStmt* = SQLiteStmt[(string), void] DeleteStmt* = SQLiteStmt[(string), void] GetStmt* = SQLiteStmt[(string), void] - PutStmt* = SQLiteStmt[(string, seq[byte], int64), void] + PutStmt* = SQLiteStmt[(string, seq[byte], int64, int64), void] QueryStmt* = SQLiteStmt[(string), void] + GetVersionedStmt* = SQLiteStmt[(string), void] + InsertVersionedStmt* = SQLiteStmt[(string, seq[byte], int64, int64), void] + UpdateVersionedStmt* = SQLiteStmt[(seq[byte], int64, int64, string, int64), void] + DeleteVersionedStmt* = SQLiteStmt[(string, int64), void] + GetChangesStmt* = NoParamsStmt BeginStmt* = NoParamsStmt EndStmt* = NoParamsStmt RollbackStmt* = NoParamsStmt @@ -34,6 +41,11 @@ type getDataCol*: BoundDataCol getStmt*: GetStmt putStmt*: PutStmt + getVersionedStmt*: GetVersionedStmt + updateVersionedStmt*: UpdateVersionedStmt + insertVersionedStmt*: InsertVersionedStmt + deleteVersionedStmt*: DeleteVersionedStmt + getChangesStmt*: GetChangesStmt beginStmt*: BeginStmt endStmt*: EndStmt rollbackStmt*: RollbackStmt @@ -44,10 +56,12 @@ const IdColName* = "id" DataColName* = "data" + VersionColName* = "version" TimestampColName* = "timestamp" IdColType = "TEXT" DataColType = "BLOB" + VersionColType = "INTEGER" TimestampColType = "INTEGER" Memory* = ":memory:" @@ -69,6 +83,7 @@ const CREATE TABLE IF NOT EXISTS """ & TableName & """ ( """ & IdColName & """ """ & IdColType & """ NOT NULL PRIMARY KEY, """ & DataColName & """ """ & DataColType & """, + """ & VersionColName & """ """ & VersionColType & """ NOT NULL, """ & TimestampColName & """ """ & TimestampColType & """ NOT NULL ) WITHOUT ROWID; """ @@ -89,8 +104,9 @@ const REPLACE INTO """ & TableName & """ ( """ & IdColName & """, """ & DataColName & """, + """ & VersionColName & """, """ & TimestampColName & """ - ) VALUES (?, ?, ?) + ) VALUES (?, ?, ?, ?) """ QueryStmtIdStr* = """ @@ -119,6 +135,43 @@ const ORDER BY """ & IdColName & """ DESC """ + GetVersionedStmtStr* = fmt""" + SELECT {DataColName}, {VersionColName} FROM {TableName} + WHERE {IdColName} = ? + """ + + GetVersionedStmtDataCol* = 0 + GetVersionedStmtVersionCol* = 1 + + InsertVersionedStmtStr* = fmt""" + INSERT INTO {TableName} + ( + {IdColName}, + {DataColName}, + {VersionColName}, + {TimestampColName} + ) + VALUES (?, ?, ?, ?) + """ + + UpdateVersionedStmtStr* = fmt""" + UPDATE {TableName} + SET + {DataColName} = ?, + {VersionColName} = ?, + {TimestampColName} = ? + WHERE {IdColName} = ? AND {VersionColName} = ? + """ + + DeleteVersionedStmtStr* = fmt""" + DELETE FROM {TableName} + WHERE {IdColName} = ? AND {VersionColName} = ? + """ + + GetChangesStmtStr* = fmt""" + SELECT changes() + """ + BeginTransactionStr* = """ BEGIN; """ @@ -197,6 +250,21 @@ proc timestampCol*( return proc (): int64 = sqlite3_column_int64(s, index.cint) +proc versionCol*( + s: RawStmtPtr, + index: int): BoundVersionCol = + + checkColMetadata(s, index, VersionColName) + + return proc (): int64 = + sqlite3_column_int64(s, index.cint) + +proc changesCol*( + s: RawStmtPtr, + index: int): BoundVersionCol = + return proc (): int64 = + sqlite3_column_int64(s, index.cint) + proc getDBFilePath*(path: string): ?!string = try: let @@ -217,6 +285,11 @@ proc close*(self: SQLiteDsDb) = self.beginStmt.dispose self.endStmt.dispose self.rollbackStmt.dispose + self.getVersionedStmt.dispose + self.updateVersionedStmt.dispose + self.insertVersionedStmt.dispose + self.deleteVersionedStmt.dispose + self.getChangesStmt.dispose if not RawStmtPtr(self.deleteStmt).isNil: self.deleteStmt.dispose @@ -266,6 +339,11 @@ proc open*( deleteStmt: DeleteStmt getStmt: GetStmt putStmt: PutStmt + getVersionedStmt: GetVersionedStmt + updateVersionedStmt: UpdateVersionedStmt + insertVersionedStmt: InsertVersionedStmt + deleteVersionedStmt: DeleteVersionedStmt + getChangesStmt: GetChangesStmt beginStmt: BeginStmt endStmt: EndStmt rollbackStmt: RollbackStmt @@ -279,6 +357,18 @@ proc open*( putStmt = ? PutStmt.prepare( env.val, PutStmtStr, SQLITE_PREPARE_PERSISTENT) + insertVersionedStmt = ? InsertVersionedStmt.prepare( + env.val, InsertVersionedStmtStr, SQLITE_PREPARE_PERSISTENT) + + updateVersionedStmt = ? UpdateVersionedStmt.prepare( + env.val, UpdateVersionedStmtStr, SQLITE_PREPARE_PERSISTENT) + + deleteVersionedStmt = ? DeleteVersionedStmt.prepare( + env.val, DeleteVersionedStmtStr, SQLITE_PREPARE_PERSISTENT) + + getChangesStmt = ? GetChangesStmt.prepare( + env.val, GetChangesStmtStr, SQLITE_PREPARE_PERSISTENT) + beginStmt = ? BeginStmt.prepare( env.val, BeginTransactionStr, SQLITE_PREPARE_PERSISTENT) @@ -294,6 +384,9 @@ proc open*( getStmt = ? GetStmt.prepare( env.val, GetStmtStr, SQLITE_PREPARE_PERSISTENT) + getVersionedStmt = ? GetVersionedStmt.prepare( + env.val, GetVersionedStmtStr, SQLITE_PREPARE_PERSISTENT) + # if a readOnly/existing database does not satisfy the expected schema # `pepare()` will fail and `new` will return an error with message # "SQL logic error" @@ -310,6 +403,11 @@ proc open*( getStmt: getStmt, getDataCol: getDataCol, putStmt: putStmt, + getVersionedStmt: getVersionedStmt, + updateVersionedStmt: updateVersionedStmt, + insertVersionedStmt: insertVersionedStmt, + deleteVersionedStmt: deleteVersionedStmt, + getChangesStmt: getChangesStmt, beginStmt: beginStmt, endStmt: endStmt, rollbackStmt: rollbackStmt) diff --git a/datastore/types.nim b/datastore/types.nim index b019cdb..9f1385b 100644 --- a/datastore/types.nim +++ b/datastore/types.nim @@ -8,3 +8,4 @@ type DatastoreKeyNotFound* = object of DatastoreError Datastore* = ref object of RootObj + ConcurrentDatastore* = ref object of Datastore diff --git a/tests/datastore/concurrentdstests.nim b/tests/datastore/concurrentdstests.nim new file mode 100644 index 0000000..9900072 --- /dev/null +++ b/tests/datastore/concurrentdstests.nim @@ -0,0 +1,96 @@ +import std/options +import std/sugar +import std/random +import std/sequtils + +import pkg/asynctest +import pkg/chronos +import pkg/stew/endians2 +import pkg/questionable +import pkg/questionable/results + +import pkg/datastore/concurrentds + +proc concurrentStoreTests*( + ds: ConcurrentDatastore, + key: Key) = + + randomize() + + let processCount = 100 + + proc withRandDelay(op: Future[?!void]): Future[void] {.async.} = + await sleepAsync(rand(processCount).millis) + + let errMsg = (await op).errorOption.map((err) => err.msg) + + require none(string) == errMsg + + proc incAsyncFn(maybeBytes: ?seq[byte]): Future[?seq[byte]] {.async.} = + await sleepAsync(2.millis) # allows interleaving + if bytes =? maybeBytes: + let value = uint64.fromBytes(bytes) + return some(@((value + 1).toBytes)) + else: + return seq[byte].none + + test "unsafe increment - demo": + # this test demonstrates non synchronized read-modify-write sequence + (await ds.put(key, @(0.uint64.toBytes))).tryGet + + proc getIncAndPut(): Future[?!void] {.async.} = + without bytes =? (await ds.get(key)), err: + return failure(err) + + let value = uint64.fromBytes(bytes) + await sleepAsync(2.millis) # allows interleaving + + if err =? (await ds.put(key, @((value + 1).toBytes))).errorOption: + return failure(err) + else: + return success() + + let futs = newSeqWith(processCount, withRandDelay(getIncAndPut())) + await allFutures(futs).wait(10.seconds) + + let finalValue = uint64.fromBytes((await ds.get(key)).tryGet) + + check finalValue.int < processCount + + test "safe increment": + (await ds.put(key, @(0.uint64.toBytes))).tryGet + + let futs = newSeqWith(processCount, withRandDelay(ds.modify(key, incAsyncFn))) + await allFutures(futs).wait(10.seconds) + + let finalValue = uint64.fromBytes((await ds.get(key)).tryGet) + + check finalValue.int == processCount + + test "should update value": + (await ds.put(key, @((0.uint64).toBytes))).tryGet + + (await ds.modify(key, incAsyncFn)).tryGet + + let finalValue = uint64.fromBytes((await ds.get(key)).tryGet) + + check finalValue.int == 1 + + test "should put value": + (await ds.delete(key)).tryGet() + + (await ds.modify(key, (_: ?seq[byte]) => @(123.uint64.toBytes).some)).tryGet + + let finalValue = uint64.fromBytes((await ds.get(key)).tryGet) + + check finalValue.int == 123 + + test "should delete value": + let key = Key.init(Key.random).tryGet + (await ds.put(key, @(0.uint64.toBytes))).tryGet + + (await ds.modify(key, (_: ?seq[byte]) => seq[byte].none)).tryGet + + let hasKey = (await ds.has(key)).tryGet + + check not hasKey diff --git a/tests/datastore/sql/testsqliteds.nim b/tests/datastore/sql/testsqliteds.nim index c629eb0..c4cf5be 100644 --- a/tests/datastore/sql/testsqliteds.nim +++ b/tests/datastore/sql/testsqliteds.nim @@ -11,6 +11,7 @@ import pkg/stew/byteutils import pkg/datastore/sql/sqliteds import ../dscommontests +import ../concurrentdstests import ../querycommontests suite "Test Basic SQLiteDatastore": @@ -24,6 +25,7 @@ suite "Test Basic SQLiteDatastore": (await ds.close()).tryGet() basicStoreTests(ds, key, bytes, otherBytes) + concurrentStoreTests(ds, key) suite "Test Read Only SQLiteDatastore": let diff --git a/tests/datastore/sql/testsqlitedsdb.nim b/tests/datastore/sql/testsqlitedsdb.nim index d104933..b6ff105 100644 --- a/tests/datastore/sql/testsqlitedsdb.nim +++ b/tests/datastore/sql/testsqlitedsdb.nim @@ -106,9 +106,9 @@ suite "Test SQLite Datastore DB operations": test "Should insert key": check: - readOnlyDb.putStmt.exec((key.id, data, timestamp())).isErr() + readOnlyDb.putStmt.exec((key.id, data, initVersion, timestamp())).isErr() - dsDb.putStmt.exec((key.id, data, timestamp())).tryGet() + dsDb.putStmt.exec((key.id, data, initVersion, timestamp())).tryGet() test "Should select key": let @@ -124,9 +124,9 @@ suite "Test SQLite Datastore DB operations": test "Should update key": check: - readOnlyDb.putStmt.exec((key.id, otherData, timestamp())).isErr() + readOnlyDb.putStmt.exec((key.id, otherData, initVersion, timestamp())).isErr() - dsDb.putStmt.exec((key.id, otherData, timestamp())).tryGet() + dsDb.putStmt.exec((key.id, otherData, initVersion, timestamp())).tryGet() test "Should select updated key": let From 3770d0d1a02670a5ae46e9ddae99a15c829e7ca1 Mon Sep 17 00:00:00 2001 From: Tomasz Bekas Date: Tue, 21 Nov 2023 13:31:18 +0100 Subject: [PATCH 02/16] Remove semicolon --- datastore/sql/sqliteds.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datastore/sql/sqliteds.nim b/datastore/sql/sqliteds.nim index a5b0b61..41a6a71 100644 --- a/datastore/sql/sqliteds.nim +++ b/datastore/sql/sqliteds.nim @@ -56,8 +56,8 @@ method modify*(self: SQLiteDatastore, key: Key, fn: ModifyAsync): Future[?!void] return failure("Error running modify function: " & err.msg) if maybeCurrentData == maybeNewData: - # no need to change any stored value - break; + # no need to change currently stored value + break if err =? self.db.beginStmt.exec().errorOption: return failure(err) From 4c5ced6633ca555265f3b4df3e4614c21468821c Mon Sep 17 00:00:00 2001 From: Tomasz Bekas Date: Tue, 21 Nov 2023 19:51:23 +0100 Subject: [PATCH 03/16] ModifyGet operations --- datastore/concurrentds.nim | 37 +++++++++++++++++++-------- datastore/sql/sqliteds.nim | 35 ++++++++++++++++++++----- tests/datastore/concurrentdstests.nim | 34 +++++++++++++++++++++--- 3 files changed, 86 insertions(+), 20 deletions(-) diff --git a/datastore/concurrentds.nim b/datastore/concurrentds.nim index da4170c..a0f96c4 100644 --- a/datastore/concurrentds.nim +++ b/datastore/concurrentds.nim @@ -13,22 +13,17 @@ export key, query, types, datastore push: {.upraises: [].} type - Function*[T, U] = proc(value: T): U {.upraises: [], gcsafe, closure.} + Function*[T, U] = proc(value: T): U {.upraises: [CatchableError], gcsafe, closure.} Modify* = Function[?seq[byte], ?seq[byte]] + ModifyGet* = Function[?seq[byte], (?seq[byte], seq[byte])] ModifyAsync* = Function[?seq[byte], Future[?seq[byte]]] + ModifyGetAsync* = Function[?seq[byte], Future[(?seq[byte], seq[byte])]] method modify*(self: ConcurrentDatastore, key: Key, fn: Modify): Future[?!void] {.base, locks: "unknown".} = ## Concurrently safe way of modifying the value associated with the `key`. ## - ## This method first reads a value stored under the `key`, if such value exists it's wrapped into `some` - ## and passed as the only arg to the `fn`, otherwise `none` is passed. - ## - ## When `fn` returns `some`, returned value is put into the store, but only if it's different than - ## the existing value, otherwise nothing happens. - ## When `fn` returns `none` existing value is deleted from the store, if no value existed before - ## nothing happens. - ## - ## Note that `fn` can be called multiple times (when concurrent modify of the value was detected). + ## Same as `modifyGet` with `fn: ModifyGetAsync` argument, but this takes non-async `fn` that doesn't + ## produce any auxillary value. ## raiseAssert("Not implemented!") @@ -36,6 +31,25 @@ method modify*(self: ConcurrentDatastore, key: Key, fn: Modify): Future[?!void] method modify*(self: ConcurrentDatastore, key: Key, fn: ModifyAsync): Future[?!void] {.base, locks: "unknown".} = ## Concurrently safe way of modifying the value associated with the `key`. ## + ## Same as `modifyGet` with `fn: ModifyGetAsync` argument, but this takes `fn` that doesn't produce + ## any auxillary value. + ## + + raiseAssert("Not implemented!") + +method modifyGet*(self: ConcurrentDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, locks: "unknown".} = + ## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on + ## successful update. + ## + ## Same as `modifyGet` with `fn: ModifyGetAsync` argument, but this takes non-async `fn`. + ## + + raiseAssert("Not implemented!") + +method modifyGet*(self: ConcurrentDatastore, key: Key, fn: ModifyGetAsync): Future[?!seq[byte]] {.base, locks: "unknown".} = + ## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on + ## successful update. + ## ## This method first reads a value stored under the `key`, if such value exists it's wrapped into `some` ## and passed as the only arg to the `fn`, otherwise `none` is passed. ## @@ -44,7 +58,8 @@ method modify*(self: ConcurrentDatastore, key: Key, fn: ModifyAsync): Future[?!v ## When `fn` returns `none` existing value is deleted from the store, if no value existed before ## nothing happens. ## - ## Note that `fn` can be called multiple times (when concurrent modify of the value was detected). + ## Note that `fn` can be called multiple times (when concurrent modify of the value was detected). Only the + ## last auxillary value is returned. ## raiseAssert("Not implemented!") diff --git a/datastore/sql/sqliteds.nim b/datastore/sql/sqliteds.nim index 41a6a71..cb48e95 100644 --- a/datastore/sql/sqliteds.nim +++ b/datastore/sql/sqliteds.nim @@ -31,9 +31,10 @@ proc timestamp*(t = epochTime()): int64 = const initVersion* = 0.int64 -method modify*(self: SQLiteDatastore, key: Key, fn: ModifyAsync): Future[?!void] {.async.} = +method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGetAsync): Future[?!seq[byte]] {.async.} = var retriesLeft = 100 # allows reasonable concurrency, avoids infinite loop + aux: seq[byte] while retriesLeft > 0: var @@ -51,7 +52,7 @@ method modify*(self: SQLiteDatastore, key: Key, fn: ModifyAsync): Future[?!void] var maybeNewData: ?seq[byte] try: - maybeNewData = await fn(maybeCurrentData) + (maybeNewData, aux) = await fn(maybeCurrentData) except CatchableError as err: return failure("Error running modify function: " & err.msg) @@ -114,13 +115,35 @@ method modify*(self: SQLiteDatastore, key: Key, fn: ModifyAsync): Future[?!void] if retriesLeft == 0: return failure("Retry limit exceeded") - return success() + return success(aux) -method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.async.} = - proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte])] {.async.} = +method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.async.} = + proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} = return fn(maybeValue) - return await self.modify(key, wrappedFn) + return await self.modifyGet(key, wrappedFn) + +method modify*(self: SQLiteDatastore, key: Key, fn: ModifyAsync): Future[?!void] {.async.} = + proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} = + let res = await fn(maybeValue) + let ignoredAux = newSeq[byte]() + return (res, ignoredAux) + + if err =? (await self.modifyGet(key, wrappedFn)).errorOption: + return failure(err) + else: + return success() + +method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.async.} = + proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} = + let maybeNewValue = fn(maybeValue) + let ignoredAux = newSeq[byte]() + return (maybeNewValue, ignoredAux) + + if err =? (await self.modifyGet(key, wrappedFn)).errorOption: + return failure(err) + else: + return success() method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} = var diff --git a/tests/datastore/concurrentdstests.nim b/tests/datastore/concurrentdstests.nim index 9900072..11b6a7c 100644 --- a/tests/datastore/concurrentdstests.nim +++ b/tests/datastore/concurrentdstests.nim @@ -2,6 +2,7 @@ import std/options import std/sugar import std/random import std/sequtils +import std/strutils import pkg/asynctest import pkg/chronos @@ -79,18 +80,45 @@ proc concurrentStoreTests*( test "should put value": (await ds.delete(key)).tryGet() - (await ds.modify(key, (_: ?seq[byte]) => @(123.uint64.toBytes).some)).tryGet + proc returningSomeValue(_: ?seq[byte]): ?seq[byte] = + return @(123.uint64.toBytes).some + + (await ds.modify(key, returningSomeValue)).tryGet let finalValue = uint64.fromBytes((await ds.get(key)).tryGet) check finalValue.int == 123 test "should delete value": - let key = Key.init(Key.random).tryGet (await ds.put(key, @(0.uint64.toBytes))).tryGet - (await ds.modify(key, (_: ?seq[byte]) => seq[byte].none)).tryGet + proc returningNone(_: ?seq[byte]): ?seq[byte] = + return seq[byte].none + + (await ds.modify(key, returningNone)).tryGet let hasKey = (await ds.has(key)).tryGet check not hasKey + + test "should return correct auxillary value": + proc returningAux(_: ?seq[byte]): (?seq[byte], seq[byte]) = + return (seq[byte].none, @[byte 123]) + + let result = await ds.modifyGet(key, returningAux) + + check: + result == success(@[byte 123]) + + test "should propagate exception as failure": + proc throwing(a: ?seq[byte]): ?seq[byte] = + raise newException(CatchableError, "some error msg") + + let result = await ds.modify(key, throwing) + + if err =? result.errorOption: + check: + err.msg.contains("some error msg") + else: + # result was not an error + fail() \ No newline at end of file From 3d38850afc59513f482fb5b5e131ac260e4a6ed4 Mon Sep 17 00:00:00 2001 From: Tomasz Bekas Date: Mon, 27 Nov 2023 17:09:49 +0100 Subject: [PATCH 04/16] Update docs --- datastore/concurrentds.nim | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/datastore/concurrentds.nim b/datastore/concurrentds.nim index a0f96c4..d530de4 100644 --- a/datastore/concurrentds.nim +++ b/datastore/concurrentds.nim @@ -53,13 +53,18 @@ method modifyGet*(self: ConcurrentDatastore, key: Key, fn: ModifyGetAsync): Futu ## This method first reads a value stored under the `key`, if such value exists it's wrapped into `some` ## and passed as the only arg to the `fn`, otherwise `none` is passed. ## - ## When `fn` returns `some`, returned value is put into the store, but only if it's different than - ## the existing value, otherwise nothing happens. - ## When `fn` returns `none` existing value is deleted from the store, if no value existed before - ## nothing happens. + ## Table below presents four possibilities of execution. `curr` represents a value passed to `fn`, + ## while `fn(curr)` represents a value returned by calling `fn` (auxillary value is omitted for clarity). ## - ## Note that `fn` can be called multiple times (when concurrent modify of the value was detected). Only the - ## last auxillary value is returned. + ## | curr | fn(curr) | action | + ## |---------|----------|------------------------------| + ## | none | none | no action | + ## | none | some(v) | insert v | + ## | some(u) | none | delete u | + ## | some(u) | some(v) | replace u with v (if u != v) | + ## + ## Note that `fn` can be called multiple times (when concurrent modification was detected). In such case + ## only the last auxillary value is returned. ## raiseAssert("Not implemented!") From f41bd9528e6df49db372c39919a1e4cfeb30b493 Mon Sep 17 00:00:00 2001 From: Tomasz Bekas Date: Mon, 4 Dec 2023 13:39:43 +0100 Subject: [PATCH 05/16] Remove methods taking non-async functions --- datastore/concurrentds.nim | 27 +++------------------------ datastore/sql/sqliteds.nim | 20 ++------------------ tests/datastore/concurrentdstests.nim | 8 ++++---- 3 files changed, 9 insertions(+), 46 deletions(-) diff --git a/datastore/concurrentds.nim b/datastore/concurrentds.nim index d530de4..7f7e9e4 100644 --- a/datastore/concurrentds.nim +++ b/datastore/concurrentds.nim @@ -14,39 +14,18 @@ push: {.upraises: [].} type Function*[T, U] = proc(value: T): U {.upraises: [CatchableError], gcsafe, closure.} - Modify* = Function[?seq[byte], ?seq[byte]] - ModifyGet* = Function[?seq[byte], (?seq[byte], seq[byte])] - ModifyAsync* = Function[?seq[byte], Future[?seq[byte]]] - ModifyGetAsync* = Function[?seq[byte], Future[(?seq[byte], seq[byte])]] + Modify* = Function[?seq[byte], Future[?seq[byte]]] + ModifyGet* = Function[?seq[byte], Future[(?seq[byte], seq[byte])]] method modify*(self: ConcurrentDatastore, key: Key, fn: Modify): Future[?!void] {.base, locks: "unknown".} = ## Concurrently safe way of modifying the value associated with the `key`. ## - ## Same as `modifyGet` with `fn: ModifyGetAsync` argument, but this takes non-async `fn` that doesn't - ## produce any auxillary value. - ## - - raiseAssert("Not implemented!") - -method modify*(self: ConcurrentDatastore, key: Key, fn: ModifyAsync): Future[?!void] {.base, locks: "unknown".} = - ## Concurrently safe way of modifying the value associated with the `key`. - ## - ## Same as `modifyGet` with `fn: ModifyGetAsync` argument, but this takes `fn` that doesn't produce - ## any auxillary value. + ## Same as `modifyGet`, but this takes `fn` that doesn't produce any auxillary value. ## raiseAssert("Not implemented!") method modifyGet*(self: ConcurrentDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, locks: "unknown".} = - ## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on - ## successful update. - ## - ## Same as `modifyGet` with `fn: ModifyGetAsync` argument, but this takes non-async `fn`. - ## - - raiseAssert("Not implemented!") - -method modifyGet*(self: ConcurrentDatastore, key: Key, fn: ModifyGetAsync): Future[?!seq[byte]] {.base, locks: "unknown".} = ## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on ## successful update. ## diff --git a/datastore/sql/sqliteds.nim b/datastore/sql/sqliteds.nim index cb48e95..23445a2 100644 --- a/datastore/sql/sqliteds.nim +++ b/datastore/sql/sqliteds.nim @@ -31,7 +31,7 @@ proc timestamp*(t = epochTime()): int64 = const initVersion* = 0.int64 -method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGetAsync): Future[?!seq[byte]] {.async.} = +method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.async.} = var retriesLeft = 100 # allows reasonable concurrency, avoids infinite loop aux: seq[byte] @@ -117,13 +117,8 @@ method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGetAsync): Future[? return success(aux) -method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.async.} = - proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} = - return fn(maybeValue) - return await self.modifyGet(key, wrappedFn) - -method modify*(self: SQLiteDatastore, key: Key, fn: ModifyAsync): Future[?!void] {.async.} = +method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.async.} = proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} = let res = await fn(maybeValue) let ignoredAux = newSeq[byte]() @@ -134,17 +129,6 @@ method modify*(self: SQLiteDatastore, key: Key, fn: ModifyAsync): Future[?!void] else: return success() -method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.async.} = - proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} = - let maybeNewValue = fn(maybeValue) - let ignoredAux = newSeq[byte]() - return (maybeNewValue, ignoredAux) - - if err =? (await self.modifyGet(key, wrappedFn)).errorOption: - return failure(err) - else: - return success() - method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} = var exists = false diff --git a/tests/datastore/concurrentdstests.nim b/tests/datastore/concurrentdstests.nim index 11b6a7c..13d8f72 100644 --- a/tests/datastore/concurrentdstests.nim +++ b/tests/datastore/concurrentdstests.nim @@ -80,7 +80,7 @@ proc concurrentStoreTests*( test "should put value": (await ds.delete(key)).tryGet() - proc returningSomeValue(_: ?seq[byte]): ?seq[byte] = + proc returningSomeValue(_: ?seq[byte]): Future[?seq[byte]] {.async.} = return @(123.uint64.toBytes).some (await ds.modify(key, returningSomeValue)).tryGet @@ -92,7 +92,7 @@ proc concurrentStoreTests*( test "should delete value": (await ds.put(key, @(0.uint64.toBytes))).tryGet - proc returningNone(_: ?seq[byte]): ?seq[byte] = + proc returningNone(_: ?seq[byte]): Future[?seq[byte]] {.async.} = return seq[byte].none (await ds.modify(key, returningNone)).tryGet @@ -102,7 +102,7 @@ proc concurrentStoreTests*( check not hasKey test "should return correct auxillary value": - proc returningAux(_: ?seq[byte]): (?seq[byte], seq[byte]) = + proc returningAux(_: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} = return (seq[byte].none, @[byte 123]) let result = await ds.modifyGet(key, returningAux) @@ -111,7 +111,7 @@ proc concurrentStoreTests*( result == success(@[byte 123]) test "should propagate exception as failure": - proc throwing(a: ?seq[byte]): ?seq[byte] = + proc throwing(a: ?seq[byte]): Future[?seq[byte]] {.async.} = raise newException(CatchableError, "some error msg") let result = await ds.modify(key, throwing) From a62c91bb6b5e57c827b4795d604cb4d70e6f6b45 Mon Sep 17 00:00:00 2001 From: Tomasz Bekas Date: Mon, 11 Dec 2023 12:29:38 +0100 Subject: [PATCH 06/16] Return err from modify fn --- datastore/sql/sqliteds.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datastore/sql/sqliteds.nim b/datastore/sql/sqliteds.nim index 23445a2..2a0fda9 100644 --- a/datastore/sql/sqliteds.nim +++ b/datastore/sql/sqliteds.nim @@ -54,7 +54,7 @@ method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[ try: (maybeNewData, aux) = await fn(maybeCurrentData) except CatchableError as err: - return failure("Error running modify function: " & err.msg) + return failure(err) if maybeCurrentData == maybeNewData: # no need to change currently stored value From 3d901cb658b61dbbd2bc16c3b9aebc02070d4307 Mon Sep 17 00:00:00 2001 From: Tomasz Bekas Date: Tue, 19 Dec 2023 15:49:44 +0100 Subject: [PATCH 07/16] Move modify methods to the Datastore interface --- datastore.nim | 3 +- datastore/concurrentds.nim | 49 --------------- datastore/datastore.nim | 35 +++++++++++ datastore/defaultimpl.nim | 63 +++++++++++++++++++ datastore/fsds.nim | 17 ++++- datastore/mountedds.nim | 20 ++++++ datastore/sql/sqliteds.nim | 6 +- datastore/tieredds.nim | 31 +++++++++ ...rrentdstests.nim => modifycommontests.nim} | 28 ++++++--- tests/datastore/sql/testsqliteds.nim | 4 +- tests/datastore/testfsds.nim | 2 + tests/datastore/testmountedds.nim | 3 + tests/datastore/testtieredds.nim | 2 + 13 files changed, 196 insertions(+), 67 deletions(-) delete mode 100644 datastore/concurrentds.nim create mode 100644 datastore/defaultimpl.nim rename tests/datastore/{concurrentdstests.nim => modifycommontests.nim} (87%) diff --git a/datastore.nim b/datastore.nim index deac2a6..6d43a20 100644 --- a/datastore.nim +++ b/datastore.nim @@ -1,8 +1,7 @@ import ./datastore/datastore -import ./datastore/concurrentds import ./datastore/fsds import ./datastore/sql import ./datastore/mountedds import ./datastore/tieredds -export datastore, concurrentds, fsds, mountedds, tieredds, sql +export datastore, fsds, mountedds, tieredds, sql diff --git a/datastore/concurrentds.nim b/datastore/concurrentds.nim deleted file mode 100644 index 7f7e9e4..0000000 --- a/datastore/concurrentds.nim +++ /dev/null @@ -1,49 +0,0 @@ -import pkg/chronos -import pkg/questionable -import pkg/questionable/results -import pkg/upraises - -import ./key -import ./query -import ./types -import ./datastore - -export key, query, types, datastore - -push: {.upraises: [].} - -type - Function*[T, U] = proc(value: T): U {.upraises: [CatchableError], gcsafe, closure.} - Modify* = Function[?seq[byte], Future[?seq[byte]]] - ModifyGet* = Function[?seq[byte], Future[(?seq[byte], seq[byte])]] - -method modify*(self: ConcurrentDatastore, key: Key, fn: Modify): Future[?!void] {.base, locks: "unknown".} = - ## Concurrently safe way of modifying the value associated with the `key`. - ## - ## Same as `modifyGet`, but this takes `fn` that doesn't produce any auxillary value. - ## - - raiseAssert("Not implemented!") - -method modifyGet*(self: ConcurrentDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, locks: "unknown".} = - ## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on - ## successful update. - ## - ## This method first reads a value stored under the `key`, if such value exists it's wrapped into `some` - ## and passed as the only arg to the `fn`, otherwise `none` is passed. - ## - ## Table below presents four possibilities of execution. `curr` represents a value passed to `fn`, - ## while `fn(curr)` represents a value returned by calling `fn` (auxillary value is omitted for clarity). - ## - ## | curr | fn(curr) | action | - ## |---------|----------|------------------------------| - ## | none | none | no action | - ## | none | some(v) | insert v | - ## | some(u) | none | delete u | - ## | some(u) | some(v) | replace u with v (if u != v) | - ## - ## Note that `fn` can be called multiple times (when concurrent modification was detected). In such case - ## only the last auxillary value is returned. - ## - - raiseAssert("Not implemented!") diff --git a/datastore/datastore.nim b/datastore/datastore.nim index d7fa45f..6079468 100644 --- a/datastore/datastore.nim +++ b/datastore/datastore.nim @@ -1,4 +1,5 @@ import pkg/chronos +import pkg/questionable import pkg/questionable/results import pkg/upraises @@ -12,6 +13,9 @@ push: {.upraises: [].} type BatchEntry* = tuple[key: Key, data: seq[byte]] + Function*[T, U] = proc(value: T): U {.upraises: [CatchableError], gcsafe, closure.} + Modify* = Function[?seq[byte], Future[?seq[byte]]] + ModifyGet* = Function[?seq[byte], Future[(?seq[byte], seq[byte])]] method has*(self: Datastore, key: Key): Future[?!bool] {.base, locks: "unknown".} = raiseAssert("Not implemented!") @@ -42,3 +46,34 @@ method query*( proc contains*(self: Datastore, key: Key): Future[bool] {.async.} = return (await self.has(key)) |? false + +method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, locks: "unknown".} = + ## Concurrently safe way of modifying the value associated with the `key`. + ## + ## Same as `modifyGet`, but this takes `fn` that doesn't produce any auxillary value. + ## + + raiseAssert("Not implemented!") + +method modifyGet*(self: Datastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, locks: "unknown".} = + ## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on + ## successful update. + ## + ## This method first reads a value stored under the `key`, if such value exists it's wrapped into `some` + ## and passed as the only arg to the `fn`, otherwise `none` is passed. + ## + ## Table below presents four possibilities of execution. `curr` represents a value passed to `fn`, + ## while `fn(curr)` represents a value returned by calling `fn` (auxillary value is omitted for clarity). + ## + ## | curr | fn(curr) | action | + ## |---------|----------|------------------------------| + ## | none | none | no action | + ## | none | some(v) | insert v | + ## | some(u) | none | delete u | + ## | some(u) | some(v) | replace u with v (if u != v) | + ## + ## Note that `fn` can be called multiple times (when concurrent modification was detected). In such case + ## only the last auxillary value is returned. + ## + + raiseAssert("Not implemented!") \ No newline at end of file diff --git a/datastore/defaultimpl.nim b/datastore/defaultimpl.nim new file mode 100644 index 0000000..05f1661 --- /dev/null +++ b/datastore/defaultimpl.nim @@ -0,0 +1,63 @@ +import pkg/chronos +import pkg/questionable +import pkg/questionable/results + +import ./datastore + +proc defaultModifyGetImpl*( + self: Datastore, + lock: AsyncLock, + key: Key, + fn: ModifyGet + ): Future[?!seq[byte]] {.async.} = + # Default implementation, serializes all modify operations using provided lock + # + + await lock.acquire() + + try: + without data =? await self.get(key), err: + if not (err of DatastoreKeyNotFound): + return failure(err) + + let maybeCurrentData = + if data.len == 0: + seq[byte].none + else: + data.some + + var + maybeNewData: ?seq[byte] + aux: seq[byte] + + try: + (maybeNewData, aux) = await fn(maybeCurrentData) + except CatchableError as err: + return failure(err) + + if newData =? maybeNewData: + if err =? (await self.put(key, newData)).errorOption: + return failure(err) + elif currentData =? maybeCurrentData: + if err =? (await self.delete(key)).errorOption: + return failure(err) + + return aux.success + finally: + lock.release() + +method defaultModifyImpl*( + self: Datastore, + lock: AsyncLock, + key: Key, + fn: Modify + ): Future[?!void] {.async.} = + proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} = + let res = await fn(maybeValue) + let ignoredAux = newSeq[byte]() + return (res, ignoredAux) + + if err =? (await self.defaultModifyGetImpl(lock, key, wrappedFn)).errorOption: + return failure(err) + else: + return success() diff --git a/datastore/fsds.nim b/datastore/fsds.nim index 163a52a..31a3875 100644 --- a/datastore/fsds.nim +++ b/datastore/fsds.nim @@ -8,6 +8,7 @@ import pkg/questionable/results from pkg/stew/results as stewResults import get, isErr import pkg/upraises +import ./defaultimpl import ./datastore export datastore @@ -19,6 +20,7 @@ type root*: string ignoreProtected: bool depth: int + lock: AsyncLock proc validDepth*(self: FSDatastore, key: Key): bool = key.len <= self.depth @@ -217,6 +219,18 @@ method query*( iter.next = next return success iter +method modifyGet*( + self: FSDatastore, + key: Key, + fn: ModifyGet): Future[?!seq[byte]] = + defaultModifyGetImpl(self, self.lock, key, fn) + +method modify*( + self: FSDatastore, + key: Key, + fn: Modify): Future[?!void] = + defaultModifyImpl(self, self.lock, key, fn) + proc new*( T: type FSDatastore, root: string, @@ -235,4 +249,5 @@ proc new*( success T( root: root, ignoreProtected: ignoreProtected, - depth: depth) + depth: depth, + lock: newAsyncLock()) diff --git a/datastore/mountedds.nim b/datastore/mountedds.nim index dbe74df..9102e49 100644 --- a/datastore/mountedds.nim +++ b/datastore/mountedds.nim @@ -119,6 +119,26 @@ method put*( return success() +method modifyGet*( + self: MountedDatastore, + key: Key, + fn: ModifyGet): Future[?!seq[byte]] {.async.} = + + without mounted =? self.dispatch(key), error: + return failure(error) + + return await mounted.store.store.modifyGet(mounted.relative, fn) + +method modify*( + self: MountedDatastore, + key: Key, + fn: Modify): Future[?!void] {.async.} = + + without mounted =? self.dispatch(key), error: + return failure(error) + + return await mounted.store.store.modify(mounted.relative, fn) + method close*(self: MountedDatastore): Future[?!void] {.async.} = for s in self.stores.values: discard await s.store.close() diff --git a/datastore/sql/sqliteds.nim b/datastore/sql/sqliteds.nim index 2a0fda9..ff440e4 100644 --- a/datastore/sql/sqliteds.nim +++ b/datastore/sql/sqliteds.nim @@ -8,15 +8,15 @@ import pkg/sqlite3_abi from pkg/stew/results as stewResults import isErr import pkg/upraises -import ../concurrentds +import ../datastore import ./sqlitedsdb -export concurrentds, sqlitedsdb +export datastore, sqlitedsdb push: {.upraises: [].} type - SQLiteDatastore* = ref object of ConcurrentDatastore + SQLiteDatastore* = ref object of Datastore readOnly: bool db: SQLiteDsDb diff --git a/datastore/tieredds.nim b/datastore/tieredds.nim index 4eca23b..0bc0044 100644 --- a/datastore/tieredds.nim +++ b/datastore/tieredds.nim @@ -116,6 +116,37 @@ method put*( return success() +method modifyGet*( + self: TieredDatastore, + key: Key, + fn: ModifyGet): Future[?!seq[byte]] {.async.} = + + let + pending = await allFinished(self.stores.mapIt(it.modifyGet(key, fn))) + + var aux = newSeq[byte]() + + for fut in pending: + if fut.read().isErr: + return fut.read() + else: + aux.add(fut.read().get) + + return success(aux) + +method modify*( + self: TieredDatastore, + key: Key, + fn: Modify): Future[?!void] {.async.} = + + let + pending = await allFinished(self.stores.mapIt(it.modify(key, fn))) + + for fut in pending: + if fut.read().isErr: return fut.read() + + return success() + # method query*( # self: TieredDatastore, # query: ...): Future[?!(?...)] {.async.} = diff --git a/tests/datastore/concurrentdstests.nim b/tests/datastore/modifycommontests.nim similarity index 87% rename from tests/datastore/concurrentdstests.nim rename to tests/datastore/modifycommontests.nim index 13d8f72..f8a2f84 100644 --- a/tests/datastore/concurrentdstests.nim +++ b/tests/datastore/modifycommontests.nim @@ -10,11 +10,12 @@ import pkg/stew/endians2 import pkg/questionable import pkg/questionable/results -import pkg/datastore/concurrentds +import pkg/datastore -proc concurrentStoreTests*( - ds: ConcurrentDatastore, - key: Key) = +proc modifyTests*( + ds: Datastore, + key: Key, + multiAux: bool = false) = randomize() @@ -105,20 +106,27 @@ proc concurrentStoreTests*( proc returningAux(_: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} = return (seq[byte].none, @[byte 123]) - let result = await ds.modifyGet(key, returningAux) + let res = await ds.modifyGet(key, returningAux) - check: - result == success(@[byte 123]) + if multiAux: + check: + res.errorOption.map((err) => err.msg) == none(string) + for b in res |? @[]: + check: + b == 123.byte + else: + check: + res == success(@[byte 123]) test "should propagate exception as failure": proc throwing(a: ?seq[byte]): Future[?seq[byte]] {.async.} = raise newException(CatchableError, "some error msg") - let result = await ds.modify(key, throwing) + let res = await ds.modify(key, throwing) - if err =? result.errorOption: + if err =? res.errorOption: check: err.msg.contains("some error msg") else: # result was not an error - fail() \ No newline at end of file + fail() diff --git a/tests/datastore/sql/testsqliteds.nim b/tests/datastore/sql/testsqliteds.nim index c4cf5be..778d7f1 100644 --- a/tests/datastore/sql/testsqliteds.nim +++ b/tests/datastore/sql/testsqliteds.nim @@ -11,7 +11,7 @@ import pkg/stew/byteutils import pkg/datastore/sql/sqliteds import ../dscommontests -import ../concurrentdstests +import ../modifycommontests import ../querycommontests suite "Test Basic SQLiteDatastore": @@ -25,7 +25,7 @@ suite "Test Basic SQLiteDatastore": (await ds.close()).tryGet() basicStoreTests(ds, key, bytes, otherBytes) - concurrentStoreTests(ds, key) + modifyTests(ds, key) suite "Test Read Only SQLiteDatastore": let diff --git a/tests/datastore/testfsds.nim b/tests/datastore/testfsds.nim index 2633677..af9adfc 100644 --- a/tests/datastore/testfsds.nim +++ b/tests/datastore/testfsds.nim @@ -11,6 +11,7 @@ import pkg/stew/byteutils import pkg/datastore/fsds import ./dscommontests +import ./modifycommontests import ./querycommontests suite "Test Basic FSDatastore": @@ -37,6 +38,7 @@ suite "Test Basic FSDatastore": require(not dirExists(basePathAbs)) basicStoreTests(fsStore, key, bytes, otherBytes) + modifyTests(fsStore, key) suite "Test Misc FSDatastore": let diff --git a/tests/datastore/testmountedds.nim b/tests/datastore/testmountedds.nim index fbe0dfd..15f91f3 100644 --- a/tests/datastore/testmountedds.nim +++ b/tests/datastore/testmountedds.nim @@ -12,6 +12,7 @@ import pkg/datastore/sql import pkg/datastore/fsds import ./dscommontests +import ./modifycommontests suite "Test Basic Mounted Datastore": let @@ -50,10 +51,12 @@ suite "Test Basic Mounted Datastore": suite "Mounted sql": let namespace = Key.init(sqlKey, key).tryGet basicStoreTests(mountedDs, namespace, bytes, otherBytes) + modifyTests(mountedDs, namespace) suite "Mounted fs": let namespace = Key.init(fsKey, key).tryGet basicStoreTests(mountedDs, namespace, bytes, otherBytes) + modifyTests(mountedDs, namespace) suite "Test Mounted Datastore": diff --git a/tests/datastore/testtieredds.nim b/tests/datastore/testtieredds.nim index 4ea76a7..272c852 100644 --- a/tests/datastore/testtieredds.nim +++ b/tests/datastore/testtieredds.nim @@ -10,6 +10,7 @@ import pkg/datastore/fsds import pkg/datastore/sql import pkg/datastore/tieredds +import ./modifycommontests import ./dscommontests suite "Test Basic Tired Datastore": @@ -40,6 +41,7 @@ suite "Test Basic Tired Datastore": require(not dirExists(rootAbs)) basicStoreTests(tiredDs, key, bytes, otherBytes) + modifyTests(tiredDs, key, multiAux = true) suite "TieredDatastore": # assumes tests/test_all is run from project root, e.g. with `nimble test` From 18d0833141d0cb0cf71b6fa31ae710fb55a7f132 Mon Sep 17 00:00:00 2001 From: gmega Date: Fri, 15 Dec 2023 15:37:28 -0300 Subject: [PATCH 08/16] explicitly annotate iterator exception effects so it does not default to raises: [Exception] --- datastore.nimble | 2 +- datastore/fsds.nim | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datastore.nimble b/datastore.nimble index 6252c6d..1900f3d 100644 --- a/datastore.nimble +++ b/datastore.nimble @@ -7,7 +7,7 @@ description = "Simple, unified API for multiple data stores" license = "Apache License 2.0 or MIT" requires "nim >= 1.2.0", - "asynctest >= 0.3.1 & < 0.4.0", + "asynctest >= 0.4.3 & < 0.5.0", "chronos", "questionable >= 0.10.3 & < 0.11.0", "sqlite3_abi", diff --git a/datastore/fsds.nim b/datastore/fsds.nim index 31a3875..038671b 100644 --- a/datastore/fsds.nim +++ b/datastore/fsds.nim @@ -156,7 +156,7 @@ method put*( return success() -proc dirWalker(path: string): iterator: string {.gcsafe.} = +proc dirWalker(path: string): (iterator: string {.raises: [Defect], gcsafe.}) = return iterator(): string = try: for p in path.walkDirRec(yieldFilter = {pcFile}, relative = true): From cc2c58a8bdd0f12711ceb8e03c7f140c95321d1c Mon Sep 17 00:00:00 2001 From: gmega Date: Fri, 15 Dec 2023 15:39:41 -0300 Subject: [PATCH 09/16] explicitly change chronos to pick up from head --- datastore.nimble | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datastore.nimble b/datastore.nimble index 1900f3d..473ad2e 100644 --- a/datastore.nimble +++ b/datastore.nimble @@ -8,7 +8,7 @@ license = "Apache License 2.0 or MIT" requires "nim >= 1.2.0", "asynctest >= 0.4.3 & < 0.5.0", - "chronos", + "chronos#head", # FIXME change to Chronos >= 4.0.0 once it's out "questionable >= 0.10.3 & < 0.11.0", "sqlite3_abi", "stew", From 795e1a3e3a25893e2e164efac42df667db97a46c Mon Sep 17 00:00:00 2001 From: gmega Date: Fri, 15 Dec 2023 18:45:55 -0300 Subject: [PATCH 10/16] pin chronos to the lastest commit (as of today) --- datastore.nimble | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datastore.nimble b/datastore.nimble index 473ad2e..c00ad01 100644 --- a/datastore.nimble +++ b/datastore.nimble @@ -8,7 +8,7 @@ license = "Apache License 2.0 or MIT" requires "nim >= 1.2.0", "asynctest >= 0.4.3 & < 0.5.0", - "chronos#head", # FIXME change to Chronos >= 4.0.0 once it's out + "chronos#c41599a", # FIXME change to Chronos >= 4.0.0 once it's out "questionable >= 0.10.3 & < 0.11.0", "sqlite3_abi", "stew", From 673778697f536cf0d70f514d2cb2e9fcb9057787 Mon Sep 17 00:00:00 2001 From: Tomasz Bekas Date: Wed, 20 Dec 2023 08:47:39 +0100 Subject: [PATCH 11/16] Replace require with check --- tests/datastore/modifycommontests.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/datastore/modifycommontests.nim b/tests/datastore/modifycommontests.nim index f8a2f84..054e6ab 100644 --- a/tests/datastore/modifycommontests.nim +++ b/tests/datastore/modifycommontests.nim @@ -26,7 +26,7 @@ proc modifyTests*( let errMsg = (await op).errorOption.map((err) => err.msg) - require none(string) == errMsg + check none(string) == errMsg proc incAsyncFn(maybeBytes: ?seq[byte]): Future[?seq[byte]] {.async.} = await sleepAsync(2.millis) # allows interleaving From e2e31e07b35e8d7481c2cfc8b1f8c479050916b6 Mon Sep 17 00:00:00 2001 From: Tomasz Bekas Date: Wed, 20 Dec 2023 09:09:06 +0100 Subject: [PATCH 12/16] Remove fail() --- tests/datastore/modifycommontests.nim | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/tests/datastore/modifycommontests.nim b/tests/datastore/modifycommontests.nim index 054e6ab..332e8bb 100644 --- a/tests/datastore/modifycommontests.nim +++ b/tests/datastore/modifycommontests.nim @@ -26,7 +26,7 @@ proc modifyTests*( let errMsg = (await op).errorOption.map((err) => err.msg) - check none(string) == errMsg + require none(string) == errMsg proc incAsyncFn(maybeBytes: ?seq[byte]): Future[?seq[byte]] {.async.} = await sleepAsync(2.millis) # allows interleaving @@ -124,9 +124,5 @@ proc modifyTests*( let res = await ds.modify(key, throwing) - if err =? res.errorOption: - check: - err.msg.contains("some error msg") - else: - # result was not an error - fail() + check: + res.errorOption.map((err) => err.msg) == some("some error msg") From a6ab1780928da6cece99a75bce15071bdc5b335a Mon Sep 17 00:00:00 2001 From: Tomasz Bekas Date: Wed, 20 Dec 2023 10:16:28 +0100 Subject: [PATCH 13/16] Fix broken tests --- datastore/defaultimpl.nim | 2 +- datastore/sql/sqliteds.nim | 2 +- tests/datastore/modifycommontests.nim | 5 +++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/datastore/defaultimpl.nim b/datastore/defaultimpl.nim index 05f1661..4d6ed3e 100644 --- a/datastore/defaultimpl.nim +++ b/datastore/defaultimpl.nim @@ -31,7 +31,7 @@ proc defaultModifyGetImpl*( aux: seq[byte] try: - (maybeNewData, aux) = await fn(maybeCurrentData) + (maybeNewData, aux) = (awaitne fn(maybeCurrentData)).read() except CatchableError as err: return failure(err) diff --git a/datastore/sql/sqliteds.nim b/datastore/sql/sqliteds.nim index ff440e4..fee0feb 100644 --- a/datastore/sql/sqliteds.nim +++ b/datastore/sql/sqliteds.nim @@ -52,7 +52,7 @@ method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[ var maybeNewData: ?seq[byte] try: - (maybeNewData, aux) = await fn(maybeCurrentData) + (maybeNewData, aux) = (awaitne fn(maybeCurrentData)).read() except CatchableError as err: return failure(err) diff --git a/tests/datastore/modifycommontests.nim b/tests/datastore/modifycommontests.nim index 332e8bb..be7fd43 100644 --- a/tests/datastore/modifycommontests.nim +++ b/tests/datastore/modifycommontests.nim @@ -24,9 +24,10 @@ proc modifyTests*( proc withRandDelay(op: Future[?!void]): Future[void] {.async.} = await sleepAsync(rand(processCount).millis) - let errMsg = (await op).errorOption.map((err) => err.msg) + discard (await op) + # let errMsg = (await op).errorOption.map((err) => err.msg) - require none(string) == errMsg + # require none(string) == errMsg proc incAsyncFn(maybeBytes: ?seq[byte]): Future[?seq[byte]] {.async.} = await sleepAsync(2.millis) # allows interleaving From b692737eed471427d873d991dca7cf085cfd0b1e Mon Sep 17 00:00:00 2001 From: Tomasz Bekas Date: Wed, 20 Dec 2023 10:55:59 +0100 Subject: [PATCH 14/16] Remove ConcurrentDatastore type --- datastore/types.nim | 1 - 1 file changed, 1 deletion(-) diff --git a/datastore/types.nim b/datastore/types.nim index 9f1385b..b019cdb 100644 --- a/datastore/types.nim +++ b/datastore/types.nim @@ -8,4 +8,3 @@ type DatastoreKeyNotFound* = object of DatastoreError Datastore* = ref object of RootObj - ConcurrentDatastore* = ref object of Datastore From 0d1ca4b2cd0f1d816d54de0fc6d3129c96dd3d2b Mon Sep 17 00:00:00 2001 From: Tomasz Bekas Date: Wed, 20 Dec 2023 15:21:26 +0100 Subject: [PATCH 15/16] Bump nim to 1.6.16 --- .github/workflows/tests.yml | 2 +- datastore/defaultimpl.nim | 2 +- datastore/sql/sqliteds.nim | 2 +- tests/datastore/modifycommontests.nim | 7 +++---- 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 0e2e106..8221127 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -13,7 +13,7 @@ jobs: fail-fast: false matrix: cache_nonce: [ 1 ] - nim_version: [ 1.6.14 ] + nim_version: [ 1.6.16 ] platform: - { icon: 🐧, diff --git a/datastore/defaultimpl.nim b/datastore/defaultimpl.nim index 4d6ed3e..05f1661 100644 --- a/datastore/defaultimpl.nim +++ b/datastore/defaultimpl.nim @@ -31,7 +31,7 @@ proc defaultModifyGetImpl*( aux: seq[byte] try: - (maybeNewData, aux) = (awaitne fn(maybeCurrentData)).read() + (maybeNewData, aux) = await fn(maybeCurrentData) except CatchableError as err: return failure(err) diff --git a/datastore/sql/sqliteds.nim b/datastore/sql/sqliteds.nim index fee0feb..ff440e4 100644 --- a/datastore/sql/sqliteds.nim +++ b/datastore/sql/sqliteds.nim @@ -52,7 +52,7 @@ method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[ var maybeNewData: ?seq[byte] try: - (maybeNewData, aux) = (awaitne fn(maybeCurrentData)).read() + (maybeNewData, aux) = await fn(maybeCurrentData) except CatchableError as err: return failure(err) diff --git a/tests/datastore/modifycommontests.nim b/tests/datastore/modifycommontests.nim index be7fd43..1a38514 100644 --- a/tests/datastore/modifycommontests.nim +++ b/tests/datastore/modifycommontests.nim @@ -21,13 +21,12 @@ proc modifyTests*( let processCount = 100 - proc withRandDelay(op: Future[?!void]): Future[void] {.async.} = + proc withRandDelay(op: Future[?!void]): Future[void] {.async: (raises: [Exception]).} = await sleepAsync(rand(processCount).millis) - discard (await op) - # let errMsg = (await op).errorOption.map((err) => err.msg) + let errMsg = (await op).errorOption.map((err) => err.msg) - # require none(string) == errMsg + require none(string) == errMsg proc incAsyncFn(maybeBytes: ?seq[byte]): Future[?seq[byte]] {.async.} = await sleepAsync(2.millis) # allows interleaving From 3c4daf41983747800f15d24f9cc076cfb4db6116 Mon Sep 17 00:00:00 2001 From: Tomasz Bekas Date: Wed, 10 Jan 2024 16:54:14 +0100 Subject: [PATCH 16/16] Fix review comments --- .github/workflows/tests.yml | 2 +- datastore/datastore.nim | 2 +- datastore/defaultimpl.nim | 2 +- datastore/fsds.nim | 26 ++++++++++++++++++++------ tests/datastore/modifycommontests.nim | 8 +++++--- 5 files changed, 28 insertions(+), 12 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 8221127..73df27f 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -13,7 +13,7 @@ jobs: fail-fast: false matrix: cache_nonce: [ 1 ] - nim_version: [ 1.6.16 ] + nim_version: [ 1.6.18 ] platform: - { icon: 🐧, diff --git a/datastore/datastore.nim b/datastore/datastore.nim index 6079468..f664ec0 100644 --- a/datastore/datastore.nim +++ b/datastore/datastore.nim @@ -13,7 +13,7 @@ push: {.upraises: [].} type BatchEntry* = tuple[key: Key, data: seq[byte]] - Function*[T, U] = proc(value: T): U {.upraises: [CatchableError], gcsafe, closure.} + Function*[T, U] = proc(value: T): U {.raises: [CatchableError], gcsafe, closure.} Modify* = Function[?seq[byte], Future[?seq[byte]]] ModifyGet* = Function[?seq[byte], Future[(?seq[byte], seq[byte])]] diff --git a/datastore/defaultimpl.nim b/datastore/defaultimpl.nim index 05f1661..1562263 100644 --- a/datastore/defaultimpl.nim +++ b/datastore/defaultimpl.nim @@ -46,7 +46,7 @@ proc defaultModifyGetImpl*( finally: lock.release() -method defaultModifyImpl*( +proc defaultModifyImpl*( self: Datastore, lock: AsyncLock, key: Key, diff --git a/datastore/fsds.nim b/datastore/fsds.nim index 038671b..2a58a99 100644 --- a/datastore/fsds.nim +++ b/datastore/fsds.nim @@ -1,5 +1,6 @@ import std/os import std/options +import std/tables import std/strutils import pkg/chronos @@ -20,7 +21,7 @@ type root*: string ignoreProtected: bool depth: int - lock: AsyncLock + locks: TableRef[Key, AsyncLock] proc validDepth*(self: FSDatastore, key: Key): bool = key.len <= self.depth @@ -222,14 +223,26 @@ method query*( method modifyGet*( self: FSDatastore, key: Key, - fn: ModifyGet): Future[?!seq[byte]] = - defaultModifyGetImpl(self, self.lock, key, fn) + fn: ModifyGet): Future[?!seq[byte]] {.async.} = + var lock: AsyncLock + try: + lock = self.locks.mgetOrPut(key, newAsyncLock()) + return await defaultModifyGetImpl(self, lock, key, fn) + finally: + if not lock.locked: + self.locks.del(key) method modify*( self: FSDatastore, key: Key, - fn: Modify): Future[?!void] = - defaultModifyImpl(self, self.lock, key, fn) + fn: Modify): Future[?!void] {.async.} = + var lock: AsyncLock + try: + lock = self.locks.mgetOrPut(key, newAsyncLock()) + return await defaultModifyImpl(self, lock, key, fn) + finally: + if not lock.locked: + self.locks.del(key) proc new*( T: type FSDatastore, @@ -250,4 +263,5 @@ proc new*( root: root, ignoreProtected: ignoreProtected, depth: depth, - lock: newAsyncLock()) + locks: newTable[Key, AsyncLock]() + ) diff --git a/tests/datastore/modifycommontests.nim b/tests/datastore/modifycommontests.nim index 1a38514..b7f4871 100644 --- a/tests/datastore/modifycommontests.nim +++ b/tests/datastore/modifycommontests.nim @@ -19,7 +19,9 @@ proc modifyTests*( randomize() - let processCount = 100 + let + processCount = 100 + timeout = (1 + processCount div 10).seconds proc withRandDelay(op: Future[?!void]): Future[void] {.async: (raises: [Exception]).} = await sleepAsync(rand(processCount).millis) @@ -53,7 +55,7 @@ proc modifyTests*( return success() let futs = newSeqWith(processCount, withRandDelay(getIncAndPut())) - await allFutures(futs).wait(10.seconds) + await allFutures(futs).wait(timeout) let finalValue = uint64.fromBytes((await ds.get(key)).tryGet) @@ -63,7 +65,7 @@ proc modifyTests*( (await ds.put(key, @(0.uint64.toBytes))).tryGet let futs = newSeqWith(processCount, withRandDelay(ds.modify(key, incAsyncFn))) - await allFutures(futs).wait(10.seconds) + await allFutures(futs).wait(timeout) let finalValue = uint64.fromBytes((await ds.get(key)).tryGet)