diff --git a/codex/sales/reservations.nim b/codex/sales/reservations.nim index ce6bf864..6a2a5bb0 100644 --- a/codex/sales/reservations.nim +++ b/codex/sales/reservations.nim @@ -92,8 +92,11 @@ type repo: RepoStore onAvailabilityAdded: ?OnAvailabilityAdded - GetNext* = proc(): Future[?seq[byte]] {.upraises: [], gcsafe, closure.} - IterDispose* = proc(): Future[?!void] {.gcsafe, closure.} + GetNext* = proc(): Future[?seq[byte]] {. + upraises: [], gcsafe, async: (raises: [CancelledError]), closure + .} + IterDispose* = + proc(): Future[?!void] {.gcsafe, async: (raises: [CancelledError]), closure.} OnAvailabilityAdded* = proc(availability: Availability): Future[void] {. upraises: [], gcsafe, async: (raises: []) .} @@ -239,7 +242,9 @@ func available*(self: Reservations): uint = func hasAvailable*(self: Reservations, bytes: uint): bool = self.repo.available(bytes.NBytes) -proc exists*(self: Reservations, key: Key): Future[bool] {.async.} = +proc exists*( + self: Reservations, key: Key +): Future[bool] {.async: (raises: [CancelledError]).} = let exists = await self.repo.metaDs.ds.contains(key) return exists @@ -247,7 +252,9 @@ iterator items(self: StorableIter): Future[?seq[byte]] = while not self.finished: yield self.next() -proc getImpl(self: Reservations, key: Key): Future[?!seq[byte]] {.async.} = +proc getImpl( + self: Reservations, key: Key +): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} = if not await self.exists(key): let err = newException(NotExistsError, "object with key " & $key & " does not exist") @@ -260,7 +267,7 @@ proc getImpl(self: Reservations, key: Key): Future[?!seq[byte]] {.async.} = proc get*( self: Reservations, key: Key, T: type SomeStorableObject -): Future[?!T] {.async.} = +): Future[?!T] {.async: (raises: [CancelledError]).} = without serialized =? await self.getImpl(key), error: return failure(error) @@ -269,7 +276,9 @@ proc get*( return success obj -proc updateImpl(self: Reservations, obj: SomeStorableObject): Future[?!void] {.async.} = +proc updateImpl( + self: Reservations, obj: SomeStorableObject +): Future[?!void] {.async: (raises: [CancelledError]).} = trace "updating " & $(obj.type), id = obj.id without key =? obj.key, error: @@ -294,61 +303,57 @@ proc updateAvailability( without key =? obj.key, error: return failure(error) - try: - without oldAvailability =? await self.get(key, Availability), err: - if err of NotExistsError: - trace "Creating new Availability" - let res = await self.updateImpl(obj) - # inform subscribers that Availability has been added - if onAvailabilityAdded =? self.onAvailabilityAdded: - await onAvailabilityAdded(obj) - return res - else: - return failure(err) - - if obj.until > 0: - without allReservations =? await self.all(Reservation, obj.id), error: - error.msg = "Error updating reservation: " & error.msg - return failure(error) - - let requestEnds = allReservations.mapIt(it.validUntil) - - if requestEnds.len > 0 and requestEnds.max > obj.until: - let error = newException( - UntilOutOfBoundsError, - "Until parameter must be greater or equal the current longest request", - ) - return failure(error) - - # Sizing of the availability changed, we need to adjust the repo reservation accordingly - if oldAvailability.totalSize != obj.totalSize: - trace "totalSize changed, updating repo reservation" - if oldAvailability.totalSize < obj.totalSize: # storage added - if reserveErr =? ( - await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).NBytes) - ).errorOption: - return failure(reserveErr.toErr(ReserveFailedError)) - elif oldAvailability.totalSize > obj.totalSize: # storage removed - if reserveErr =? ( - await self.repo.release((oldAvailability.totalSize - obj.totalSize).NBytes) - ).errorOption: - return failure(reserveErr.toErr(ReleaseFailedError)) - - let res = await self.updateImpl(obj) - - if oldAvailability.freeSize < obj.freeSize: # availability added - # inform subscribers that Availability has been modified (with increased - # size) + without oldAvailability =? await self.get(key, Availability), err: + if err of NotExistsError: + trace "Creating new Availability" + let res = await self.updateImpl(obj) + # inform subscribers that Availability has been added if onAvailabilityAdded =? self.onAvailabilityAdded: await onAvailabilityAdded(obj) - return res - except CancelledError as e: - raise e - except CatchableError as e: - error "Error when trying to update availability", error = e.msg - return failure(e) + return res + else: + return failure(err) -proc update*(self: Reservations, obj: Reservation): Future[?!void] {.async.} = + if obj.until > 0: + without allReservations =? await self.all(Reservation, obj.id), error: + error.msg = "Error updating reservation: " & error.msg + return failure(error) + + let requestEnds = allReservations.mapIt(it.validUntil) + + if requestEnds.len > 0 and requestEnds.max > obj.until: + let error = newException( + UntilOutOfBoundsError, + "Until parameter must be greater or equal the current longest request", + ) + return failure(error) + + # Sizing of the availability changed, we need to adjust the repo reservation accordingly + if oldAvailability.totalSize != obj.totalSize: + trace "totalSize changed, updating repo reservation" + if oldAvailability.totalSize < obj.totalSize: # storage added + if reserveErr =? ( + await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).NBytes) + ).errorOption: + return failure(reserveErr.toErr(ReserveFailedError)) + elif oldAvailability.totalSize > obj.totalSize: # storage removed + if reserveErr =? ( + await self.repo.release((oldAvailability.totalSize - obj.totalSize).NBytes) + ).errorOption: + return failure(reserveErr.toErr(ReleaseFailedError)) + + let res = await self.updateImpl(obj) + + if oldAvailability.freeSize < obj.freeSize: # availability added + # inform subscribers that Availability has been modified (with increased + # size) + if onAvailabilityAdded =? self.onAvailabilityAdded: + await onAvailabilityAdded(obj) + return res + +proc update*( + self: Reservations, obj: Reservation +): Future[?!void] {.async: (raises: [CancelledError]).} = return await self.updateImpl(obj) proc update*( @@ -579,7 +584,7 @@ proc release*( reservationId: ReservationId, availabilityId: AvailabilityId, bytes: uint, -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = logScope: topics = "release" bytes @@ -633,31 +638,25 @@ proc storables( else: raiseAssert "unknown type" - try: - without results =? await self.repo.metaDs.ds.query(query), error: - return failure(error) + without results =? await self.repo.metaDs.ds.query(query), error: + return failure(error) - # /sales/reservations - proc next(): Future[?seq[byte]] {.async.} = - await idleAsync() - iter.finished = results.finished - if not results.finished and res =? (await results.next()) and res.data.len > 0 and - key =? res.key and key.namespaces.len == defaultKey.namespaces.len: - return some res.data + # /sales/reservations + proc next(): Future[?seq[byte]] {.async: (raises: [CancelledError]).} = + await idleAsync() + iter.finished = results.finished + if not results.finished and res =? (await results.next()) and res.data.len > 0 and + key =? res.key and key.namespaces.len == defaultKey.namespaces.len: + return some res.data - return none seq[byte] + return none seq[byte] - proc dispose(): Future[?!void] {.async.} = - return await results.dispose() + proc dispose(): Future[?!void] {.async: (raises: [CancelledError]).} = + return await results.dispose() - iter.next = next - iter.dispose = dispose - return success iter - except CancelledError as e: - raise e - except CatchableError as e: - error "Cannot retrieve the storables from the datastore", error = e.msg - return failure(e) + iter.next = next + iter.dispose = dispose + return success iter proc allImpl( self: Reservations, T: type SomeStorableObject, queryKey: Key = ReservationsKey @@ -667,8 +666,8 @@ proc allImpl( without storables =? (await self.storables(T, queryKey)), error: return failure(error) - try: - for storable in storables.items: + for storable in storables.items: + try: without bytes =? (await storable): continue @@ -678,11 +677,11 @@ proc allImpl( continue ret.add obj - except CancelledError as e: - raise e - except CatchableError as e: - error "error when retrieving storable", error = e.msg - return failure(e) + except CancelledError as err: + raise err + except CatchableError as err: + error "Error when retrieving storable", error = err.msg + continue return success(ret) diff --git a/codex/stores/repostore/operations.nim b/codex/stores/repostore/operations.nim index 125741e1..cc488240 100644 --- a/codex/stores/repostore/operations.nim +++ b/codex/stores/repostore/operations.nim @@ -105,7 +105,7 @@ proc updateQuotaUsage*( minusUsed: NBytes = 0.NBytes, plusReserved: NBytes = 0.NBytes, minusReserved: NBytes = 0.NBytes, -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = await self.metaDs.modify( QuotaUsedKey, proc(maybeCurrUsage: ?QuotaUsage): Future[?QuotaUsage] {.async.} = diff --git a/codex/stores/repostore/store.nim b/codex/stores/repostore/store.nim index d7305107..130ab15e 100644 --- a/codex/stores/repostore/store.nim +++ b/codex/stores/repostore/store.nim @@ -380,7 +380,9 @@ method close*(self: RepoStore): Future[void] {.async.} = # RepoStore procs ########################################################### -proc reserve*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} = +proc reserve*( + self: RepoStore, bytes: NBytes +): Future[?!void] {.async: (raises: [CancelledError]).} = ## Reserve bytes ## @@ -388,7 +390,9 @@ proc reserve*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} = await self.updateQuotaUsage(plusReserved = bytes) -proc release*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} = +proc release*( + self: RepoStore, bytes: NBytes +): Future[?!void] {.async: (raises: [CancelledError]).} = ## Release bytes ## diff --git a/vendor/nim-datastore b/vendor/nim-datastore index d67860ad..5778e373 160000 --- a/vendor/nim-datastore +++ b/vendor/nim-datastore @@ -1 +1 @@ -Subproject commit d67860add63fd23cdacde1d3da8f4739c2660c2d +Subproject commit 5778e373fa97286f389e0aef61f1e8f30a934dab