diff --git a/codex/sales.nim b/codex/sales.nim index 43fa9f49..71eb9c62 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -114,6 +114,10 @@ proc cleanUp(sales: Sales, reservationId = data.reservation.?id |? ReservationId.default, availabilityId = data.reservation.?availabilityId |? AvailabilityId.default + # TODO: return bytes that were used in the request back to the availability + # as well, which will require removing the bytes from disk (perhaps via + # setting blockTTL to -1 and then running block maintainer?) + # delete reservation and return reservation bytes back to the availability if reservation =? agent.data.reservation and deleteErr =? (await sales.context.reservations.deleteReservation( @@ -154,6 +158,30 @@ proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) = agent.start(SalePreparing()) sales.agents.add agent +proc deleteInactiveReservations(sales: Sales, activeSlots: seq[Slot]) {.async.} = + let reservations = sales.context.reservations + without reservs =? await reservations.all(Reservation): + info "no unused reservations found for deletion" + + let unused = reservs.filter(r => ( + let slotId = slotId(r.requestId, r.slotIndex) + not activeSlots.any(slot => slot.id == slotId) + )) + info "found unused reservations for deletion", unused = unused.len + + for reservation in unused: + + logScope: + reservationId = reservation.id + availabilityId = reservation.availabilityId + + if err =? (await reservations.deleteReservation( + reservation.id, reservation.availabilityId + )).errorOption: + error "failed to delete unused reservation", error = err.msg + else: + trace "deleted unused reservation" + proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} = let market = sales.context.market let slotIds = await market.mySlots() @@ -167,11 +195,13 @@ proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} = return slots proc load*(sales: Sales) {.async.} = - let slots = await sales.mySlots() + let activeSlots = await sales.mySlots() # TODO: add slots to slotqueue, as workers need to be dispatched - for slot in slots: + await sales.deleteInactiveReservations(activeSlots) + + for slot in activeSlots: let agent = newSalesAgent( sales.context, slot.request.id, diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index 920d8b00..ad6ee3cc 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -216,54 +216,55 @@ asyncchecksuite "Sales": waitFor run() test "processes all request's slots once StorageRequested emitted": - queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = - itemsProcessed.add item - done.complete() - createAvailability() - await market.requestStorage(request) - let items = SlotQueueItem.init(request) - check eventually items.allIt(itemsProcessed.contains(it)) + queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = + itemsProcessed.add item + done.complete() + createAvailability() + await market.requestStorage(request) + let items = SlotQueueItem.init(request) + check eventually items.allIt(itemsProcessed.contains(it)) test "removes slots from slot queue once RequestCancelled emitted": - let request1 = await addRequestToSaturatedQueue() - market.emitRequestCancelled(request1.id) - check always itemsProcessed.notProcessed(request1) + let request1 = await addRequestToSaturatedQueue() + market.emitRequestCancelled(request1.id) + check always itemsProcessed.notProcessed(request1) test "removes request from slot queue once RequestFailed emitted": - let request1 = await addRequestToSaturatedQueue() - market.emitRequestFailed(request1.id) - check always itemsProcessed.notProcessed(request1) + let request1 = await addRequestToSaturatedQueue() + market.emitRequestFailed(request1.id) + check always itemsProcessed.notProcessed(request1) test "removes request from slot queue once RequestFulfilled emitted": - let request1 = await addRequestToSaturatedQueue() - market.emitRequestFulfilled(request1.id) - check always itemsProcessed.notProcessed(request1) + let request1 = await addRequestToSaturatedQueue() + market.emitRequestFulfilled(request1.id) + check always itemsProcessed.notProcessed(request1) test "removes slot index from slot queue once SlotFilled emitted": - let request1 = await addRequestToSaturatedQueue() - market.emitSlotFilled(request1.id, 1.u256) - let expected = SlotQueueItem.init(request1, 1'u16) - check always (not itemsProcessed.contains(expected)) + let request1 = await addRequestToSaturatedQueue() + market.emitSlotFilled(request1.id, 1.u256) + let expected = SlotQueueItem.init(request1, 1'u16) + check always (not itemsProcessed.contains(expected)) test "adds slot index to slot queue once SlotFreed emitted": - queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = - itemsProcessed.add item - done.complete() + queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = + itemsProcessed.add item + done.complete() - createAvailability() - market.requested.add request # "contract" must be able to return request - market.emitSlotFreed(request.id, 2.u256) + createAvailability() + market.requested.add request # "contract" must be able to return request + market.emitSlotFreed(request.id, 2.u256) - let expected = SlotQueueItem.init(request, 2.uint16) - check eventually itemsProcessed.contains(expected) + let expected = SlotQueueItem.init(request, 2.uint16) + check eventually itemsProcessed.contains(expected) test "adds past requests to queue once availability added": - var itemsProcessed: seq[SlotQueueItem] = @[] - queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = - itemsProcessed.add item - done.complete() + var itemsProcessed: seq[SlotQueueItem] = @[] + queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = + itemsProcessed.add item + done.complete() - await market.requestStorage(request) + await market.requestStorage(request) + await sleepAsync(1.millis) # now add matching availability createAvailability() @@ -280,12 +281,14 @@ asyncchecksuite "Sales": createAvailability() let origSize = availability.size await market.requestStorage(request) - check eventuallyCheck getAvailability().size == availability.size - request.ask.slotSize + check eventually getAvailability().size == availability.size - request.ask.slotSize test "non-downloaded bytes are returned to availability once finished": + var slotIndex = 0.u256 sales.onStore = proc(request: StorageRequest, slot: UInt256, onBatch: BatchProc): Future[?!void] {.async.} = + slotIndex = slot let blk = bt.Block.new( @[1.byte] ).get onBatch(@[ blk ]) return success() @@ -293,8 +296,13 @@ asyncchecksuite "Sales": createAvailability() let origSize = availability.size await market.requestStorage(request) - await sleepAsync(1.millis) - check eventuallyCheck getAvailability().size == origSize - 1 + await sleepAsync(2.millis) # allow proving to start + + # complete request + market.slotState[request.slotId(slotIndex)] = SlotState.Finished + clock.advance(request.ask.duration.truncate(int64)) + + check eventually getAvailability().size == origSize - 1 test "ignores download when duration not long enough": availability.duration = request.ask.duration - 1 @@ -397,7 +405,7 @@ asyncchecksuite "Sales": soldSlotIndex = slotIndex createAvailability() await market.requestStorage(request) - check eventuallyCheck soldRequest == request + check eventually soldRequest == request check soldSlotIndex < request.ask.slots.u256 test "calls onClear when storage becomes available again": @@ -427,7 +435,7 @@ asyncchecksuite "Sales": await market.requestStorage(request) for slotIndex in 0.. agent.data.requestId == request.id and agent.data.slotIndex == 0.u256) check sales.agents.any(agent => agent.data.requestId == request.id and agent.data.slotIndex == 1.u256) + + test "deletes inactive reservations on load": + createAvailability() + discard await reservations.createReservation( + availability.id, + 100.u256, + RequestId.example, + UInt256.example) + check (await reservations.all(Reservation)).get.len == 1 + await sales.load() + check (await reservations.all(Reservation)).get.len == 0 + check getAvailability().size == availability.size # was restored