Move modify methods to the Datastore interface

This commit is contained in:
Tomasz Bekas 2023-12-19 15:49:44 +01:00
parent a62c91bb6b
commit 3d901cb658
No known key found for this signature in database
GPG Key ID: 4854E04C98824959
13 changed files with 196 additions and 67 deletions

View File

@ -1,8 +1,7 @@
import ./datastore/datastore
import ./datastore/concurrentds
import ./datastore/fsds
import ./datastore/sql
import ./datastore/mountedds
import ./datastore/tieredds
export datastore, concurrentds, fsds, mountedds, tieredds, sql
export datastore, fsds, mountedds, tieredds, sql

View File

@ -1,49 +0,0 @@
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/upraises
import ./key
import ./query
import ./types
import ./datastore
export key, query, types, datastore
push: {.upraises: [].}
type
Function*[T, U] = proc(value: T): U {.upraises: [CatchableError], gcsafe, closure.}
Modify* = Function[?seq[byte], Future[?seq[byte]]]
ModifyGet* = Function[?seq[byte], Future[(?seq[byte], seq[byte])]]
method modify*(self: ConcurrentDatastore, key: Key, fn: Modify): Future[?!void] {.base, locks: "unknown".} =
## Concurrently safe way of modifying the value associated with the `key`.
##
## Same as `modifyGet`, but this takes `fn` that doesn't produce any auxillary value.
##
raiseAssert("Not implemented!")
method modifyGet*(self: ConcurrentDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, locks: "unknown".} =
## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on
## successful update.
##
## This method first reads a value stored under the `key`, if such value exists it's wrapped into `some`
## and passed as the only arg to the `fn`, otherwise `none` is passed.
##
## Table below presents four possibilities of execution. `curr` represents a value passed to `fn`,
## while `fn(curr)` represents a value returned by calling `fn` (auxillary value is omitted for clarity).
##
## | curr | fn(curr) | action |
## |---------|----------|------------------------------|
## | none | none | no action |
## | none | some(v) | insert v |
## | some(u) | none | delete u |
## | some(u) | some(v) | replace u with v (if u != v) |
##
## Note that `fn` can be called multiple times (when concurrent modification was detected). In such case
## only the last auxillary value is returned.
##
raiseAssert("Not implemented!")

View File

@ -1,4 +1,5 @@
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/upraises
@ -12,6 +13,9 @@ push: {.upraises: [].}
type
BatchEntry* = tuple[key: Key, data: seq[byte]]
Function*[T, U] = proc(value: T): U {.upraises: [CatchableError], gcsafe, closure.}
Modify* = Function[?seq[byte], Future[?seq[byte]]]
ModifyGet* = Function[?seq[byte], Future[(?seq[byte], seq[byte])]]
method has*(self: Datastore, key: Key): Future[?!bool] {.base, locks: "unknown".} =
raiseAssert("Not implemented!")
@ -42,3 +46,34 @@ method query*(
proc contains*(self: Datastore, key: Key): Future[bool] {.async.} =
return (await self.has(key)) |? false
method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, locks: "unknown".} =
## Concurrently safe way of modifying the value associated with the `key`.
##
## Same as `modifyGet`, but this takes `fn` that doesn't produce any auxillary value.
##
raiseAssert("Not implemented!")
method modifyGet*(self: Datastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, locks: "unknown".} =
## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on
## successful update.
##
## This method first reads a value stored under the `key`, if such value exists it's wrapped into `some`
## and passed as the only arg to the `fn`, otherwise `none` is passed.
##
## Table below presents four possibilities of execution. `curr` represents a value passed to `fn`,
## while `fn(curr)` represents a value returned by calling `fn` (auxillary value is omitted for clarity).
##
## | curr | fn(curr) | action |
## |---------|----------|------------------------------|
## | none | none | no action |
## | none | some(v) | insert v |
## | some(u) | none | delete u |
## | some(u) | some(v) | replace u with v (if u != v) |
##
## Note that `fn` can be called multiple times (when concurrent modification was detected). In such case
## only the last auxillary value is returned.
##
raiseAssert("Not implemented!")

