diff --git a/codex/sales.nim b/codex/sales.nim index 5d18b31b..bf334297 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -1,6 +1,5 @@ import std/sequtils import std/sugar -import std/tables import pkg/questionable import pkg/questionable/results import pkg/stint @@ -197,8 +196,6 @@ proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} = proc load*(sales: Sales) {.async.} = let activeSlots = await sales.mySlots() - # TODO: add slots to slotqueue, as workers need to be dispatched - await sales.deleteInactiveReservations(activeSlots) for slot in activeSlots: @@ -222,7 +219,7 @@ proc onAvailabilityAdded(sales: Sales, availability: Availability) {.async.} = let queue = context.slotQueue logScope: - topics = "marketplace sales onReservationAdded callback" + topics = "marketplace sales onAvailabilityAdded callback" trace "availability added, querying past storage requests to add to queue" @@ -234,26 +231,35 @@ proc onAvailabilityAdded(sales: Sales, availability: Availability) {.async.} = return let requests = events.map(event => - SlotQueueItem.init(event.requestId, event.ask, event.expiry) + ( + pricePerSlot: event.ask.pricePerSlot, + slots: SlotQueueItem.init(event.requestId, event.ask, event.expiry) + ) ) trace "found past storage requested events to add to queue", events = events.len - for slots in requests: + for (pricePerSlot, slots) in requests: for slot in slots: - if err =? queue.push(slot).errorOption: - # continue on error - if err of QueueNotRunningError: - warn "cannot push items to queue, queue is not running" - elif err of NoMatchingAvailabilityError: - info "slot in queue had no matching availabilities, ignoring" - elif err of SlotsOutOfRangeError: - warn "Too many slots, cannot add to queue" - elif err of SlotQueueItemExistsError: - trace "item already exists, ignoring" - discard - else: raise err + if availability =? await sales.context.reservations.findAvailability( + slot.slotSize, + slot.duration, + pricePerSlot, + slot.collateral): + + if err =? queue.push(slot).errorOption: + # continue on error + if err of QueueNotRunningError: + warn "cannot push items to queue, queue is not running" + elif err of NoMatchingAvailabilityError: + info "slot in queue had no matching availabilities, ignoring" + elif err of SlotsOutOfRangeError: + warn "Too many slots, cannot add to queue" + elif err of SlotQueueItemExistsError: + trace "item already exists, ignoring" + discard + else: raise err except CatchableError as e: warn "Error adding request to SlotQueue", error = e.msg diff --git a/codex/sales/states/finished.nim b/codex/sales/states/finished.nim index f0b6b2c4..1a8c5151 100644 --- a/codex/sales/states/finished.nim +++ b/codex/sales/states/finished.nim @@ -23,7 +23,6 @@ method onFailed*(state: SaleFinished, request: StorageRequest): ?State = method run*(state: SaleFinished, machine: Machine): Future[?State] {.async.} = let agent = SalesAgent(machine) let data = agent.data - let context = agent.context without request =? data.request: raiseAssert "no sale request" diff --git a/codex/sales/states/ignored.nim b/codex/sales/states/ignored.nim index f9e71582..5955937e 100644 --- a/codex/sales/states/ignored.nim +++ b/codex/sales/states/ignored.nim @@ -14,7 +14,6 @@ method `$`*(state: SaleIgnored): string = "SaleIgnored" method run*(state: SaleIgnored, machine: Machine): Future[?State] {.async.} = let agent = SalesAgent(machine) - let context = agent.context if onCleanUp =? agent.onCleanUp: await onCleanUp() diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index ad6ee3cc..289abddb 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -1,4 +1,3 @@ -import std/sets import std/sequtils import std/sugar import std/times @@ -259,16 +258,22 @@ asyncchecksuite "Sales": test "adds past requests to queue once availability added": var itemsProcessed: seq[SlotQueueItem] = @[] + + # ignore all + queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = + done.complete() + + await market.requestStorage(request) + await sleepAsync(10.millis) + + # check how many slots were processed by the queue queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = itemsProcessed.add item done.complete() - await market.requestStorage(request) - await sleepAsync(1.millis) - - # now add matching availability - createAvailability() - check eventually itemsProcessed.len == request.ask.slots.int + # now add matching availability + createAvailability() + check eventually itemsProcessed.len == request.ask.slots.int test "availability size is reduced by request slot size when fully downloaded": sales.onStore = proc(request: StorageRequest, @@ -279,7 +284,6 @@ asyncchecksuite "Sales": return success() createAvailability() - let origSize = availability.size await market.requestStorage(request) check eventually getAvailability().size == availability.size - request.ask.slotSize