fix(sales): handle cancellation of slot cleanup

Ensures that processing slots from the slot queue
continues even when cleanup of a slot is cancelled.

Co-Authored-By: Eric <5089238+emizzle@users.noreply.github.com>
This commit is contained in:
Mark Spanbroek 2025-05-20 15:47:48 +02:00
parent f7d06cd0e8
commit 9438e5069c
No known key found for this signature in database
GPG Key ID: FBE3E9548D427C00
8 changed files with 90 additions and 78 deletions

View File

@ -105,14 +105,14 @@ proc new*(
subscriptions: @[],
)
proc remove(sales: Sales, agent: SalesAgent) {.async.} =
proc remove(sales: Sales, agent: SalesAgent) {.async: (raises: [CancelledError]).} =
await agent.stop()
if sales.running:
sales.agents.keepItIf(it != agent)
proc cleanUp(
sales: Sales, agent: SalesAgent, reprocessSlot: bool, returnedCollateral: ?UInt256
) {.async.} =
) {.async: (raises: [CancelledError]).} =
let data = agent.data
logScope:
@ -193,9 +193,12 @@ proc processSlot(
agent.onCleanUp = proc(
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
) {.async: (raises: []).} =
trace "slot cleanup"
await sales.cleanUp(agent, reprocessSlot, returnedCollateral)
try:
await sales.cleanUp(agent, reprocessSlot, returnedCollateral)
except CancelledError as e:
trace "slot cleanup was cancelled", error = e.msgDetail
completed.fire()
agent.onFilled = some proc(request: StorageRequest, slotIndex: uint64) =
@ -269,8 +272,11 @@ proc load*(sales: Sales) {.async.} =
agent.onCleanUp = proc(
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
await sales.cleanUp(agent, reprocessSlot, returnedCollateral)
) {.async: (raises: []).} =
try:
await sales.cleanUp(agent, reprocessSlot, returnedCollateral)
except CancelledError as e:
trace "slot cleanup was cancelled", error = e.msgDetail
# There is no need to assign agent.onFilled as slots loaded from `mySlots`
# are inherently already filled and so assigning agent.onFilled would be

View File

@ -384,7 +384,7 @@ proc deleteReservation*(
reservationId: ReservationId,
availabilityId: AvailabilityId,
returnedCollateral: ?UInt256 = UInt256.none,
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
logScope:
reservationId
availabilityId
@ -393,35 +393,39 @@ proc deleteReservation*(
without key =? key(reservationId, availabilityId), error:
return failure(error)
withLock(self.availabilityLock):
without reservation =? (await self.get(key, Reservation)), error:
if error of NotExistsError:
return success()
else:
return failure(error)
try:
withLock(self.availabilityLock):
without reservation =? (await self.get(key, Reservation)), error:
if error of NotExistsError:
return success()
else:
return failure(error)
if reservation.size > 0.uint64:
trace "returning remaining reservation bytes to availability",
size = reservation.size
if reservation.size > 0.uint64:
trace "returning remaining reservation bytes to availability",
size = reservation.size
without availabilityKey =? availabilityId.key, error:
return failure(error)
without availabilityKey =? availabilityId.key, error:
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
availability.freeSize += reservation.size
availability.freeSize += reservation.size
if collateral =? returnedCollateral:
availability.totalRemainingCollateral += collateral
if collateral =? returnedCollateral:
availability.totalRemainingCollateral += collateral
if updateErr =? (await self.updateAvailability(availability)).errorOption:
return failure(updateErr)
if updateErr =? (await self.updateAvailability(availability)).errorOption:
return failure(updateErr)
if err =? (await self.repo.metaDs.ds.delete(key)).errorOption:
return failure(err.toErr(DeleteFailedError))
if err =? (await self.repo.metaDs.ds.delete(key)).errorOption:
return failure(err.toErr(DeleteFailedError))
return success()
return success()
except AsyncLockError as e:
error "Lock error when trying to delete the availability", err = e.msg
return failure(e)
# TODO: add support for deleting availabilities
# To delete, must not have any active sales.
@ -526,53 +530,56 @@ proc returnBytesToAvailability*(
availabilityId: AvailabilityId,
reservationId: ReservationId,
bytes: uint64,
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
logScope:
reservationId
availabilityId
try:
withLock(self.availabilityLock):
without key =? key(reservationId, availabilityId), error:
return failure(error)
withLock(self.availabilityLock):
without key =? key(reservationId, availabilityId), error:
return failure(error)
without var reservation =? (await self.get(key, Reservation)), error:
return failure(error)
without var reservation =? (await self.get(key, Reservation)), error:
return failure(error)
# We are ignoring bytes that are still present in the Reservation because
# they will be returned to Availability through `deleteReservation`.
let bytesToBeReturned = bytes - reservation.size
# We are ignoring bytes that are still present in the Reservation because
# they will be returned to Availability through `deleteReservation`.
let bytesToBeReturned = bytes - reservation.size
if bytesToBeReturned == 0:
trace "No bytes are returned",
requestSizeBytes = bytes, returningBytes = bytesToBeReturned
return success()
if bytesToBeReturned == 0:
trace "No bytes are returned",
trace "Returning bytes",
requestSizeBytes = bytes, returningBytes = bytesToBeReturned
# First lets see if we can re-reserve the bytes, if the Repo's quota
# is depleted then we will fail-fast as there is nothing to be done atm.
if reserveErr =? (await self.repo.reserve(bytesToBeReturned.NBytes)).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))
without availabilityKey =? availabilityId.key, error:
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
availability.freeSize += bytesToBeReturned
# Update availability with returned size
if updateErr =? (await self.updateAvailability(availability)).errorOption:
trace "Rolling back returning bytes"
if rollbackErr =? (await self.repo.release(bytesToBeReturned.NBytes)).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)
return failure(updateErr)
return success()
trace "Returning bytes",
requestSizeBytes = bytes, returningBytes = bytesToBeReturned
# First lets see if we can re-reserve the bytes, if the Repo's quota
# is depleted then we will fail-fast as there is nothing to be done atm.
if reserveErr =? (await self.repo.reserve(bytesToBeReturned.NBytes)).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))
without availabilityKey =? availabilityId.key, error:
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
availability.freeSize += bytesToBeReturned
# Update availability with returned size
if updateErr =? (await self.updateAvailability(availability)).errorOption:
trace "Rolling back returning bytes"
if rollbackErr =? (await self.repo.release(bytesToBeReturned.NBytes)).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)
return failure(updateErr)
return success()
except AsyncLockError as e:
error "Lock error when returning bytes to the availability", err = e.msg
return failure(e)
proc release*(
self: Reservations,

View File

@ -26,10 +26,10 @@ type
onCleanUp*: OnCleanUp
onFilled*: ?OnFilled
OnCleanUp* = proc(
reprocessSlot = false, returnedCollateral = UInt256.none
): Future[void] {.gcsafe, upraises: [].}
OnFilled* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, upraises: [].}
OnCleanUp* = proc(reprocessSlot = false, returnedCollateral = UInt256.none) {.
async: (raises: [])
.}
OnFilled* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, raises: [].}
SalesAgentError = object of CodexError
AllSlotsFilledError* = object of SalesAgentError
@ -132,7 +132,7 @@ proc subscribe*(agent: SalesAgent) {.async.} =
await agent.subscribeCancellation()
agent.subscribed = true
proc unsubscribe*(agent: SalesAgent) {.async.} =
proc unsubscribe*(agent: SalesAgent) {.async: (raises: [CancelledError]).} =
if not agent.subscribed:
return
@ -143,6 +143,6 @@ proc unsubscribe*(agent: SalesAgent) {.async.} =
agent.subscribed = false
proc stop*(agent: SalesAgent) {.async.} =
proc stop*(agent: SalesAgent) {.async: (raises: [CancelledError]).} =
await Machine(agent).stop()
await agent.unsubscribe()

