Add deletion of inactive reservations on sales load

This commit is contained in:
Eric 2023-08-28 12:20:33 +10:00
parent 0d6b3f862b
commit ec99992665
No known key found for this signature in database
2 changed files with 91 additions and 52 deletions

View File

@ -114,6 +114,10 @@ proc cleanUp(sales: Sales,
reservationId = data.reservation.?id |? ReservationId.default,
availabilityId = data.reservation.?availabilityId |? AvailabilityId.default
# TODO: return bytes that were used in the request back to the availability
# as well, which will require removing the bytes from disk (perhaps via
# setting blockTTL to -1 and then running block maintainer?)
# delete reservation and return reservation bytes back to the availability
if reservation =? agent.data.reservation and
deleteErr =? (await sales.context.reservations.deleteReservation(
@ -154,6 +158,30 @@ proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) =
agent.start(SalePreparing())
sales.agents.add agent
proc deleteInactiveReservations(sales: Sales, activeSlots: seq[Slot]) {.async.} =
let reservations = sales.context.reservations
without reservs =? await reservations.all(Reservation):
info "no unused reservations found for deletion"
let unused = reservs.filter(r => (
let slotId = slotId(r.requestId, r.slotIndex)
not activeSlots.any(slot => slot.id == slotId)
))
info "found unused reservations for deletion", unused = unused.len
for reservation in unused:
logScope:
reservationId = reservation.id
availabilityId = reservation.availabilityId
if err =? (await reservations.deleteReservation(
reservation.id, reservation.availabilityId
)).errorOption:
error "failed to delete unused reservation", error = err.msg
else:
trace "deleted unused reservation"
proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} =
let market = sales.context.market
let slotIds = await market.mySlots()
@ -167,11 +195,13 @@ proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} =
return slots
proc load*(sales: Sales) {.async.} =
let slots = await sales.mySlots()
let activeSlots = await sales.mySlots()
# TODO: add slots to slotqueue, as workers need to be dispatched
for slot in slots:
await sales.deleteInactiveReservations(activeSlots)
for slot in activeSlots:
let agent = newSalesAgent(
sales.context,
slot.request.id,

View File

@ -216,54 +216,55 @@ asyncchecksuite "Sales":
waitFor run()
test "processes all request's slots once StorageRequested emitted":
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
itemsProcessed.add item
done.complete()
createAvailability()
await market.requestStorage(request)
let items = SlotQueueItem.init(request)
check eventually items.allIt(itemsProcessed.contains(it))
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
itemsProcessed.add item
done.complete()
createAvailability()
await market.requestStorage(request)
let items = SlotQueueItem.init(request)
check eventually items.allIt(itemsProcessed.contains(it))
test "removes slots from slot queue once RequestCancelled emitted":
let request1 = await addRequestToSaturatedQueue()
market.emitRequestCancelled(request1.id)
check always itemsProcessed.notProcessed(request1)
let request1 = await addRequestToSaturatedQueue()
market.emitRequestCancelled(request1.id)
check always itemsProcessed.notProcessed(request1)
test "removes request from slot queue once RequestFailed emitted":
let request1 = await addRequestToSaturatedQueue()
market.emitRequestFailed(request1.id)
check always itemsProcessed.notProcessed(request1)
let request1 = await addRequestToSaturatedQueue()
market.emitRequestFailed(request1.id)
check always itemsProcessed.notProcessed(request1)
test "removes request from slot queue once RequestFulfilled emitted":
let request1 = await addRequestToSaturatedQueue()
market.emitRequestFulfilled(request1.id)
check always itemsProcessed.notProcessed(request1)
let request1 = await addRequestToSaturatedQueue()
market.emitRequestFulfilled(request1.id)
check always itemsProcessed.notProcessed(request1)
test "removes slot index from slot queue once SlotFilled emitted":
let request1 = await addRequestToSaturatedQueue()
market.emitSlotFilled(request1.id, 1.u256)
let expected = SlotQueueItem.init(request1, 1'u16)
check always (not itemsProcessed.contains(expected))
let request1 = await addRequestToSaturatedQueue()
market.emitSlotFilled(request1.id, 1.u256)
let expected = SlotQueueItem.init(request1, 1'u16)
check always (not itemsProcessed.contains(expected))
test "adds slot index to slot queue once SlotFreed emitted":
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
itemsProcessed.add item
done.complete()
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
itemsProcessed.add item
done.complete()
createAvailability()
market.requested.add request # "contract" must be able to return request
market.emitSlotFreed(request.id, 2.u256)
createAvailability()
market.requested.add request # "contract" must be able to return request
market.emitSlotFreed(request.id, 2.u256)
let expected = SlotQueueItem.init(request, 2.uint16)
check eventually itemsProcessed.contains(expected)
let expected = SlotQueueItem.init(request, 2.uint16)
check eventually itemsProcessed.contains(expected)
test "adds past requests to queue once availability added":
var itemsProcessed: seq[SlotQueueItem] = @[]
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
itemsProcessed.add item
done.complete()
var itemsProcessed: seq[SlotQueueItem] = @[]
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
itemsProcessed.add item
done.complete()
await market.requestStorage(request)
await market.requestStorage(request)
await sleepAsync(1.millis)
# now add matching availability
createAvailability()
@ -280,12 +281,14 @@ asyncchecksuite "Sales":
createAvailability()
let origSize = availability.size
await market.requestStorage(request)
check eventuallyCheck getAvailability().size == availability.size - request.ask.slotSize
check eventually getAvailability().size == availability.size - request.ask.slotSize
test "non-downloaded bytes are returned to availability once finished":
var slotIndex = 0.u256
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
slotIndex = slot
let blk = bt.Block.new( @[1.byte] ).get
onBatch(@[ blk ])
return success()
@ -293,8 +296,13 @@ asyncchecksuite "Sales":
createAvailability()
let origSize = availability.size
await market.requestStorage(request)
await sleepAsync(1.millis)
check eventuallyCheck getAvailability().size == origSize - 1
await sleepAsync(2.millis) # allow proving to start
# complete request
market.slotState[request.slotId(slotIndex)] = SlotState.Finished
clock.advance(request.ask.duration.truncate(int64))
check eventually getAvailability().size == origSize - 1
test "ignores download when duration not long enough":
availability.duration = request.ask.duration - 1
@ -397,7 +405,7 @@ asyncchecksuite "Sales":
soldSlotIndex = slotIndex
createAvailability()
await market.requestStorage(request)
check eventuallyCheck soldRequest == request
check eventually soldRequest == request
check soldSlotIndex < request.ask.slots.u256
test "calls onClear when storage becomes available again":
@ -427,7 +435,7 @@ asyncchecksuite "Sales":
await market.requestStorage(request)
for slotIndex in 0..<request.ask.slots:
market.fillSlot(request.id, slotIndex.u256, proof, otherHost)
check eventuallyCheck (await reservations.all(Availability)).get == @[availability]
check eventually (await reservations.all(Availability)).get == @[availability]
test "makes storage available again when request expires":
let origSize = availability.size
@ -439,20 +447,9 @@ asyncchecksuite "Sales":
createAvailability()
await market.requestStorage(request)
clock.set(request.expiry.truncate(int64))
check eventuallyCheck (await reservations.all(Availability)).get == @[availability]
check eventually (await reservations.all(Availability)).get == @[availability]
check getAvailability().size == origSize
test "adds proving for slot when slot is filled":
var soldSlotIndex: UInt256
sales.onSale = proc(request: StorageRequest,
slotIndex: UInt256) =
soldSlotIndex = slotIndex
check proving.slots.len == 0
createAvailability()
await market.requestStorage(request)
check eventuallyCheck proving.slots.len == 1
check proving.slots.contains(Slot(request: request, slotIndex: soldSlotIndex))
test "loads active slots from market":
let me = await market.getSigner()
@ -489,3 +486,15 @@ asyncchecksuite "Sales":
check eventually sales.agents.len == 2
check sales.agents.any(agent => agent.data.requestId == request.id and agent.data.slotIndex == 0.u256)
check sales.agents.any(agent => agent.data.requestId == request.id and agent.data.slotIndex == 1.u256)
test "deletes inactive reservations on load":
createAvailability()
discard await reservations.createReservation(
availability.id,
100.u256,
RequestId.example,
UInt256.example)
check (await reservations.all(Reservation)).get.len == 1
await sales.load()
check (await reservations.all(Reservation)).get.len == 0
check getAvailability().size == availability.size # was restored