63
datastore/defaultimpl.nim Normal file
View File

@ -0,0 +1,63 @@
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import ./datastore
proc defaultModifyGetImpl*(
self: Datastore,
lock: AsyncLock,
key: Key,
fn: ModifyGet
): Future[?!seq[byte]] {.async.} =
# Default implementation, serializes all modify operations using provided lock
#
await lock.acquire()
try:
without data =? await self.get(key), err:
if not (err of DatastoreKeyNotFound):
return failure(err)
let maybeCurrentData =
if data.len == 0:
seq[byte].none
else:
data.some
var
maybeNewData: ?seq[byte]
aux: seq[byte]
try:
(maybeNewData, aux) = await fn(maybeCurrentData)
except CatchableError as err:
return failure(err)
if newData =? maybeNewData:
if err =? (await self.put(key, newData)).errorOption:
return failure(err)
elif currentData =? maybeCurrentData:
if err =? (await self.delete(key)).errorOption:
return failure(err)
return aux.success
finally:
lock.release()
method defaultModifyImpl*(
self: Datastore,
lock: AsyncLock,
key: Key,
fn: Modify
): Future[?!void] {.async.} =
proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
let res = await fn(maybeValue)
let ignoredAux = newSeq[byte]()
return (res, ignoredAux)
if err =? (await self.defaultModifyGetImpl(lock, key, wrappedFn)).errorOption:
return failure(err)
else:
return success()

View File

@ -8,6 +8,7 @@ import pkg/questionable/results
from pkg/stew/results as stewResults import get, isErr
import pkg/upraises
import ./defaultimpl
import ./datastore
export datastore
@ -19,6 +20,7 @@ type
root*: string
ignoreProtected: bool
depth: int
lock: AsyncLock
proc validDepth*(self: FSDatastore, key: Key): bool =
key.len <= self.depth
@ -217,6 +219,18 @@ method query*(
iter.next = next
return success iter
method modifyGet*(
self: FSDatastore,
key: Key,
fn: ModifyGet): Future[?!seq[byte]] =
defaultModifyGetImpl(self, self.lock, key, fn)
method modify*(
self: FSDatastore,
key: Key,
fn: Modify): Future[?!void] =
defaultModifyImpl(self, self.lock, key, fn)
proc new*(
T: type FSDatastore,
root: string,
@ -235,4 +249,5 @@ proc new*(
success T(
root: root,
ignoreProtected: ignoreProtected,
depth: depth)
depth: depth,
lock: newAsyncLock())

View File

@ -119,6 +119,26 @@ method put*(
return success()
method modifyGet*(
self: MountedDatastore,
key: Key,
fn: ModifyGet): Future[?!seq[byte]] {.async.} =
without mounted =? self.dispatch(key), error:
return failure(error)
return await mounted.store.store.modifyGet(mounted.relative, fn)
method modify*(
self: MountedDatastore,
key: Key,
fn: Modify): Future[?!void] {.async.} =
without mounted =? self.dispatch(key), error:
return failure(error)
return await mounted.store.store.modify(mounted.relative, fn)
method close*(self: MountedDatastore): Future[?!void] {.async.} =
for s in self.stores.values:
discard await s.store.close()

View File

@ -8,15 +8,15 @@ import pkg/sqlite3_abi
from pkg/stew/results as stewResults import isErr
import pkg/upraises
import ../concurrentds
import ../datastore
import ./sqlitedsdb
export concurrentds, sqlitedsdb
export datastore, sqlitedsdb
push: {.upraises: [].}
type
SQLiteDatastore* = ref object of ConcurrentDatastore
SQLiteDatastore* = ref object of Datastore
readOnly: bool
db: SQLiteDsDb

View File

@ -116,6 +116,37 @@ method put*(
return success()
method modifyGet*(
self: TieredDatastore,
key: Key,
fn: ModifyGet): Future[?!seq[byte]] {.async.} =
let
pending = await allFinished(self.stores.mapIt(it.modifyGet(key, fn)))
var aux = newSeq[byte]()
for fut in pending:
if fut.read().isErr:
return fut.read()
else:
aux.add(fut.read().get)
return success(aux)
method modify*(
self: TieredDatastore,
key: Key,
fn: Modify): Future[?!void] {.async.} =
let
pending = await allFinished(self.stores.mapIt(it.modify(key, fn)))
for fut in pending:
if fut.read().isErr: return fut.read()
return success()
# method query*(
# self: TieredDatastore,
# query: ...): Future[?!(?...)] {.async.} =

View File

@ -10,11 +10,12 @@ import pkg/stew/endians2
import pkg/questionable
import pkg/questionable/results
import pkg/datastore/concurrentds
import pkg/datastore
proc concurrentStoreTests*(
ds: ConcurrentDatastore,
key: Key) =
proc modifyTests*(
ds: Datastore,
key: Key,
multiAux: bool = false) =
randomize()
@ -105,20 +106,27 @@ proc concurrentStoreTests*(
proc returningAux(_: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
return (seq[byte].none, @[byte 123])
let result = await ds.modifyGet(key, returningAux)
let res = await ds.modifyGet(key, returningAux)
check:
result == success(@[byte 123])
if multiAux:
check:
res.errorOption.map((err) => err.msg) == none(string)
for b in res |? @[]:
check:
b == 123.byte
else:
check:
res == success(@[byte 123])
test "should propagate exception as failure":
proc throwing(a: ?seq[byte]): Future[?seq[byte]] {.async.} =
raise newException(CatchableError, "some error msg")
let result = await ds.modify(key, throwing)
let res = await ds.modify(key, throwing)
if err =? result.errorOption:
if err =? res.errorOption:
check:
err.msg.contains("some error msg")
else:
# result was not an error
fail()
fail()

View File

@ -11,7 +11,7 @@ import pkg/stew/byteutils
import pkg/datastore/sql/sqliteds
import ../dscommontests
import ../concurrentdstests
import ../modifycommontests
import ../querycommontests
suite "Test Basic SQLiteDatastore":
@ -25,7 +25,7 @@ suite "Test Basic SQLiteDatastore":
(await ds.close()).tryGet()
basicStoreTests(ds, key, bytes, otherBytes)
concurrentStoreTests(ds, key)
modifyTests(ds, key)
suite "Test Read Only SQLiteDatastore":
let

View File

@ -11,6 +11,7 @@ import pkg/stew/byteutils
import pkg/datastore/fsds
import ./dscommontests
import ./modifycommontests
import ./querycommontests
suite "Test Basic FSDatastore":
@ -37,6 +38,7 @@ suite "Test Basic FSDatastore":
require(not dirExists(basePathAbs))
basicStoreTests(fsStore, key, bytes, otherBytes)
modifyTests(fsStore, key)
suite "Test Misc FSDatastore":
let

View File

@ -12,6 +12,7 @@ import pkg/datastore/sql
import pkg/datastore/fsds
import ./dscommontests
import ./modifycommontests
suite "Test Basic Mounted Datastore":
let
@ -50,10 +51,12 @@ suite "Test Basic Mounted Datastore":
suite "Mounted sql":
let namespace = Key.init(sqlKey, key).tryGet
basicStoreTests(mountedDs, namespace, bytes, otherBytes)
modifyTests(mountedDs, namespace)
suite "Mounted fs":
let namespace = Key.init(fsKey, key).tryGet
basicStoreTests(mountedDs, namespace, bytes, otherBytes)
modifyTests(mountedDs, namespace)
suite "Test Mounted Datastore":

View File

@ -10,6 +10,7 @@ import pkg/datastore/fsds
import pkg/datastore/sql
import pkg/datastore/tieredds
import ./modifycommontests
import ./dscommontests
suite "Test Basic Tired Datastore":
@ -40,6 +41,7 @@ suite "Test Basic Tired Datastore":
require(not dirExists(rootAbs))
basicStoreTests(tiredDs, key, bytes, otherBytes)
modifyTests(tiredDs, key, multiAux = true)
suite "TieredDatastore":
# assumes tests/test_all is run from project root, e.g. with `nimble test`