mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-02 13:43:11 +00:00
Do not propagate AsyncLockError
This commit is contained in:
parent
42e4f530bf
commit
9586f95ccd
@ -46,7 +46,7 @@ method query*(
|
||||
proc contains*(self: Datastore, key: Key): Future[bool] {.async: (raises: [CancelledError]).} =
|
||||
return (await self.has(key)) |? false
|
||||
|
||||
method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError, AsyncLockError]), locks: "unknown".} =
|
||||
method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
|
||||
## Concurrently safe way of modifying the value associated with the `key`.
|
||||
##
|
||||
## Same as `modifyGet`, but this takes `fn` that doesn't produce any auxillary value.
|
||||
@ -54,7 +54,7 @@ method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, gc
|
||||
|
||||
raiseAssert("Not implemented!")
|
||||
|
||||
method modifyGet*(self: Datastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, gcsafe, async: (raises: [CancelledError, AsyncLockError]), locks: "unknown".} =
|
||||
method modifyGet*(self: Datastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
|
||||
## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on
|
||||
## successful update.
|
||||
##
|
||||
|
||||
@ -11,7 +11,7 @@ proc defaultModifyGetImpl*(
|
||||
lock: AsyncLock,
|
||||
key: Key,
|
||||
fn: ModifyGet
|
||||
): Future[?!seq[byte]] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||
): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
|
||||
# Default implementation, serializes all modify operations using provided lock
|
||||
#
|
||||
await lock.acquire()
|
||||
@ -45,14 +45,17 @@ proc defaultModifyGetImpl*(
|
||||
|
||||
return aux.success
|
||||
finally:
|
||||
lock.release()
|
||||
try:
|
||||
lock.release()
|
||||
except AsyncLockError as err:
|
||||
return failure(err)
|
||||
|
||||
proc defaultModifyImpl*(
|
||||
self: Datastore,
|
||||
lock: AsyncLock,
|
||||
key: Key,
|
||||
fn: Modify
|
||||
): Future[?!void] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
|
||||
let res = await fn(maybeValue)
|
||||
let ignoredAux = newSeq[byte]()
|
||||
|
||||
@ -221,7 +221,7 @@ method query*(
|
||||
method modifyGet*(
|
||||
self: FSDatastore,
|
||||
key: Key,
|
||||
fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||
fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
|
||||
var lock: AsyncLock
|
||||
try:
|
||||
lock = self.locks.mgetOrPut(key, newAsyncLock())
|
||||
@ -233,7 +233,7 @@ method modifyGet*(
|
||||
method modify*(
|
||||
self: FSDatastore,
|
||||
key: Key,
|
||||
fn: Modify): Future[?!void] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||
fn: Modify): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
var lock: AsyncLock
|
||||
|
||||
try:
|
||||
|
||||
@ -125,7 +125,7 @@ method query*(
|
||||
method modifyGet*(
|
||||
self: LevelDbDatastore,
|
||||
key: Key,
|
||||
fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||
fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
|
||||
var lock: AsyncLock
|
||||
try:
|
||||
lock = self.locks.mgetOrPut(key, newAsyncLock())
|
||||
@ -137,7 +137,7 @@ method modifyGet*(
|
||||
method modify*(
|
||||
self: LevelDbDatastore,
|
||||
key: Key,
|
||||
fn: Modify): Future[?!void] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||
fn: Modify): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
var lock: AsyncLock
|
||||
try:
|
||||
lock = self.locks.mgetOrPut(key, newAsyncLock())
|
||||
|
||||
@ -121,7 +121,7 @@ method put*(
|
||||
method modifyGet*(
|
||||
self: MountedDatastore,
|
||||
key: Key,
|
||||
fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||
fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
|
||||
|
||||
without mounted =? self.dispatch(key), error:
|
||||
return failure(error)
|
||||
@ -131,7 +131,7 @@ method modifyGet*(
|
||||
method modify*(
|
||||
self: MountedDatastore,
|
||||
key: Key,
|
||||
fn: Modify): Future[?!void] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||
fn: Modify): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
|
||||
without mounted =? self.dispatch(key), error:
|
||||
return failure(error)
|
||||
|
||||
@ -40,7 +40,7 @@ proc newRollbackError(rbErr: ref CatchableError, opErrMsg: string): ref Rollback
|
||||
proc newRollbackError(rbErr: ref CatchableError, opErr: ref CatchableError): ref RollbackError =
|
||||
return newRollbackError(rbErr, opErr)
|
||||
|
||||
method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||
method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
|
||||
var
|
||||
retriesLeft = 100 # allows reasonable concurrency, avoids infinite loop
|
||||
aux: seq[byte]
|
||||
@ -135,7 +135,7 @@ method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[
|
||||
return success(aux)
|
||||
|
||||
|
||||
method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||
method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
|
||||
let res = await fn(maybeValue)
|
||||
let ignoredAux = newSeq[byte]()
|
||||
|
||||
@ -133,7 +133,7 @@ method put*(
|
||||
method modifyGet*(
|
||||
self: TieredDatastore,
|
||||
key: Key,
|
||||
fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||
fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
|
||||
|
||||
let
|
||||
pending = await allFinished(self.stores.mapIt(it.modifyGet(key, fn)))
|
||||
@ -154,7 +154,7 @@ method modifyGet*(
|
||||
method modify*(
|
||||
self: TieredDatastore,
|
||||
key: Key,
|
||||
fn: Modify): Future[?!void] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||
fn: Modify): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
|
||||
let
|
||||
pending = await allFinished(self.stores.mapIt(it.modify(key, fn)))
|
||||
|
||||
@ -102,7 +102,7 @@ proc get*[T](self: TypedDatastore, key: Key): Future[?!T] {.async: (raises: [Can
|
||||
return failure(error)
|
||||
return T.decode(bytes)
|
||||
|
||||
proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||
proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
requireDecoder(T)
|
||||
requireEncoder(T)
|
||||
|
||||
@ -123,7 +123,7 @@ proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] {
|
||||
|
||||
await self.ds.modify(key, wrappedFn)
|
||||
|
||||
proc modifyGet*[T, U](self: TypedDatastore, key: Key, fn: ModifyGet[T, U]): Future[?!U] {.async: (raises: [CancelledError, AsyncLockError]).} =
|
||||
proc modifyGet*[T, U](self: TypedDatastore, key: Key, fn: ModifyGet[T, U]): Future[?!U] {.async: (raises: [CancelledError]).} =
|
||||
requireDecoder(T)
|
||||
requireEncoder(T)
|
||||
requireEncoder(U)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user