fix: sales cleanup cancellation (#1234)

* fix(sales): handle cancellation of slot cleanup

Ensures that processing slots from the slot queue
continues even when cleanup of a slot is cancelled.

Co-Authored-By: Eric <5089238+emizzle@users.noreply.github.com>

* chore(reservations): add more `raises` annotations

* Fix cleanup cancellation

* Add remove-agent to trackedfutures instead of the cleanup function

* Increase the timeout to match the request expiry

* Enable logs to debug on CI

* Remove useless except and do not return when add item back to slot queue fails

* Reduce poll interval to detect sale cancelled state

* Avoid cancelling cleanup routine

* Do not cancel creating reservation in order to avoid inconsistent state

* Remove useless try except

---------

Co-authored-by: Mark Spanbroek <mark@spanbroek.net>
Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com>
This commit is contained in:
Arnaud 2025-05-29 08:57:05 +02:00 committed by GitHub
parent 71422f0d3d
commit c689542579
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 255 additions and 151 deletions

View File

@ -105,14 +105,15 @@ proc new*(
subscriptions: @[],
)
proc remove(sales: Sales, agent: SalesAgent) {.async.} =
proc remove(sales: Sales, agent: SalesAgent) {.async: (raises: []).} =
await agent.stop()
if sales.running:
sales.agents.keepItIf(it != agent)
proc cleanUp(
sales: Sales, agent: SalesAgent, reprocessSlot: bool, returnedCollateral: ?UInt256
) {.async.} =
) {.async: (raises: []).} =
let data = agent.data
logScope:
@ -129,36 +130,32 @@ proc cleanUp(
# there are not really any bytes to be returned
if request =? data.request and reservation =? data.reservation:
if returnErr =? (
await sales.context.reservations.returnBytesToAvailability(
await noCancel sales.context.reservations.returnBytesToAvailability(
reservation.availabilityId, reservation.id, request.ask.slotSize
)
).errorOption:
error "failure returning bytes",
error = returnErr.msg, bytes = request.ask.slotSize
# delete reservation and return reservation bytes back to the availability
if reservation =? data.reservation and
deleteErr =? (
await sales.context.reservations.deleteReservation(
reservation.id, reservation.availabilityId, returnedCollateral
)
).errorOption:
error "failure deleting reservation", error = deleteErr.msg
if data.slotIndex > uint16.high.uint64:
error "Cannot cast slot index to uint16", slotIndex = data.slotIndex
return
# delete reservation and return reservation bytes back to the availability
if reservation =? data.reservation and
deleteErr =? (
await noCancel sales.context.reservations.deleteReservation(
reservation.id, reservation.availabilityId, returnedCollateral
)
).errorOption:
error "failure deleting reservation", error = deleteErr.msg
# Re-add items back into the queue to prevent small availabilities from
# draining the queue. Seen items will be ordered last.
if reprocessSlot and request =? data.request:
try:
without collateral =?
await sales.context.market.slotCollateral(data.requestId, data.slotIndex), err:
error "Failed to re-add item back to the slot queue: unable to calculate collateral",
error = err.msg
return
if data.slotIndex <= uint16.high.uint64 and reprocessSlot and request =? data.request:
let res =
await noCancel sales.context.market.slotCollateral(data.requestId, data.slotIndex)
if res.isErr:
error "Failed to re-add item back to the slot queue: unable to calculate collateral",
error = res.error.msg
else:
let collateral = res.get()
let queue = sales.context.slotQueue
var seenItem = SlotQueueItem.init(
data.requestId,
@ -171,11 +168,9 @@ proc cleanUp(
trace "pushing ignored item to queue, marked as seen"
if err =? queue.push(seenItem).errorOption:
error "failed to readd slot to queue", errorType = $(type err), error = err.msg
except MarketError as e:
error "Failed to re-add item back to the slot queue.", error = e.msg
return
await sales.remove(agent)
let fut = sales.remove(agent)
sales.trackedFutures.track(fut)
proc filled(sales: Sales, request: StorageRequest, slotIndex: uint64) =
if onSale =? sales.context.onSale:
@ -193,7 +188,7 @@ proc processSlot(
agent.onCleanUp = proc(
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
) {.async: (raises: []).} =
trace "slot cleanup"
await sales.cleanUp(agent, reprocessSlot, returnedCollateral)
completed.fire()
@ -269,7 +264,7 @@ proc load*(sales: Sales) {.async.} =
agent.onCleanUp = proc(
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
) {.async: (raises: []).} =
await sales.cleanUp(agent, reprocessSlot, returnedCollateral)
# There is no need to assign agent.onFilled as slots loaded from `mySlots`

View File

@ -248,7 +248,7 @@ proc exists*(
let exists = await self.repo.metaDs.ds.contains(key)
return exists
iterator items(self: StorableIter): Future[?seq[byte]] =
iterator items(self: StorableIter): auto =
while not self.finished:
yield self.next()
@ -368,7 +368,9 @@ proc update*(
error "Lock error when trying to update the availability", err = e.msg
return failure(e)
proc delete(self: Reservations, key: Key): Future[?!void] {.async.} =
proc delete(
self: Reservations, key: Key
): Future[?!void] {.async: (raises: [CancelledError]).} =
trace "deleting object", key
if not await self.exists(key):
@ -384,7 +386,7 @@ proc deleteReservation*(
reservationId: ReservationId,
availabilityId: AvailabilityId,
returnedCollateral: ?UInt256 = UInt256.none,
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
logScope:
reservationId
availabilityId
@ -393,35 +395,39 @@ proc deleteReservation*(
without key =? key(reservationId, availabilityId), error:
return failure(error)
withLock(self.availabilityLock):
without reservation =? (await self.get(key, Reservation)), error:
if error of NotExistsError:
return success()
else:
return failure(error)
try:
withLock(self.availabilityLock):
without reservation =? (await self.get(key, Reservation)), error:
if error of NotExistsError:
return success()
else:
return failure(error)
if reservation.size > 0.uint64:
trace "returning remaining reservation bytes to availability",
size = reservation.size
if reservation.size > 0.uint64:
trace "returning remaining reservation bytes to availability",
size = reservation.size
without availabilityKey =? availabilityId.key, error:
return failure(error)
without availabilityKey =? availabilityId.key, error:
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
availability.freeSize += reservation.size
availability.freeSize += reservation.size
if collateral =? returnedCollateral:
availability.totalRemainingCollateral += collateral
if collateral =? returnedCollateral:
availability.totalRemainingCollateral += collateral
if updateErr =? (await self.updateAvailability(availability)).errorOption:
return failure(updateErr)
if updateErr =? (await self.updateAvailability(availability)).errorOption:
return failure(updateErr)
if err =? (await self.repo.metaDs.ds.delete(key)).errorOption:
return failure(err.toErr(DeleteFailedError))
if err =? (await self.repo.metaDs.ds.delete(key)).errorOption:
return failure(err.toErr(DeleteFailedError))
return success()
return success()
except AsyncLockError as e:
error "Lock error when trying to delete the availability", err = e.msg
return failure(e)
# TODO: add support for deleting availabilities
# To delete, must not have any active sales.
@ -434,7 +440,7 @@ proc createAvailability*(
totalCollateral: UInt256,
enabled: bool,
until: SecondsSince1970,
): Future[?!Availability] {.async.} =
): Future[?!Availability] {.async: (raises: [CancelledError]).} =
trace "creating availability",
size, duration, minPricePerBytePerSecond, totalCollateral, enabled, until
@ -470,109 +476,116 @@ method createReservation*(
slotIndex: uint64,
collateralPerByte: UInt256,
validUntil: SecondsSince1970,
): Future[?!Reservation] {.async, base.} =
withLock(self.availabilityLock):
without availabilityKey =? availabilityId.key, error:
return failure(error)
): Future[?!Reservation] {.async: (raises: [CancelledError]), base.} =
try:
withLock(self.availabilityLock):
without availabilityKey =? availabilityId.key, error:
return failure(error)
without availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
without availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
# Check that the found availability has enough free space after the lock has been acquired, to prevent asynchronous Availiability modifications
if availability.freeSize < slotSize:
let error = newException(
BytesOutOfBoundsError,
"trying to reserve an amount of bytes that is greater than the free size of the Availability",
)
return failure(error)
# Check that the found availability has enough free space after the lock has been acquired, to prevent asynchronous Availiability modifications
if availability.freeSize < slotSize:
let error = newException(
BytesOutOfBoundsError,
"trying to reserve an amount of bytes that is greater than the free size of the Availability",
)
return failure(error)
trace "Creating reservation",
availabilityId, slotSize, requestId, slotIndex, validUntil = validUntil
trace "Creating reservation",
availabilityId, slotSize, requestId, slotIndex, validUntil = validUntil
let reservation =
Reservation.init(availabilityId, slotSize, requestId, slotIndex, validUntil)
let reservation =
Reservation.init(availabilityId, slotSize, requestId, slotIndex, validUntil)
if createResErr =? (await self.update(reservation)).errorOption:
return failure(createResErr)
if createResErr =? (await self.update(reservation)).errorOption:
return failure(createResErr)
# reduce availability freeSize by the slot size, which is now accounted for in
# the newly created Reservation
availability.freeSize -= slotSize
# reduce availability freeSize by the slot size, which is now accounted for in
# the newly created Reservation
availability.freeSize -= slotSize
# adjust the remaining totalRemainingCollateral
availability.totalRemainingCollateral -= slotSize.u256 * collateralPerByte
# adjust the remaining totalRemainingCollateral
availability.totalRemainingCollateral -= slotSize.u256 * collateralPerByte
# update availability with reduced size
trace "Updating availability with reduced size"
if updateErr =? (await self.updateAvailability(availability)).errorOption:
trace "Updating availability failed, rolling back reservation creation"
# update availability with reduced size
trace "Updating availability with reduced size"
if updateErr =? (await self.updateAvailability(availability)).errorOption:
trace "Updating availability failed, rolling back reservation creation"
without key =? reservation.key, keyError:
keyError.parent = updateErr
return failure(keyError)
without key =? reservation.key, keyError:
keyError.parent = updateErr
return failure(keyError)
# rollback the reservation creation
if rollbackErr =? (await self.delete(key)).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)
# rollback the reservation creation
if rollbackErr =? (await self.delete(key)).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)
return failure(updateErr)
return failure(updateErr)
trace "Reservation succesfully created"
return success(reservation)
trace "Reservation succesfully created"
return success(reservation)
except AsyncLockError as e:
error "Lock error when trying to delete the availability", err = e.msg
return failure(e)
proc returnBytesToAvailability*(
self: Reservations,
availabilityId: AvailabilityId,
reservationId: ReservationId,
bytes: uint64,
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
logScope:
reservationId
availabilityId
try:
withLock(self.availabilityLock):
without key =? key(reservationId, availabilityId), error:
return failure(error)
withLock(self.availabilityLock):
without key =? key(reservationId, availabilityId), error:
return failure(error)
without var reservation =? (await self.get(key, Reservation)), error:
return failure(error)
without var reservation =? (await self.get(key, Reservation)), error:
return failure(error)
# We are ignoring bytes that are still present in the Reservation because
# they will be returned to Availability through `deleteReservation`.
let bytesToBeReturned = bytes - reservation.size
# We are ignoring bytes that are still present in the Reservation because
# they will be returned to Availability through `deleteReservation`.
let bytesToBeReturned = bytes - reservation.size
if bytesToBeReturned == 0:
trace "No bytes are returned",
requestSizeBytes = bytes, returningBytes = bytesToBeReturned
return success()
if bytesToBeReturned == 0:
trace "No bytes are returned",
trace "Returning bytes",
requestSizeBytes = bytes, returningBytes = bytesToBeReturned
# First lets see if we can re-reserve the bytes, if the Repo's quota
# is depleted then we will fail-fast as there is nothing to be done atm.
if reserveErr =? (await self.repo.reserve(bytesToBeReturned.NBytes)).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))
without availabilityKey =? availabilityId.key, error:
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
availability.freeSize += bytesToBeReturned
# Update availability with returned size
if updateErr =? (await self.updateAvailability(availability)).errorOption:
trace "Rolling back returning bytes"
if rollbackErr =? (await self.repo.release(bytesToBeReturned.NBytes)).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)
return failure(updateErr)
return success()
trace "Returning bytes",
requestSizeBytes = bytes, returningBytes = bytesToBeReturned
# First lets see if we can re-reserve the bytes, if the Repo's quota
# is depleted then we will fail-fast as there is nothing to be done atm.
if reserveErr =? (await self.repo.reserve(bytesToBeReturned.NBytes)).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))
without availabilityKey =? availabilityId.key, error:
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
availability.freeSize += bytesToBeReturned
# Update availability with returned size
if updateErr =? (await self.updateAvailability(availability)).errorOption:
trace "Rolling back returning bytes"
if rollbackErr =? (await self.repo.release(bytesToBeReturned.NBytes)).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)
return failure(updateErr)
return success()
except AsyncLockError as e:
error "Lock error when returning bytes to the availability", err = e.msg
return failure(e)
proc release*(
self: Reservations,
@ -698,7 +711,7 @@ proc findAvailability*(
size, duration: uint64,
pricePerBytePerSecond, collateralPerByte: UInt256,
validUntil: SecondsSince1970,
): Future[?Availability] {.async.} =
): Future[?Availability] {.async: (raises: [CancelledError]).} =
without storables =? (await self.storables(Availability)), e:
error "failed to get all storables", error = e.msg
return none Availability

View File

@ -26,10 +26,10 @@ type
onCleanUp*: OnCleanUp
onFilled*: ?OnFilled
OnCleanUp* = proc(
reprocessSlot = false, returnedCollateral = UInt256.none
): Future[void] {.gcsafe, upraises: [].}
OnFilled* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, upraises: [].}
OnCleanUp* = proc(reprocessSlot = false, returnedCollateral = UInt256.none) {.
async: (raises: [])
.}
OnFilled* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, raises: [].}
SalesAgentError = object of CodexError
AllSlotsFilledError* = object of SalesAgentError
@ -132,7 +132,7 @@ proc subscribe*(agent: SalesAgent) {.async.} =
await agent.subscribeCancellation()
agent.subscribed = true
proc unsubscribe*(agent: SalesAgent) {.async.} =
proc unsubscribe*(agent: SalesAgent) {.async: (raises: []).} =
if not agent.subscribed:
return
@ -143,6 +143,6 @@ proc unsubscribe*(agent: SalesAgent) {.async.} =
agent.subscribed = false
proc stop*(agent: SalesAgent) {.async.} =
proc stop*(agent: SalesAgent) {.async: (raises: []).} =
await Machine(agent).stop()
await agent.unsubscribe()

View File

@ -82,7 +82,7 @@ method run*(
info "Availability found for request, creating reservation"
without reservation =?
await reservations.createReservation(
await noCancel reservations.createReservation(
availability.id, request.ask.slotSize, request.id, data.slotIndex,
request.ask.collateralPerByte, requestEnd,
), error:

View File

@ -2,7 +2,6 @@ import pkg/questionable
import pkg/chronos
import ../logutils
import ./trackedfutures
import ./exceptions
{.push raises: [].}
@ -89,7 +88,7 @@ proc start*(machine: Machine, initialState: State) =
machine.trackedFutures.track(fut)
machine.schedule(Event.transition(machine.state, initialState))
proc stop*(machine: Machine) {.async.} =
proc stop*(machine: Machine) {.async: (raises: []).} =
if not machine.started:
return

View File

@ -30,7 +30,7 @@ method createReservation*(
slotIndex: uint64,
collateralPerByte: UInt256,
validUntil: SecondsSince1970,
): Future[?!Reservation] {.async.} =
): Future[?!Reservation] {.async: (raises: [CancelledError]).} =
if self.createReservationThrowBytesOutOfBoundsError:
let error = newException(
BytesOutOfBoundsError,

View File

@ -31,7 +31,7 @@ asyncchecksuite "sales state 'cancelled'":
market = MockMarket.new()
let onCleanUp = proc(
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
) {.async: (raises: []).} =
reprocessSlotWas = some reprocessSlot
returnedCollateralValue = returnedCollateral

View File

@ -25,7 +25,7 @@ asyncchecksuite "sales state 'errored'":
setup:
let onCleanUp = proc(
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
) {.async: (raises: []).} =
reprocessSlotWas = reprocessSlot
let context = SalesContext(market: market, clock: clock)

View File

@ -31,7 +31,7 @@ asyncchecksuite "sales state 'finished'":
market = MockMarket.new()
let onCleanUp = proc(
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
) {.async: (raises: []).} =
reprocessSlotWas = some reprocessSlot
returnedCollateralValue = returnedCollateral

View File

@ -25,7 +25,7 @@ asyncchecksuite "sales state 'ignored'":
setup:
let onCleanUp = proc(
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
) {.async: (raises: []).} =
reprocessSlotWas = reprocessSlot
let context = SalesContext(market: market, clock: clock)

View File

@ -424,3 +424,10 @@ proc requestId*(
proc buildUrl*(client: CodexClient, path: string): string =
return client.baseurl & path
proc getSlots*(
client: CodexClient
): Future[?!seq[Slot]] {.async: (raises: [CancelledError, HttpError]).} =
let url = client.baseurl & "/sales/slots"
let body = await client.getContent(url)
seq[Slot].fromJson(body)

View File

@ -133,6 +133,87 @@ marketplacesuite "Marketplace":
timeout = 10 * 1000, # give client a bit of time to withdraw its funds
)
test "SP are able to process slots after workers were busy with other slots and ignored them",
NodeConfigs(
clients: CodexConfigs.init(nodes = 1)
# .debug()
.some,
providers: CodexConfigs.init(nodes = 2)
# .debug()
# .withLogFile()
# .withLogTopics("marketplace", "sales", "statemachine","slotqueue", "reservations")
.some,
):
let client0 = clients()[0]
let provider0 = providers()[0]
let provider1 = providers()[1]
let duration = 20 * 60.uint64
let data = await RandomChunker.example(blocks = blocks)
let slotSize = slotSize(blocks, ecNodes, ecTolerance)
# We create an avavilability allowing the first SP to host the 3 slots.
# So the second SP will not have any availability so it will just process
# the slots and ignore them.
discard await provider0.client.postAvailability(
totalSize = 3 * slotSize.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = 3 * slotSize * minPricePerBytePerSecond,
)
let cid = (await client0.client.upload(data)).get
let purchaseId = await client0.client.requestStorage(
cid,
duration = duration,
pricePerBytePerSecond = minPricePerBytePerSecond,
proofProbability = 1.u256,
expiry = 10 * 60.uint64,
collateralPerByte = collateralPerByte,
nodes = ecNodes,
tolerance = ecTolerance,
)
let requestId = (await client0.client.requestId(purchaseId)).get
# We wait that the 3 slots are filled by the first SP
check eventually(
await client0.client.purchaseStateIs(purchaseId, "started"),
timeout = 10 * 60.int * 1000,
)
# Here we create the same availability as previously but for the second SP.
# Meaning that, after ignoring all the slots for the first request, the second SP will process
# and host the slots for the second request.
discard await provider1.client.postAvailability(
totalSize = 3 * slotSize.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = 3 * slotSize * collateralPerByte,
)
let purchaseId2 = await client0.client.requestStorage(
cid,
duration = duration,
pricePerBytePerSecond = minPricePerBytePerSecond,
proofProbability = 3.u256,
expiry = 10 * 60.uint64,
collateralPerByte = collateralPerByte,
nodes = ecNodes,
tolerance = ecTolerance,
)
let requestId2 = (await client0.client.requestId(purchaseId2)).get
# Wait that the slots of the second request are filled
check eventually(
await client0.client.purchaseStateIs(purchaseId2, "started"),
timeout = 10 * 60.int * 1000,
)
# Double check, verify that our second SP hosts the 3 slots
check eventually ((await provider1.client.getSlots()).get).len == 3
marketplacesuite "Marketplace payouts":
const minPricePerBytePerSecond = 1.u256
const collateralPerByte = 1.u256
@ -145,14 +226,18 @@ marketplacesuite "Marketplace payouts":
# Uncomment to start Hardhat automatically, typically so logs can be inspected locally
hardhat: HardhatConfig.none,
clients: CodexConfigs.init(nodes = 1)
# .debug() # uncomment to enable console log output.debug()
# .withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
# .debug() # uncomment to enable console log output.debug()
# .withLogFile()
# # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
# .withLogTopics("node", "erasure")
.some,
providers: CodexConfigs.init(nodes = 1)
# .debug() # uncomment to enable console log output
# .withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
# .withLogTopics("node", "marketplace", "sales", "reservations", "node", "proving", "clock")
# .debug() # uncomment to enable console log output
# .withLogFile()
# # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
# .withLogTopics(
# "node", "marketplace", "sales", "reservations", "node", "statemachine"
# )
.some,
):
let duration = 20.periods
@ -203,7 +288,12 @@ marketplacesuite "Marketplace payouts":
# wait until sale is cancelled
await ethProvider.advanceTime(expiry.u256)
check eventually await providerApi.saleStateIs(slotId, "SaleCancelled")
check eventually(
await providerApi.saleStateIs(slotId, "SaleCancelled"),
timeout = 5 * 1000,
pollInterval = 200,
)
await advanceToNextPeriod()