mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-05 23:23:10 +00:00
Add errors to raises async pragam
This commit is contained in:
parent
86044f819b
commit
0092a796ea
@ -34,19 +34,19 @@ method put*(self: Datastore, key: Key, data: seq[byte]): Future[?!void] {.base,
|
|||||||
method put*(self: Datastore, batch: seq[BatchEntry]): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
|
method put*(self: Datastore, batch: seq[BatchEntry]): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
|
||||||
raiseAssert("Not implemented!")
|
raiseAssert("Not implemented!")
|
||||||
|
|
||||||
method close*(self: Datastore): Future[?!void] {.base, async, locks: "unknown".} =
|
method close*(self: Datastore): Future[?!void] {.base, async: (raises: [CancelledError]), locks: "unknown".} =
|
||||||
raiseAssert("Not implemented!")
|
raiseAssert("Not implemented!")
|
||||||
|
|
||||||
method query*(
|
method query*(
|
||||||
self: Datastore,
|
self: Datastore,
|
||||||
query: Query): Future[?!QueryIter] {.base, gcsafe.} =
|
query: Query): Future[?!QueryIter] {.base, gcsafe, async: (raises: [CancelledError]).} =
|
||||||
|
|
||||||
raiseAssert("Not implemented!")
|
raiseAssert("Not implemented!")
|
||||||
|
|
||||||
proc contains*(self: Datastore, key: Key): Future[bool] {.async: (raises: [CancelledError]).} =
|
proc contains*(self: Datastore, key: Key): Future[bool] {.async: (raises: [CancelledError]).} =
|
||||||
return (await self.has(key)) |? false
|
return (await self.has(key)) |? false
|
||||||
|
|
||||||
method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, gcsafe, locks: "unknown".} =
|
method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError, AsyncLockError]), locks: "unknown".} =
|
||||||
## Concurrently safe way of modifying the value associated with the `key`.
|
## Concurrently safe way of modifying the value associated with the `key`.
|
||||||
##
|
##
|
||||||
## Same as `modifyGet`, but this takes `fn` that doesn't produce any auxillary value.
|
## Same as `modifyGet`, but this takes `fn` that doesn't produce any auxillary value.
|
||||||
@ -54,7 +54,7 @@ method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, gc
|
|||||||
|
|
||||||
raiseAssert("Not implemented!")
|
raiseAssert("Not implemented!")
|
||||||
|
|
||||||
method modifyGet*(self: Datastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, gcsafe, locks: "unknown".} =
|
method modifyGet*(self: Datastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, gcsafe, async: (raises: [CancelledError, AsyncLockError]), locks: "unknown".} =
|
||||||
## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on
|
## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on
|
||||||
## successful update.
|
## successful update.
|
||||||
##
|
##
|
||||||
|
|||||||
@ -11,10 +11,9 @@ proc defaultModifyGetImpl*(
|
|||||||
lock: AsyncLock,
|
lock: AsyncLock,
|
||||||
key: Key,
|
key: Key,
|
||||||
fn: ModifyGet
|
fn: ModifyGet
|
||||||
): Future[?!seq[byte]] {.async.} =
|
): Future[?!seq[byte]] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||||
# Default implementation, serializes all modify operations using provided lock
|
# Default implementation, serializes all modify operations using provided lock
|
||||||
#
|
#
|
||||||
|
|
||||||
await lock.acquire()
|
await lock.acquire()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -53,7 +52,7 @@ proc defaultModifyImpl*(
|
|||||||
lock: AsyncLock,
|
lock: AsyncLock,
|
||||||
key: Key,
|
key: Key,
|
||||||
fn: Modify
|
fn: Modify
|
||||||
): Future[?!void] {.async.} =
|
): Future[?!void] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||||
proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
|
proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
|
||||||
let res = await fn(maybeValue)
|
let res = await fn(maybeValue)
|
||||||
let ignoredAux = newSeq[byte]()
|
let ignoredAux = newSeq[byte]()
|
||||||
|
|||||||
@ -163,12 +163,12 @@ proc dirWalker(path: string): (iterator: string {.raises: [Defect], gcsafe.}) =
|
|||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
raise newException(Defect, exc.msg)
|
raise newException(Defect, exc.msg)
|
||||||
|
|
||||||
method close*(self: FSDatastore): Future[?!void] {.async.} =
|
method close*(self: FSDatastore): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
method query*(
|
method query*(
|
||||||
self: FSDatastore,
|
self: FSDatastore,
|
||||||
query: Query): Future[?!QueryIter] {.async.} =
|
query: Query): Future[?!QueryIter] {.async: (raises: [CancelledError]).} =
|
||||||
|
|
||||||
without path =? self.path(query.key), error:
|
without path =? self.path(query.key), error:
|
||||||
return failure error
|
return failure error
|
||||||
@ -221,7 +221,7 @@ method query*(
|
|||||||
method modifyGet*(
|
method modifyGet*(
|
||||||
self: FSDatastore,
|
self: FSDatastore,
|
||||||
key: Key,
|
key: Key,
|
||||||
fn: ModifyGet): Future[?!seq[byte]] {.async.} =
|
fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||||
var lock: AsyncLock
|
var lock: AsyncLock
|
||||||
try:
|
try:
|
||||||
lock = self.locks.mgetOrPut(key, newAsyncLock())
|
lock = self.locks.mgetOrPut(key, newAsyncLock())
|
||||||
@ -233,8 +233,9 @@ method modifyGet*(
|
|||||||
method modify*(
|
method modify*(
|
||||||
self: FSDatastore,
|
self: FSDatastore,
|
||||||
key: Key,
|
key: Key,
|
||||||
fn: Modify): Future[?!void] {.async.} =
|
fn: Modify): Future[?!void] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||||
var lock: AsyncLock
|
var lock: AsyncLock
|
||||||
|
|
||||||
try:
|
try:
|
||||||
lock = self.locks.mgetOrPut(key, newAsyncLock())
|
lock = self.locks.mgetOrPut(key, newAsyncLock())
|
||||||
return await defaultModifyImpl(self, lock, key, fn)
|
return await defaultModifyImpl(self, lock, key, fn)
|
||||||
|
|||||||
@ -68,7 +68,7 @@ method put*(self: LevelDbDatastore, batch: seq[BatchEntry]): Future[?!void] {.as
|
|||||||
except LevelDbException as e:
|
except LevelDbException as e:
|
||||||
return failure("LevelDbDatastore.put (batch) exception: " & $e.msg)
|
return failure("LevelDbDatastore.put (batch) exception: " & $e.msg)
|
||||||
|
|
||||||
method close*(self: LevelDbDatastore): Future[?!void] {.async.} =
|
method close*(self: LevelDbDatastore): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
try:
|
try:
|
||||||
self.db.close()
|
self.db.close()
|
||||||
return success()
|
return success()
|
||||||
@ -84,7 +84,7 @@ proc getQueryString(query: Query): string =
|
|||||||
|
|
||||||
method query*(
|
method query*(
|
||||||
self: LevelDbDatastore,
|
self: LevelDbDatastore,
|
||||||
query: Query): Future[?!QueryIter] {.async, gcsafe.} =
|
query: Query): Future[?!QueryIter] {.async: (raises: [CancelledError]), gcsafe.} =
|
||||||
|
|
||||||
if not (query.sort == SortOrder.Assending):
|
if not (query.sort == SortOrder.Assending):
|
||||||
return failure("LevelDbDatastore.query: query.sort is not SortOrder.Ascending. Unsupported.")
|
return failure("LevelDbDatastore.query: query.sort is not SortOrder.Ascending. Unsupported.")
|
||||||
@ -125,7 +125,7 @@ method query*(
|
|||||||
method modifyGet*(
|
method modifyGet*(
|
||||||
self: LevelDbDatastore,
|
self: LevelDbDatastore,
|
||||||
key: Key,
|
key: Key,
|
||||||
fn: ModifyGet): Future[?!seq[byte]] {.async.} =
|
fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||||
var lock: AsyncLock
|
var lock: AsyncLock
|
||||||
try:
|
try:
|
||||||
lock = self.locks.mgetOrPut(key, newAsyncLock())
|
lock = self.locks.mgetOrPut(key, newAsyncLock())
|
||||||
@ -137,7 +137,7 @@ method modifyGet*(
|
|||||||
method modify*(
|
method modify*(
|
||||||
self: LevelDbDatastore,
|
self: LevelDbDatastore,
|
||||||
key: Key,
|
key: Key,
|
||||||
fn: Modify): Future[?!void] {.async.} =
|
fn: Modify): Future[?!void] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||||
var lock: AsyncLock
|
var lock: AsyncLock
|
||||||
try:
|
try:
|
||||||
lock = self.locks.mgetOrPut(key, newAsyncLock())
|
lock = self.locks.mgetOrPut(key, newAsyncLock())
|
||||||
|
|||||||
@ -121,7 +121,7 @@ method put*(
|
|||||||
method modifyGet*(
|
method modifyGet*(
|
||||||
self: MountedDatastore,
|
self: MountedDatastore,
|
||||||
key: Key,
|
key: Key,
|
||||||
fn: ModifyGet): Future[?!seq[byte]] {.async.} =
|
fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||||
|
|
||||||
without mounted =? self.dispatch(key), error:
|
without mounted =? self.dispatch(key), error:
|
||||||
return failure(error)
|
return failure(error)
|
||||||
@ -131,14 +131,14 @@ method modifyGet*(
|
|||||||
method modify*(
|
method modify*(
|
||||||
self: MountedDatastore,
|
self: MountedDatastore,
|
||||||
key: Key,
|
key: Key,
|
||||||
fn: Modify): Future[?!void] {.async.} =
|
fn: Modify): Future[?!void] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||||
|
|
||||||
without mounted =? self.dispatch(key), error:
|
without mounted =? self.dispatch(key), error:
|
||||||
return failure(error)
|
return failure(error)
|
||||||
|
|
||||||
return await mounted.store.store.modify(mounted.relative, fn)
|
return await mounted.store.store.modify(mounted.relative, fn)
|
||||||
|
|
||||||
method close*(self: MountedDatastore): Future[?!void] {.async.} =
|
method close*(self: MountedDatastore): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
for s in self.stores.values:
|
for s in self.stores.values:
|
||||||
discard await s.store.close()
|
discard await s.store.close()
|
||||||
|
|
||||||
|
|||||||
@ -40,7 +40,7 @@ proc newRollbackError(rbErr: ref CatchableError, opErrMsg: string): ref Rollback
|
|||||||
proc newRollbackError(rbErr: ref CatchableError, opErr: ref CatchableError): ref RollbackError =
|
proc newRollbackError(rbErr: ref CatchableError, opErr: ref CatchableError): ref RollbackError =
|
||||||
return newRollbackError(rbErr, opErr)
|
return newRollbackError(rbErr, opErr)
|
||||||
|
|
||||||
method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.async.} =
|
method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||||
var
|
var
|
||||||
retriesLeft = 100 # allows reasonable concurrency, avoids infinite loop
|
retriesLeft = 100 # allows reasonable concurrency, avoids infinite loop
|
||||||
aux: seq[byte]
|
aux: seq[byte]
|
||||||
@ -135,7 +135,7 @@ method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[
|
|||||||
return success(aux)
|
return success(aux)
|
||||||
|
|
||||||
|
|
||||||
method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.async.} =
|
method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||||
proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
|
proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
|
||||||
let res = await fn(maybeValue)
|
let res = await fn(maybeValue)
|
||||||
let ignoredAux = newSeq[byte]()
|
let ignoredAux = newSeq[byte]()
|
||||||
@ -216,14 +216,14 @@ method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.asy
|
|||||||
|
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
method close*(self: SQLiteDatastore): Future[?!void] {.async.} =
|
method close*(self: SQLiteDatastore): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||||
self.db.close()
|
self.db.close()
|
||||||
|
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
method query*(
|
method query*(
|
||||||
self: SQLiteDatastore,
|
self: SQLiteDatastore,
|
||||||
query: Query): Future[?!QueryIter] {.async.} =
|
query: Query): Future[?!QueryIter] {.async: (raises: [CancelledError]).} =
|
||||||
|
|
||||||
var
|
var
|
||||||
iter = QueryIter()
|
iter = QueryIter()
|
||||||
|
|||||||
@ -47,8 +47,11 @@ method delete*(
|
|||||||
pending = await allFinished(self.stores.mapIt(it.delete(key)))
|
pending = await allFinished(self.stores.mapIt(it.delete(key)))
|
||||||
|
|
||||||
for fut in pending:
|
for fut in pending:
|
||||||
without res =? fut.read().catch, error:
|
try:
|
||||||
return failure error
|
if fut.read().isErr:
|
||||||
|
return fut.read()
|
||||||
|
except FuturePendingError as err:
|
||||||
|
return failure err
|
||||||
|
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
@ -61,8 +64,11 @@ method delete*(
|
|||||||
pending = await allFinished(self.stores.mapIt(it.delete(key)))
|
pending = await allFinished(self.stores.mapIt(it.delete(key)))
|
||||||
|
|
||||||
for fut in pending:
|
for fut in pending:
|
||||||
without res =? fut.read().catch, error:
|
try:
|
||||||
return failure error
|
if fut.read().isErr:
|
||||||
|
return fut.read()
|
||||||
|
except FuturePendingError as err:
|
||||||
|
return failure err
|
||||||
|
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
@ -99,8 +105,11 @@ method put*(
|
|||||||
pending = await allFinished(self.stores.mapIt(it.put(key, data)))
|
pending = await allFinished(self.stores.mapIt(it.put(key, data)))
|
||||||
|
|
||||||
for fut in pending:
|
for fut in pending:
|
||||||
without res =? fut.read().catch, error:
|
try:
|
||||||
return failure error
|
if fut.read().isErr:
|
||||||
|
return fut.read()
|
||||||
|
except FuturePendingError as err:
|
||||||
|
return failure err
|
||||||
|
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
@ -113,15 +122,18 @@ method put*(
|
|||||||
pending = await allFinished(self.stores.mapIt(it.put(entry.key, entry.data)))
|
pending = await allFinished(self.stores.mapIt(it.put(entry.key, entry.data)))
|
||||||
|
|
||||||
for fut in pending:
|
for fut in pending:
|
||||||
without res =? fut.read().catch, error:
|
try:
|
||||||
return failure error
|
if fut.read().isErr:
|
||||||
|
return fut.read()
|
||||||
|
except FuturePendingError as err:
|
||||||
|
return failure err
|
||||||
|
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
method modifyGet*(
|
method modifyGet*(
|
||||||
self: TieredDatastore,
|
self: TieredDatastore,
|
||||||
key: Key,
|
key: Key,
|
||||||
fn: ModifyGet): Future[?!seq[byte]] {.async.} =
|
fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||||
|
|
||||||
let
|
let
|
||||||
pending = await allFinished(self.stores.mapIt(it.modifyGet(key, fn)))
|
pending = await allFinished(self.stores.mapIt(it.modifyGet(key, fn)))
|
||||||
@ -129,23 +141,30 @@ method modifyGet*(
|
|||||||
var aux = newSeq[byte]()
|
var aux = newSeq[byte]()
|
||||||
|
|
||||||
for fut in pending:
|
for fut in pending:
|
||||||
if fut.read().isErr:
|
try:
|
||||||
return fut.read()
|
if fut.read().isErr:
|
||||||
else:
|
return fut.read()
|
||||||
aux.add(fut.read().get)
|
else:
|
||||||
|
aux.add(fut.read().get)
|
||||||
|
except FuturePendingError as err:
|
||||||
|
return failure err
|
||||||
|
|
||||||
return success(aux)
|
return success(aux)
|
||||||
|
|
||||||
method modify*(
|
method modify*(
|
||||||
self: TieredDatastore,
|
self: TieredDatastore,
|
||||||
key: Key,
|
key: Key,
|
||||||
fn: Modify): Future[?!void] {.async.} =
|
fn: Modify): Future[?!void] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||||
|
|
||||||
let
|
let
|
||||||
pending = await allFinished(self.stores.mapIt(it.modify(key, fn)))
|
pending = await allFinished(self.stores.mapIt(it.modify(key, fn)))
|
||||||
|
|
||||||
for fut in pending:
|
for fut in pending:
|
||||||
if fut.read().isErr: return fut.read()
|
try:
|
||||||
|
if fut.read().isErr:
|
||||||
|
return fut.read()
|
||||||
|
except FuturePendingError as err:
|
||||||
|
return failure err
|
||||||
|
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
|
|||||||
@ -102,7 +102,7 @@ proc get*[T](self: TypedDatastore, key: Key): Future[?!T] {.async: (raises: [Can
|
|||||||
return failure(error)
|
return failure(error)
|
||||||
return T.decode(bytes)
|
return T.decode(bytes)
|
||||||
|
|
||||||
proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] {.async.} =
|
proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||||
requireDecoder(T)
|
requireDecoder(T)
|
||||||
requireEncoder(T)
|
requireEncoder(T)
|
||||||
|
|
||||||
@ -123,7 +123,7 @@ proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] {
|
|||||||
|
|
||||||
await self.ds.modify(key, wrappedFn)
|
await self.ds.modify(key, wrappedFn)
|
||||||
|
|
||||||
proc modifyGet*[T, U](self: TypedDatastore, key: Key, fn: ModifyGet[T, U]): Future[?!U] {.async.} =
|
proc modifyGet*[T, U](self: TypedDatastore, key: Key, fn: ModifyGet[T, U]): Future[?!U] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||||
requireDecoder(T)
|
requireDecoder(T)
|
||||||
requireEncoder(T)
|
requireEncoder(T)
|
||||||
requireEncoder(U)
|
requireEncoder(U)
|
||||||
@ -153,7 +153,7 @@ proc modifyGet*[T, U](self: TypedDatastore, key: Key, fn: ModifyGet[T, U]): Futu
|
|||||||
|
|
||||||
return U.decode(auxBytes)
|
return U.decode(auxBytes)
|
||||||
|
|
||||||
proc query*[T](self: TypedDatastore, q: Query): Future[?!QueryIter[T]] {.async.} =
|
proc query*[T](self: TypedDatastore, q: Query): Future[?!QueryIter[T]] {.async: (raises: [CancelledError]).} =
|
||||||
requireDecoder(T)
|
requireDecoder(T)
|
||||||
|
|
||||||
without dsIter =? await self.ds.query(q), error:
|
without dsIter =? await self.ds.query(q), error:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user