From 6270e8ef1f7abfdb5476d50fa3489b8cecce810a Mon Sep 17 00:00:00 2001 From: Eric <5089238+emizzle@users.noreply.github.com> Date: Thu, 7 Sep 2023 17:13:33 +1000 Subject: [PATCH] filter past requests based on availability Because availability filtering on push was removed, when availability is added and past storage request events are queried, those requests need to be filtered by availability before being added to the queue. --- codex/sales.nim | 42 +++++++++++++++++++-------------- codex/sales/states/finished.nim | 1 - codex/sales/states/ignored.nim | 1 - tests/codex/sales/testsales.nim | 20 +++++++++------- 4 files changed, 36 insertions(+), 28 deletions(-) diff --git a/codex/sales.nim b/codex/sales.nim index 5d18b31b..bf334297 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -1,6 +1,5 @@ import std/sequtils import std/sugar -import std/tables import pkg/questionable import pkg/questionable/results import pkg/stint @@ -197,8 +196,6 @@ proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} = proc load*(sales: Sales) {.async.} = let activeSlots = await sales.mySlots() - # TODO: add slots to slotqueue, as workers need to be dispatched - await sales.deleteInactiveReservations(activeSlots) for slot in activeSlots: @@ -222,7 +219,7 @@ proc onAvailabilityAdded(sales: Sales, availability: Availability) {.async.} = let queue = context.slotQueue logScope: - topics = "marketplace sales onReservationAdded callback" + topics = "marketplace sales onAvailabilityAdded callback" trace "availability added, querying past storage requests to add to queue" @@ -234,26 +231,35 @@ proc onAvailabilityAdded(sales: Sales, availability: Availability) {.async.} = return let requests = events.map(event => - SlotQueueItem.init(event.requestId, event.ask, event.expiry) + ( + pricePerSlot: event.ask.pricePerSlot, + slots: SlotQueueItem.init(event.requestId, event.ask, event.expiry) + ) ) trace "found past storage requested events to add to queue", events = events.len - for slots in requests: + for (pricePerSlot, slots) in requests: for slot in slots: - if err =? queue.push(slot).errorOption: - # continue on error - if err of QueueNotRunningError: - warn "cannot push items to queue, queue is not running" - elif err of NoMatchingAvailabilityError: - info "slot in queue had no matching availabilities, ignoring" - elif err of SlotsOutOfRangeError: - warn "Too many slots, cannot add to queue" - elif err of SlotQueueItemExistsError: - trace "item already exists, ignoring" - discard - else: raise err + if availability =? await sales.context.reservations.findAvailability( + slot.slotSize, + slot.duration, + pricePerSlot, + slot.collateral): + + if err =? queue.push(slot).errorOption: + # continue on error + if err of QueueNotRunningError: + warn "cannot push items to queue, queue is not running" + elif err of NoMatchingAvailabilityError: + info "slot in queue had no matching availabilities, ignoring" + elif err of SlotsOutOfRangeError: + warn "Too many slots, cannot add to queue" + elif err of SlotQueueItemExistsError: + trace "item already exists, ignoring" + discard + else: raise err except CatchableError as e: warn "Error adding request to SlotQueue", error = e.msg diff --git a/codex/sales/states/finished.nim b/codex/sales/states/finished.nim index f0b6b2c4..1a8c5151 100644 --- a/codex/sales/states/finished.nim +++ b/codex/sales/states/finished.nim @@ -23,7 +23,6 @@ method onFailed*(state: SaleFinished, request: StorageRequest): ?State = method run*(state: SaleFinished, machine: Machine): Future[?State] {.async.} = let agent = SalesAgent(machine) let data = agent.data - let context = agent.context without request =? data.request: raiseAssert "no sale request" diff --git a/codex/sales/states/ignored.nim b/codex/sales/states/ignored.nim index f9e71582..5955937e 100644 --- a/codex/sales/states/ignored.nim +++ b/codex/sales/states/ignored.nim @@ -14,7 +14,6 @@ method `$`*(state: SaleIgnored): string = "SaleIgnored" method run*(state: SaleIgnored, machine: Machine): Future[?State] {.async.} = let agent = SalesAgent(machine) - let context = agent.context if onCleanUp =? agent.onCleanUp: await onCleanUp() diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index ad6ee3cc..289abddb 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -1,4 +1,3 @@ -import std/sets import std/sequtils import std/sugar import std/times @@ -259,16 +258,22 @@ asyncchecksuite "Sales": test "adds past requests to queue once availability added": var itemsProcessed: seq[SlotQueueItem] = @[] + + # ignore all + queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = + done.complete() + + await market.requestStorage(request) + await sleepAsync(10.millis) + + # check how many slots were processed by the queue queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = itemsProcessed.add item done.complete() - await market.requestStorage(request) - await sleepAsync(1.millis) - - # now add matching availability - createAvailability() - check eventually itemsProcessed.len == request.ask.slots.int + # now add matching availability + createAvailability() + check eventually itemsProcessed.len == request.ask.slots.int test "availability size is reduced by request slot size when fully downloaded": sales.onStore = proc(request: StorageRequest, @@ -279,7 +284,6 @@ asyncchecksuite "Sales": return success() createAvailability() - let origSize = availability.size await market.requestStorage(request) check eventually getAvailability().size == availability.size - request.ask.slotSize