Add raises errors to async pragram and remove useless try except

This commit is contained in:
Arnaud 2025-02-25 13:58:31 +01:00
parent 42ee85b2f3
commit f06b7ffcae
No known key found for this signature in database
GPG Key ID: 69D6CE281FCAE663
4 changed files with 94 additions and 91 deletions

View File

@ -92,8 +92,11 @@ type
repo: RepoStore
onAvailabilityAdded: ?OnAvailabilityAdded
GetNext* = proc(): Future[?seq[byte]] {.upraises: [], gcsafe, closure.}
IterDispose* = proc(): Future[?!void] {.gcsafe, closure.}
GetNext* = proc(): Future[?seq[byte]] {.
upraises: [], gcsafe, async: (raises: [CancelledError]), closure
.}
IterDispose* =
proc(): Future[?!void] {.gcsafe, async: (raises: [CancelledError]), closure.}
OnAvailabilityAdded* = proc(availability: Availability): Future[void] {.
upraises: [], gcsafe, async: (raises: [])
.}
@ -239,7 +242,9 @@ func available*(self: Reservations): uint =
func hasAvailable*(self: Reservations, bytes: uint): bool =
self.repo.available(bytes.NBytes)
proc exists*(self: Reservations, key: Key): Future[bool] {.async.} =
proc exists*(
self: Reservations, key: Key
): Future[bool] {.async: (raises: [CancelledError]).} =
let exists = await self.repo.metaDs.ds.contains(key)
return exists
@ -247,7 +252,9 @@ iterator items(self: StorableIter): Future[?seq[byte]] =
while not self.finished:
yield self.next()
proc getImpl(self: Reservations, key: Key): Future[?!seq[byte]] {.async.} =
proc getImpl(
self: Reservations, key: Key
): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
if not await self.exists(key):
let err =
newException(NotExistsError, "object with key " & $key & " does not exist")
@ -260,7 +267,7 @@ proc getImpl(self: Reservations, key: Key): Future[?!seq[byte]] {.async.} =
proc get*(
self: Reservations, key: Key, T: type SomeStorableObject
): Future[?!T] {.async.} =
): Future[?!T] {.async: (raises: [CancelledError]).} =
without serialized =? await self.getImpl(key), error:
return failure(error)
@ -269,7 +276,9 @@ proc get*(
return success obj
proc updateImpl(self: Reservations, obj: SomeStorableObject): Future[?!void] {.async.} =
proc updateImpl(
self: Reservations, obj: SomeStorableObject
): Future[?!void] {.async: (raises: [CancelledError]).} =
trace "updating " & $(obj.type), id = obj.id
without key =? obj.key, error:
@ -294,61 +303,57 @@ proc updateAvailability(
without key =? obj.key, error:
return failure(error)
try:
without oldAvailability =? await self.get(key, Availability), err:
if err of NotExistsError:
trace "Creating new Availability"
let res = await self.updateImpl(obj)
# inform subscribers that Availability has been added
if onAvailabilityAdded =? self.onAvailabilityAdded:
await onAvailabilityAdded(obj)
return res
else:
return failure(err)
if obj.until > 0:
without allReservations =? await self.all(Reservation, obj.id), error:
error.msg = "Error updating reservation: " & error.msg
return failure(error)
let requestEnds = allReservations.mapIt(it.validUntil)
if requestEnds.len > 0 and requestEnds.max > obj.until:
let error = newException(
UntilOutOfBoundsError,
"Until parameter must be greater or equal the current longest request",
)
return failure(error)
# Sizing of the availability changed, we need to adjust the repo reservation accordingly
if oldAvailability.totalSize != obj.totalSize:
trace "totalSize changed, updating repo reservation"
if oldAvailability.totalSize < obj.totalSize: # storage added
if reserveErr =? (
await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).NBytes)
).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))
elif oldAvailability.totalSize > obj.totalSize: # storage removed
if reserveErr =? (
await self.repo.release((oldAvailability.totalSize - obj.totalSize).NBytes)
).errorOption:
return failure(reserveErr.toErr(ReleaseFailedError))
let res = await self.updateImpl(obj)
if oldAvailability.freeSize < obj.freeSize: # availability added
# inform subscribers that Availability has been modified (with increased
# size)
without oldAvailability =? await self.get(key, Availability), err:
if err of NotExistsError:
trace "Creating new Availability"
let res = await self.updateImpl(obj)
# inform subscribers that Availability has been added
if onAvailabilityAdded =? self.onAvailabilityAdded:
await onAvailabilityAdded(obj)
return res
except CancelledError as e:
raise e
except CatchableError as e:
error "Error when trying to update availability", error = e.msg
return failure(e)
return res
else:
return failure(err)
proc update*(self: Reservations, obj: Reservation): Future[?!void] {.async.} =
if obj.until > 0:
without allReservations =? await self.all(Reservation, obj.id), error:
error.msg = "Error updating reservation: " & error.msg
return failure(error)
let requestEnds = allReservations.mapIt(it.validUntil)
if requestEnds.len > 0 and requestEnds.max > obj.until:
let error = newException(
UntilOutOfBoundsError,
"Until parameter must be greater or equal the current longest request",
)
return failure(error)
# Sizing of the availability changed, we need to adjust the repo reservation accordingly
if oldAvailability.totalSize != obj.totalSize:
trace "totalSize changed, updating repo reservation"
if oldAvailability.totalSize < obj.totalSize: # storage added
if reserveErr =? (
await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).NBytes)
).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))
elif oldAvailability.totalSize > obj.totalSize: # storage removed
if reserveErr =? (
await self.repo.release((oldAvailability.totalSize - obj.totalSize).NBytes)
).errorOption:
return failure(reserveErr.toErr(ReleaseFailedError))
let res = await self.updateImpl(obj)
if oldAvailability.freeSize < obj.freeSize: # availability added
# inform subscribers that Availability has been modified (with increased
# size)
if onAvailabilityAdded =? self.onAvailabilityAdded:
await onAvailabilityAdded(obj)
return res
proc update*(
self: Reservations, obj: Reservation
): Future[?!void] {.async: (raises: [CancelledError]).} =
return await self.updateImpl(obj)
proc update*(
@ -579,7 +584,7 @@ proc release*(
reservationId: ReservationId,
availabilityId: AvailabilityId,
bytes: uint,
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
logScope:
topics = "release"
bytes
@ -633,31 +638,25 @@ proc storables(
else:
raiseAssert "unknown type"
try:
without results =? await self.repo.metaDs.ds.query(query), error:
return failure(error)
without results =? await self.repo.metaDs.ds.query(query), error:
return failure(error)
# /sales/reservations
proc next(): Future[?seq[byte]] {.async.} =
await idleAsync()
iter.finished = results.finished
if not results.finished and res =? (await results.next()) and res.data.len > 0 and
key =? res.key and key.namespaces.len == defaultKey.namespaces.len:
return some res.data
# /sales/reservations
proc next(): Future[?seq[byte]] {.async: (raises: [CancelledError]).} =
await idleAsync()
iter.finished = results.finished
if not results.finished and res =? (await results.next()) and res.data.len > 0 and
key =? res.key and key.namespaces.len == defaultKey.namespaces.len:
return some res.data
return none seq[byte]
return none seq[byte]
proc dispose(): Future[?!void] {.async.} =
return await results.dispose()
proc dispose(): Future[?!void] {.async: (raises: [CancelledError]).} =
return await results.dispose()
iter.next = next
iter.dispose = dispose
return success iter
except CancelledError as e:
raise e
except CatchableError as e:
error "Cannot retrieve the storables from the datastore", error = e.msg
return failure(e)
iter.next = next
iter.dispose = dispose
return success iter
proc allImpl(
self: Reservations, T: type SomeStorableObject, queryKey: Key = ReservationsKey
@ -667,8 +666,8 @@ proc allImpl(
without storables =? (await self.storables(T, queryKey)), error:
return failure(error)
try:
for storable in storables.items:
for storable in storables.items:
try:
without bytes =? (await storable):
continue
@ -678,11 +677,11 @@ proc allImpl(
continue
ret.add obj
except CancelledError as e:
raise e
except CatchableError as e:
error "error when retrieving storable", error = e.msg
return failure(e)
except CancelledError as err:
raise err
except CatchableError as err:
error "Error when retrieving storable", error = err.msg
continue
return success(ret)

View File

@ -105,7 +105,7 @@ proc updateQuotaUsage*(
minusUsed: NBytes = 0.NBytes,
plusReserved: NBytes = 0.NBytes,
minusReserved: NBytes = 0.NBytes,
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
await self.metaDs.modify(
QuotaUsedKey,
proc(maybeCurrUsage: ?QuotaUsage): Future[?QuotaUsage] {.async.} =

View File

@ -380,7 +380,9 @@ method close*(self: RepoStore): Future[void] {.async.} =
# RepoStore procs
###########################################################
proc reserve*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} =
proc reserve*(
self: RepoStore, bytes: NBytes
): Future[?!void] {.async: (raises: [CancelledError]).} =
## Reserve bytes
##
@ -388,7 +390,9 @@ proc reserve*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} =
await self.updateQuotaUsage(plusReserved = bytes)
proc release*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} =
proc release*(
self: RepoStore, bytes: NBytes
): Future[?!void] {.async: (raises: [CancelledError]).} =
## Release bytes
##

@ -1 +1 @@
Subproject commit d67860add63fd23cdacde1d3da8f4739c2660c2d
Subproject commit 5778e373fa97286f389e0aef61f1e8f30a934dab