mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-02 13:43:11 +00:00
Move modify methods to the Datastore interface
This commit is contained in:
parent
f8bc104ecb
commit
7594f9e769
@ -1,8 +1,7 @@
|
||||
import ./datastore/datastore
|
||||
import ./datastore/concurrentds
|
||||
import ./datastore/fsds
|
||||
import ./datastore/sql
|
||||
import ./datastore/mountedds
|
||||
import ./datastore/tieredds
|
||||
|
||||
export datastore, concurrentds, fsds, mountedds, tieredds, sql
|
||||
export datastore, fsds, mountedds, tieredds, sql
|
||||
|
||||
@ -1,49 +0,0 @@
|
||||
import pkg/chronos
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/upraises
|
||||
|
||||
import ./key
|
||||
import ./query
|
||||
import ./types
|
||||
import ./datastore
|
||||
|
||||
export key, query, types, datastore
|
||||
|
||||
push: {.upraises: [].}
|
||||
|
||||
type
|
||||
Function*[T, U] = proc(value: T): U {.upraises: [CatchableError], gcsafe, closure.}
|
||||
Modify* = Function[?seq[byte], Future[?seq[byte]]]
|
||||
ModifyGet* = Function[?seq[byte], Future[(?seq[byte], seq[byte])]]
|
||||
|
||||
method modify*(self: ConcurrentDatastore, key: Key, fn: Modify): Future[?!void] {.base, locks: "unknown".} =
|
||||
## Concurrently safe way of modifying the value associated with the `key`.
|
||||
##
|
||||
## Same as `modifyGet`, but this takes `fn` that doesn't produce any auxillary value.
|
||||
##
|
||||
|
||||
raiseAssert("Not implemented!")
|
||||
|
||||
method modifyGet*(self: ConcurrentDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, locks: "unknown".} =
|
||||
## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on
|
||||
## successful update.
|
||||
##
|
||||
## This method first reads a value stored under the `key`, if such value exists it's wrapped into `some`
|
||||
## and passed as the only arg to the `fn`, otherwise `none` is passed.
|
||||
##
|
||||
## Table below presents four possibilities of execution. `curr` represents a value passed to `fn`,
|
||||
## while `fn(curr)` represents a value returned by calling `fn` (auxillary value is omitted for clarity).
|
||||
##
|
||||
## | curr | fn(curr) | action |
|
||||
## |---------|----------|------------------------------|
|
||||
## | none | none | no action |
|
||||
## | none | some(v) | insert v |
|
||||
## | some(u) | none | delete u |
|
||||
## | some(u) | some(v) | replace u with v (if u != v) |
|
||||
##
|
||||
## Note that `fn` can be called multiple times (when concurrent modification was detected). In such case
|
||||
## only the last auxillary value is returned.
|
||||
##
|
||||
|
||||
raiseAssert("Not implemented!")
|
||||
@ -1,4 +1,5 @@
|
||||
import pkg/chronos
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/upraises
|
||||
|
||||
@ -12,6 +13,9 @@ push: {.upraises: [].}
|
||||
|
||||
type
|
||||
BatchEntry* = tuple[key: Key, data: seq[byte]]
|
||||
Function*[T, U] = proc(value: T): U {.upraises: [CatchableError], gcsafe, closure.}
|
||||
Modify* = Function[?seq[byte], Future[?seq[byte]]]
|
||||
ModifyGet* = Function[?seq[byte], Future[(?seq[byte], seq[byte])]]
|
||||
|
||||
method has*(self: Datastore, key: Key): Future[?!bool] {.base, locks: "unknown".} =
|
||||
raiseAssert("Not implemented!")
|
||||
@ -42,3 +46,34 @@ method query*(
|
||||
|
||||
proc contains*(self: Datastore, key: Key): Future[bool] {.async.} =
|
||||
return (await self.has(key)) |? false
|
||||
|
||||
method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, locks: "unknown".} =
|
||||
## Concurrently safe way of modifying the value associated with the `key`.
|
||||
##
|
||||
## Same as `modifyGet`, but this takes `fn` that doesn't produce any auxillary value.
|
||||
##
|
||||
|
||||
raiseAssert("Not implemented!")
|
||||
|
||||
method modifyGet*(self: Datastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, locks: "unknown".} =
|
||||
## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on
|
||||
## successful update.
|
||||
##
|
||||
## This method first reads a value stored under the `key`, if such value exists it's wrapped into `some`
|
||||
## and passed as the only arg to the `fn`, otherwise `none` is passed.
|
||||
##
|
||||
## Table below presents four possibilities of execution. `curr` represents a value passed to `fn`,
|
||||
## while `fn(curr)` represents a value returned by calling `fn` (auxillary value is omitted for clarity).
|
||||
##
|
||||
## | curr | fn(curr) | action |
|
||||
## |---------|----------|------------------------------|
|
||||
## | none | none | no action |
|
||||
## | none | some(v) | insert v |
|
||||
## | some(u) | none | delete u |
|
||||
## | some(u) | some(v) | replace u with v (if u != v) |
|
||||
##
|
||||
## Note that `fn` can be called multiple times (when concurrent modification was detected). In such case
|
||||
## only the last auxillary value is returned.
|
||||
##
|
||||
|
||||
raiseAssert("Not implemented!")
|
||||
63
datastore/defaultimpl.nim
Normal file
63
datastore/defaultimpl.nim
Normal file
@ -0,0 +1,63 @@
|
||||
import pkg/chronos
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
|
||||
import ./datastore
|
||||
|
||||
proc defaultModifyGetImpl*(
|
||||
self: Datastore,
|
||||
lock: AsyncLock,
|
||||
key: Key,
|
||||
fn: ModifyGet
|
||||
): Future[?!seq[byte]] {.async.} =
|
||||
# Default implementation, serializes all modify operations using provided lock
|
||||
#
|
||||
|
||||
await lock.acquire()
|
||||
|
||||
try:
|
||||
without data =? await self.get(key), err:
|
||||
if not (err of DatastoreKeyNotFound):
|
||||
return failure(err)
|
||||
|
||||
let maybeCurrentData =
|
||||
if data.len == 0:
|
||||
seq[byte].none
|
||||
else:
|
||||
data.some
|
||||
|
||||
var
|
||||
maybeNewData: ?seq[byte]
|
||||
aux: seq[byte]
|
||||
|
||||
try:
|
||||
(maybeNewData, aux) = await fn(maybeCurrentData)
|
||||
except CatchableError as err:
|
||||
return failure(err)
|
||||
|
||||
if newData =? maybeNewData:
|
||||
if err =? (await self.put(key, newData)).errorOption:
|
||||
return failure(err)
|
||||
elif currentData =? maybeCurrentData:
|
||||
if err =? (await self.delete(key)).errorOption:
|
||||
return failure(err)
|
||||
|
||||
return aux.success
|
||||
finally:
|
||||
lock.release()
|
||||
|
||||
method defaultModifyImpl*(
|
||||
self: Datastore,
|
||||
lock: AsyncLock,
|
||||
key: Key,
|
||||
fn: Modify
|
||||
): Future[?!void] {.async.} =
|
||||
proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
|
||||
let res = await fn(maybeValue)
|
||||
let ignoredAux = newSeq[byte]()
|
||||
return (res, ignoredAux)
|
||||
|
||||
if err =? (await self.defaultModifyGetImpl(lock, key, wrappedFn)).errorOption:
|
||||
return failure(err)
|
||||
else:
|
||||
return success()
|
||||
@ -8,6 +8,7 @@ import pkg/questionable/results
|
||||
from pkg/stew/results as stewResults import get, isErr
|
||||
import pkg/upraises
|
||||
|
||||
import ./defaultimpl
|
||||
import ./datastore
|
||||
|
||||
export datastore
|
||||
@ -19,6 +20,7 @@ type
|
||||
root*: string
|
||||
ignoreProtected: bool
|
||||
depth: int
|
||||
lock: AsyncLock
|
||||
|
||||
proc validDepth*(self: FSDatastore, key: Key): bool =
|
||||
key.len <= self.depth
|
||||
@ -217,6 +219,18 @@ method query*(
|
||||
iter.next = next
|
||||
return success iter
|
||||
|
||||
method modifyGet*(
|
||||
self: FSDatastore,
|
||||
key: Key,
|
||||
fn: ModifyGet): Future[?!seq[byte]] =
|
||||
defaultModifyGetImpl(self, self.lock, key, fn)
|
||||
|
||||
method modify*(
|
||||
self: FSDatastore,
|
||||
key: Key,
|
||||
fn: Modify): Future[?!void] =
|
||||
defaultModifyImpl(self, self.lock, key, fn)
|
||||
|
||||
proc new*(
|
||||
T: type FSDatastore,
|
||||
root: string,
|
||||
@ -235,4 +249,5 @@ proc new*(
|
||||
success T(
|
||||
root: root,
|
||||
ignoreProtected: ignoreProtected,
|
||||
depth: depth)
|
||||
depth: depth,
|
||||
lock: newAsyncLock())
|
||||
|
||||
@ -119,6 +119,26 @@ method put*(
|
||||
|
||||
return success()
|
||||
|
||||
method modifyGet*(
|
||||
self: MountedDatastore,
|
||||
key: Key,
|
||||
fn: ModifyGet): Future[?!seq[byte]] {.async.} =
|
||||
|
||||
without mounted =? self.dispatch(key), error:
|
||||
return failure(error)
|
||||
|
||||
return await mounted.store.store.modifyGet(mounted.relative, fn)
|
||||
|
||||
method modify*(
|
||||
self: MountedDatastore,
|
||||
key: Key,
|
||||
fn: Modify): Future[?!void] {.async.} =
|
||||
|
||||
without mounted =? self.dispatch(key), error:
|
||||
return failure(error)
|
||||
|
||||
return await mounted.store.store.modify(mounted.relative, fn)
|
||||
|
||||
method close*(self: MountedDatastore): Future[?!void] {.async.} =
|
||||
for s in self.stores.values:
|
||||
discard await s.store.close()
|
||||
|
||||
@ -8,15 +8,15 @@ import pkg/sqlite3_abi
|
||||
from pkg/stew/results as stewResults import isErr
|
||||
import pkg/upraises
|
||||
|
||||
import ../concurrentds
|
||||
import ../datastore
|
||||
import ./sqlitedsdb
|
||||
|
||||
export concurrentds, sqlitedsdb
|
||||
export datastore, sqlitedsdb
|
||||
|
||||
push: {.upraises: [].}
|
||||
|
||||
type
|
||||
SQLiteDatastore* = ref object of ConcurrentDatastore
|
||||
SQLiteDatastore* = ref object of Datastore
|
||||
readOnly: bool
|
||||
db: SQLiteDsDb
|
||||
|
||||
|
||||
@ -116,6 +116,37 @@ method put*(
|
||||
|
||||
return success()
|
||||
|
||||
method modifyGet*(
|
||||
self: TieredDatastore,
|
||||
key: Key,
|
||||
fn: ModifyGet): Future[?!seq[byte]] {.async.} =
|
||||
|
||||
let
|
||||
pending = await allFinished(self.stores.mapIt(it.modifyGet(key, fn)))
|
||||
|
||||
var aux = newSeq[byte]()
|
||||
|
||||
for fut in pending:
|
||||
if fut.read().isErr:
|
||||
return fut.read()
|
||||
else:
|
||||
aux.add(fut.read().get)
|
||||
|
||||
return success(aux)
|
||||
|
||||
method modify*(
|
||||
self: TieredDatastore,
|
||||
key: Key,
|
||||
fn: Modify): Future[?!void] {.async.} =
|
||||
|
||||
let
|
||||
pending = await allFinished(self.stores.mapIt(it.modify(key, fn)))
|
||||
|
||||
for fut in pending:
|
||||
if fut.read().isErr: return fut.read()
|
||||
|
||||
return success()
|
||||
|
||||
# method query*(
|
||||
# self: TieredDatastore,
|
||||
# query: ...): Future[?!(?...)] {.async.} =
|
||||
|
||||
@ -10,11 +10,12 @@ import pkg/stew/endians2
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
|
||||
import pkg/datastore/concurrentds
|
||||
import pkg/datastore
|
||||
|
||||
proc concurrentStoreTests*(
|
||||
ds: ConcurrentDatastore,
|
||||
key: Key) =
|
||||
proc modifyTests*(
|
||||
ds: Datastore,
|
||||
key: Key,
|
||||
multiAux: bool = false) =
|
||||
|
||||
randomize()
|
||||
|
||||
@ -105,20 +106,27 @@ proc concurrentStoreTests*(
|
||||
proc returningAux(_: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
|
||||
return (seq[byte].none, @[byte 123])
|
||||
|
||||
let result = await ds.modifyGet(key, returningAux)
|
||||
let res = await ds.modifyGet(key, returningAux)
|
||||
|
||||
check:
|
||||
result == success(@[byte 123])
|
||||
if multiAux:
|
||||
check:
|
||||
res.errorOption.map((err) => err.msg) == none(string)
|
||||
for b in res |? @[]:
|
||||
check:
|
||||
b == 123.byte
|
||||
else:
|
||||
check:
|
||||
res == success(@[byte 123])
|
||||
|
||||
test "should propagate exception as failure":
|
||||
proc throwing(a: ?seq[byte]): Future[?seq[byte]] {.async.} =
|
||||
raise newException(CatchableError, "some error msg")
|
||||
|
||||
let result = await ds.modify(key, throwing)
|
||||
let res = await ds.modify(key, throwing)
|
||||
|
||||
if err =? result.errorOption:
|
||||
if err =? res.errorOption:
|
||||
check:
|
||||
err.msg.contains("some error msg")
|
||||
else:
|
||||
# result was not an error
|
||||
fail()
|
||||
fail()
|
||||
@ -11,7 +11,7 @@ import pkg/stew/byteutils
|
||||
import pkg/datastore/sql/sqliteds
|
||||
|
||||
import ../dscommontests
|
||||
import ../concurrentdstests
|
||||
import ../modifycommontests
|
||||
import ../querycommontests
|
||||
|
||||
suite "Test Basic SQLiteDatastore":
|
||||
@ -25,7 +25,7 @@ suite "Test Basic SQLiteDatastore":
|
||||
(await ds.close()).tryGet()
|
||||
|
||||
basicStoreTests(ds, key, bytes, otherBytes)
|
||||
concurrentStoreTests(ds, key)
|
||||
modifyTests(ds, key)
|
||||
|
||||
suite "Test Read Only SQLiteDatastore":
|
||||
let
|
||||
|
||||
@ -11,6 +11,7 @@ import pkg/stew/byteutils
|
||||
import pkg/datastore/fsds
|
||||
|
||||
import ./dscommontests
|
||||
import ./modifycommontests
|
||||
import ./querycommontests
|
||||
|
||||
suite "Test Basic FSDatastore":
|
||||
@ -37,6 +38,7 @@ suite "Test Basic FSDatastore":
|
||||
require(not dirExists(basePathAbs))
|
||||
|
||||
basicStoreTests(fsStore, key, bytes, otherBytes)
|
||||
modifyTests(fsStore, key)
|
||||
|
||||
suite "Test Misc FSDatastore":
|
||||
let
|
||||
|
||||
@ -12,6 +12,7 @@ import pkg/datastore/sql
|
||||
import pkg/datastore/fsds
|
||||
|
||||
import ./dscommontests
|
||||
import ./modifycommontests
|
||||
|
||||
suite "Test Basic Mounted Datastore":
|
||||
let
|
||||
@ -50,10 +51,12 @@ suite "Test Basic Mounted Datastore":
|
||||
suite "Mounted sql":
|
||||
let namespace = Key.init(sqlKey, key).tryGet
|
||||
basicStoreTests(mountedDs, namespace, bytes, otherBytes)
|
||||
modifyTests(mountedDs, namespace)
|
||||
|
||||
suite "Mounted fs":
|
||||
let namespace = Key.init(fsKey, key).tryGet
|
||||
basicStoreTests(mountedDs, namespace, bytes, otherBytes)
|
||||
modifyTests(mountedDs, namespace)
|
||||
|
||||
suite "Test Mounted Datastore":
|
||||
|
||||
|
||||
@ -10,6 +10,7 @@ import pkg/datastore/fsds
|
||||
import pkg/datastore/sql
|
||||
import pkg/datastore/tieredds
|
||||
|
||||
import ./modifycommontests
|
||||
import ./dscommontests
|
||||
|
||||
suite "Test Basic Tired Datastore":
|
||||
@ -40,6 +41,7 @@ suite "Test Basic Tired Datastore":
|
||||
require(not dirExists(rootAbs))
|
||||
|
||||
basicStoreTests(tiredDs, key, bytes, otherBytes)
|
||||
modifyTests(tiredDs, key, multiAux = true)
|
||||
|
||||
suite "TieredDatastore":
|
||||
# assumes tests/test_all is run from project root, e.g. with `nimble test`
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user