Fix cleanup cancellation

This commit is contained in:
Arnaud 2025-05-22 09:18:52 +02:00
parent e869ce7154
commit 0ebf083039
No known key found for this signature in database
GPG Key ID: B8FBC178F10CA7AE
3 changed files with 119 additions and 20 deletions

View File

@ -128,30 +128,36 @@ proc cleanUp(
# that the cleanUp was called before the sales process really started, so
# there are not really any bytes to be returned
if request =? data.request and reservation =? data.reservation:
if returnErr =? (
await sales.context.reservations.returnBytesToAvailability(
reservation.availabilityId, reservation.id, request.ask.slotSize
)
).errorOption:
error "failure returning bytes",
error = returnErr.msg, bytes = request.ask.slotSize
# delete reservation and return reservation bytes back to the availability
if reservation =? data.reservation and
deleteErr =? (
await sales.context.reservations.deleteReservation(
reservation.id, reservation.availabilityId, returnedCollateral
try:
if returnErr =? (
await sales.context.reservations.returnBytesToAvailability(
reservation.availabilityId, reservation.id, request.ask.slotSize
)
).errorOption:
error "failure deleting reservation", error = deleteErr.msg
error "failure returning bytes",
error = returnErr.msg, bytes = request.ask.slotSize
except CancelledError:
debug "returning bytes was cancelled"
except CatchableError as e:
error "failure returning bytes", error = e.msg
if data.slotIndex > uint16.high.uint64:
error "Cannot cast slot index to uint16", slotIndex = data.slotIndex
return
try:
# delete reservation and return reservation bytes back to the availability
if reservation =? data.reservation and
deleteErr =? (
await sales.context.reservations.deleteReservation(
reservation.id, reservation.availabilityId, returnedCollateral
)
).errorOption:
error "failure deleting reservation", error = deleteErr.msg
except CancelledError:
debug "deleting reservation was cancelled"
except CatchableError as e:
error "failure deleting reservation", error = e.msg
# Re-add items back into the queue to prevent small availabilities from
# draining the queue. Seen items will be ordered last.
if reprocessSlot and request =? data.request:
if data.slotIndex <= uint16.high.uint64 and reprocessSlot and request =? data.request:
try:
without collateral =?
await sales.context.market.slotCollateral(data.requestId, data.slotIndex), err:
@ -171,11 +177,16 @@ proc cleanUp(
trace "pushing ignored item to queue, marked as seen"
if err =? queue.push(seenItem).errorOption:
error "failed to readd slot to queue", errorType = $(type err), error = err.msg
except MarketError as e:
except CatchableError as e:
error "Failed to re-add item back to the slot queue.", error = e.msg
return
await sales.remove(agent)
try:
await sales.remove(agent)
except CancelledError:
debug "sales remove was cancelled"
except CatchableError as e:
error "failure removing sales", error = e.msg
proc filled(sales: Sales, request: StorageRequest, slotIndex: uint64) =
if onSale =? sales.context.onSale:

View File

@ -425,3 +425,10 @@ proc requestId*(
proc buildUrl*(client: CodexClient, path: string): string =
return client.baseurl & path
proc getSlots*(
client: CodexClient
): Future[?!seq[Slot]] {.async: (raises: [CancelledError, HttpError]).} =
let url = client.baseurl & "/sales/slots"
let body = await client.getContent(url)
seq[Slot].fromJson(body)

View File

@ -133,6 +133,87 @@ marketplacesuite "Marketplace":
timeout = 10 * 1000, # give client a bit of time to withdraw its funds
)
test "SP are able to process slots after workers were busy with other slots and ignored them",
NodeConfigs(
clients: CodexConfigs.init(nodes = 1)
# .debug()
.some,
providers: CodexConfigs.init(nodes = 2)
# .debug()
# .withLogFile()
# .withLogTopics("marketplace", "sales", "statemachine","slotqueue", "reservations")
.some,
):
let client0 = clients()[0]
let provider0 = providers()[0]
let provider1 = providers()[1]
let duration = 20 * 60.uint64
let data = await RandomChunker.example(blocks = blocks)
let slotSize = slotSize(blocks, ecNodes, ecTolerance)
# We create an avavilability allowing the first SP to host the 3 slots.
# So the second SP will not have any availability so it will just process
# the slots and ignore them.
discard await provider0.client.postAvailability(
totalSize = 3 * slotSize.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = 3 * slotSize * minPricePerBytePerSecond,
)
let cid = (await client0.client.upload(data)).get
let purchaseId = await client0.client.requestStorage(
cid,
duration = duration,
pricePerBytePerSecond = minPricePerBytePerSecond,
proofProbability = 1.u256,
expiry = 10 * 60.uint64,
collateralPerByte = collateralPerByte,
nodes = ecNodes,
tolerance = ecTolerance,
)
let requestId = (await client0.client.requestId(purchaseId)).get
# We wait that the 3 slots are filled by the first SP
check eventually(
await client0.client.purchaseStateIs(purchaseId, "started"),
timeout = 10 * 60.int * 1000,
)
# Here we create the same availability as previously but for the second SP.
# Meaning that, after ignoring all the slots for the first request, the second SP will process
# and host the slots for the second request.
discard await provider1.client.postAvailability(
totalSize = 3 * slotSize.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = 3 * slotSize * collateralPerByte,
)
let purchaseId2 = await client0.client.requestStorage(
cid,
duration = duration,
pricePerBytePerSecond = minPricePerBytePerSecond,
proofProbability = 3.u256,
expiry = 10 * 60.uint64,
collateralPerByte = collateralPerByte,
nodes = ecNodes,
tolerance = ecTolerance,
)
let requestId2 = (await client0.client.requestId(purchaseId2)).get
# Wait that the slots of the second request are filled
check eventually(
await client0.client.purchaseStateIs(purchaseId2, "started"),
timeout = 10 * 60.int * 1000,
)
# Double check, verify that our second SP hosts the 3 slots
check eventually ((await provider1.client.getSlots()).get).len == 3
marketplacesuite "Marketplace payouts":
const minPricePerBytePerSecond = 1.u256
const collateralPerByte = 1.u256