View File

@ -2,7 +2,6 @@ import pkg/questionable
import pkg/chronos
import ../logutils
import ./trackedfutures
import ./exceptions
{.push raises: [].}
@ -89,7 +88,7 @@ proc start*(machine: Machine, initialState: State) =
machine.trackedFutures.track(fut)
machine.schedule(Event.transition(machine.state, initialState))
proc stop*(machine: Machine) {.async.} =
proc stop*(machine: Machine) {.async: (raises: []).} =
if not machine.started:
return

View File

@ -31,7 +31,7 @@ asyncchecksuite "sales state 'cancelled'":
market = MockMarket.new()
let onCleanUp = proc(
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
) {.async: (raises: []).} =
reprocessSlotWas = some reprocessSlot
returnedCollateralValue = returnedCollateral

View File

@ -25,7 +25,7 @@ asyncchecksuite "sales state 'errored'":
setup:
let onCleanUp = proc(
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
) {.async: (raises: []).} =
reprocessSlotWas = reprocessSlot
let context = SalesContext(market: market, clock: clock)

View File

@ -31,7 +31,7 @@ asyncchecksuite "sales state 'finished'":
market = MockMarket.new()
let onCleanUp = proc(
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
) {.async: (raises: []).} =
reprocessSlotWas = some reprocessSlot
returnedCollateralValue = returnedCollateral

View File

@ -25,7 +25,7 @@ asyncchecksuite "sales state 'ignored'":
setup:
let onCleanUp = proc(
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
) {.async: (raises: []).} =
reprocessSlotWas = reprocessSlot
let context = SalesContext(market: market, clock: clock)