From 7594f9e769f609daab35f7906afdfcc3ddf37cbe Mon Sep 17 00:00:00 2001 From: Tomasz Bekas Date: Tue, 19 Dec 2023 15:49:44 +0100 Subject: [PATCH] Move modify methods to the Datastore interface --- datastore.nim | 3 +- datastore/concurrentds.nim | 49 --------------- datastore/datastore.nim | 35 +++++++++++ datastore/defaultimpl.nim | 63 +++++++++++++++++++ datastore/fsds.nim | 17 ++++- datastore/mountedds.nim | 20 ++++++ datastore/sql/sqliteds.nim | 6 +- datastore/tieredds.nim | 31 +++++++++ ...rrentdstests.nim => modifycommontests.nim} | 28 ++++++--- tests/datastore/sql/testsqliteds.nim | 4 +- tests/datastore/testfsds.nim | 2 + tests/datastore/testmountedds.nim | 3 + tests/datastore/testtieredds.nim | 2 + 13 files changed, 196 insertions(+), 67 deletions(-) delete mode 100644 datastore/concurrentds.nim create mode 100644 datastore/defaultimpl.nim rename tests/datastore/{concurrentdstests.nim => modifycommontests.nim} (87%) diff --git a/datastore.nim b/datastore.nim index deac2a6..6d43a20 100644 --- a/datastore.nim +++ b/datastore.nim @@ -1,8 +1,7 @@ import ./datastore/datastore -import ./datastore/concurrentds import ./datastore/fsds import ./datastore/sql import ./datastore/mountedds import ./datastore/tieredds -export datastore, concurrentds, fsds, mountedds, tieredds, sql +export datastore, fsds, mountedds, tieredds, sql diff --git a/datastore/concurrentds.nim b/datastore/concurrentds.nim deleted file mode 100644 index 7f7e9e4..0000000 --- a/datastore/concurrentds.nim +++ /dev/null @@ -1,49 +0,0 @@ -import pkg/chronos -import pkg/questionable -import pkg/questionable/results -import pkg/upraises - -import ./key -import ./query -import ./types -import ./datastore - -export key, query, types, datastore - -push: {.upraises: [].} - -type - Function*[T, U] = proc(value: T): U {.upraises: [CatchableError], gcsafe, closure.} - Modify* = Function[?seq[byte], Future[?seq[byte]]] - ModifyGet* = Function[?seq[byte], Future[(?seq[byte], seq[byte])]] - -method modify*(self: ConcurrentDatastore, key: Key, fn: Modify): Future[?!void] {.base, locks: "unknown".} = - ## Concurrently safe way of modifying the value associated with the `key`. - ## - ## Same as `modifyGet`, but this takes `fn` that doesn't produce any auxillary value. - ## - - raiseAssert("Not implemented!") - -method modifyGet*(self: ConcurrentDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, locks: "unknown".} = - ## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on - ## successful update. - ## - ## This method first reads a value stored under the `key`, if such value exists it's wrapped into `some` - ## and passed as the only arg to the `fn`, otherwise `none` is passed. - ## - ## Table below presents four possibilities of execution. `curr` represents a value passed to `fn`, - ## while `fn(curr)` represents a value returned by calling `fn` (auxillary value is omitted for clarity). - ## - ## | curr | fn(curr) | action | - ## |---------|----------|------------------------------| - ## | none | none | no action | - ## | none | some(v) | insert v | - ## | some(u) | none | delete u | - ## | some(u) | some(v) | replace u with v (if u != v) | - ## - ## Note that `fn` can be called multiple times (when concurrent modification was detected). In such case - ## only the last auxillary value is returned. - ## - - raiseAssert("Not implemented!") diff --git a/datastore/datastore.nim b/datastore/datastore.nim index d7fa45f..6079468 100644 --- a/datastore/datastore.nim +++ b/datastore/datastore.nim @@ -1,4 +1,5 @@ import pkg/chronos +import pkg/questionable import pkg/questionable/results import pkg/upraises @@ -12,6 +13,9 @@ push: {.upraises: [].} type BatchEntry* = tuple[key: Key, data: seq[byte]] + Function*[T, U] = proc(value: T): U {.upraises: [CatchableError], gcsafe, closure.} + Modify* = Function[?seq[byte], Future[?seq[byte]]] + ModifyGet* = Function[?seq[byte], Future[(?seq[byte], seq[byte])]] method has*(self: Datastore, key: Key): Future[?!bool] {.base, locks: "unknown".} = raiseAssert("Not implemented!") @@ -42,3 +46,34 @@ method query*( proc contains*(self: Datastore, key: Key): Future[bool] {.async.} = return (await self.has(key)) |? false + +method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, locks: "unknown".} = + ## Concurrently safe way of modifying the value associated with the `key`. + ## + ## Same as `modifyGet`, but this takes `fn` that doesn't produce any auxillary value. + ## + + raiseAssert("Not implemented!") + +method modifyGet*(self: Datastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, locks: "unknown".} = + ## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on + ## successful update. + ## + ## This method first reads a value stored under the `key`, if such value exists it's wrapped into `some` + ## and passed as the only arg to the `fn`, otherwise `none` is passed. + ## + ## Table below presents four possibilities of execution. `curr` represents a value passed to `fn`, + ## while `fn(curr)` represents a value returned by calling `fn` (auxillary value is omitted for clarity). + ## + ## | curr | fn(curr) | action | + ## |---------|----------|------------------------------| + ## | none | none | no action | + ## | none | some(v) | insert v | + ## | some(u) | none | delete u | + ## | some(u) | some(v) | replace u with v (if u != v) | + ## + ## Note that `fn` can be called multiple times (when concurrent modification was detected). In such case + ## only the last auxillary value is returned. + ## + + raiseAssert("Not implemented!") \ No newline at end of file diff --git a/datastore/defaultimpl.nim b/datastore/defaultimpl.nim new file mode 100644 index 0000000..05f1661 --- /dev/null +++ b/datastore/defaultimpl.nim @@ -0,0 +1,63 @@ +import pkg/chronos +import pkg/questionable +import pkg/questionable/results + +import ./datastore + +proc defaultModifyGetImpl*( + self: Datastore, + lock: AsyncLock, + key: Key, + fn: ModifyGet + ): Future[?!seq[byte]] {.async.} = + # Default implementation, serializes all modify operations using provided lock + # + + await lock.acquire() + + try: + without data =? await self.get(key), err: + if not (err of DatastoreKeyNotFound): + return failure(err) + + let maybeCurrentData = + if data.len == 0: + seq[byte].none + else: + data.some + + var + maybeNewData: ?seq[byte] + aux: seq[byte] + + try: + (maybeNewData, aux) = await fn(maybeCurrentData) + except CatchableError as err: + return failure(err) + + if newData =? maybeNewData: + if err =? (await self.put(key, newData)).errorOption: + return failure(err) + elif currentData =? maybeCurrentData: + if err =? (await self.delete(key)).errorOption: + return failure(err) + + return aux.success + finally: + lock.release() + +method defaultModifyImpl*( + self: Datastore, + lock: AsyncLock, + key: Key, + fn: Modify + ): Future[?!void] {.async.} = + proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} = + let res = await fn(maybeValue) + let ignoredAux = newSeq[byte]() + return (res, ignoredAux) + + if err =? (await self.defaultModifyGetImpl(lock, key, wrappedFn)).errorOption: + return failure(err) + else: + return success() diff --git a/datastore/fsds.nim b/datastore/fsds.nim index 0be95c9..038671b 100644 --- a/datastore/fsds.nim +++ b/datastore/fsds.nim @@ -8,6 +8,7 @@ import pkg/questionable/results from pkg/stew/results as stewResults import get, isErr import pkg/upraises +import ./defaultimpl import ./datastore export datastore @@ -19,6 +20,7 @@ type root*: string ignoreProtected: bool depth: int + lock: AsyncLock proc validDepth*(self: FSDatastore, key: Key): bool = key.len <= self.depth @@ -217,6 +219,18 @@ method query*( iter.next = next return success iter +method modifyGet*( + self: FSDatastore, + key: Key, + fn: ModifyGet): Future[?!seq[byte]] = + defaultModifyGetImpl(self, self.lock, key, fn) + +method modify*( + self: FSDatastore, + key: Key, + fn: Modify): Future[?!void] = + defaultModifyImpl(self, self.lock, key, fn) + proc new*( T: type FSDatastore, root: string, @@ -235,4 +249,5 @@ proc new*( success T( root: root, ignoreProtected: ignoreProtected, - depth: depth) + depth: depth, + lock: newAsyncLock()) diff --git a/datastore/mountedds.nim b/datastore/mountedds.nim index dbe74df..9102e49 100644 --- a/datastore/mountedds.nim +++ b/datastore/mountedds.nim @@ -119,6 +119,26 @@ method put*( return success() +method modifyGet*( + self: MountedDatastore, + key: Key, + fn: ModifyGet): Future[?!seq[byte]] {.async.} = + + without mounted =? self.dispatch(key), error: + return failure(error) + + return await mounted.store.store.modifyGet(mounted.relative, fn) + +method modify*( + self: MountedDatastore, + key: Key, + fn: Modify): Future[?!void] {.async.} = + + without mounted =? self.dispatch(key), error: + return failure(error) + + return await mounted.store.store.modify(mounted.relative, fn) + method close*(self: MountedDatastore): Future[?!void] {.async.} = for s in self.stores.values: discard await s.store.close() diff --git a/datastore/sql/sqliteds.nim b/datastore/sql/sqliteds.nim index 2a0fda9..ff440e4 100644 --- a/datastore/sql/sqliteds.nim +++ b/datastore/sql/sqliteds.nim @@ -8,15 +8,15 @@ import pkg/sqlite3_abi from pkg/stew/results as stewResults import isErr import pkg/upraises -import ../concurrentds +import ../datastore import ./sqlitedsdb -export concurrentds, sqlitedsdb +export datastore, sqlitedsdb push: {.upraises: [].} type - SQLiteDatastore* = ref object of ConcurrentDatastore + SQLiteDatastore* = ref object of Datastore readOnly: bool db: SQLiteDsDb diff --git a/datastore/tieredds.nim b/datastore/tieredds.nim index 4eca23b..0bc0044 100644 --- a/datastore/tieredds.nim +++ b/datastore/tieredds.nim @@ -116,6 +116,37 @@ method put*( return success() +method modifyGet*( + self: TieredDatastore, + key: Key, + fn: ModifyGet): Future[?!seq[byte]] {.async.} = + + let + pending = await allFinished(self.stores.mapIt(it.modifyGet(key, fn))) + + var aux = newSeq[byte]() + + for fut in pending: + if fut.read().isErr: + return fut.read() + else: + aux.add(fut.read().get) + + return success(aux) + +method modify*( + self: TieredDatastore, + key: Key, + fn: Modify): Future[?!void] {.async.} = + + let + pending = await allFinished(self.stores.mapIt(it.modify(key, fn))) + + for fut in pending: + if fut.read().isErr: return fut.read() + + return success() + # method query*( # self: TieredDatastore, # query: ...): Future[?!(?...)] {.async.} = diff --git a/tests/datastore/concurrentdstests.nim b/tests/datastore/modifycommontests.nim similarity index 87% rename from tests/datastore/concurrentdstests.nim rename to tests/datastore/modifycommontests.nim index 13d8f72..f8a2f84 100644 --- a/tests/datastore/concurrentdstests.nim +++ b/tests/datastore/modifycommontests.nim @@ -10,11 +10,12 @@ import pkg/stew/endians2 import pkg/questionable import pkg/questionable/results -import pkg/datastore/concurrentds +import pkg/datastore -proc concurrentStoreTests*( - ds: ConcurrentDatastore, - key: Key) = +proc modifyTests*( + ds: Datastore, + key: Key, + multiAux: bool = false) = randomize() @@ -105,20 +106,27 @@ proc concurrentStoreTests*( proc returningAux(_: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} = return (seq[byte].none, @[byte 123]) - let result = await ds.modifyGet(key, returningAux) + let res = await ds.modifyGet(key, returningAux) - check: - result == success(@[byte 123]) + if multiAux: + check: + res.errorOption.map((err) => err.msg) == none(string) + for b in res |? @[]: + check: + b == 123.byte + else: + check: + res == success(@[byte 123]) test "should propagate exception as failure": proc throwing(a: ?seq[byte]): Future[?seq[byte]] {.async.} = raise newException(CatchableError, "some error msg") - let result = await ds.modify(key, throwing) + let res = await ds.modify(key, throwing) - if err =? result.errorOption: + if err =? res.errorOption: check: err.msg.contains("some error msg") else: # result was not an error - fail() \ No newline at end of file + fail() diff --git a/tests/datastore/sql/testsqliteds.nim b/tests/datastore/sql/testsqliteds.nim index c4cf5be..778d7f1 100644 --- a/tests/datastore/sql/testsqliteds.nim +++ b/tests/datastore/sql/testsqliteds.nim @@ -11,7 +11,7 @@ import pkg/stew/byteutils import pkg/datastore/sql/sqliteds import ../dscommontests -import ../concurrentdstests +import ../modifycommontests import ../querycommontests suite "Test Basic SQLiteDatastore": @@ -25,7 +25,7 @@ suite "Test Basic SQLiteDatastore": (await ds.close()).tryGet() basicStoreTests(ds, key, bytes, otherBytes) - concurrentStoreTests(ds, key) + modifyTests(ds, key) suite "Test Read Only SQLiteDatastore": let diff --git a/tests/datastore/testfsds.nim b/tests/datastore/testfsds.nim index 2633677..af9adfc 100644 --- a/tests/datastore/testfsds.nim +++ b/tests/datastore/testfsds.nim @@ -11,6 +11,7 @@ import pkg/stew/byteutils import pkg/datastore/fsds import ./dscommontests +import ./modifycommontests import ./querycommontests suite "Test Basic FSDatastore": @@ -37,6 +38,7 @@ suite "Test Basic FSDatastore": require(not dirExists(basePathAbs)) basicStoreTests(fsStore, key, bytes, otherBytes) + modifyTests(fsStore, key) suite "Test Misc FSDatastore": let diff --git a/tests/datastore/testmountedds.nim b/tests/datastore/testmountedds.nim index fbe0dfd..15f91f3 100644 --- a/tests/datastore/testmountedds.nim +++ b/tests/datastore/testmountedds.nim @@ -12,6 +12,7 @@ import pkg/datastore/sql import pkg/datastore/fsds import ./dscommontests +import ./modifycommontests suite "Test Basic Mounted Datastore": let @@ -50,10 +51,12 @@ suite "Test Basic Mounted Datastore": suite "Mounted sql": let namespace = Key.init(sqlKey, key).tryGet basicStoreTests(mountedDs, namespace, bytes, otherBytes) + modifyTests(mountedDs, namespace) suite "Mounted fs": let namespace = Key.init(fsKey, key).tryGet basicStoreTests(mountedDs, namespace, bytes, otherBytes) + modifyTests(mountedDs, namespace) suite "Test Mounted Datastore": diff --git a/tests/datastore/testtieredds.nim b/tests/datastore/testtieredds.nim index 4ea76a7..272c852 100644 --- a/tests/datastore/testtieredds.nim +++ b/tests/datastore/testtieredds.nim @@ -10,6 +10,7 @@ import pkg/datastore/fsds import pkg/datastore/sql import pkg/datastore/tieredds +import ./modifycommontests import ./dscommontests suite "Test Basic Tired Datastore": @@ -40,6 +41,7 @@ suite "Test Basic Tired Datastore": require(not dirExists(rootAbs)) basicStoreTests(tiredDs, key, bytes, otherBytes) + modifyTests(tiredDs, key, multiAux = true) suite "TieredDatastore": # assumes tests/test_all is run from project root, e.g. with `nimble test`