diff --git a/codex/sales.nim b/codex/sales.nim index 53b5860b..0f76f175 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -109,6 +109,7 @@ proc remove(sales: Sales, agent: SalesAgent) {.async.} = proc cleanUp(sales: Sales, agent: SalesAgent, + returnBytes: bool, processing: Future[void]) {.async.} = let data = agent.data @@ -119,9 +120,19 @@ proc cleanUp(sales: Sales, reservationId = data.reservation.?id |? ReservationId.default, availabilityId = data.reservation.?availabilityId |? AvailabilityId.default - # TODO: return bytes that were used in the request back to the availability - # as well, which will require removing the bytes from disk (perhaps via - # setting blockTTL to -1 and then running block maintainer?) + # if reservation for the SalesAgent was not created, then it means + # that the cleanUp was called before the sales process really started, so + # there are not really any bytes to be returned + if returnBytes and request =? data.request and reservation =? data.reservation: + if returnErr =? (await sales.context.reservations.returnBytesToAvailability( + reservation.availabilityId, + reservation.id, + request.ask.slotSize + )).errorOption: + error "failure returning bytes", + error = returnErr.msg, + availabilityId = reservation.availabilityId, + bytes = request.ask.slotSize # delete reservation and return reservation bytes back to the availability if reservation =? data.reservation and @@ -164,8 +175,8 @@ proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) = none StorageRequest ) - agent.onCleanUp = proc {.async.} = - await sales.cleanUp(agent, done) + agent.onCleanUp = proc (returnBytes = false) {.async.} = + await sales.cleanUp(agent, returnBytes, done) agent.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) = sales.filled(request, slotIndex, done) @@ -229,9 +240,9 @@ proc load*(sales: Sales) {.async.} = slot.slotIndex, some slot.request) - agent.onCleanUp = proc {.async.} = + agent.onCleanUp = proc(returnBytes = false) {.async.} = let done = newFuture[void]("onCleanUp_Dummy") - await sales.cleanUp(agent, done) + await sales.cleanUp(agent, returnBytes, done) await done # completed in sales.cleanUp agent.start(SaleUnknown()) diff --git a/codex/sales/reservations.nim b/codex/sales/reservations.nim index 001d6a5a..611c1ad6 100644 --- a/codex/sales/reservations.nim +++ b/codex/sales/reservations.nim @@ -359,6 +359,58 @@ proc createReservation*( return success(reservation) +proc returnBytesToAvailability*( + self: Reservations, + availabilityId: AvailabilityId, + reservationId: ReservationId, + bytes: UInt256): Future[?!void] {.async.} = + + logScope: + reservationId + availabilityId + + + without key =? key(reservationId, availabilityId), error: + return failure(error) + + without var reservation =? (await self.get(key, Reservation)), error: + return failure(error) + + # We are ignoring bytes that are still present in the Reservation because + # they will be returned to Availability through `deleteReservation`. + let bytesToBeReturned = bytes - reservation.size + + if bytesToBeReturned == 0: + trace "No bytes are returned", requestSizeBytes = bytes, returningBytes = bytesToBeReturned + return success() + + trace "Returning bytes", requestSizeBytes = bytes, returningBytes = bytesToBeReturned + + # First lets see if we can re-reserve the bytes, if the Repo's quota + # is depleted then we will fail-fast as there is nothing to be done atm. + if reserveErr =? (await self.repo.reserve(bytesToBeReturned.truncate(uint))).errorOption: + return failure(reserveErr.toErr(ReserveFailedError)) + + without availabilityKey =? availabilityId.key, error: + return failure(error) + + without var availability =? await self.get(availabilityKey, Availability), error: + return failure(error) + + availability.size += bytesToBeReturned + + # Update availability with returned size + if updateErr =? (await self.update(availability)).errorOption: + + trace "Rolling back returning bytes" + if rollbackErr =? (await self.repo.release(bytesToBeReturned.truncate(uint))).errorOption: + rollbackErr.parent = updateErr + return failure(rollbackErr) + + return failure(updateErr) + + return success() + proc release*( self: Reservations, reservationId: ReservationId, diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim index b189650d..08d3ab0e 100644 --- a/codex/sales/salesagent.nim +++ b/codex/sales/salesagent.nim @@ -25,7 +25,7 @@ type onCleanUp*: OnCleanUp onFilled*: ?OnFilled - OnCleanUp* = proc: Future[void] {.gcsafe, upraises: [].} + OnCleanUp* = proc (returnBytes = false): Future[void] {.gcsafe, upraises: [].} OnFilled* = proc(request: StorageRequest, slotIndex: UInt256) {.gcsafe, upraises: [].} diff --git a/codex/sales/states/cancelled.nim b/codex/sales/states/cancelled.nim index 3f0e2157..f1f75407 100644 --- a/codex/sales/states/cancelled.nim +++ b/codex/sales/states/cancelled.nim @@ -29,6 +29,6 @@ method run*(state: SaleCancelled, machine: Machine): Future[?State] {.async.} = onClear(request, data.slotIndex) if onCleanUp =? agent.onCleanUp: - await onCleanUp() + await onCleanUp(returnBytes = true) warn "Sale cancelled due to timeout", requestId = $data.requestId, slotIndex = $data.slotIndex diff --git a/codex/sales/states/errored.nim b/codex/sales/states/errored.nim index c934be4b..59754eca 100644 --- a/codex/sales/states/errored.nim +++ b/codex/sales/states/errored.nim @@ -29,5 +29,5 @@ method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} = onClear(request, data.slotIndex) if onCleanUp =? agent.onCleanUp: - await onCleanUp() + await onCleanUp(returnBytes = true) diff --git a/tests/codex/sales/testreservations.nim b/tests/codex/sales/testreservations.nim index 3bfee998..a6ab1b26 100644 --- a/tests/codex/sales/testreservations.nim +++ b/tests/codex/sales/testreservations.nim @@ -175,6 +175,24 @@ asyncchecksuite "Reservations module": let updated = !(await reservations.get(key, Availability)) check updated.size == orig + test "calling returnBytesToAvailability returns bytes back to availability": + let availability = createAvailability() + let reservation = createReservation(availability) + let orig = availability.size - reservation.size + let origQuota = repo.quotaReservedBytes + let returnedBytes = reservation.size + 200.u256 + + check isOk await reservations.returnBytesToAvailability( + reservation.availabilityId, reservation.id, returnedBytes + ) + + let key = availability.key.get + let updated = !(await reservations.get(key, Availability)) + + check updated.size > orig + check (updated.size - orig) == 200.u256 + check (repo.quotaReservedBytes - origQuota) == 200 + test "reservation can be partially released": let availability = createAvailability() let reservation = createReservation(availability) diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index 7b47f375..6668b105 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -149,6 +149,7 @@ asyncchecksuite "Sales": let me = await market.getSigner() market.activeSlots[me] = @[] + market.requestEnds[request.id] = request.expiry.toSecondsSince1970 clock = MockClock.new() let repoDs = SQLiteDatastore.new(Memory).tryGet() @@ -169,7 +170,6 @@ asyncchecksuite "Sales": sales.onProve = proc(slot: Slot, challenge: ProofChallenge): Future[seq[byte]] {.async.} = return proof await sales.start() - request.expiry = (clock.now() + 42).u256 itemsProcessed = @[] teardown: