feat: cleanup returning availability (#650)

Co-authored-by: markspanbroek <mark@spanbroek.net>
This commit is contained in:
Adam Uhlíř 2023-12-13 20:58:17 +01:00 committed by GitHub
parent 0c3d1dd563
commit e62cb96bde
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 92 additions and 11 deletions

View File

@ -109,6 +109,7 @@ proc remove(sales: Sales, agent: SalesAgent) {.async.} =
proc cleanUp(sales: Sales,
agent: SalesAgent,
returnBytes: bool,
processing: Future[void]) {.async.} =
let data = agent.data
@ -119,9 +120,19 @@ proc cleanUp(sales: Sales,
reservationId = data.reservation.?id |? ReservationId.default,
availabilityId = data.reservation.?availabilityId |? AvailabilityId.default
# TODO: return bytes that were used in the request back to the availability
# as well, which will require removing the bytes from disk (perhaps via
# setting blockTTL to -1 and then running block maintainer?)
# if reservation for the SalesAgent was not created, then it means
# that the cleanUp was called before the sales process really started, so
# there are not really any bytes to be returned
if returnBytes and request =? data.request and reservation =? data.reservation:
if returnErr =? (await sales.context.reservations.returnBytesToAvailability(
reservation.availabilityId,
reservation.id,
request.ask.slotSize
)).errorOption:
error "failure returning bytes",
error = returnErr.msg,
availabilityId = reservation.availabilityId,
bytes = request.ask.slotSize
# delete reservation and return reservation bytes back to the availability
if reservation =? data.reservation and
@ -164,8 +175,8 @@ proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) =
none StorageRequest
)
agent.onCleanUp = proc {.async.} =
await sales.cleanUp(agent, done)
agent.onCleanUp = proc (returnBytes = false) {.async.} =
await sales.cleanUp(agent, returnBytes, done)
agent.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) =
sales.filled(request, slotIndex, done)
@ -229,9 +240,9 @@ proc load*(sales: Sales) {.async.} =
slot.slotIndex,
some slot.request)
agent.onCleanUp = proc {.async.} =
agent.onCleanUp = proc(returnBytes = false) {.async.} =
let done = newFuture[void]("onCleanUp_Dummy")
await sales.cleanUp(agent, done)
await sales.cleanUp(agent, returnBytes, done)
await done # completed in sales.cleanUp
agent.start(SaleUnknown())

View File

@ -359,6 +359,58 @@ proc createReservation*(
return success(reservation)
proc returnBytesToAvailability*(
self: Reservations,
availabilityId: AvailabilityId,
reservationId: ReservationId,
bytes: UInt256): Future[?!void] {.async.} =
logScope:
reservationId
availabilityId
without key =? key(reservationId, availabilityId), error:
return failure(error)
without var reservation =? (await self.get(key, Reservation)), error:
return failure(error)
# We are ignoring bytes that are still present in the Reservation because
# they will be returned to Availability through `deleteReservation`.
let bytesToBeReturned = bytes - reservation.size
if bytesToBeReturned == 0:
trace "No bytes are returned", requestSizeBytes = bytes, returningBytes = bytesToBeReturned
return success()
trace "Returning bytes", requestSizeBytes = bytes, returningBytes = bytesToBeReturned
# First lets see if we can re-reserve the bytes, if the Repo's quota
# is depleted then we will fail-fast as there is nothing to be done atm.
if reserveErr =? (await self.repo.reserve(bytesToBeReturned.truncate(uint))).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))
without availabilityKey =? availabilityId.key, error:
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
availability.size += bytesToBeReturned
# Update availability with returned size
if updateErr =? (await self.update(availability)).errorOption:
trace "Rolling back returning bytes"
if rollbackErr =? (await self.repo.release(bytesToBeReturned.truncate(uint))).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)
return failure(updateErr)
return success()
proc release*(
self: Reservations,
reservationId: ReservationId,

View File

@ -25,7 +25,7 @@ type
onCleanUp*: OnCleanUp
onFilled*: ?OnFilled
OnCleanUp* = proc: Future[void] {.gcsafe, upraises: [].}
OnCleanUp* = proc (returnBytes = false): Future[void] {.gcsafe, upraises: [].}
OnFilled* = proc(request: StorageRequest,
slotIndex: UInt256) {.gcsafe, upraises: [].}

View File

@ -29,6 +29,6 @@ method run*(state: SaleCancelled, machine: Machine): Future[?State] {.async.} =
onClear(request, data.slotIndex)
if onCleanUp =? agent.onCleanUp:
await onCleanUp()
await onCleanUp(returnBytes = true)
warn "Sale cancelled due to timeout", requestId = $data.requestId, slotIndex = $data.slotIndex

View File

@ -29,5 +29,5 @@ method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} =
onClear(request, data.slotIndex)
if onCleanUp =? agent.onCleanUp:
await onCleanUp()
await onCleanUp(returnBytes = true)

View File

@ -175,6 +175,24 @@ asyncchecksuite "Reservations module":
let updated = !(await reservations.get(key, Availability))
check updated.size == orig
test "calling returnBytesToAvailability returns bytes back to availability":
let availability = createAvailability()
let reservation = createReservation(availability)
let orig = availability.size - reservation.size
let origQuota = repo.quotaReservedBytes
let returnedBytes = reservation.size + 200.u256
check isOk await reservations.returnBytesToAvailability(
reservation.availabilityId, reservation.id, returnedBytes
)
let key = availability.key.get
let updated = !(await reservations.get(key, Availability))
check updated.size > orig
check (updated.size - orig) == 200.u256
check (repo.quotaReservedBytes - origQuota) == 200
test "reservation can be partially released":
let availability = createAvailability()
let reservation = createReservation(availability)

View File

@ -149,6 +149,7 @@ asyncchecksuite "Sales":
let me = await market.getSigner()
market.activeSlots[me] = @[]
market.requestEnds[request.id] = request.expiry.toSecondsSince1970
clock = MockClock.new()
let repoDs = SQLiteDatastore.new(Memory).tryGet()
@ -169,7 +170,6 @@ asyncchecksuite "Sales":
sales.onProve = proc(slot: Slot, challenge: ProofChallenge): Future[seq[byte]] {.async.} =
return proof
await sales.start()
request.expiry = (clock.now() + 42).u256
itemsProcessed = @[]
teardown: