diff --git a/datastore/concurrentds.nim b/datastore/concurrentds.nim index da4170c..a0f96c4 100644 --- a/datastore/concurrentds.nim +++ b/datastore/concurrentds.nim @@ -13,22 +13,17 @@ export key, query, types, datastore push: {.upraises: [].} type - Function*[T, U] = proc(value: T): U {.upraises: [], gcsafe, closure.} + Function*[T, U] = proc(value: T): U {.upraises: [CatchableError], gcsafe, closure.} Modify* = Function[?seq[byte], ?seq[byte]] + ModifyGet* = Function[?seq[byte], (?seq[byte], seq[byte])] ModifyAsync* = Function[?seq[byte], Future[?seq[byte]]] + ModifyGetAsync* = Function[?seq[byte], Future[(?seq[byte], seq[byte])]] method modify*(self: ConcurrentDatastore, key: Key, fn: Modify): Future[?!void] {.base, locks: "unknown".} = ## Concurrently safe way of modifying the value associated with the `key`. ## - ## This method first reads a value stored under the `key`, if such value exists it's wrapped into `some` - ## and passed as the only arg to the `fn`, otherwise `none` is passed. - ## - ## When `fn` returns `some`, returned value is put into the store, but only if it's different than - ## the existing value, otherwise nothing happens. - ## When `fn` returns `none` existing value is deleted from the store, if no value existed before - ## nothing happens. - ## - ## Note that `fn` can be called multiple times (when concurrent modify of the value was detected). + ## Same as `modifyGet` with `fn: ModifyGetAsync` argument, but this takes non-async `fn` that doesn't + ## produce any auxillary value. ## raiseAssert("Not implemented!") @@ -36,6 +31,25 @@ method modify*(self: ConcurrentDatastore, key: Key, fn: Modify): Future[?!void] method modify*(self: ConcurrentDatastore, key: Key, fn: ModifyAsync): Future[?!void] {.base, locks: "unknown".} = ## Concurrently safe way of modifying the value associated with the `key`. ## + ## Same as `modifyGet` with `fn: ModifyGetAsync` argument, but this takes `fn` that doesn't produce + ## any auxillary value. + ## + + raiseAssert("Not implemented!") + +method modifyGet*(self: ConcurrentDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, locks: "unknown".} = + ## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on + ## successful update. + ## + ## Same as `modifyGet` with `fn: ModifyGetAsync` argument, but this takes non-async `fn`. + ## + + raiseAssert("Not implemented!") + +method modifyGet*(self: ConcurrentDatastore, key: Key, fn: ModifyGetAsync): Future[?!seq[byte]] {.base, locks: "unknown".} = + ## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on + ## successful update. + ## ## This method first reads a value stored under the `key`, if such value exists it's wrapped into `some` ## and passed as the only arg to the `fn`, otherwise `none` is passed. ## @@ -44,7 +58,8 @@ method modify*(self: ConcurrentDatastore, key: Key, fn: ModifyAsync): Future[?!v ## When `fn` returns `none` existing value is deleted from the store, if no value existed before ## nothing happens. ## - ## Note that `fn` can be called multiple times (when concurrent modify of the value was detected). + ## Note that `fn` can be called multiple times (when concurrent modify of the value was detected). Only the + ## last auxillary value is returned. ## raiseAssert("Not implemented!") diff --git a/datastore/sql/sqliteds.nim b/datastore/sql/sqliteds.nim index 41a6a71..cb48e95 100644 --- a/datastore/sql/sqliteds.nim +++ b/datastore/sql/sqliteds.nim @@ -31,9 +31,10 @@ proc timestamp*(t = epochTime()): int64 = const initVersion* = 0.int64 -method modify*(self: SQLiteDatastore, key: Key, fn: ModifyAsync): Future[?!void] {.async.} = +method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGetAsync): Future[?!seq[byte]] {.async.} = var retriesLeft = 100 # allows reasonable concurrency, avoids infinite loop + aux: seq[byte] while retriesLeft > 0: var @@ -51,7 +52,7 @@ method modify*(self: SQLiteDatastore, key: Key, fn: ModifyAsync): Future[?!void] var maybeNewData: ?seq[byte] try: - maybeNewData = await fn(maybeCurrentData) + (maybeNewData, aux) = await fn(maybeCurrentData) except CatchableError as err: return failure("Error running modify function: " & err.msg) @@ -114,13 +115,35 @@ method modify*(self: SQLiteDatastore, key: Key, fn: ModifyAsync): Future[?!void] if retriesLeft == 0: return failure("Retry limit exceeded") - return success() + return success(aux) -method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.async.} = - proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte])] {.async.} = +method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.async.} = + proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} = return fn(maybeValue) - return await self.modify(key, wrappedFn) + return await self.modifyGet(key, wrappedFn) + +method modify*(self: SQLiteDatastore, key: Key, fn: ModifyAsync): Future[?!void] {.async.} = + proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} = + let res = await fn(maybeValue) + let ignoredAux = newSeq[byte]() + return (res, ignoredAux) + + if err =? (await self.modifyGet(key, wrappedFn)).errorOption: + return failure(err) + else: + return success() + +method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.async.} = + proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} = + let maybeNewValue = fn(maybeValue) + let ignoredAux = newSeq[byte]() + return (maybeNewValue, ignoredAux) + + if err =? (await self.modifyGet(key, wrappedFn)).errorOption: + return failure(err) + else: + return success() method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} = var diff --git a/tests/datastore/concurrentdstests.nim b/tests/datastore/concurrentdstests.nim index 9900072..11b6a7c 100644 --- a/tests/datastore/concurrentdstests.nim +++ b/tests/datastore/concurrentdstests.nim @@ -2,6 +2,7 @@ import std/options import std/sugar import std/random import std/sequtils +import std/strutils import pkg/asynctest import pkg/chronos @@ -79,18 +80,45 @@ proc concurrentStoreTests*( test "should put value": (await ds.delete(key)).tryGet() - (await ds.modify(key, (_: ?seq[byte]) => @(123.uint64.toBytes).some)).tryGet + proc returningSomeValue(_: ?seq[byte]): ?seq[byte] = + return @(123.uint64.toBytes).some + + (await ds.modify(key, returningSomeValue)).tryGet let finalValue = uint64.fromBytes((await ds.get(key)).tryGet) check finalValue.int == 123 test "should delete value": - let key = Key.init(Key.random).tryGet (await ds.put(key, @(0.uint64.toBytes))).tryGet - (await ds.modify(key, (_: ?seq[byte]) => seq[byte].none)).tryGet + proc returningNone(_: ?seq[byte]): ?seq[byte] = + return seq[byte].none + + (await ds.modify(key, returningNone)).tryGet let hasKey = (await ds.has(key)).tryGet check not hasKey + + test "should return correct auxillary value": + proc returningAux(_: ?seq[byte]): (?seq[byte], seq[byte]) = + return (seq[byte].none, @[byte 123]) + + let result = await ds.modifyGet(key, returningAux) + + check: + result == success(@[byte 123]) + + test "should propagate exception as failure": + proc throwing(a: ?seq[byte]): ?seq[byte] = + raise newException(CatchableError, "some error msg") + + let result = await ds.modify(key, throwing) + + if err =? result.errorOption: + check: + err.msg.contains("some error msg") + else: + # result was not an error + fail() \ No newline at end of file