ModifyGet operations

This commit is contained in:
Tomasz Bekas 2023-11-21 19:51:23 +01:00
parent 3770d0d1a0
commit 4c5ced6633
3 changed files with 86 additions and 20 deletions

View File

@ -13,22 +13,17 @@ export key, query, types, datastore
push: {.upraises: [].}
type
Function*[T, U] = proc(value: T): U {.upraises: [], gcsafe, closure.}
Function*[T, U] = proc(value: T): U {.upraises: [CatchableError], gcsafe, closure.}
Modify* = Function[?seq[byte], ?seq[byte]]
ModifyGet* = Function[?seq[byte], (?seq[byte], seq[byte])]
ModifyAsync* = Function[?seq[byte], Future[?seq[byte]]]
ModifyGetAsync* = Function[?seq[byte], Future[(?seq[byte], seq[byte])]]
method modify*(self: ConcurrentDatastore, key: Key, fn: Modify): Future[?!void] {.base, locks: "unknown".} =
## Concurrently safe way of modifying the value associated with the `key`.
##
## This method first reads a value stored under the `key`, if such value exists it's wrapped into `some`
## and passed as the only arg to the `fn`, otherwise `none` is passed.
##
## When `fn` returns `some`, returned value is put into the store, but only if it's different than
## the existing value, otherwise nothing happens.
## When `fn` returns `none` existing value is deleted from the store, if no value existed before
## nothing happens.
##
## Note that `fn` can be called multiple times (when concurrent modify of the value was detected).
## Same as `modifyGet` with `fn: ModifyGetAsync` argument, but this takes non-async `fn` that doesn't
## produce any auxillary value.
##
raiseAssert("Not implemented!")
@ -36,6 +31,25 @@ method modify*(self: ConcurrentDatastore, key: Key, fn: Modify): Future[?!void]
method modify*(self: ConcurrentDatastore, key: Key, fn: ModifyAsync): Future[?!void] {.base, locks: "unknown".} =
## Concurrently safe way of modifying the value associated with the `key`.
##
## Same as `modifyGet` with `fn: ModifyGetAsync` argument, but this takes `fn` that doesn't produce
## any auxillary value.
##
raiseAssert("Not implemented!")
method modifyGet*(self: ConcurrentDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, locks: "unknown".} =
## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on
## successful update.
##
## Same as `modifyGet` with `fn: ModifyGetAsync` argument, but this takes non-async `fn`.
##
raiseAssert("Not implemented!")
method modifyGet*(self: ConcurrentDatastore, key: Key, fn: ModifyGetAsync): Future[?!seq[byte]] {.base, locks: "unknown".} =
## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on
## successful update.
##
## This method first reads a value stored under the `key`, if such value exists it's wrapped into `some`
## and passed as the only arg to the `fn`, otherwise `none` is passed.
##
@ -44,7 +58,8 @@ method modify*(self: ConcurrentDatastore, key: Key, fn: ModifyAsync): Future[?!v
## When `fn` returns `none` existing value is deleted from the store, if no value existed before
## nothing happens.
##
## Note that `fn` can be called multiple times (when concurrent modify of the value was detected).
## Note that `fn` can be called multiple times (when concurrent modify of the value was detected). Only the
## last auxillary value is returned.
##
raiseAssert("Not implemented!")

View File

@ -31,9 +31,10 @@ proc timestamp*(t = epochTime()): int64 =
const initVersion* = 0.int64
method modify*(self: SQLiteDatastore, key: Key, fn: ModifyAsync): Future[?!void] {.async.} =
method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGetAsync): Future[?!seq[byte]] {.async.} =
var
retriesLeft = 100 # allows reasonable concurrency, avoids infinite loop
aux: seq[byte]
while retriesLeft > 0:
var
@ -51,7 +52,7 @@ method modify*(self: SQLiteDatastore, key: Key, fn: ModifyAsync): Future[?!void]
var maybeNewData: ?seq[byte]
try:
maybeNewData = await fn(maybeCurrentData)
(maybeNewData, aux) = await fn(maybeCurrentData)
except CatchableError as err:
return failure("Error running modify function: " & err.msg)
@ -114,13 +115,35 @@ method modify*(self: SQLiteDatastore, key: Key, fn: ModifyAsync): Future[?!void]
if retriesLeft == 0:
return failure("Retry limit exceeded")
return success()
return success(aux)
method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.async.} =
proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte])] {.async.} =
method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.async.} =
proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
return fn(maybeValue)
return await self.modify(key, wrappedFn)
return await self.modifyGet(key, wrappedFn)
method modify*(self: SQLiteDatastore, key: Key, fn: ModifyAsync): Future[?!void] {.async.} =
proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
let res = await fn(maybeValue)
let ignoredAux = newSeq[byte]()
return (res, ignoredAux)
if err =? (await self.modifyGet(key, wrappedFn)).errorOption:
return failure(err)
else:
return success()
method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.async.} =
proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
let maybeNewValue = fn(maybeValue)
let ignoredAux = newSeq[byte]()
return (maybeNewValue, ignoredAux)
if err =? (await self.modifyGet(key, wrappedFn)).errorOption:
return failure(err)
else:
return success()
method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} =
var

View File

@ -2,6 +2,7 @@ import std/options
import std/sugar
import std/random
import std/sequtils
import std/strutils
import pkg/asynctest
import pkg/chronos
@ -79,18 +80,45 @@ proc concurrentStoreTests*(
test "should put value":
(await ds.delete(key)).tryGet()
(await ds.modify(key, (_: ?seq[byte]) => @(123.uint64.toBytes).some)).tryGet
proc returningSomeValue(_: ?seq[byte]): ?seq[byte] =
return @(123.uint64.toBytes).some
(await ds.modify(key, returningSomeValue)).tryGet
let finalValue = uint64.fromBytes((await ds.get(key)).tryGet)
check finalValue.int == 123
test "should delete value":
let key = Key.init(Key.random).tryGet
(await ds.put(key, @(0.uint64.toBytes))).tryGet
(await ds.modify(key, (_: ?seq[byte]) => seq[byte].none)).tryGet
proc returningNone(_: ?seq[byte]): ?seq[byte] =
return seq[byte].none
(await ds.modify(key, returningNone)).tryGet
let hasKey = (await ds.has(key)).tryGet
check not hasKey
test "should return correct auxillary value":
proc returningAux(_: ?seq[byte]): (?seq[byte], seq[byte]) =
return (seq[byte].none, @[byte 123])
let result = await ds.modifyGet(key, returningAux)
check:
result == success(@[byte 123])
test "should propagate exception as failure":
proc throwing(a: ?seq[byte]): ?seq[byte] =
raise newException(CatchableError, "some error msg")
let result = await ds.modify(key, throwing)
if err =? result.errorOption:
check:
err.msg.contains("some error msg")
else:
# result was not an error
fail()