From 42e4f530bff0162765c649adf1d5481c58b53cdb Mon Sep 17 00:00:00 2001 From: Arnaud Date: Tue, 25 Feb 2025 11:01:10 +0100 Subject: [PATCH] Add errors to raises async pragam --- datastore/datastore.nim | 8 +++--- datastore/defaultimpl.nim | 5 ++-- datastore/fsds.nim | 9 +++--- datastore/leveldb/leveldbds.nim | 8 +++--- datastore/mountedds.nim | 6 ++-- datastore/sql/sqliteds.nim | 8 +++--- datastore/tieredds.nim | 49 +++++++++++++++++++++++---------- datastore/typedds.nim | 6 ++-- 8 files changed, 59 insertions(+), 40 deletions(-) diff --git a/datastore/datastore.nim b/datastore/datastore.nim index 31abb1f..0bab3ad 100644 --- a/datastore/datastore.nim +++ b/datastore/datastore.nim @@ -34,19 +34,19 @@ method put*(self: Datastore, key: Key, data: seq[byte]): Future[?!void] {.base, method put*(self: Datastore, batch: seq[BatchEntry]): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} = raiseAssert("Not implemented!") -method close*(self: Datastore): Future[?!void] {.base, async, locks: "unknown".} = +method close*(self: Datastore): Future[?!void] {.base, async: (raises: [CancelledError]), locks: "unknown".} = raiseAssert("Not implemented!") method query*( self: Datastore, - query: Query): Future[?!QueryIter] {.base, gcsafe.} = + query: Query): Future[?!QueryIter] {.base, gcsafe, async: (raises: [CancelledError]).} = raiseAssert("Not implemented!") proc contains*(self: Datastore, key: Key): Future[bool] {.async: (raises: [CancelledError]).} = return (await self.has(key)) |? false -method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, gcsafe, locks: "unknown".} = +method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError, AsyncLockError]), locks: "unknown".} = ## Concurrently safe way of modifying the value associated with the `key`. ## ## Same as `modifyGet`, but this takes `fn` that doesn't produce any auxillary value. @@ -54,7 +54,7 @@ method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, gc raiseAssert("Not implemented!") -method modifyGet*(self: Datastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, gcsafe, locks: "unknown".} = +method modifyGet*(self: Datastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, gcsafe, async: (raises: [CancelledError, AsyncLockError]), locks: "unknown".} = ## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on ## successful update. ## diff --git a/datastore/defaultimpl.nim b/datastore/defaultimpl.nim index be7178a..c4ae77b 100644 --- a/datastore/defaultimpl.nim +++ b/datastore/defaultimpl.nim @@ -11,10 +11,9 @@ proc defaultModifyGetImpl*( lock: AsyncLock, key: Key, fn: ModifyGet - ): Future[?!seq[byte]] {.async.} = + ): Future[?!seq[byte]] {.async: (raises: [CancelledError, AsyncLockError]).} = # Default implementation, serializes all modify operations using provided lock # - await lock.acquire() try: @@ -53,7 +52,7 @@ proc defaultModifyImpl*( lock: AsyncLock, key: Key, fn: Modify - ): Future[?!void] {.async.} = + ): Future[?!void] {.async: (raises: [CancelledError, AsyncLockError]).} = proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} = let res = await fn(maybeValue) let ignoredAux = newSeq[byte]() diff --git a/datastore/fsds.nim b/datastore/fsds.nim index 407c2c3..a7fdb83 100644 --- a/datastore/fsds.nim +++ b/datastore/fsds.nim @@ -163,12 +163,12 @@ proc dirWalker(path: string): (iterator: string {.raises: [Defect], gcsafe.}) = except CatchableError as exc: raise newException(Defect, exc.msg) -method close*(self: FSDatastore): Future[?!void] {.async.} = +method close*(self: FSDatastore): Future[?!void] {.async: (raises: [CancelledError]).} = return success() method query*( self: FSDatastore, - query: Query): Future[?!QueryIter] {.async.} = + query: Query): Future[?!QueryIter] {.async: (raises: [CancelledError]).} = without path =? self.path(query.key), error: return failure error @@ -221,7 +221,7 @@ method query*( method modifyGet*( self: FSDatastore, key: Key, - fn: ModifyGet): Future[?!seq[byte]] {.async.} = + fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError, AsyncLockError]).} = var lock: AsyncLock try: lock = self.locks.mgetOrPut(key, newAsyncLock()) @@ -233,8 +233,9 @@ method modifyGet*( method modify*( self: FSDatastore, key: Key, - fn: Modify): Future[?!void] {.async.} = + fn: Modify): Future[?!void] {.async: (raises: [CancelledError, AsyncLockError]).} = var lock: AsyncLock + try: lock = self.locks.mgetOrPut(key, newAsyncLock()) return await defaultModifyImpl(self, lock, key, fn) diff --git a/datastore/leveldb/leveldbds.nim b/datastore/leveldb/leveldbds.nim index 14d6a09..e438470 100644 --- a/datastore/leveldb/leveldbds.nim +++ b/datastore/leveldb/leveldbds.nim @@ -68,7 +68,7 @@ method put*(self: LevelDbDatastore, batch: seq[BatchEntry]): Future[?!void] {.as except LevelDbException as e: return failure("LevelDbDatastore.put (batch) exception: " & $e.msg) -method close*(self: LevelDbDatastore): Future[?!void] {.async.} = +method close*(self: LevelDbDatastore): Future[?!void] {.async: (raises: [CancelledError]).} = try: self.db.close() return success() @@ -84,7 +84,7 @@ proc getQueryString(query: Query): string = method query*( self: LevelDbDatastore, - query: Query): Future[?!QueryIter] {.async, gcsafe.} = + query: Query): Future[?!QueryIter] {.async: (raises: [CancelledError]), gcsafe.} = if not (query.sort == SortOrder.Assending): return failure("LevelDbDatastore.query: query.sort is not SortOrder.Ascending. Unsupported.") @@ -125,7 +125,7 @@ method query*( method modifyGet*( self: LevelDbDatastore, key: Key, - fn: ModifyGet): Future[?!seq[byte]] {.async.} = + fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError, AsyncLockError]).} = var lock: AsyncLock try: lock = self.locks.mgetOrPut(key, newAsyncLock()) @@ -137,7 +137,7 @@ method modifyGet*( method modify*( self: LevelDbDatastore, key: Key, - fn: Modify): Future[?!void] {.async.} = + fn: Modify): Future[?!void] {.async: (raises: [CancelledError, AsyncLockError]).} = var lock: AsyncLock try: lock = self.locks.mgetOrPut(key, newAsyncLock()) diff --git a/datastore/mountedds.nim b/datastore/mountedds.nim index 02a58d4..f4de063 100644 --- a/datastore/mountedds.nim +++ b/datastore/mountedds.nim @@ -121,7 +121,7 @@ method put*( method modifyGet*( self: MountedDatastore, key: Key, - fn: ModifyGet): Future[?!seq[byte]] {.async.} = + fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError, AsyncLockError]).} = without mounted =? self.dispatch(key), error: return failure(error) @@ -131,14 +131,14 @@ method modifyGet*( method modify*( self: MountedDatastore, key: Key, - fn: Modify): Future[?!void] {.async.} = + fn: Modify): Future[?!void] {.async: (raises: [CancelledError, AsyncLockError]).} = without mounted =? self.dispatch(key), error: return failure(error) return await mounted.store.store.modify(mounted.relative, fn) -method close*(self: MountedDatastore): Future[?!void] {.async.} = +method close*(self: MountedDatastore): Future[?!void] {.async: (raises: [CancelledError]).} = for s in self.stores.values: discard await s.store.close() diff --git a/datastore/sql/sqliteds.nim b/datastore/sql/sqliteds.nim index bfffcf6..ccd9e92 100644 --- a/datastore/sql/sqliteds.nim +++ b/datastore/sql/sqliteds.nim @@ -40,7 +40,7 @@ proc newRollbackError(rbErr: ref CatchableError, opErrMsg: string): ref Rollback proc newRollbackError(rbErr: ref CatchableError, opErr: ref CatchableError): ref RollbackError = return newRollbackError(rbErr, opErr) -method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.async.} = +method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError, AsyncLockError]).} = var retriesLeft = 100 # allows reasonable concurrency, avoids infinite loop aux: seq[byte] @@ -135,7 +135,7 @@ method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[ return success(aux) -method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.async.} = +method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.async: (raises: [CancelledError, AsyncLockError]).} = proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} = let res = await fn(maybeValue) let ignoredAux = newSeq[byte]() @@ -216,14 +216,14 @@ method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.asy return success() -method close*(self: SQLiteDatastore): Future[?!void] {.async.} = +method close*(self: SQLiteDatastore): Future[?!void] {.async: (raises: [CancelledError]).} = self.db.close() return success() method query*( self: SQLiteDatastore, - query: Query): Future[?!QueryIter] {.async.} = + query: Query): Future[?!QueryIter] {.async: (raises: [CancelledError]).} = var iter = QueryIter() diff --git a/datastore/tieredds.nim b/datastore/tieredds.nim index 80a3d01..2118320 100644 --- a/datastore/tieredds.nim +++ b/datastore/tieredds.nim @@ -47,8 +47,11 @@ method delete*( pending = await allFinished(self.stores.mapIt(it.delete(key))) for fut in pending: - without res =? fut.read().catch, error: - return failure error + try: + if fut.read().isErr: + return fut.read() + except FuturePendingError as err: + return failure err return success() @@ -61,8 +64,11 @@ method delete*( pending = await allFinished(self.stores.mapIt(it.delete(key))) for fut in pending: - without res =? fut.read().catch, error: - return failure error + try: + if fut.read().isErr: + return fut.read() + except FuturePendingError as err: + return failure err return success() @@ -99,8 +105,11 @@ method put*( pending = await allFinished(self.stores.mapIt(it.put(key, data))) for fut in pending: - without res =? fut.read().catch, error: - return failure error + try: + if fut.read().isErr: + return fut.read() + except FuturePendingError as err: + return failure err return success() @@ -113,15 +122,18 @@ method put*( pending = await allFinished(self.stores.mapIt(it.put(entry.key, entry.data))) for fut in pending: - without res =? fut.read().catch, error: - return failure error + try: + if fut.read().isErr: + return fut.read() + except FuturePendingError as err: + return failure err return success() method modifyGet*( self: TieredDatastore, key: Key, - fn: ModifyGet): Future[?!seq[byte]] {.async.} = + fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError, AsyncLockError]).} = let pending = await allFinished(self.stores.mapIt(it.modifyGet(key, fn))) @@ -129,23 +141,30 @@ method modifyGet*( var aux = newSeq[byte]() for fut in pending: - if fut.read().isErr: - return fut.read() - else: - aux.add(fut.read().get) + try: + if fut.read().isErr: + return fut.read() + else: + aux.add(fut.read().get) + except FuturePendingError as err: + return failure err return success(aux) method modify*( self: TieredDatastore, key: Key, - fn: Modify): Future[?!void] {.async.} = + fn: Modify): Future[?!void] {.async: (raises: [CancelledError, AsyncLockError]).} = let pending = await allFinished(self.stores.mapIt(it.modify(key, fn))) for fut in pending: - if fut.read().isErr: return fut.read() + try: + if fut.read().isErr: + return fut.read() + except FuturePendingError as err: + return failure err return success() diff --git a/datastore/typedds.nim b/datastore/typedds.nim index cf7b0f0..4bf76c2 100644 --- a/datastore/typedds.nim +++ b/datastore/typedds.nim @@ -102,7 +102,7 @@ proc get*[T](self: TypedDatastore, key: Key): Future[?!T] {.async: (raises: [Can return failure(error) return T.decode(bytes) -proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] {.async.} = +proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] {.async: (raises: [CancelledError, AsyncLockError]).} = requireDecoder(T) requireEncoder(T) @@ -123,7 +123,7 @@ proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] { await self.ds.modify(key, wrappedFn) -proc modifyGet*[T, U](self: TypedDatastore, key: Key, fn: ModifyGet[T, U]): Future[?!U] {.async.} = +proc modifyGet*[T, U](self: TypedDatastore, key: Key, fn: ModifyGet[T, U]): Future[?!U] {.async: (raises: [CancelledError, AsyncLockError]).} = requireDecoder(T) requireEncoder(T) requireEncoder(U) @@ -153,7 +153,7 @@ proc modifyGet*[T, U](self: TypedDatastore, key: Key, fn: ModifyGet[T, U]): Futu return U.decode(auxBytes) -proc query*[T](self: TypedDatastore, q: Query): Future[?!QueryIter[T]] {.async.} = +proc query*[T](self: TypedDatastore, q: Query): Future[?!QueryIter[T]] {.async: (raises: [CancelledError]).} = requireDecoder(T) without dsIter =? await self.ds.query(q), error: