From 9438e5069c085dcf62559e2a926a01b2801650a5 Mon Sep 17 00:00:00 2001 From: Mark Spanbroek Date: Tue, 20 May 2025 15:47:48 +0200 Subject: [PATCH] fix(sales): handle cancellation of slot cleanup Ensures that processing slots from the slot queue continues even when cleanup of a slot is cancelled. Co-Authored-By: Eric <5089238+emizzle@users.noreply.github.com> --- codex/sales.nim | 18 ++- codex/sales/reservations.nim | 127 +++++++++++---------- codex/sales/salesagent.nim | 12 +- codex/utils/asyncstatemachine.nim | 3 +- tests/codex/sales/states/testcancelled.nim | 2 +- tests/codex/sales/states/testerrored.nim | 2 +- tests/codex/sales/states/testfinished.nim | 2 +- tests/codex/sales/states/testignored.nim | 2 +- 8 files changed, 90 insertions(+), 78 deletions(-) diff --git a/codex/sales.nim b/codex/sales.nim index 01cc0fd7..fa09eefd 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -105,14 +105,14 @@ proc new*( subscriptions: @[], ) -proc remove(sales: Sales, agent: SalesAgent) {.async.} = +proc remove(sales: Sales, agent: SalesAgent) {.async: (raises: [CancelledError]).} = await agent.stop() if sales.running: sales.agents.keepItIf(it != agent) proc cleanUp( sales: Sales, agent: SalesAgent, reprocessSlot: bool, returnedCollateral: ?UInt256 -) {.async.} = +) {.async: (raises: [CancelledError]).} = let data = agent.data logScope: @@ -193,9 +193,12 @@ proc processSlot( agent.onCleanUp = proc( reprocessSlot = false, returnedCollateral = UInt256.none - ) {.async.} = + ) {.async: (raises: []).} = trace "slot cleanup" - await sales.cleanUp(agent, reprocessSlot, returnedCollateral) + try: + await sales.cleanUp(agent, reprocessSlot, returnedCollateral) + except CancelledError as e: + trace "slot cleanup was cancelled", error = e.msgDetail completed.fire() agent.onFilled = some proc(request: StorageRequest, slotIndex: uint64) = @@ -269,8 +272,11 @@ proc load*(sales: Sales) {.async.} = agent.onCleanUp = proc( reprocessSlot = false, returnedCollateral = UInt256.none - ) {.async.} = - await sales.cleanUp(agent, reprocessSlot, returnedCollateral) + ) {.async: (raises: []).} = + try: + await sales.cleanUp(agent, reprocessSlot, returnedCollateral) + except CancelledError as e: + trace "slot cleanup was cancelled", error = e.msgDetail # There is no need to assign agent.onFilled as slots loaded from `mySlots` # are inherently already filled and so assigning agent.onFilled would be diff --git a/codex/sales/reservations.nim b/codex/sales/reservations.nim index 07e3f406..0e9a83f9 100644 --- a/codex/sales/reservations.nim +++ b/codex/sales/reservations.nim @@ -384,7 +384,7 @@ proc deleteReservation*( reservationId: ReservationId, availabilityId: AvailabilityId, returnedCollateral: ?UInt256 = UInt256.none, -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = logScope: reservationId availabilityId @@ -393,35 +393,39 @@ proc deleteReservation*( without key =? key(reservationId, availabilityId), error: return failure(error) - withLock(self.availabilityLock): - without reservation =? (await self.get(key, Reservation)), error: - if error of NotExistsError: - return success() - else: - return failure(error) + try: + withLock(self.availabilityLock): + without reservation =? (await self.get(key, Reservation)), error: + if error of NotExistsError: + return success() + else: + return failure(error) - if reservation.size > 0.uint64: - trace "returning remaining reservation bytes to availability", - size = reservation.size + if reservation.size > 0.uint64: + trace "returning remaining reservation bytes to availability", + size = reservation.size - without availabilityKey =? availabilityId.key, error: - return failure(error) + without availabilityKey =? availabilityId.key, error: + return failure(error) - without var availability =? await self.get(availabilityKey, Availability), error: - return failure(error) + without var availability =? await self.get(availabilityKey, Availability), error: + return failure(error) - availability.freeSize += reservation.size + availability.freeSize += reservation.size - if collateral =? returnedCollateral: - availability.totalRemainingCollateral += collateral + if collateral =? returnedCollateral: + availability.totalRemainingCollateral += collateral - if updateErr =? (await self.updateAvailability(availability)).errorOption: - return failure(updateErr) + if updateErr =? (await self.updateAvailability(availability)).errorOption: + return failure(updateErr) - if err =? (await self.repo.metaDs.ds.delete(key)).errorOption: - return failure(err.toErr(DeleteFailedError)) + if err =? (await self.repo.metaDs.ds.delete(key)).errorOption: + return failure(err.toErr(DeleteFailedError)) - return success() + return success() + except AsyncLockError as e: + error "Lock error when trying to delete the availability", err = e.msg + return failure(e) # TODO: add support for deleting availabilities # To delete, must not have any active sales. @@ -526,53 +530,56 @@ proc returnBytesToAvailability*( availabilityId: AvailabilityId, reservationId: ReservationId, bytes: uint64, -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = logScope: reservationId availabilityId + try: + withLock(self.availabilityLock): + without key =? key(reservationId, availabilityId), error: + return failure(error) - withLock(self.availabilityLock): - without key =? key(reservationId, availabilityId), error: - return failure(error) + without var reservation =? (await self.get(key, Reservation)), error: + return failure(error) - without var reservation =? (await self.get(key, Reservation)), error: - return failure(error) + # We are ignoring bytes that are still present in the Reservation because + # they will be returned to Availability through `deleteReservation`. + let bytesToBeReturned = bytes - reservation.size - # We are ignoring bytes that are still present in the Reservation because - # they will be returned to Availability through `deleteReservation`. - let bytesToBeReturned = bytes - reservation.size + if bytesToBeReturned == 0: + trace "No bytes are returned", + requestSizeBytes = bytes, returningBytes = bytesToBeReturned + return success() - if bytesToBeReturned == 0: - trace "No bytes are returned", + trace "Returning bytes", requestSizeBytes = bytes, returningBytes = bytesToBeReturned + + # First lets see if we can re-reserve the bytes, if the Repo's quota + # is depleted then we will fail-fast as there is nothing to be done atm. + if reserveErr =? (await self.repo.reserve(bytesToBeReturned.NBytes)).errorOption: + return failure(reserveErr.toErr(ReserveFailedError)) + + without availabilityKey =? availabilityId.key, error: + return failure(error) + + without var availability =? await self.get(availabilityKey, Availability), error: + return failure(error) + + availability.freeSize += bytesToBeReturned + + # Update availability with returned size + if updateErr =? (await self.updateAvailability(availability)).errorOption: + trace "Rolling back returning bytes" + if rollbackErr =? (await self.repo.release(bytesToBeReturned.NBytes)).errorOption: + rollbackErr.parent = updateErr + return failure(rollbackErr) + + return failure(updateErr) + return success() - - trace "Returning bytes", - requestSizeBytes = bytes, returningBytes = bytesToBeReturned - - # First lets see if we can re-reserve the bytes, if the Repo's quota - # is depleted then we will fail-fast as there is nothing to be done atm. - if reserveErr =? (await self.repo.reserve(bytesToBeReturned.NBytes)).errorOption: - return failure(reserveErr.toErr(ReserveFailedError)) - - without availabilityKey =? availabilityId.key, error: - return failure(error) - - without var availability =? await self.get(availabilityKey, Availability), error: - return failure(error) - - availability.freeSize += bytesToBeReturned - - # Update availability with returned size - if updateErr =? (await self.updateAvailability(availability)).errorOption: - trace "Rolling back returning bytes" - if rollbackErr =? (await self.repo.release(bytesToBeReturned.NBytes)).errorOption: - rollbackErr.parent = updateErr - return failure(rollbackErr) - - return failure(updateErr) - - return success() + except AsyncLockError as e: + error "Lock error when returning bytes to the availability", err = e.msg + return failure(e) proc release*( self: Reservations, diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim index 61f3a9d3..26baa0c4 100644 --- a/codex/sales/salesagent.nim +++ b/codex/sales/salesagent.nim @@ -26,10 +26,10 @@ type onCleanUp*: OnCleanUp onFilled*: ?OnFilled - OnCleanUp* = proc( - reprocessSlot = false, returnedCollateral = UInt256.none - ): Future[void] {.gcsafe, upraises: [].} - OnFilled* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, upraises: [].} + OnCleanUp* = proc(reprocessSlot = false, returnedCollateral = UInt256.none) {. + async: (raises: []) + .} + OnFilled* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, raises: [].} SalesAgentError = object of CodexError AllSlotsFilledError* = object of SalesAgentError @@ -132,7 +132,7 @@ proc subscribe*(agent: SalesAgent) {.async.} = await agent.subscribeCancellation() agent.subscribed = true -proc unsubscribe*(agent: SalesAgent) {.async.} = +proc unsubscribe*(agent: SalesAgent) {.async: (raises: [CancelledError]).} = if not agent.subscribed: return @@ -143,6 +143,6 @@ proc unsubscribe*(agent: SalesAgent) {.async.} = agent.subscribed = false -proc stop*(agent: SalesAgent) {.async.} = +proc stop*(agent: SalesAgent) {.async: (raises: [CancelledError]).} = await Machine(agent).stop() await agent.unsubscribe() diff --git a/codex/utils/asyncstatemachine.nim b/codex/utils/asyncstatemachine.nim index eb84378c..194aea20 100644 --- a/codex/utils/asyncstatemachine.nim +++ b/codex/utils/asyncstatemachine.nim @@ -2,7 +2,6 @@ import pkg/questionable import pkg/chronos import ../logutils import ./trackedfutures -import ./exceptions {.push raises: [].} @@ -89,7 +88,7 @@ proc start*(machine: Machine, initialState: State) = machine.trackedFutures.track(fut) machine.schedule(Event.transition(machine.state, initialState)) -proc stop*(machine: Machine) {.async.} = +proc stop*(machine: Machine) {.async: (raises: []).} = if not machine.started: return diff --git a/tests/codex/sales/states/testcancelled.nim b/tests/codex/sales/states/testcancelled.nim index 6eaf1f5a..ebd9fe2d 100644 --- a/tests/codex/sales/states/testcancelled.nim +++ b/tests/codex/sales/states/testcancelled.nim @@ -31,7 +31,7 @@ asyncchecksuite "sales state 'cancelled'": market = MockMarket.new() let onCleanUp = proc( reprocessSlot = false, returnedCollateral = UInt256.none - ) {.async.} = + ) {.async: (raises: []).} = reprocessSlotWas = some reprocessSlot returnedCollateralValue = returnedCollateral diff --git a/tests/codex/sales/states/testerrored.nim b/tests/codex/sales/states/testerrored.nim index 0cc26cf8..b0352edb 100644 --- a/tests/codex/sales/states/testerrored.nim +++ b/tests/codex/sales/states/testerrored.nim @@ -25,7 +25,7 @@ asyncchecksuite "sales state 'errored'": setup: let onCleanUp = proc( reprocessSlot = false, returnedCollateral = UInt256.none - ) {.async.} = + ) {.async: (raises: []).} = reprocessSlotWas = reprocessSlot let context = SalesContext(market: market, clock: clock) diff --git a/tests/codex/sales/states/testfinished.nim b/tests/codex/sales/states/testfinished.nim index 1648df3a..b5502351 100644 --- a/tests/codex/sales/states/testfinished.nim +++ b/tests/codex/sales/states/testfinished.nim @@ -31,7 +31,7 @@ asyncchecksuite "sales state 'finished'": market = MockMarket.new() let onCleanUp = proc( reprocessSlot = false, returnedCollateral = UInt256.none - ) {.async.} = + ) {.async: (raises: []).} = reprocessSlotWas = some reprocessSlot returnedCollateralValue = returnedCollateral diff --git a/tests/codex/sales/states/testignored.nim b/tests/codex/sales/states/testignored.nim index 5eea7d16..b509ce9b 100644 --- a/tests/codex/sales/states/testignored.nim +++ b/tests/codex/sales/states/testignored.nim @@ -25,7 +25,7 @@ asyncchecksuite "sales state 'ignored'": setup: let onCleanUp = proc( reprocessSlot = false, returnedCollateral = UInt256.none - ) {.async.} = + ) {.async: (raises: []).} = reprocessSlotWas = reprocessSlot let context = SalesContext(market: market, clock: clock)