Compare commits

..

No commits in common. "master" and "0.2.0" have entirely different histories.

11 changed files with 99 additions and 115 deletions

View File

@ -23,22 +23,16 @@ jobs:
os: windows,
shell: msys2
}
nim: [binary:2.2.4]
nim: [2.0.14]
name: ${{ matrix.platform.icon }} ${{ matrix.platform.label }} - Nim v${{ matrix.nim }}
runs-on: ${{ matrix.platform.os }}-latest
steps:
- uses: actions/checkout@v4
with:
submodules: true
- uses: iffy/install-nim@v5
- uses: iffy/install-nim@v4
with:
version: ${{ matrix.nim }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Setup CMake
uses: jwlawson/actions-setup-cmake@v2
with:
cmake-version: '3.x'
- name: Build
run: nimble install -y
- name: Test

View File

@ -1,7 +1,7 @@
mode = ScriptMode.Verbose
packageName = "datastore"
version = "0.2.1"
version = "0.2.0"
author = "Status Research & Development GmbH"
description = "Simple, unified API for multiple data stores"
license = "Apache License 2.0 or MIT"
@ -12,7 +12,7 @@ requires "nim >= 2.0.14",
"questionable >= 0.10.15 & < 0.11.0",
"sqlite3_abi >= 3.47.0.0 & < 4.0.0.0",
"leveldbstatic >= 0.2.0 & < 0.3.0",
"stew >= 0.4.0 & < 0.5.0",
"stew >= 0.2.0 & < 0.3.0",
"unittest2 >= 0.2.3 & < 0.3.0"
task coverage, "generates code coverage report":

View File

@ -16,37 +16,37 @@ type
Modify* = Function[?seq[byte], Future[?seq[byte]]]
ModifyGet* = Function[?seq[byte], Future[(?seq[byte], seq[byte])]]
method has*(self: Datastore, key: Key): Future[?!bool] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
method has*(self: Datastore, key: Key): Future[?!bool] {.base, gcsafe, locks: "unknown".} =
raiseAssert("Not implemented!")
method delete*(self: Datastore, key: Key): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
method delete*(self: Datastore, key: Key): Future[?!void] {.base, gcsafe, locks: "unknown".} =
raiseAssert("Not implemented!")
method delete*(self: Datastore, keys: seq[Key]): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
method delete*(self: Datastore, keys: seq[Key]): Future[?!void] {.base, gcsafe, locks: "unknown".} =
raiseAssert("Not implemented!")
method get*(self: Datastore, key: Key): Future[?!seq[byte]] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
method get*(self: Datastore, key: Key): Future[?!seq[byte]] {.base, gcsafe, locks: "unknown".} =
raiseAssert("Not implemented!")
method put*(self: Datastore, key: Key, data: seq[byte]): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
method put*(self: Datastore, key: Key, data: seq[byte]): Future[?!void] {.base, gcsafe, locks: "unknown".} =
raiseAssert("Not implemented!")
method put*(self: Datastore, batch: seq[BatchEntry]): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
method put*(self: Datastore, batch: seq[BatchEntry]): Future[?!void] {.base, gcsafe, locks: "unknown".} =
raiseAssert("Not implemented!")
method close*(self: Datastore): Future[?!void] {.base, async: (raises: [CancelledError]), locks: "unknown".} =
method close*(self: Datastore): Future[?!void] {.base, async, locks: "unknown".} =
raiseAssert("Not implemented!")
method query*(
self: Datastore,
query: Query): Future[?!QueryIter] {.base, gcsafe, async: (raises: [CancelledError]).} =
query: Query): Future[?!QueryIter] {.base, gcsafe.} =
raiseAssert("Not implemented!")
proc contains*(self: Datastore, key: Key): Future[bool] {.async: (raises: [CancelledError]).} =
proc contains*(self: Datastore, key: Key): Future[bool] {.async.} =
return (await self.has(key)) |? false
method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, gcsafe, locks: "unknown".} =
## Concurrently safe way of modifying the value associated with the `key`.
##
## Same as `modifyGet`, but this takes `fn` that doesn't produce any auxillary value.
@ -54,7 +54,7 @@ method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, gc
raiseAssert("Not implemented!")
method modifyGet*(self: Datastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
method modifyGet*(self: Datastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, gcsafe, locks: "unknown".} =
## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on
## successful update.
##

View File

@ -11,9 +11,10 @@ proc defaultModifyGetImpl*(
lock: AsyncLock,
key: Key,
fn: ModifyGet
): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
): Future[?!seq[byte]] {.async.} =
# Default implementation, serializes all modify operations using provided lock
#
await lock.acquire()
try:
@ -33,8 +34,6 @@ proc defaultModifyGetImpl*(
try:
(maybeNewData, aux) = await fn(maybeCurrentData)
except CancelledError as err:
raise err
except CatchableError as err:
return failure(err)
@ -47,17 +46,14 @@ proc defaultModifyGetImpl*(
return aux.success
finally:
try:
lock.release()
except AsyncLockError as err:
return failure(err)
lock.release()
proc defaultModifyImpl*(
self: Datastore,
lock: AsyncLock,
key: Key,
fn: Modify
): Future[?!void] {.async: (raises: [CancelledError]).} =
): Future[?!void] {.async.} =
proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
let res = await fn(maybeValue)
let ignoredAux = newSeq[byte]()

View File

@ -65,10 +65,10 @@ proc path*(self: FSDatastore, key: Key): ?!string =
return success fullname
method has*(self: FSDatastore, key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} =
method has*(self: FSDatastore, key: Key): Future[?!bool] {.async.} =
return self.path(key).?fileExists()
method delete*(self: FSDatastore, key: Key): Future[?!void] {.async: (raises: [CancelledError]).} =
method delete*(self: FSDatastore, key: Key): Future[?!void] {.async.} =
without path =? self.path(key), error:
return failure error
@ -82,7 +82,7 @@ method delete*(self: FSDatastore, key: Key): Future[?!void] {.async: (raises: [C
return success()
method delete*(self: FSDatastore, keys: seq[Key]): Future[?!void] {.async: (raises: [CancelledError]).} =
method delete*(self: FSDatastore, keys: seq[Key]): Future[?!void] {.async.} =
for key in keys:
if err =? (await self.delete(key)).errorOption:
return failure err
@ -119,7 +119,7 @@ proc readFile*(self: FSDatastore, path: string): ?!seq[byte] =
except CatchableError as e:
return failure e
method get*(self: FSDatastore, key: Key): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
method get*(self: FSDatastore, key: Key): Future[?!seq[byte]] {.async.} =
without path =? self.path(key), error:
return failure error
@ -132,7 +132,7 @@ method get*(self: FSDatastore, key: Key): Future[?!seq[byte]] {.async: (raises:
method put*(
self: FSDatastore,
key: Key,
data: seq[byte]): Future[?!void] {.async: (raises: [CancelledError]).} =
data: seq[byte]): Future[?!void] {.async.} =
without path =? self.path(key), error:
return failure error
@ -147,7 +147,7 @@ method put*(
method put*(
self: FSDatastore,
batch: seq[BatchEntry]): Future[?!void] {.async: (raises: [CancelledError]).} =
batch: seq[BatchEntry]): Future[?!void] {.async.} =
for entry in batch:
if err =? (await self.put(entry.key, entry.data)).errorOption:
@ -163,12 +163,12 @@ proc dirWalker(path: string): (iterator: string {.raises: [Defect], gcsafe.}) =
except CatchableError as exc:
raise newException(Defect, exc.msg)
method close*(self: FSDatastore): Future[?!void] {.async: (raises: [CancelledError]).} =
method close*(self: FSDatastore): Future[?!void] {.async.} =
return success()
method query*(
self: FSDatastore,
query: Query): Future[?!QueryIter] {.async: (raises: [CancelledError]).} =
query: Query): Future[?!QueryIter] {.async.} =
without path =? self.path(query.key), error:
return failure error
@ -189,7 +189,7 @@ method query*(
var
iter = QueryIter.new()
proc next(): Future[?!QueryResponse] {.async: (raises: [CancelledError]).} =
proc next(): Future[?!QueryResponse] {.async.} =
let
path = walker()
@ -208,13 +208,8 @@ method query*(
key = Key.init(keyPath).expect("should not fail")
data =
if query.value:
try:
self.readFile((basePath / path).absolutePath)
.expect("Should read file")
except ValueError as err:
return failure err
except OSError as err:
return failure err
self.readFile((basePath / path).absolutePath)
.expect("Should read file")
else:
@[]
@ -226,7 +221,7 @@ method query*(
method modifyGet*(
self: FSDatastore,
key: Key,
fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
fn: ModifyGet): Future[?!seq[byte]] {.async.} =
var lock: AsyncLock
try:
lock = self.locks.mgetOrPut(key, newAsyncLock())
@ -238,9 +233,8 @@ method modifyGet*(
method modify*(
self: FSDatastore,
key: Key,
fn: Modify): Future[?!void] {.async: (raises: [CancelledError]).} =
fn: Modify): Future[?!void] {.async.} =
var lock: AsyncLock
try:
lock = self.locks.mgetOrPut(key, newAsyncLock())
return await defaultModifyImpl(self, lock, key, fn)

View File

@ -20,27 +20,27 @@ type
db: LevelDb
locks: TableRef[Key, AsyncLock]
method has*(self: LevelDbDatastore, key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} =
method has*(self: LevelDbDatastore, key: Key): Future[?!bool] {.async.} =
try:
let str = self.db.get($key)
return success(str.isSome)
except LevelDbException as e:
return failure("LevelDbDatastore.has exception: " & e.msg)
method delete*(self: LevelDbDatastore, key: Key): Future[?!void] {.async: (raises: [CancelledError]).} =
method delete*(self: LevelDbDatastore, key: Key): Future[?!void] {.async.} =
try:
self.db.delete($key, sync = true)
return success()
except LevelDbException as e:
return failure("LevelDbDatastore.delete exception: " & e.msg)
method delete*(self: LevelDbDatastore, keys: seq[Key]): Future[?!void] {.async: (raises: [CancelledError]).} =
method delete*(self: LevelDbDatastore, keys: seq[Key]): Future[?!void] {.async.} =
for key in keys:
if err =? (await self.delete(key)).errorOption:
return failure(err.msg)
return success()
method get*(self: LevelDbDatastore, key: Key): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
method get*(self: LevelDbDatastore, key: Key): Future[?!seq[byte]] {.async.} =
try:
let str = self.db.get($key)
if not str.isSome:
@ -50,7 +50,7 @@ method get*(self: LevelDbDatastore, key: Key): Future[?!seq[byte]] {.async: (rai
except LevelDbException as e:
return failure("LevelDbDatastore.get exception: " & $e.msg)
method put*(self: LevelDbDatastore, key: Key, data: seq[byte]): Future[?!void] {.async: (raises: [CancelledError]).} =
method put*(self: LevelDbDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} =
try:
let str = string.fromBytes(data)
self.db.put($key, str)
@ -58,7 +58,7 @@ method put*(self: LevelDbDatastore, key: Key, data: seq[byte]): Future[?!void] {
except LevelDbException as e:
return failure("LevelDbDatastore.put exception: " & $e.msg)
method put*(self: LevelDbDatastore, batch: seq[BatchEntry]): Future[?!void] {.async: (raises: [CancelledError]).} =
method put*(self: LevelDbDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} =
try:
let b = newBatch()
for entry in batch:
@ -68,7 +68,7 @@ method put*(self: LevelDbDatastore, batch: seq[BatchEntry]): Future[?!void] {.as
except LevelDbException as e:
return failure("LevelDbDatastore.put (batch) exception: " & $e.msg)
method close*(self: LevelDbDatastore): Future[?!void] {.async: (raises: [CancelledError]).} =
method close*(self: LevelDbDatastore): Future[?!void] {.async.} =
try:
self.db.close()
return success()
@ -84,7 +84,7 @@ proc getQueryString(query: Query): string =
method query*(
self: LevelDbDatastore,
query: Query): Future[?!QueryIter] {.async: (raises: [CancelledError]), gcsafe.} =
query: Query): Future[?!QueryIter] {.async, gcsafe.} =
if not (query.sort == SortOrder.Assending):
return failure("LevelDbDatastore.query: query.sort is not SortOrder.Ascending. Unsupported.")
@ -98,7 +98,7 @@ method query*(
limit = query.limit
)
proc next(): Future[?!QueryResponse] {.async: (raises: [CancelledError]).} =
proc next(): Future[?!QueryResponse] {.async.} =
if iter.finished:
return failure(newException(QueryEndedError, "Calling next on a finished query!"))
@ -114,7 +114,7 @@ method query*(
except LevelDbException as e:
return failure("LevelDbDatastore.query -> next exception: " & $e.msg)
proc dispose(): Future[?!void] {.async: (raises: [CancelledError]).} =
proc dispose(): Future[?!void] {.async.} =
dbIter.dispose()
return success()
@ -125,7 +125,7 @@ method query*(
method modifyGet*(
self: LevelDbDatastore,
key: Key,
fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
fn: ModifyGet): Future[?!seq[byte]] {.async.} =
var lock: AsyncLock
try:
lock = self.locks.mgetOrPut(key, newAsyncLock())
@ -137,7 +137,7 @@ method modifyGet*(
method modify*(
self: LevelDbDatastore,
key: Key,
fn: Modify): Future[?!void] {.async: (raises: [CancelledError]).} =
fn: Modify): Future[?!void] {.async.} =
var lock: AsyncLock
try:
lock = self.locks.mgetOrPut(key, newAsyncLock())

View File

@ -63,7 +63,7 @@ proc dispatch(
method has*(
self: MountedDatastore,
key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} =
key: Key): Future[?!bool] {.async.} =
without mounted =? self.dispatch(key):
return failure "No mounted datastore found"
@ -72,7 +72,7 @@ method has*(
method delete*(
self: MountedDatastore,
key: Key): Future[?!void] {.async: (raises: [CancelledError]).} =
key: Key): Future[?!void] {.async.} =
without mounted =? self.dispatch(key), error:
return failure(error)
@ -81,7 +81,7 @@ method delete*(
method delete*(
self: MountedDatastore,
keys: seq[Key]): Future[?!void] {.async: (raises: [CancelledError]).} =
keys: seq[Key]): Future[?!void] {.async.} =
for key in keys:
if err =? (await self.delete(key)).errorOption:
@ -91,7 +91,7 @@ method delete*(
method get*(
self: MountedDatastore,
key: Key): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
key: Key): Future[?!seq[byte]] {.async.} =
without mounted =? self.dispatch(key), error:
return failure(error)
@ -101,7 +101,7 @@ method get*(
method put*(
self: MountedDatastore,
key: Key,
data: seq[byte]): Future[?!void] {.async: (raises: [CancelledError]).} =
data: seq[byte]): Future[?!void] {.async.} =
without mounted =? self.dispatch(key), error:
return failure(error)
@ -110,7 +110,7 @@ method put*(
method put*(
self: MountedDatastore,
batch: seq[BatchEntry]): Future[?!void] {.async: (raises: [CancelledError]).} =
batch: seq[BatchEntry]): Future[?!void] {.async.} =
for entry in batch:
if err =? (await self.put(entry.key, entry.data)).errorOption:
@ -121,7 +121,7 @@ method put*(
method modifyGet*(
self: MountedDatastore,
key: Key,
fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
fn: ModifyGet): Future[?!seq[byte]] {.async.} =
without mounted =? self.dispatch(key), error:
return failure(error)
@ -131,14 +131,14 @@ method modifyGet*(
method modify*(
self: MountedDatastore,
key: Key,
fn: Modify): Future[?!void] {.async: (raises: [CancelledError]).} =
fn: Modify): Future[?!void] {.async.} =
without mounted =? self.dispatch(key), error:
return failure(error)
return await mounted.store.store.modify(mounted.relative, fn)
method close*(self: MountedDatastore): Future[?!void] {.async: (raises: [CancelledError]).} =
method close*(self: MountedDatastore): Future[?!void] {.async.} =
for s in self.stores.values:
discard await s.store.close()

View File

@ -23,8 +23,8 @@ type
QueryResponse* = tuple[key: ?Key, data: seq[byte]]
QueryEndedError* = object of DatastoreError
GetNext* = proc(): Future[?!QueryResponse] {.async: (raises: [CancelledError]), gcsafe, closure.}
IterDispose* = proc(): Future[?!void] {.async: (raises: [CancelledError]), gcsafe.}
GetNext* = proc(): Future[?!QueryResponse] {.raises: [], gcsafe, closure.}
IterDispose* = proc(): Future[?!void] {.raises: [], gcsafe.}
QueryIter* = ref object
finished*: bool
next*: GetNext
@ -34,7 +34,7 @@ iterator items*(q: QueryIter): Future[?!QueryResponse] =
while not q.finished:
yield q.next()
proc defaultDispose(): Future[?!void] {.gcsafe, async: (raises: [CancelledError]).} =
proc defaultDispose(): Future[?!void] {.gcsafe, async.} =
return success()
proc new*(T: type QueryIter, dispose = defaultDispose): T =

View File

@ -40,7 +40,7 @@ proc newRollbackError(rbErr: ref CatchableError, opErrMsg: string): ref Rollback
proc newRollbackError(rbErr: ref CatchableError, opErr: ref CatchableError): ref RollbackError =
return newRollbackError(rbErr, opErr)
method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.async.} =
var
retriesLeft = 100 # allows reasonable concurrency, avoids infinite loop
aux: seq[byte]
@ -62,8 +62,6 @@ method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[
try:
(maybeNewData, aux) = await fn(maybeCurrentData)
except CancelledError as err:
raise err
except CatchableError as err:
return failure(err)
@ -137,7 +135,7 @@ method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[
return success(aux)
method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.async: (raises: [CancelledError]).} =
method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.async.} =
proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
let res = await fn(maybeValue)
let ignoredAux = newSeq[byte]()
@ -148,7 +146,7 @@ method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.as
else:
return success()
method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} =
method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} =
var
exists = false
@ -160,10 +158,10 @@ method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async: (raises: [
return success exists
method delete*(self: SQLiteDatastore, key: Key): Future[?!void] {.async: (raises: [CancelledError]).} =
method delete*(self: SQLiteDatastore, key: Key): Future[?!void] {.async.} =
return self.db.deleteStmt.exec((key.id))
method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async: (raises: [CancelledError]).} =
method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async.} =
if err =? self.db.beginStmt.exec().errorOption:
return failure(err)
@ -179,7 +177,7 @@ method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async: (
return success()
method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} =
# see comment in ./filesystem_datastore re: finer control of memory
# allocation in `method get`, could apply here as well if bytes were read
# incrementally with `sqlite3_blob_read`
@ -199,10 +197,10 @@ method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async: (rais
return success bytes
method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async: (raises: [CancelledError]).} =
method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} =
return self.db.putStmt.exec((key.id, data, initVersion, timestamp()))
method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.async: (raises: [CancelledError]).} =
method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} =
if err =? self.db.beginStmt.exec().errorOption:
return failure err
@ -218,14 +216,14 @@ method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.asy
return success()
method close*(self: SQLiteDatastore): Future[?!void] {.async: (raises: [CancelledError]).} =
method close*(self: SQLiteDatastore): Future[?!void] {.async.} =
self.db.close()
return success()
method query*(
self: SQLiteDatastore,
query: Query): Future[?!QueryIter] {.async: (raises: [CancelledError]).} =
query: Query): Future[?!QueryIter] {.async.} =
var
iter = QueryIter()
@ -269,7 +267,7 @@ method query*(
if not (v == SQLITE_OK):
return failure newException(DatastoreError, $sqlite3_errstr(v))
proc next(): Future[?!QueryResponse] {.async: (raises: [CancelledError]).} =
proc next(): Future[?!QueryResponse] {.async.} =
if iter.finished:
return failure(newException(QueryEndedError, "Calling next on a finished query!"))
@ -323,7 +321,7 @@ method query*(
iter.finished = true
return failure newException(DatastoreError, $sqlite3_errstr(v))
iter.dispose = proc(): Future[?!void] {.async: (raises: [CancelledError]).} =
iter.dispose = proc(): Future[?!void] {.async.} =
discard sqlite3_reset(s)
discard sqlite3_clear_bindings(s)
iter.next = nil

View File

@ -28,7 +28,7 @@ proc stores*(self: TieredDatastore): seq[Datastore] =
method has*(
self: TieredDatastore,
key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} =
key: Key): Future[?!bool] {.async.} =
for store in self.stores:
without res =? (await store.has(key)), err:
@ -41,32 +41,32 @@ method has*(
method delete*(
self: TieredDatastore,
key: Key): Future[?!void] {.async: (raises: [CancelledError]).} =
key: Key): Future[?!void] {.async.} =
let
pending = await allFinished(self.stores.mapIt(it.delete(key)))
for fut in pending:
? await fut
if fut.read().isErr: return fut.read()
return success()
method delete*(
self: TieredDatastore,
keys: seq[Key]): Future[?!void] {.async: (raises: [CancelledError]).} =
keys: seq[Key]): Future[?!void] {.async.} =
for key in keys:
let
pending = await allFinished(self.stores.mapIt(it.delete(key)))
for fut in pending:
? await fut
if fut.read().isErr: return fut.read()
return success()
method get*(
self: TieredDatastore,
key: Key): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
key: Key): Future[?!seq[byte]] {.async.} =
var
bytes: seq[byte]
@ -91,33 +91,33 @@ method get*(
method put*(
self: TieredDatastore,
key: Key,
data: seq[byte]): Future[?!void] {.async: (raises: [CancelledError]).} =
data: seq[byte]): Future[?!void] {.async.} =
let
pending = await allFinished(self.stores.mapIt(it.put(key, data)))
for fut in pending:
? await fut
if fut.read().isErr: return fut.read()
return success()
method put*(
self: TieredDatastore,
batch: seq[BatchEntry]): Future[?!void] {.async: (raises: [CancelledError]).} =
batch: seq[BatchEntry]): Future[?!void] {.async.} =
for entry in batch:
let
pending = await allFinished(self.stores.mapIt(it.put(entry.key, entry.data)))
for fut in pending:
? await fut
if fut.read().isErr: return fut.read()
return success()
method modifyGet*(
self: TieredDatastore,
key: Key,
fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
fn: ModifyGet): Future[?!seq[byte]] {.async.} =
let
pending = await allFinished(self.stores.mapIt(it.modifyGet(key, fn)))
@ -125,21 +125,23 @@ method modifyGet*(
var aux = newSeq[byte]()
for fut in pending:
let res = ? await fut
aux.add(res)
if fut.read().isErr:
return fut.read()
else:
aux.add(fut.read().get)
return success(aux)
method modify*(
self: TieredDatastore,
key: Key,
fn: Modify): Future[?!void] {.async: (raises: [CancelledError]).} =
fn: Modify): Future[?!void] {.async.} =
let
pending = await allFinished(self.stores.mapIt(it.modify(key, fn)))
for fut in pending:
? await fut
if fut.read().isErr: return fut.read()
return success()

View File

@ -47,11 +47,11 @@ type
TypedDatastore* = ref object of RootObj
ds*: Datastore
Modify*[T] = proc(v: ?T): Future[?T] {.raises: [CancelledError], gcsafe, closure.}
ModifyGet*[T, U] = proc(v: ?T): Future[(?T, U)] {.raises: [CancelledError], gcsafe, closure.}
Modify*[T] = proc(v: ?T): Future[?T] {.raises: [CatchableError], gcsafe, closure.}
ModifyGet*[T, U] = proc(v: ?T): Future[(?T, U)] {.raises: [CatchableError], gcsafe, closure.}
QueryResponse*[T] = tuple[key: ?Key, value: ?!T]
GetNext*[T] = proc(): Future[?!QueryResponse[T]] {.async: (raises: [CancelledError]), gcsafe, closure.}
GetNext*[T] = proc(): Future[?!QueryResponse[T]] {.raises: [], gcsafe, closure.}
QueryIter*[T] = ref object
finished*: bool
next*: GetNext[T]
@ -71,16 +71,16 @@ template requireEncoder*(T: typedesc): untyped =
{.error: "provide an encoder: `proc encode(a: " & $T & "): seq[byte]`".}
# Original Datastore API
proc has*(self: TypedDatastore, key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} =
proc has*(self: TypedDatastore, key: Key): Future[?!bool] {.async.} =
await self.ds.has(key)
proc contains*(self: TypedDatastore, key: Key): Future[bool] {.async: (raises: [CancelledError]).} =
proc contains*(self: TypedDatastore, key: Key): Future[bool] {.async.} =
return (await self.ds.has(key)) |? false
proc delete*(self: TypedDatastore, key: Key): Future[?!void] {.async: (raises: [CancelledError]).} =
proc delete*(self: TypedDatastore, key: Key): Future[?!void] {.async.} =
await self.ds.delete(key)
proc delete*(self: TypedDatastore, keys: seq[Key]): Future[?!void] {.async: (raises: [CancelledError]).} =
proc delete*(self: TypedDatastore, keys: seq[Key]): Future[?!void] {.async.} =
await self.ds.delete(keys)
proc close*(self: TypedDatastore): Future[?!void] {.async.} =
@ -90,19 +90,19 @@ proc close*(self: TypedDatastore): Future[?!void] {.async.} =
proc init*(T: type TypedDatastore, ds: Datastore): T =
TypedDatastore(ds: ds)
proc put*[T](self: TypedDatastore, key: Key, t: T): Future[?!void] {.async: (raises: [CancelledError]).} =
proc put*[T](self: TypedDatastore, key: Key, t: T): Future[?!void] {.async.} =
requireEncoder(T)
await self.ds.put(key, t.encode)
proc get*[T](self: TypedDatastore, key: Key): Future[?!T] {.async: (raises: [CancelledError]).} =
proc get*[T](self: TypedDatastore, key: Key): Future[?!T] {.async.} =
requireDecoder(T)
without bytes =? await self.ds.get(key), error:
return failure(error)
return T.decode(bytes)
proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] {.async: (raises: [CancelledError]).} =
proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] {.async.} =
requireDecoder(T)
requireEncoder(T)
@ -123,7 +123,7 @@ proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] {
await self.ds.modify(key, wrappedFn)
proc modifyGet*[T, U](self: TypedDatastore, key: Key, fn: ModifyGet[T, U]): Future[?!U] {.async: (raises: [CancelledError]).} =
proc modifyGet*[T, U](self: TypedDatastore, key: Key, fn: ModifyGet[T, U]): Future[?!U] {.async.} =
requireDecoder(T)
requireEncoder(T)
requireEncoder(U)
@ -153,7 +153,7 @@ proc modifyGet*[T, U](self: TypedDatastore, key: Key, fn: ModifyGet[T, U]): Futu
return U.decode(auxBytes)
proc query*[T](self: TypedDatastore, q: Query): Future[?!QueryIter[T]] {.async: (raises: [CancelledError]).} =
proc query*[T](self: TypedDatastore, q: Query): Future[?!QueryIter[T]] {.async.} =
requireDecoder(T)
without dsIter =? await self.ds.query(q), error:
@ -161,14 +161,14 @@ proc query*[T](self: TypedDatastore, q: Query): Future[?!QueryIter[T]] {.async:
return failure(childErr)
var iter = QueryIter[T]()
iter.dispose = proc (): Future[?!void] {.async: (raises: [CancelledError]).} =
iter.dispose = proc (): Future[?!void] {.async.} =
await dsIter.dispose()
if dsIter.finished:
iter.finished = true
return success(iter)
proc getNext: Future[?!QueryResponse[T]] {.async: (raises: [CancelledError]).} =
proc getNext: Future[?!QueryResponse[T]] {.async.} =
without pair =? await dsIter.next(), error:
return failure(error)