mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-03 14:13:09 +00:00
Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a7ee4b170a | ||
|
|
950990720a | ||
|
|
d583647c5b | ||
|
|
92f9533c7d | ||
|
|
9586f95ccd | ||
|
|
42e4f530bf | ||
|
|
13c0a0cdb1 |
@ -16,37 +16,37 @@ type
|
|||||||
Modify* = Function[?seq[byte], Future[?seq[byte]]]
|
Modify* = Function[?seq[byte], Future[?seq[byte]]]
|
||||||
ModifyGet* = Function[?seq[byte], Future[(?seq[byte], seq[byte])]]
|
ModifyGet* = Function[?seq[byte], Future[(?seq[byte], seq[byte])]]
|
||||||
|
|
||||||
method has*(self: Datastore, key: Key): Future[?!bool] {.base, gcsafe, locks: "unknown".} =
|
method has*(self: Datastore, key: Key): Future[?!bool] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
|
||||||
raiseAssert("Not implemented!")
|
raiseAssert("Not implemented!")
|
||||||
|
|
||||||
method delete*(self: Datastore, key: Key): Future[?!void] {.base, gcsafe, locks: "unknown".} =
|
method delete*(self: Datastore, key: Key): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
|
||||||
raiseAssert("Not implemented!")
|
raiseAssert("Not implemented!")
|
||||||
|
|
||||||
method delete*(self: Datastore, keys: seq[Key]): Future[?!void] {.base, gcsafe, locks: "unknown".} =
|
method delete*(self: Datastore, keys: seq[Key]): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
|
||||||
raiseAssert("Not implemented!")
|
raiseAssert("Not implemented!")
|
||||||
|
|
||||||
method get*(self: Datastore, key: Key): Future[?!seq[byte]] {.base, gcsafe, locks: "unknown".} =
|
method get*(self: Datastore, key: Key): Future[?!seq[byte]] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
|
||||||
raiseAssert("Not implemented!")
|
raiseAssert("Not implemented!")
|
||||||
|
|
||||||
method put*(self: Datastore, key: Key, data: seq[byte]): Future[?!void] {.base, gcsafe, locks: "unknown".} =
|
method put*(self: Datastore, key: Key, data: seq[byte]): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
|
||||||
raiseAssert("Not implemented!")
|
raiseAssert("Not implemented!")
|
||||||
|
|
||||||
method put*(self: Datastore, batch: seq[BatchEntry]): Future[?!void] {.base, gcsafe, locks: "unknown".} =
|
method put*(self: Datastore, batch: seq[BatchEntry]): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
|
||||||
raiseAssert("Not implemented!")
|
raiseAssert("Not implemented!")
|
||||||
|
|
||||||
method close*(self: Datastore): Future[?!void] {.base, async, locks: "unknown".} =
|
method close*(self: Datastore): Future[?!void] {.base, async: (raises: [CancelledError]), locks: "unknown".} =
|
||||||
raiseAssert("Not implemented!")
|
raiseAssert("Not implemented!")
|
||||||
|
|
||||||
method query*(
|
method query*(
|
||||||
self: Datastore,
|
self: Datastore,
|
||||||
query: Query): Future[?!QueryIter] {.base, gcsafe.} =
|
query: Query): Future[?!QueryIter] {.base, gcsafe, async: (raises: [CancelledError]).} =
|
||||||
|
|
||||||
raiseAssert("Not implemented!")
|
raiseAssert("Not implemented!")
|
||||||
|
|
||||||
proc contains*(self: Datastore, key: Key): Future[bool] {.async.} =
|
proc contains*(self: Datastore, key: Key): Future[bool] {.async: (raises: [CancelledError]).} =
|
||||||
return (await self.has(key)) |? false
|
return (await self.has(key)) |? false
|
||||||
|
|
||||||
method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, gcsafe, locks: "unknown".} =
|
method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
|
||||||
## Concurrently safe way of modifying the value associated with the `key`.
|
## Concurrently safe way of modifying the value associated with the `key`.
|
||||||
##
|
##
|
||||||
## Same as `modifyGet`, but this takes `fn` that doesn't produce any auxillary value.
|
## Same as `modifyGet`, but this takes `fn` that doesn't produce any auxillary value.
|
||||||
@ -54,7 +54,7 @@ method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, gc
|
|||||||
|
|
||||||
raiseAssert("Not implemented!")
|
raiseAssert("Not implemented!")
|
||||||
|
|
||||||
method modifyGet*(self: Datastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, gcsafe, locks: "unknown".} =
|
method modifyGet*(self: Datastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
|
||||||
## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on
|
## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on
|
||||||
## successful update.
|
## successful update.
|
||||||
##
|
##
|
||||||
|
|||||||
@ -11,10 +11,9 @@ proc defaultModifyGetImpl*(
|
|||||||
lock: AsyncLock,
|
lock: AsyncLock,
|
||||||
key: Key,
|
key: Key,
|
||||||
fn: ModifyGet
|
fn: ModifyGet
|
||||||
): Future[?!seq[byte]] {.async.} =
|
): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
|
||||||
# Default implementation, serializes all modify operations using provided lock
|
# Default implementation, serializes all modify operations using provided lock
|
||||||
#
|
#
|
||||||
|
|
||||||
await lock.acquire()
|
await lock.acquire()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -34,6 +33,8 @@ proc defaultModifyGetImpl*(
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
(maybeNewData, aux) = await fn(maybeCurrentData)
|
(maybeNewData, aux) = await fn(maybeCurrentData)
|
||||||
|
except CancelledError as err:
|
||||||
|
raise err
|
||||||
except CatchableError as err:
|
except CatchableError as err:
|
||||||
return failure(err)
|
return failure(err)
|
||||||
|
|
||||||
@ -46,14 +47,17 @@ proc defaultModifyGetImpl*(
|
|||||||
|
|
||||||
return aux.success
|
return aux.success
|
||||||
finally:
|
finally:
|
||||||
lock.release()
|
try:
|
||||||
|
lock.release()
|
||||||
|
except AsyncLockError as err:
|
||||||
|
return failure(err)
|
||||||
|
|
||||||
proc defaultModifyImpl*(
|
proc defaultModifyImpl*(
|
||||||
self: Datastore,
|
self: Datastore,
|
||||||
lock: AsyncLock,
|
lock: AsyncLock,
|
||||||
key: Key,
|
key: Key,
|
||||||
fn: Modify
|
fn: Modify
|
||||||
): Future[?!void] {.async.} =
|
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
|
proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
|
||||||
let res = await fn(maybeValue)
|
let res = await fn(maybeValue)
|
||||||
let ignoredAux = newSeq[byte]()
|
let ignoredAux = newSeq[byte]()
|
||||||
|
|||||||
@ -65,10 +65,10 @@ proc path*(self: FSDatastore, key: Key): ?!string =
|
|||||||
|
|
||||||
return success fullname
|
return success fullname
|
||||||
|
|
||||||
method has*(self: FSDatastore, key: Key): Future[?!bool] {.async.} =
|
method has*(self: FSDatastore, key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} =
|
||||||
return self.path(key).?fileExists()
|
return self.path(key).?fileExists()
|
||||||
|
|
||||||
method delete*(self: FSDatastore, key: Key): Future[?!void] {.async.} =
|
method delete*(self: FSDatastore, key: Key): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
without path =? self.path(key), error:
|
without path =? self.path(key), error:
|
||||||
return failure error
|
return failure error
|
||||||
|
|
||||||
@ -82,7 +82,7 @@ method delete*(self: FSDatastore, key: Key): Future[?!void] {.async.} =
|
|||||||
|
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
method delete*(self: FSDatastore, keys: seq[Key]): Future[?!void] {.async.} =
|
method delete*(self: FSDatastore, keys: seq[Key]): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
for key in keys:
|
for key in keys:
|
||||||
if err =? (await self.delete(key)).errorOption:
|
if err =? (await self.delete(key)).errorOption:
|
||||||
return failure err
|
return failure err
|
||||||
@ -119,7 +119,7 @@ proc readFile*(self: FSDatastore, path: string): ?!seq[byte] =
|
|||||||
except CatchableError as e:
|
except CatchableError as e:
|
||||||
return failure e
|
return failure e
|
||||||
|
|
||||||
method get*(self: FSDatastore, key: Key): Future[?!seq[byte]] {.async.} =
|
method get*(self: FSDatastore, key: Key): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
|
||||||
without path =? self.path(key), error:
|
without path =? self.path(key), error:
|
||||||
return failure error
|
return failure error
|
||||||
|
|
||||||
@ -132,7 +132,7 @@ method get*(self: FSDatastore, key: Key): Future[?!seq[byte]] {.async.} =
|
|||||||
method put*(
|
method put*(
|
||||||
self: FSDatastore,
|
self: FSDatastore,
|
||||||
key: Key,
|
key: Key,
|
||||||
data: seq[byte]): Future[?!void] {.async.} =
|
data: seq[byte]): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
|
|
||||||
without path =? self.path(key), error:
|
without path =? self.path(key), error:
|
||||||
return failure error
|
return failure error
|
||||||
@ -147,7 +147,7 @@ method put*(
|
|||||||
|
|
||||||
method put*(
|
method put*(
|
||||||
self: FSDatastore,
|
self: FSDatastore,
|
||||||
batch: seq[BatchEntry]): Future[?!void] {.async.} =
|
batch: seq[BatchEntry]): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
|
|
||||||
for entry in batch:
|
for entry in batch:
|
||||||
if err =? (await self.put(entry.key, entry.data)).errorOption:
|
if err =? (await self.put(entry.key, entry.data)).errorOption:
|
||||||
@ -163,12 +163,12 @@ proc dirWalker(path: string): (iterator: string {.raises: [Defect], gcsafe.}) =
|
|||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
raise newException(Defect, exc.msg)
|
raise newException(Defect, exc.msg)
|
||||||
|
|
||||||
method close*(self: FSDatastore): Future[?!void] {.async.} =
|
method close*(self: FSDatastore): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
method query*(
|
method query*(
|
||||||
self: FSDatastore,
|
self: FSDatastore,
|
||||||
query: Query): Future[?!QueryIter] {.async.} =
|
query: Query): Future[?!QueryIter] {.async: (raises: [CancelledError]).} =
|
||||||
|
|
||||||
without path =? self.path(query.key), error:
|
without path =? self.path(query.key), error:
|
||||||
return failure error
|
return failure error
|
||||||
@ -189,7 +189,7 @@ method query*(
|
|||||||
var
|
var
|
||||||
iter = QueryIter.new()
|
iter = QueryIter.new()
|
||||||
|
|
||||||
proc next(): Future[?!QueryResponse] {.async.} =
|
proc next(): Future[?!QueryResponse] {.async: (raises: [CancelledError]).} =
|
||||||
let
|
let
|
||||||
path = walker()
|
path = walker()
|
||||||
|
|
||||||
@ -208,8 +208,13 @@ method query*(
|
|||||||
key = Key.init(keyPath).expect("should not fail")
|
key = Key.init(keyPath).expect("should not fail")
|
||||||
data =
|
data =
|
||||||
if query.value:
|
if query.value:
|
||||||
self.readFile((basePath / path).absolutePath)
|
try:
|
||||||
.expect("Should read file")
|
self.readFile((basePath / path).absolutePath)
|
||||||
|
.expect("Should read file")
|
||||||
|
except ValueError as err:
|
||||||
|
return failure err
|
||||||
|
except OSError as err:
|
||||||
|
return failure err
|
||||||
else:
|
else:
|
||||||
@[]
|
@[]
|
||||||
|
|
||||||
@ -221,7 +226,7 @@ method query*(
|
|||||||
method modifyGet*(
|
method modifyGet*(
|
||||||
self: FSDatastore,
|
self: FSDatastore,
|
||||||
key: Key,
|
key: Key,
|
||||||
fn: ModifyGet): Future[?!seq[byte]] {.async.} =
|
fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
|
||||||
var lock: AsyncLock
|
var lock: AsyncLock
|
||||||
try:
|
try:
|
||||||
lock = self.locks.mgetOrPut(key, newAsyncLock())
|
lock = self.locks.mgetOrPut(key, newAsyncLock())
|
||||||
@ -233,8 +238,9 @@ method modifyGet*(
|
|||||||
method modify*(
|
method modify*(
|
||||||
self: FSDatastore,
|
self: FSDatastore,
|
||||||
key: Key,
|
key: Key,
|
||||||
fn: Modify): Future[?!void] {.async.} =
|
fn: Modify): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
var lock: AsyncLock
|
var lock: AsyncLock
|
||||||
|
|
||||||
try:
|
try:
|
||||||
lock = self.locks.mgetOrPut(key, newAsyncLock())
|
lock = self.locks.mgetOrPut(key, newAsyncLock())
|
||||||
return await defaultModifyImpl(self, lock, key, fn)
|
return await defaultModifyImpl(self, lock, key, fn)
|
||||||
|
|||||||
@ -20,27 +20,27 @@ type
|
|||||||
db: LevelDb
|
db: LevelDb
|
||||||
locks: TableRef[Key, AsyncLock]
|
locks: TableRef[Key, AsyncLock]
|
||||||
|
|
||||||
method has*(self: LevelDbDatastore, key: Key): Future[?!bool] {.async.} =
|
method has*(self: LevelDbDatastore, key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} =
|
||||||
try:
|
try:
|
||||||
let str = self.db.get($key)
|
let str = self.db.get($key)
|
||||||
return success(str.isSome)
|
return success(str.isSome)
|
||||||
except LevelDbException as e:
|
except LevelDbException as e:
|
||||||
return failure("LevelDbDatastore.has exception: " & e.msg)
|
return failure("LevelDbDatastore.has exception: " & e.msg)
|
||||||
|
|
||||||
method delete*(self: LevelDbDatastore, key: Key): Future[?!void] {.async.} =
|
method delete*(self: LevelDbDatastore, key: Key): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
try:
|
try:
|
||||||
self.db.delete($key, sync = true)
|
self.db.delete($key, sync = true)
|
||||||
return success()
|
return success()
|
||||||
except LevelDbException as e:
|
except LevelDbException as e:
|
||||||
return failure("LevelDbDatastore.delete exception: " & e.msg)
|
return failure("LevelDbDatastore.delete exception: " & e.msg)
|
||||||
|
|
||||||
method delete*(self: LevelDbDatastore, keys: seq[Key]): Future[?!void] {.async.} =
|
method delete*(self: LevelDbDatastore, keys: seq[Key]): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
for key in keys:
|
for key in keys:
|
||||||
if err =? (await self.delete(key)).errorOption:
|
if err =? (await self.delete(key)).errorOption:
|
||||||
return failure(err.msg)
|
return failure(err.msg)
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
method get*(self: LevelDbDatastore, key: Key): Future[?!seq[byte]] {.async.} =
|
method get*(self: LevelDbDatastore, key: Key): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
|
||||||
try:
|
try:
|
||||||
let str = self.db.get($key)
|
let str = self.db.get($key)
|
||||||
if not str.isSome:
|
if not str.isSome:
|
||||||
@ -50,7 +50,7 @@ method get*(self: LevelDbDatastore, key: Key): Future[?!seq[byte]] {.async.} =
|
|||||||
except LevelDbException as e:
|
except LevelDbException as e:
|
||||||
return failure("LevelDbDatastore.get exception: " & $e.msg)
|
return failure("LevelDbDatastore.get exception: " & $e.msg)
|
||||||
|
|
||||||
method put*(self: LevelDbDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} =
|
method put*(self: LevelDbDatastore, key: Key, data: seq[byte]): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
try:
|
try:
|
||||||
let str = string.fromBytes(data)
|
let str = string.fromBytes(data)
|
||||||
self.db.put($key, str)
|
self.db.put($key, str)
|
||||||
@ -58,7 +58,7 @@ method put*(self: LevelDbDatastore, key: Key, data: seq[byte]): Future[?!void] {
|
|||||||
except LevelDbException as e:
|
except LevelDbException as e:
|
||||||
return failure("LevelDbDatastore.put exception: " & $e.msg)
|
return failure("LevelDbDatastore.put exception: " & $e.msg)
|
||||||
|
|
||||||
method put*(self: LevelDbDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} =
|
method put*(self: LevelDbDatastore, batch: seq[BatchEntry]): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
try:
|
try:
|
||||||
let b = newBatch()
|
let b = newBatch()
|
||||||
for entry in batch:
|
for entry in batch:
|
||||||
@ -68,7 +68,7 @@ method put*(self: LevelDbDatastore, batch: seq[BatchEntry]): Future[?!void] {.as
|
|||||||
except LevelDbException as e:
|
except LevelDbException as e:
|
||||||
return failure("LevelDbDatastore.put (batch) exception: " & $e.msg)
|
return failure("LevelDbDatastore.put (batch) exception: " & $e.msg)
|
||||||
|
|
||||||
method close*(self: LevelDbDatastore): Future[?!void] {.async.} =
|
method close*(self: LevelDbDatastore): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
try:
|
try:
|
||||||
self.db.close()
|
self.db.close()
|
||||||
return success()
|
return success()
|
||||||
@ -84,7 +84,7 @@ proc getQueryString(query: Query): string =
|
|||||||
|
|
||||||
method query*(
|
method query*(
|
||||||
self: LevelDbDatastore,
|
self: LevelDbDatastore,
|
||||||
query: Query): Future[?!QueryIter] {.async, gcsafe.} =
|
query: Query): Future[?!QueryIter] {.async: (raises: [CancelledError]), gcsafe.} =
|
||||||
|
|
||||||
if not (query.sort == SortOrder.Assending):
|
if not (query.sort == SortOrder.Assending):
|
||||||
return failure("LevelDbDatastore.query: query.sort is not SortOrder.Ascending. Unsupported.")
|
return failure("LevelDbDatastore.query: query.sort is not SortOrder.Ascending. Unsupported.")
|
||||||
@ -98,7 +98,7 @@ method query*(
|
|||||||
limit = query.limit
|
limit = query.limit
|
||||||
)
|
)
|
||||||
|
|
||||||
proc next(): Future[?!QueryResponse] {.async.} =
|
proc next(): Future[?!QueryResponse] {.async: (raises: [CancelledError]).} =
|
||||||
if iter.finished:
|
if iter.finished:
|
||||||
return failure(newException(QueryEndedError, "Calling next on a finished query!"))
|
return failure(newException(QueryEndedError, "Calling next on a finished query!"))
|
||||||
|
|
||||||
@ -114,7 +114,7 @@ method query*(
|
|||||||
except LevelDbException as e:
|
except LevelDbException as e:
|
||||||
return failure("LevelDbDatastore.query -> next exception: " & $e.msg)
|
return failure("LevelDbDatastore.query -> next exception: " & $e.msg)
|
||||||
|
|
||||||
proc dispose(): Future[?!void] {.async.} =
|
proc dispose(): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
dbIter.dispose()
|
dbIter.dispose()
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
@ -125,7 +125,7 @@ method query*(
|
|||||||
method modifyGet*(
|
method modifyGet*(
|
||||||
self: LevelDbDatastore,
|
self: LevelDbDatastore,
|
||||||
key: Key,
|
key: Key,
|
||||||
fn: ModifyGet): Future[?!seq[byte]] {.async.} =
|
fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
|
||||||
var lock: AsyncLock
|
var lock: AsyncLock
|
||||||
try:
|
try:
|
||||||
lock = self.locks.mgetOrPut(key, newAsyncLock())
|
lock = self.locks.mgetOrPut(key, newAsyncLock())
|
||||||
@ -137,7 +137,7 @@ method modifyGet*(
|
|||||||
method modify*(
|
method modify*(
|
||||||
self: LevelDbDatastore,
|
self: LevelDbDatastore,
|
||||||
key: Key,
|
key: Key,
|
||||||
fn: Modify): Future[?!void] {.async.} =
|
fn: Modify): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
var lock: AsyncLock
|
var lock: AsyncLock
|
||||||
try:
|
try:
|
||||||
lock = self.locks.mgetOrPut(key, newAsyncLock())
|
lock = self.locks.mgetOrPut(key, newAsyncLock())
|
||||||
|
|||||||
@ -63,7 +63,7 @@ proc dispatch(
|
|||||||
|
|
||||||
method has*(
|
method has*(
|
||||||
self: MountedDatastore,
|
self: MountedDatastore,
|
||||||
key: Key): Future[?!bool] {.async.} =
|
key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} =
|
||||||
|
|
||||||
without mounted =? self.dispatch(key):
|
without mounted =? self.dispatch(key):
|
||||||
return failure "No mounted datastore found"
|
return failure "No mounted datastore found"
|
||||||
@ -72,7 +72,7 @@ method has*(
|
|||||||
|
|
||||||
method delete*(
|
method delete*(
|
||||||
self: MountedDatastore,
|
self: MountedDatastore,
|
||||||
key: Key): Future[?!void] {.async.} =
|
key: Key): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
|
|
||||||
without mounted =? self.dispatch(key), error:
|
without mounted =? self.dispatch(key), error:
|
||||||
return failure(error)
|
return failure(error)
|
||||||
@ -81,7 +81,7 @@ method delete*(
|
|||||||
|
|
||||||
method delete*(
|
method delete*(
|
||||||
self: MountedDatastore,
|
self: MountedDatastore,
|
||||||
keys: seq[Key]): Future[?!void] {.async.} =
|
keys: seq[Key]): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
|
|
||||||
for key in keys:
|
for key in keys:
|
||||||
if err =? (await self.delete(key)).errorOption:
|
if err =? (await self.delete(key)).errorOption:
|
||||||
@ -91,7 +91,7 @@ method delete*(
|
|||||||
|
|
||||||
method get*(
|
method get*(
|
||||||
self: MountedDatastore,
|
self: MountedDatastore,
|
||||||
key: Key): Future[?!seq[byte]] {.async.} =
|
key: Key): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
|
||||||
|
|
||||||
without mounted =? self.dispatch(key), error:
|
without mounted =? self.dispatch(key), error:
|
||||||
return failure(error)
|
return failure(error)
|
||||||
@ -101,7 +101,7 @@ method get*(
|
|||||||
method put*(
|
method put*(
|
||||||
self: MountedDatastore,
|
self: MountedDatastore,
|
||||||
key: Key,
|
key: Key,
|
||||||
data: seq[byte]): Future[?!void] {.async.} =
|
data: seq[byte]): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
|
|
||||||
without mounted =? self.dispatch(key), error:
|
without mounted =? self.dispatch(key), error:
|
||||||
return failure(error)
|
return failure(error)
|
||||||
@ -110,7 +110,7 @@ method put*(
|
|||||||
|
|
||||||
method put*(
|
method put*(
|
||||||
self: MountedDatastore,
|
self: MountedDatastore,
|
||||||
batch: seq[BatchEntry]): Future[?!void] {.async.} =
|
batch: seq[BatchEntry]): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
|
|
||||||
for entry in batch:
|
for entry in batch:
|
||||||
if err =? (await self.put(entry.key, entry.data)).errorOption:
|
if err =? (await self.put(entry.key, entry.data)).errorOption:
|
||||||
@ -121,7 +121,7 @@ method put*(
|
|||||||
method modifyGet*(
|
method modifyGet*(
|
||||||
self: MountedDatastore,
|
self: MountedDatastore,
|
||||||
key: Key,
|
key: Key,
|
||||||
fn: ModifyGet): Future[?!seq[byte]] {.async.} =
|
fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
|
||||||
|
|
||||||
without mounted =? self.dispatch(key), error:
|
without mounted =? self.dispatch(key), error:
|
||||||
return failure(error)
|
return failure(error)
|
||||||
@ -131,14 +131,14 @@ method modifyGet*(
|
|||||||
method modify*(
|
method modify*(
|
||||||
self: MountedDatastore,
|
self: MountedDatastore,
|
||||||
key: Key,
|
key: Key,
|
||||||
fn: Modify): Future[?!void] {.async.} =
|
fn: Modify): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
|
|
||||||
without mounted =? self.dispatch(key), error:
|
without mounted =? self.dispatch(key), error:
|
||||||
return failure(error)
|
return failure(error)
|
||||||
|
|
||||||
return await mounted.store.store.modify(mounted.relative, fn)
|
return await mounted.store.store.modify(mounted.relative, fn)
|
||||||
|
|
||||||
method close*(self: MountedDatastore): Future[?!void] {.async.} =
|
method close*(self: MountedDatastore): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
for s in self.stores.values:
|
for s in self.stores.values:
|
||||||
discard await s.store.close()
|
discard await s.store.close()
|
||||||
|
|
||||||
|
|||||||
@ -23,8 +23,8 @@ type
|
|||||||
QueryResponse* = tuple[key: ?Key, data: seq[byte]]
|
QueryResponse* = tuple[key: ?Key, data: seq[byte]]
|
||||||
QueryEndedError* = object of DatastoreError
|
QueryEndedError* = object of DatastoreError
|
||||||
|
|
||||||
GetNext* = proc(): Future[?!QueryResponse] {.raises: [], gcsafe, closure.}
|
GetNext* = proc(): Future[?!QueryResponse] {.async: (raises: [CancelledError]), gcsafe, closure.}
|
||||||
IterDispose* = proc(): Future[?!void] {.raises: [], gcsafe.}
|
IterDispose* = proc(): Future[?!void] {.async: (raises: [CancelledError]), gcsafe.}
|
||||||
QueryIter* = ref object
|
QueryIter* = ref object
|
||||||
finished*: bool
|
finished*: bool
|
||||||
next*: GetNext
|
next*: GetNext
|
||||||
@ -34,7 +34,7 @@ iterator items*(q: QueryIter): Future[?!QueryResponse] =
|
|||||||
while not q.finished:
|
while not q.finished:
|
||||||
yield q.next()
|
yield q.next()
|
||||||
|
|
||||||
proc defaultDispose(): Future[?!void] {.gcsafe, async.} =
|
proc defaultDispose(): Future[?!void] {.gcsafe, async: (raises: [CancelledError]).} =
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
proc new*(T: type QueryIter, dispose = defaultDispose): T =
|
proc new*(T: type QueryIter, dispose = defaultDispose): T =
|
||||||
|
|||||||
@ -40,7 +40,7 @@ proc newRollbackError(rbErr: ref CatchableError, opErrMsg: string): ref Rollback
|
|||||||
proc newRollbackError(rbErr: ref CatchableError, opErr: ref CatchableError): ref RollbackError =
|
proc newRollbackError(rbErr: ref CatchableError, opErr: ref CatchableError): ref RollbackError =
|
||||||
return newRollbackError(rbErr, opErr)
|
return newRollbackError(rbErr, opErr)
|
||||||
|
|
||||||
method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.async.} =
|
method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
|
||||||
var
|
var
|
||||||
retriesLeft = 100 # allows reasonable concurrency, avoids infinite loop
|
retriesLeft = 100 # allows reasonable concurrency, avoids infinite loop
|
||||||
aux: seq[byte]
|
aux: seq[byte]
|
||||||
@ -62,6 +62,8 @@ method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
(maybeNewData, aux) = await fn(maybeCurrentData)
|
(maybeNewData, aux) = await fn(maybeCurrentData)
|
||||||
|
except CancelledError as err:
|
||||||
|
raise err
|
||||||
except CatchableError as err:
|
except CatchableError as err:
|
||||||
return failure(err)
|
return failure(err)
|
||||||
|
|
||||||
@ -135,7 +137,7 @@ method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[
|
|||||||
return success(aux)
|
return success(aux)
|
||||||
|
|
||||||
|
|
||||||
method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.async.} =
|
method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
|
proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
|
||||||
let res = await fn(maybeValue)
|
let res = await fn(maybeValue)
|
||||||
let ignoredAux = newSeq[byte]()
|
let ignoredAux = newSeq[byte]()
|
||||||
@ -146,7 +148,7 @@ method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.as
|
|||||||
else:
|
else:
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} =
|
method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} =
|
||||||
var
|
var
|
||||||
exists = false
|
exists = false
|
||||||
|
|
||||||
@ -158,10 +160,10 @@ method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} =
|
|||||||
|
|
||||||
return success exists
|
return success exists
|
||||||
|
|
||||||
method delete*(self: SQLiteDatastore, key: Key): Future[?!void] {.async.} =
|
method delete*(self: SQLiteDatastore, key: Key): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
return self.db.deleteStmt.exec((key.id))
|
return self.db.deleteStmt.exec((key.id))
|
||||||
|
|
||||||
method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async.} =
|
method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
if err =? self.db.beginStmt.exec().errorOption:
|
if err =? self.db.beginStmt.exec().errorOption:
|
||||||
return failure(err)
|
return failure(err)
|
||||||
|
|
||||||
@ -177,7 +179,7 @@ method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async.}
|
|||||||
|
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} =
|
method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
|
||||||
# see comment in ./filesystem_datastore re: finer control of memory
|
# see comment in ./filesystem_datastore re: finer control of memory
|
||||||
# allocation in `method get`, could apply here as well if bytes were read
|
# allocation in `method get`, could apply here as well if bytes were read
|
||||||
# incrementally with `sqlite3_blob_read`
|
# incrementally with `sqlite3_blob_read`
|
||||||
@ -197,10 +199,10 @@ method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} =
|
|||||||
|
|
||||||
return success bytes
|
return success bytes
|
||||||
|
|
||||||
method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} =
|
method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
return self.db.putStmt.exec((key.id, data, initVersion, timestamp()))
|
return self.db.putStmt.exec((key.id, data, initVersion, timestamp()))
|
||||||
|
|
||||||
method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} =
|
method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
if err =? self.db.beginStmt.exec().errorOption:
|
if err =? self.db.beginStmt.exec().errorOption:
|
||||||
return failure err
|
return failure err
|
||||||
|
|
||||||
@ -216,14 +218,14 @@ method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.asy
|
|||||||
|
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
method close*(self: SQLiteDatastore): Future[?!void] {.async.} =
|
method close*(self: SQLiteDatastore): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
self.db.close()
|
self.db.close()
|
||||||
|
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
method query*(
|
method query*(
|
||||||
self: SQLiteDatastore,
|
self: SQLiteDatastore,
|
||||||
query: Query): Future[?!QueryIter] {.async.} =
|
query: Query): Future[?!QueryIter] {.async: (raises: [CancelledError]).} =
|
||||||
|
|
||||||
var
|
var
|
||||||
iter = QueryIter()
|
iter = QueryIter()
|
||||||
@ -267,7 +269,7 @@ method query*(
|
|||||||
if not (v == SQLITE_OK):
|
if not (v == SQLITE_OK):
|
||||||
return failure newException(DatastoreError, $sqlite3_errstr(v))
|
return failure newException(DatastoreError, $sqlite3_errstr(v))
|
||||||
|
|
||||||
proc next(): Future[?!QueryResponse] {.async.} =
|
proc next(): Future[?!QueryResponse] {.async: (raises: [CancelledError]).} =
|
||||||
if iter.finished:
|
if iter.finished:
|
||||||
return failure(newException(QueryEndedError, "Calling next on a finished query!"))
|
return failure(newException(QueryEndedError, "Calling next on a finished query!"))
|
||||||
|
|
||||||
@ -321,7 +323,7 @@ method query*(
|
|||||||
iter.finished = true
|
iter.finished = true
|
||||||
return failure newException(DatastoreError, $sqlite3_errstr(v))
|
return failure newException(DatastoreError, $sqlite3_errstr(v))
|
||||||
|
|
||||||
iter.dispose = proc(): Future[?!void] {.async.} =
|
iter.dispose = proc(): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
discard sqlite3_reset(s)
|
discard sqlite3_reset(s)
|
||||||
discard sqlite3_clear_bindings(s)
|
discard sqlite3_clear_bindings(s)
|
||||||
iter.next = nil
|
iter.next = nil
|
||||||
|
|||||||
@ -28,7 +28,7 @@ proc stores*(self: TieredDatastore): seq[Datastore] =
|
|||||||
|
|
||||||
method has*(
|
method has*(
|
||||||
self: TieredDatastore,
|
self: TieredDatastore,
|
||||||
key: Key): Future[?!bool] {.async.} =
|
key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} =
|
||||||
|
|
||||||
for store in self.stores:
|
for store in self.stores:
|
||||||
without res =? (await store.has(key)), err:
|
without res =? (await store.has(key)), err:
|
||||||
@ -41,32 +41,32 @@ method has*(
|
|||||||
|
|
||||||
method delete*(
|
method delete*(
|
||||||
self: TieredDatastore,
|
self: TieredDatastore,
|
||||||
key: Key): Future[?!void] {.async.} =
|
key: Key): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
|
|
||||||
let
|
let
|
||||||
pending = await allFinished(self.stores.mapIt(it.delete(key)))
|
pending = await allFinished(self.stores.mapIt(it.delete(key)))
|
||||||
|
|
||||||
for fut in pending:
|
for fut in pending:
|
||||||
if fut.read().isErr: return fut.read()
|
? await fut
|
||||||
|
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
method delete*(
|
method delete*(
|
||||||
self: TieredDatastore,
|
self: TieredDatastore,
|
||||||
keys: seq[Key]): Future[?!void] {.async.} =
|
keys: seq[Key]): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
|
|
||||||
for key in keys:
|
for key in keys:
|
||||||
let
|
let
|
||||||
pending = await allFinished(self.stores.mapIt(it.delete(key)))
|
pending = await allFinished(self.stores.mapIt(it.delete(key)))
|
||||||
|
|
||||||
for fut in pending:
|
for fut in pending:
|
||||||
if fut.read().isErr: return fut.read()
|
? await fut
|
||||||
|
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
method get*(
|
method get*(
|
||||||
self: TieredDatastore,
|
self: TieredDatastore,
|
||||||
key: Key): Future[?!seq[byte]] {.async.} =
|
key: Key): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
|
||||||
|
|
||||||
var
|
var
|
||||||
bytes: seq[byte]
|
bytes: seq[byte]
|
||||||
@ -91,33 +91,33 @@ method get*(
|
|||||||
method put*(
|
method put*(
|
||||||
self: TieredDatastore,
|
self: TieredDatastore,
|
||||||
key: Key,
|
key: Key,
|
||||||
data: seq[byte]): Future[?!void] {.async.} =
|
data: seq[byte]): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
|
|
||||||
let
|
let
|
||||||
pending = await allFinished(self.stores.mapIt(it.put(key, data)))
|
pending = await allFinished(self.stores.mapIt(it.put(key, data)))
|
||||||
|
|
||||||
for fut in pending:
|
for fut in pending:
|
||||||
if fut.read().isErr: return fut.read()
|
? await fut
|
||||||
|
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
method put*(
|
method put*(
|
||||||
self: TieredDatastore,
|
self: TieredDatastore,
|
||||||
batch: seq[BatchEntry]): Future[?!void] {.async.} =
|
batch: seq[BatchEntry]): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
|
|
||||||
for entry in batch:
|
for entry in batch:
|
||||||
let
|
let
|
||||||
pending = await allFinished(self.stores.mapIt(it.put(entry.key, entry.data)))
|
pending = await allFinished(self.stores.mapIt(it.put(entry.key, entry.data)))
|
||||||
|
|
||||||
for fut in pending:
|
for fut in pending:
|
||||||
if fut.read().isErr: return fut.read()
|
? await fut
|
||||||
|
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
method modifyGet*(
|
method modifyGet*(
|
||||||
self: TieredDatastore,
|
self: TieredDatastore,
|
||||||
key: Key,
|
key: Key,
|
||||||
fn: ModifyGet): Future[?!seq[byte]] {.async.} =
|
fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
|
||||||
|
|
||||||
let
|
let
|
||||||
pending = await allFinished(self.stores.mapIt(it.modifyGet(key, fn)))
|
pending = await allFinished(self.stores.mapIt(it.modifyGet(key, fn)))
|
||||||
@ -125,23 +125,21 @@ method modifyGet*(
|
|||||||
var aux = newSeq[byte]()
|
var aux = newSeq[byte]()
|
||||||
|
|
||||||
for fut in pending:
|
for fut in pending:
|
||||||
if fut.read().isErr:
|
let res = ? await fut
|
||||||
return fut.read()
|
aux.add(res)
|
||||||
else:
|
|
||||||
aux.add(fut.read().get)
|
|
||||||
|
|
||||||
return success(aux)
|
return success(aux)
|
||||||
|
|
||||||
method modify*(
|
method modify*(
|
||||||
self: TieredDatastore,
|
self: TieredDatastore,
|
||||||
key: Key,
|
key: Key,
|
||||||
fn: Modify): Future[?!void] {.async.} =
|
fn: Modify): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
|
|
||||||
let
|
let
|
||||||
pending = await allFinished(self.stores.mapIt(it.modify(key, fn)))
|
pending = await allFinished(self.stores.mapIt(it.modify(key, fn)))
|
||||||
|
|
||||||
for fut in pending:
|
for fut in pending:
|
||||||
if fut.read().isErr: return fut.read()
|
? await fut
|
||||||
|
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
|
|||||||
@ -47,11 +47,11 @@ type
|
|||||||
TypedDatastore* = ref object of RootObj
|
TypedDatastore* = ref object of RootObj
|
||||||
ds*: Datastore
|
ds*: Datastore
|
||||||
|
|
||||||
Modify*[T] = proc(v: ?T): Future[?T] {.raises: [CatchableError], gcsafe, closure.}
|
Modify*[T] = proc(v: ?T): Future[?T] {.raises: [CancelledError], gcsafe, closure.}
|
||||||
ModifyGet*[T, U] = proc(v: ?T): Future[(?T, U)] {.raises: [CatchableError], gcsafe, closure.}
|
ModifyGet*[T, U] = proc(v: ?T): Future[(?T, U)] {.raises: [CancelledError], gcsafe, closure.}
|
||||||
|
|
||||||
QueryResponse*[T] = tuple[key: ?Key, value: ?!T]
|
QueryResponse*[T] = tuple[key: ?Key, value: ?!T]
|
||||||
GetNext*[T] = proc(): Future[?!QueryResponse[T]] {.raises: [], gcsafe, closure.}
|
GetNext*[T] = proc(): Future[?!QueryResponse[T]] {.async: (raises: [CancelledError]), gcsafe, closure.}
|
||||||
QueryIter*[T] = ref object
|
QueryIter*[T] = ref object
|
||||||
finished*: bool
|
finished*: bool
|
||||||
next*: GetNext[T]
|
next*: GetNext[T]
|
||||||
@ -71,16 +71,16 @@ template requireEncoder*(T: typedesc): untyped =
|
|||||||
{.error: "provide an encoder: `proc encode(a: " & $T & "): seq[byte]`".}
|
{.error: "provide an encoder: `proc encode(a: " & $T & "): seq[byte]`".}
|
||||||
|
|
||||||
# Original Datastore API
|
# Original Datastore API
|
||||||
proc has*(self: TypedDatastore, key: Key): Future[?!bool] {.async.} =
|
proc has*(self: TypedDatastore, key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} =
|
||||||
await self.ds.has(key)
|
await self.ds.has(key)
|
||||||
|
|
||||||
proc contains*(self: TypedDatastore, key: Key): Future[bool] {.async.} =
|
proc contains*(self: TypedDatastore, key: Key): Future[bool] {.async: (raises: [CancelledError]).} =
|
||||||
return (await self.ds.has(key)) |? false
|
return (await self.ds.has(key)) |? false
|
||||||
|
|
||||||
proc delete*(self: TypedDatastore, key: Key): Future[?!void] {.async.} =
|
proc delete*(self: TypedDatastore, key: Key): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
await self.ds.delete(key)
|
await self.ds.delete(key)
|
||||||
|
|
||||||
proc delete*(self: TypedDatastore, keys: seq[Key]): Future[?!void] {.async.} =
|
proc delete*(self: TypedDatastore, keys: seq[Key]): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
await self.ds.delete(keys)
|
await self.ds.delete(keys)
|
||||||
|
|
||||||
proc close*(self: TypedDatastore): Future[?!void] {.async.} =
|
proc close*(self: TypedDatastore): Future[?!void] {.async.} =
|
||||||
@ -90,19 +90,19 @@ proc close*(self: TypedDatastore): Future[?!void] {.async.} =
|
|||||||
proc init*(T: type TypedDatastore, ds: Datastore): T =
|
proc init*(T: type TypedDatastore, ds: Datastore): T =
|
||||||
TypedDatastore(ds: ds)
|
TypedDatastore(ds: ds)
|
||||||
|
|
||||||
proc put*[T](self: TypedDatastore, key: Key, t: T): Future[?!void] {.async.} =
|
proc put*[T](self: TypedDatastore, key: Key, t: T): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
requireEncoder(T)
|
requireEncoder(T)
|
||||||
|
|
||||||
await self.ds.put(key, t.encode)
|
await self.ds.put(key, t.encode)
|
||||||
|
|
||||||
proc get*[T](self: TypedDatastore, key: Key): Future[?!T] {.async.} =
|
proc get*[T](self: TypedDatastore, key: Key): Future[?!T] {.async: (raises: [CancelledError]).} =
|
||||||
requireDecoder(T)
|
requireDecoder(T)
|
||||||
|
|
||||||
without bytes =? await self.ds.get(key), error:
|
without bytes =? await self.ds.get(key), error:
|
||||||
return failure(error)
|
return failure(error)
|
||||||
return T.decode(bytes)
|
return T.decode(bytes)
|
||||||
|
|
||||||
proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] {.async.} =
|
proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
requireDecoder(T)
|
requireDecoder(T)
|
||||||
requireEncoder(T)
|
requireEncoder(T)
|
||||||
|
|
||||||
@ -123,7 +123,7 @@ proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] {
|
|||||||
|
|
||||||
await self.ds.modify(key, wrappedFn)
|
await self.ds.modify(key, wrappedFn)
|
||||||
|
|
||||||
proc modifyGet*[T, U](self: TypedDatastore, key: Key, fn: ModifyGet[T, U]): Future[?!U] {.async.} =
|
proc modifyGet*[T, U](self: TypedDatastore, key: Key, fn: ModifyGet[T, U]): Future[?!U] {.async: (raises: [CancelledError]).} =
|
||||||
requireDecoder(T)
|
requireDecoder(T)
|
||||||
requireEncoder(T)
|
requireEncoder(T)
|
||||||
requireEncoder(U)
|
requireEncoder(U)
|
||||||
@ -153,7 +153,7 @@ proc modifyGet*[T, U](self: TypedDatastore, key: Key, fn: ModifyGet[T, U]): Futu
|
|||||||
|
|
||||||
return U.decode(auxBytes)
|
return U.decode(auxBytes)
|
||||||
|
|
||||||
proc query*[T](self: TypedDatastore, q: Query): Future[?!QueryIter[T]] {.async.} =
|
proc query*[T](self: TypedDatastore, q: Query): Future[?!QueryIter[T]] {.async: (raises: [CancelledError]).} =
|
||||||
requireDecoder(T)
|
requireDecoder(T)
|
||||||
|
|
||||||
without dsIter =? await self.ds.query(q), error:
|
without dsIter =? await self.ds.query(q), error:
|
||||||
@ -161,14 +161,14 @@ proc query*[T](self: TypedDatastore, q: Query): Future[?!QueryIter[T]] {.async.}
|
|||||||
return failure(childErr)
|
return failure(childErr)
|
||||||
|
|
||||||
var iter = QueryIter[T]()
|
var iter = QueryIter[T]()
|
||||||
iter.dispose = proc (): Future[?!void] {.async.} =
|
iter.dispose = proc (): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
await dsIter.dispose()
|
await dsIter.dispose()
|
||||||
|
|
||||||
if dsIter.finished:
|
if dsIter.finished:
|
||||||
iter.finished = true
|
iter.finished = true
|
||||||
return success(iter)
|
return success(iter)
|
||||||
|
|
||||||
proc getNext: Future[?!QueryResponse[T]] {.async.} =
|
proc getNext: Future[?!QueryResponse[T]] {.async: (raises: [CancelledError]).} =
|
||||||
without pair =? await dsIter.next(), error:
|
without pair =? await dsIter.next(), error:
|
||||||
return failure(error)
|
return failure(error)
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user