From e6a387e8e81824b2fcd816f7bde25699bbe1a162 Mon Sep 17 00:00:00 2001 From: Eric <5089238+emizzle@users.noreply.github.com> Date: Sun, 26 May 2024 10:38:38 +1000 Subject: [PATCH] feat[marketplace]: add slot queue pausing (#752) * add seen flag * Add MockSlotQueueItem and better prioritisation tests * Update seen priority, and include in SlotQueueItem.init * Re-add processed slots to queue Re-add processed slots to queue if the sale was ignored or errored * add pausing of queue - when processing slots in queue, pause queue if item was marked seen - if availability size is increased, trigger onAvailabilityAdded callback - in sales, on availability added, clear 'seen' flags, then unpause the queue - when items pushed to the queue, unpause the queue * remove unused NoMatchingAvailabilityError from slotqueue The slot queue should also have nothing to do with availabilities * when all availabilities are empty, pause the queue An empty availability is defined as size < DefaultBlockSize as this means even the smallest possible request could not be served. However, this is up for discussion. * remove availability from onAvailabilitiesEmptied callback * refactor onAvailabilityAdded and onAvailabilitiesEmptied onAvailabilityAdded and onAvailabilitiesEmptied are now only called from reservations.update (and eventually reservations.delete once implemented). - Add empty routine for Availability and Reservation - Add allEmpty routine for Availability and Reservation, which returns true when all all Availability or Reservation objects in the datastore are empty. * SlotQueue test support updates * Sales module test support updates * Reservations module tests for queue pausing * Sales module tests for queue pausing Includes tests for sales states cancelled, errored, ignored to ensure onCleanUp is called with correct parameters * SlotQueue module tests for queue pausing * fix existing sales test * PR feedback - indent `self.unpause` - update comment for `clearSeenFlags` * reprocessSlot in SaleErrored only when coming from downloading * remove pausing of queue when availabilities are "emptied" Queue pausing when all availiabilies are "emptied" is not necessary, given that the node would not be able to service slots once all its availabilities' freeSize are too small for the slots in the queue, and would then be paused anyway. Add test that asserts the queue is paused once the freeSpace of availabilities drops too low to fill slots in the queue. * Update clearing of seen flags The asyncheapqueue update overload would need to check index bounds and ultimately a different solution was found using the mitems iterator. * fix test request.id was different before updating request.ask.slots, and that id was used to set the state in mockmarket. * Change filled/cleanup future to nil, so no await is needed * add wait to allow items to be added to queue * do not unpause queue when seen items are pushed * re-add seen item back to queue once paused Previously, when a seen item was processed, it was first popped off the queue, then the queue was paused waiting to process that item once the queue was unpaused. Now, when a seen item is processed, it is popped off the queue, the queue is paused, then the item is re-added to the queue and the queue will wait until unpaused before it will continue popping items off the queue. If the item was not re-added to the queue, it would have been processed immediately once unpaused, however there may have been other items with higher priority pushed to the queue in the meantime. The queue would not be unpaused if those added items were already seen. In particular, this may happen when ignored items due to lack of availability are re-added to a paused queue. Those ignored items will likely have a higher priority than the item that was just seen (due to it having been processed first), causing the queue to the be paused. * address PR comments --- codex/sales.nim | 115 +++++------ codex/sales/reservations.nim | 86 +++++--- codex/sales/salesagent.nim | 2 +- codex/sales/slotqueue.nim | 92 ++++++++- codex/sales/states/cancelled.nim | 2 +- codex/sales/states/downloading.nim | 2 +- codex/sales/states/errored.nim | 3 +- codex/sales/states/ignored.nim | 5 +- tests/codex/helpers/mockslotqueueitem.nim | 26 +++ tests/codex/sales/states/testcancelled.nim | 45 +++++ tests/codex/sales/states/testdownloading.nim | 3 +- tests/codex/sales/states/testerrored.nim | 49 +++++ tests/codex/sales/states/testignored.nim | 45 +++++ tests/codex/sales/testreservations.nim | 22 ++- tests/codex/sales/testsales.nim | 51 +++-- tests/codex/sales/testslotqueue.nim | 195 ++++++++++++++++++- 16 files changed, 614 insertions(+), 129 deletions(-) create mode 100644 tests/codex/helpers/mockslotqueueitem.nim create mode 100644 tests/codex/sales/states/testcancelled.nim create mode 100644 tests/codex/sales/states/testerrored.nim create mode 100644 tests/codex/sales/states/testignored.nim diff --git a/codex/sales.nim b/codex/sales.nim index 95c29cd2..c4fcb217 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -78,13 +78,13 @@ proc onProve*(sales: Sales): ?OnProve = sales.context.onProve proc onExpiryUpdate*(sales: Sales): ?OnExpiryUpdate = sales.context.onExpiryUpdate -func new*(_: type Sales, +proc new*(_: type Sales, market: Market, clock: Clock, repo: RepoStore): Sales = Sales.new(market, clock, repo, 0) -func new*(_: type Sales, +proc new*(_: type Sales, market: Market, clock: Clock, repo: RepoStore, @@ -111,16 +111,20 @@ proc remove(sales: Sales, agent: SalesAgent) {.async.} = proc cleanUp(sales: Sales, agent: SalesAgent, returnBytes: bool, + reprocessSlot: bool, processing: Future[void]) {.async.} = let data = agent.data - trace "cleaning up sales agent", - requestId = data.requestId, - slotIndex = data.slotIndex, - reservationId = data.reservation.?id |? ReservationId.default, + logScope: + topics = "sales cleanUp" + requestId = data.requestId + slotIndex = data.slotIndex + reservationId = data.reservation.?id |? ReservationId.default availabilityId = data.reservation.?availabilityId |? AvailabilityId.default + trace "cleaning up sales agent" + # if reservation for the SalesAgent was not created, then it means # that the cleanUp was called before the sales process really started, so # there are not really any bytes to be returned @@ -132,7 +136,6 @@ proc cleanUp(sales: Sales, )).errorOption: error "failure returning bytes", error = returnErr.msg, - availabilityId = reservation.availabilityId, bytes = request.ask.slotSize # delete reservation and return reservation bytes back to the availability @@ -141,10 +144,21 @@ proc cleanUp(sales: Sales, reservation.id, reservation.availabilityId )).errorOption: - error "failure deleting reservation", - error = deleteErr.msg, - reservationId = reservation.id, - availabilityId = reservation.availabilityId + error "failure deleting reservation", error = deleteErr.msg + + # Re-add items back into the queue to prevent small availabilities from + # draining the queue. Seen items will be ordered last. + if reprocessSlot and request =? data.request: + let queue = sales.context.slotQueue + var seenItem = SlotQueueItem.init(data.requestId, + data.slotIndex.truncate(uint16), + data.ask, + request.expiry, + seen = true) + trace "pushing ignored item to queue, marked as seen" + if err =? queue.push(seenItem).errorOption: + error "failed to readd slot to queue", + errorType = $(type err), error = err.msg await sales.remove(agent) @@ -176,8 +190,8 @@ proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) = none StorageRequest ) - agent.onCleanUp = proc (returnBytes = false) {.async.} = - await sales.cleanUp(agent, returnBytes, done) + agent.onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} = + await sales.cleanUp(agent, returnBytes, reprocessSlot, done) agent.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) = sales.filled(request, slotIndex, done) @@ -222,7 +236,6 @@ proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} = return slots proc activeSale*(sales: Sales, slotId: SlotId): Future[?SalesAgent] {.async.} = - let market = sales.context.market for agent in sales.agents: if slotId(agent.data.requestId, agent.data.slotIndex) == slotId: return some agent @@ -241,60 +254,29 @@ proc load*(sales: Sales) {.async.} = slot.slotIndex, some slot.request) - agent.onCleanUp = proc(returnBytes = false) {.async.} = - let done = newFuture[void]("onCleanUp_Dummy") - await sales.cleanUp(agent, returnBytes, done) - await done # completed in sales.cleanUp + agent.onCleanUp = proc(returnBytes = false, reprocessSlot = false) {.async.} = + # since workers are not being dispatched, this future has not been created + # by a worker. Create a dummy one here so we can call sales.cleanUp + let done: Future[void] = nil + await sales.cleanUp(agent, returnBytes, reprocessSlot, done) + + # There is no need to assign agent.onFilled as slots loaded from `mySlots` + # are inherently already filled and so assigning agent.onFilled would be + # superfluous. agent.start(SaleUnknown()) sales.agents.add agent proc onAvailabilityAdded(sales: Sales, availability: Availability) {.async.} = - ## Query last 256 blocks for new requests, adding them to the queue. `push` - ## checks for availability before adding to the queue. If processed, the - ## sales agent will check if the slot is free. - let context = sales.context - let market = context.market - let queue = context.slotQueue + ## When availabilities are modified or added, the queue should be unpaused if + ## it was paused and any slots in the queue should have their `seen` flag + ## cleared. + let queue = sales.context.slotQueue - logScope: - topics = "marketplace sales onAvailabilityAdded callback" - - trace "availability added, querying past storage requests to add to queue" - - try: - let events = await market.queryPastStorageRequests(256) - - if events.len == 0: - trace "no storage request events found in recent past" - return - - let requests = events.map(event => - SlotQueueItem.init(event.requestId, event.ask, event.expiry) - ) - - trace "found past storage requested events to add to queue", - events = events.len - - for slots in requests: - for slot in slots: - if err =? queue.push(slot).errorOption: - # continue on error - if err of QueueNotRunningError: - warn "cannot push items to queue, queue is not running" - elif err of NoMatchingAvailabilityError: - info "slot in queue had no matching availabilities, ignoring" - elif err of SlotsOutOfRangeError: - warn "Too many slots, cannot add to queue" - elif err of SlotQueueItemExistsError: - trace "item already exists, ignoring" - discard - else: raise err - except CancelledError as error: - raise error - except CatchableError as e: - warn "Error adding request to SlotQueue", error = e.msg - discard + queue.clearSeenFlags() + if queue.paused: + trace "unpausing queue after new availability added" + queue.unpause() proc onStorageRequested(sales: Sales, requestId: RequestId, @@ -321,9 +303,7 @@ proc onStorageRequested(sales: Sales, for item in items: # continue on failure if err =? slotQueue.push(item).errorOption: - if err of NoMatchingAvailabilityError: - info "slot in queue had no matching availabilities, ignoring" - elif err of SlotQueueItemExistsError: + if err of SlotQueueItemExistsError: error "Failed to push item to queue becaue it already exists" elif err of QueueNotRunningError: warn "Failed to push item to queue becaue queue is not running" @@ -364,9 +344,7 @@ proc onSlotFreed(sales: Sales, addSlotToQueue() .track(sales) .catch(proc(err: ref CatchableError) = - if err of NoMatchingAvailabilityError: - info "slot in queue had no matching availabilities, ignoring" - elif err of SlotQueueItemExistsError: + if err of SlotQueueItemExistsError: error "Failed to push item to queue becaue it already exists" elif err of QueueNotRunningError: warn "Failed to push item to queue becaue queue is not running" @@ -489,6 +467,7 @@ proc startSlotQueue(sales: Sales) {.async.} = slotQueue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = + trace "processing slot queue item", reqId = item.requestId, slotIdx = item.slotIndex sales.processSlot(item, done) asyncSpawn slotQueue.start() diff --git a/codex/sales/reservations.nim b/codex/sales/reservations.nim index 68dd45e8..c64dd806 100644 --- a/codex/sales/reservations.nim +++ b/codex/sales/reservations.nim @@ -28,6 +28,8 @@ import pkg/upraises push: {.upraises: [].} +import std/sequtils +import std/sugar import std/typetraits import std/sequtils import pkg/chronos @@ -37,6 +39,7 @@ import pkg/questionable import pkg/questionable/results import pkg/stint import pkg/stew/byteutils +import ../codextypes import ../logutils import ../clock import ../stores @@ -90,6 +93,8 @@ const SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module ReservationsKey = (SalesKey / "reservations").tryGet +proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.} + proc new*(T: type Reservations, repo: RepoStore): Reservations = @@ -226,26 +231,57 @@ proc update*( without key =? obj.key, error: return failure(error) - let getResult = await self.get(key, Availability) - - if getResult.isOk: - let oldAvailability = !getResult - - # Sizing of the availability changed, we need to adjust the repo reservation accordingly - if oldAvailability.totalSize != obj.totalSize: - if oldAvailability.totalSize < obj.totalSize: # storage added - if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint))).errorOption: - return failure(reserveErr.toErr(ReserveFailedError)) - - elif oldAvailability.totalSize > obj.totalSize: # storage removed - if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint))).errorOption: - return failure(reserveErr.toErr(ReleaseFailedError)) - else: - let err = getResult.error() - if not (err of NotExistsError): + without oldAvailability =? await self.get(key, Availability), err: + if err of NotExistsError: + let res = await self.updateImpl(obj) + # inform subscribers that Availability has been added + if onAvailabilityAdded =? self.onAvailabilityAdded: + # when chronos v4 is implemented, and OnAvailabilityAdded is annotated + # with async:(raises:[]), we can remove this try/catch as we know, with + # certainty, that nothing will be raised + try: + await onAvailabilityAdded(obj) + except CancelledError as e: + raise e + except CatchableError as e: + # we don't have any insight into types of exceptions that + # `onAvailabilityAdded` can raise because it is caller-defined + warn "Unknown error during 'onAvailabilityAdded' callback", + availabilityId = obj.id, error = e.msg + return res + else: return failure(err) - return await self.updateImpl(obj) + # Sizing of the availability changed, we need to adjust the repo reservation accordingly + if oldAvailability.totalSize != obj.totalSize: + if oldAvailability.totalSize < obj.totalSize: # storage added + if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint))).errorOption: + return failure(reserveErr.toErr(ReserveFailedError)) + + elif oldAvailability.totalSize > obj.totalSize: # storage removed + if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint))).errorOption: + return failure(reserveErr.toErr(ReleaseFailedError)) + + let res = await self.updateImpl(obj) + + if oldAvailability.freeSize < obj.freeSize: # availability added + # inform subscribers that Availability has been modified (with increased + # size) + if onAvailabilityAdded =? self.onAvailabilityAdded: + # when chronos v4 is implemented, and OnAvailabilityAdded is annotated + # with async:(raises:[]), we can remove this try/catch as we know, with + # certainty, that nothing will be raised + try: + await onAvailabilityAdded(obj) + except CancelledError as e: + raise e + except CatchableError as e: + # we don't have any insight into types of exceptions that + # `onAvailabilityAdded` can raise because it is caller-defined + warn "Unknown error during 'onAvailabilityAdded' callback", + availabilityId = obj.id, error = e.msg + + return res proc delete( self: Reservations, @@ -300,6 +336,9 @@ proc deleteReservation*( return success() +# TODO: add support for deleting availabilities +# To delete, must not have any active sales. + proc createAvailability*( self: Reservations, size: UInt256, @@ -327,17 +366,6 @@ proc createAvailability*( return failure(updateErr) - if onAvailabilityAdded =? self.onAvailabilityAdded: - try: - await onAvailabilityAdded(availability) - except CancelledError as error: - raise error - except CatchableError as e: - # we don't have any insight into types of errors that `onProcessSlot` can - # throw because it is caller-defined - warn "Unknown error during 'onAvailabilityAdded' callback", - availabilityId = availability.id, error = e.msg - return success(availability) proc createReservation*( diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim index 5bb0e9fb..81de2d6f 100644 --- a/codex/sales/salesagent.nim +++ b/codex/sales/salesagent.nim @@ -25,7 +25,7 @@ type onCleanUp*: OnCleanUp onFilled*: ?OnFilled - OnCleanUp* = proc (returnBytes = false): Future[void] {.gcsafe, upraises: [].} + OnCleanUp* = proc (returnBytes = false, reprocessSlot = false): Future[void] {.gcsafe, upraises: [].} OnFilled* = proc(request: StorageRequest, slotIndex: UInt256) {.gcsafe, upraises: [].} diff --git a/codex/sales/slotqueue.nim b/codex/sales/slotqueue.nim index 0512d388..198ef80f 100644 --- a/codex/sales/slotqueue.nim +++ b/codex/sales/slotqueue.nim @@ -36,6 +36,7 @@ type reward: UInt256 collateral: UInt256 expiry: UInt256 + seen: bool # don't need to -1 to prevent overflow when adding 1 (to always allow push) # because AsyncHeapQueue size is of type `int`, which is larger than `uint16` @@ -48,12 +49,12 @@ type running: bool workers: AsyncQueue[SlotQueueWorker] trackedFutures: TrackedFutures + unpaused: AsyncEvent SlotQueueError = object of CodexError SlotQueueItemExistsError* = object of SlotQueueError SlotQueueItemNotExistsError* = object of SlotQueueError SlotsOutOfRangeError* = object of SlotQueueError - NoMatchingAvailabilityError* = object of SlotQueueError QueueNotRunningError* = object of SlotQueueError # Number of concurrent workers used for processing SlotQueueItems @@ -84,6 +85,9 @@ proc `<`*(a, b: SlotQueueItem): bool = if condition: score += 1'u8 shl addition + scoreA.addIf(a.seen < b.seen, 4) + scoreB.addIf(a.seen > b.seen, 4) + scoreA.addIf(a.profitability > b.profitability, 3) scoreB.addIf(a.profitability < b.profitability, 3) @@ -117,12 +121,13 @@ proc new*(_: type SlotQueue, # temporarily. After push (and sort), the bottom-most item will be deleted queue: newAsyncHeapQueue[SlotQueueItem](maxSize.int + 1), running: false, - trackedFutures: TrackedFutures.new() + trackedFutures: TrackedFutures.new(), + unpaused: newAsyncEvent() ) # avoid instantiating `workers` in constructor to avoid side effects in # `newAsyncQueue` procedure -proc init*(_: type SlotQueueWorker): SlotQueueWorker = +proc init(_: type SlotQueueWorker): SlotQueueWorker = SlotQueueWorker( doneProcessing: newFuture[void]("slotqueue.worker.processing") ) @@ -131,7 +136,8 @@ proc init*(_: type SlotQueueItem, requestId: RequestId, slotIndex: uint16, ask: StorageAsk, - expiry: UInt256): SlotQueueItem = + expiry: UInt256, + seen = false): SlotQueueItem = SlotQueueItem( requestId: requestId, @@ -140,7 +146,8 @@ proc init*(_: type SlotQueueItem, duration: ask.duration, reward: ask.reward, collateral: ask.collateral, - expiry: expiry + expiry: expiry, + seen: seen ) proc init*(_: type SlotQueueItem, @@ -184,6 +191,7 @@ proc slotSize*(self: SlotQueueItem): UInt256 = self.slotSize proc duration*(self: SlotQueueItem): UInt256 = self.duration proc reward*(self: SlotQueueItem): UInt256 = self.reward proc collateral*(self: SlotQueueItem): UInt256 = self.collateral +proc seen*(self: SlotQueueItem): bool = self.seen proc running*(self: SlotQueue): bool = self.running @@ -191,6 +199,8 @@ proc len*(self: SlotQueue): int = self.queue.len proc size*(self: SlotQueue): int = self.queue.size - 1 +proc paused*(self: SlotQueue): bool = not self.unpaused.isSet + proc `$`*(self: SlotQueue): string = $self.queue proc `onProcessSlot=`*(self: SlotQueue, onProcessSlot: OnProcessSlot) = @@ -205,6 +215,14 @@ proc activeWorkers*(self: SlotQueue): int = proc contains*(self: SlotQueue, item: SlotQueueItem): bool = self.queue.contains(item) +proc pause*(self: SlotQueue) = + # set unpaused flag to false -- coroutines will block on unpaused.wait() + self.unpaused.clear() + +proc unpause*(self: SlotQueue) = + # set unpaused flag to true -- unblocks coroutines waiting on unpaused.wait() + self.unpaused.fire() + proc populateItem*(self: SlotQueue, requestId: RequestId, slotIndex: uint16): ?SlotQueueItem = @@ -226,8 +244,12 @@ proc populateItem*(self: SlotQueue, proc push*(self: SlotQueue, item: SlotQueueItem): ?!void = - trace "pushing item to queue", - requestId = item.requestId, slotIndex = item.slotIndex + logScope: + requestId = item.requestId + slotIndex = item.slotIndex + seen = item.seen + + trace "pushing item to queue" if not self.running: let err = newException(QueueNotRunningError, "queue not running") @@ -245,6 +267,13 @@ proc push*(self: SlotQueue, item: SlotQueueItem): ?!void = self.queue.del(self.queue.size - 1) doAssert self.queue.len <= self.queue.size - 1 + + # when slots are pushed to the queue, the queue should be unpaused if it was + # paused + if self.paused and not item.seen: + trace "unpausing queue after new slot pushed" + self.unpause() + return success() proc push*(self: SlotQueue, items: seq[SlotQueueItem]): ?!void = @@ -295,6 +324,7 @@ proc addWorker(self: SlotQueue): ?!void = let worker = SlotQueueWorker.init() try: + discard worker.doneProcessing.track(self) self.workers.addLastNoWait(worker) except AsyncQueueFullError: return failure("failed to add worker, worker queue full") @@ -314,6 +344,7 @@ proc dispatch(self: SlotQueue, if onProcessSlot =? self.onProcessSlot: try: + discard worker.doneProcessing.track(self) await onProcessSlot(item, worker.doneProcessing) await worker.doneProcessing @@ -332,6 +363,23 @@ proc dispatch(self: SlotQueue, # throw because it is caller-defined warn "Unknown error processing slot in worker", error = e.msg +proc clearSeenFlags*(self: SlotQueue) = + # Enumerate all items in the queue, overwriting each item with `seen = false`. + # To avoid issues with new queue items being pushed to the queue while all + # items are being iterated (eg if a new storage request comes in and pushes + # new slots to the queue), this routine must remain synchronous. + + if self.queue.empty: + return + + for item in self.queue.mitems: + item.seen = false # does not maintain the heap invariant + + # force heap reshuffling to maintain the heap invariant + doAssert self.queue.update(self.queue[0]), "slot queue failed to reshuffle" + + trace "all 'seen' flags cleared" + proc start*(self: SlotQueue) {.async.} = if self.running: return @@ -351,21 +399,47 @@ proc start*(self: SlotQueue) {.async.} = while self.running: try: + if self.paused: + trace "Queue is paused, waiting for new slots or availabilities to be modified/added" + + # block until unpaused is true/fired, ie wait for queue to be unpaused + await self.unpaused.wait() + let worker = await self.workers.popFirst().track(self) # if workers saturated, wait here for new workers let item = await self.queue.pop().track(self) # if queue empty, wait here for new items + logScope: + reqId = item.requestId + slotIdx = item.slotIndex + seen = item.seen + if not self.running: # may have changed after waiting for pop trace "not running, exiting" break + # If, upon processing a slot, the slot item already has a `seen` flag set, + # the queue should be paused. + if item.seen: + trace "processing already seen item, pausing queue", + reqId = item.requestId, slotIdx = item.slotIndex + self.pause() + # put item back in queue so that if other items are pushed while paused, + # it will be sorted accordingly. Otherwise, this item would be processed + # immediately (with priority over other items) once unpaused + trace "readding seen item back into the queue" + discard self.push(item) # on error, drop the item and continue + worker.doneProcessing.complete() + await sleepAsync(1.millis) # poll + continue + + trace "processing item" + self.dispatch(worker, item) .track(self) .catch(proc (e: ref CatchableError) = error "Unknown error dispatching worker", error = e.msg ) - discard worker.doneProcessing.track(self) - await sleepAsync(1.millis) # poll except CancelledError: trace "slot queue cancelled" diff --git a/codex/sales/states/cancelled.nim b/codex/sales/states/cancelled.nim index 8464a61b..4bdc444e 100644 --- a/codex/sales/states/cancelled.nim +++ b/codex/sales/states/cancelled.nim @@ -28,6 +28,6 @@ method run*(state: SaleCancelled, machine: Machine): Future[?State] {.async.} = onClear(request, data.slotIndex) if onCleanUp =? agent.onCleanUp: - await onCleanUp(returnBytes = true) + await onCleanUp(returnBytes = true, reprocessSlot = false) warn "Sale cancelled due to timeout", requestId = data.requestId, slotIndex = data.slotIndex diff --git a/codex/sales/states/downloading.nim b/codex/sales/states/downloading.nim index c301ab2e..deed0e35 100644 --- a/codex/sales/states/downloading.nim +++ b/codex/sales/states/downloading.nim @@ -69,7 +69,7 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} if err =? (await onStore(request, data.slotIndex, onBlocks)).errorOption: - return some State(SaleErrored(error: err)) + return some State(SaleErrored(error: err, reprocessSlot: false)) trace "Download complete" return some State(SaleInitialProving()) diff --git a/codex/sales/states/errored.nim b/codex/sales/states/errored.nim index 51f34bc9..fdd83122 100644 --- a/codex/sales/states/errored.nim +++ b/codex/sales/states/errored.nim @@ -12,6 +12,7 @@ logScope: type SaleErrored* = ref object of SaleState error*: ref CatchableError + reprocessSlot*: bool method `$`*(state: SaleErrored): string = "SaleErrored" @@ -30,5 +31,5 @@ method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} = onClear(request, data.slotIndex) if onCleanUp =? agent.onCleanUp: - await onCleanUp(returnBytes = true) + await onCleanUp(returnBytes = true, reprocessSlot = state.reprocessSlot) diff --git a/codex/sales/states/ignored.nim b/codex/sales/states/ignored.nim index d757e9c1..7a70fb20 100644 --- a/codex/sales/states/ignored.nim +++ b/codex/sales/states/ignored.nim @@ -17,4 +17,7 @@ method run*(state: SaleIgnored, machine: Machine): Future[?State] {.async.} = let agent = SalesAgent(machine) if onCleanUp =? agent.onCleanUp: - await onCleanUp() + # Ignored slots mean there was no availability. In order to prevent small + # availabilities from draining the queue, mark this slot as seen and re-add + # back into the queue. + await onCleanUp(reprocessSlot = true) diff --git a/tests/codex/helpers/mockslotqueueitem.nim b/tests/codex/helpers/mockslotqueueitem.nim new file mode 100644 index 00000000..e4c0bbb6 --- /dev/null +++ b/tests/codex/helpers/mockslotqueueitem.nim @@ -0,0 +1,26 @@ +import pkg/codex/contracts/requests +import pkg/codex/sales/slotqueue + +type MockSlotQueueItem* = object + requestId*: RequestId + slotIndex*: uint16 + slotSize*: UInt256 + duration*: UInt256 + reward*: UInt256 + collateral*: UInt256 + expiry*: UInt256 + seen*: bool + +proc toSlotQueueItem*(item: MockSlotQueueItem): SlotQueueItem = + SlotQueueItem.init( + requestId = item.requestId, + slotIndex = item.slotIndex, + ask = StorageAsk( + slotSize: item.slotSize, + duration: item.duration, + reward: item.reward, + collateral: item.collateral + ), + expiry = item.expiry, + seen = item.seen + ) diff --git a/tests/codex/sales/states/testcancelled.nim b/tests/codex/sales/states/testcancelled.nim new file mode 100644 index 00000000..e252cd9c --- /dev/null +++ b/tests/codex/sales/states/testcancelled.nim @@ -0,0 +1,45 @@ +import pkg/questionable +import pkg/chronos +import pkg/codex/contracts/requests +import pkg/codex/sales/states/cancelled +import pkg/codex/sales/salesagent +import pkg/codex/sales/salescontext +import pkg/codex/market + +import ../../../asynctest +import ../../examples +import ../../helpers +import ../../helpers/mockmarket +import ../../helpers/mockclock + +asyncchecksuite "sales state 'cancelled'": + let request = StorageRequest.example + let slotIndex = (request.ask.slots div 2).u256 + let market = MockMarket.new() + let clock = MockClock.new() + + var state: SaleCancelled + var agent: SalesAgent + var returnBytesWas = false + var reprocessSlotWas = false + + setup: + let onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} = + returnBytesWas = returnBytes + reprocessSlotWas = reprocessSlot + + let context = SalesContext( + market: market, + clock: clock + ) + agent = newSalesAgent(context, + request.id, + slotIndex, + request.some) + agent.onCleanUp = onCleanUp + state = SaleCancelled.new() + + test "calls onCleanUp with returnBytes = false and reprocessSlot = true": + discard await state.run(agent) + check eventually returnBytesWas == true + check eventually reprocessSlotWas == false diff --git a/tests/codex/sales/states/testdownloading.nim b/tests/codex/sales/states/testdownloading.nim index 3f65c6e7..fc81b158 100644 --- a/tests/codex/sales/states/testdownloading.nim +++ b/tests/codex/sales/states/testdownloading.nim @@ -1,8 +1,9 @@ import std/unittest import pkg/questionable import pkg/codex/contracts/requests -import pkg/codex/sales/states/downloading import pkg/codex/sales/states/cancelled +import pkg/codex/sales/states/downloading +import pkg/codex/sales/states/errored import pkg/codex/sales/states/failed import pkg/codex/sales/states/filled import ../../examples diff --git a/tests/codex/sales/states/testerrored.nim b/tests/codex/sales/states/testerrored.nim new file mode 100644 index 00000000..dc525894 --- /dev/null +++ b/tests/codex/sales/states/testerrored.nim @@ -0,0 +1,49 @@ +import pkg/questionable +import pkg/chronos +import pkg/codex/contracts/requests +import pkg/codex/sales/states/errored +import pkg/codex/sales/salesagent +import pkg/codex/sales/salescontext +import pkg/codex/market + +import ../../../asynctest +import ../../examples +import ../../helpers +import ../../helpers/mockmarket +import ../../helpers/mockclock + +asyncchecksuite "sales state 'errored'": + let request = StorageRequest.example + let slotIndex = (request.ask.slots div 2).u256 + let market = MockMarket.new() + let clock = MockClock.new() + + var state: SaleErrored + var agent: SalesAgent + var returnBytesWas = false + var reprocessSlotWas = false + + setup: + let onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} = + returnBytesWas = returnBytes + reprocessSlotWas = reprocessSlot + + let context = SalesContext( + market: market, + clock: clock + ) + agent = newSalesAgent(context, + request.id, + slotIndex, + request.some) + agent.onCleanUp = onCleanUp + state = SaleErrored(error: newException(ValueError, "oh no!")) + + test "calls onCleanUp with returnBytes = false and reprocessSlot = true": + state = SaleErrored( + error: newException(ValueError, "oh no!"), + reprocessSlot: true + ) + discard await state.run(agent) + check eventually returnBytesWas == true + check eventually reprocessSlotWas == true diff --git a/tests/codex/sales/states/testignored.nim b/tests/codex/sales/states/testignored.nim new file mode 100644 index 00000000..680dca8d --- /dev/null +++ b/tests/codex/sales/states/testignored.nim @@ -0,0 +1,45 @@ +import pkg/questionable +import pkg/chronos +import pkg/codex/contracts/requests +import pkg/codex/sales/states/ignored +import pkg/codex/sales/salesagent +import pkg/codex/sales/salescontext +import pkg/codex/market + +import ../../../asynctest +import ../../examples +import ../../helpers +import ../../helpers/mockmarket +import ../../helpers/mockclock + +asyncchecksuite "sales state 'ignored'": + let request = StorageRequest.example + let slotIndex = (request.ask.slots div 2).u256 + let market = MockMarket.new() + let clock = MockClock.new() + + var state: SaleIgnored + var agent: SalesAgent + var returnBytesWas = false + var reprocessSlotWas = false + + setup: + let onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} = + returnBytesWas = returnBytes + reprocessSlotWas = reprocessSlot + + let context = SalesContext( + market: market, + clock: clock + ) + agent = newSalesAgent(context, + request.id, + slotIndex, + request.some) + agent.onCleanUp = onCleanUp + state = SaleIgnored.new() + + test "calls onCleanUp with returnBytes = false and reprocessSlot = true": + discard await state.run(agent) + check eventually returnBytesWas == false + check eventually reprocessSlotWas == true diff --git a/tests/codex/sales/testreservations.nim b/tests/codex/sales/testreservations.nim index 4b82fb89..ae15ad2f 100644 --- a/tests/codex/sales/testreservations.nim +++ b/tests/codex/sales/testreservations.nim @@ -258,7 +258,7 @@ asyncchecksuite "Reservations module": check updated.isErr check updated.error of NotExistsError - test "onAvailabilityAdded called when availability is reserved": + test "onAvailabilityAdded called when availability is created": var added: Availability reservations.onAvailabilityAdded = proc(a: Availability) {.async.} = added = a @@ -267,6 +267,26 @@ asyncchecksuite "Reservations module": check added == availability + test "onAvailabilityAdded called when availability size is increased": + var availability = createAvailability() + var added: Availability + reservations.onAvailabilityAdded = proc(a: Availability) {.async.} = + added = a + availability.freeSize += 1.u256 + discard await reservations.update(availability) + + check added == availability + + test "onAvailabilityAdded is not called when availability size is decreased": + var availability = createAvailability() + var called = false + reservations.onAvailabilityAdded = proc(a: Availability) {.async.} = + called = true + availability.freeSize -= 1.u256 + discard await reservations.update(availability) + + check not called + test "availabilities can be found": let availability = createAvailability() diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index 222ba0ff..4aa83e25 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -272,24 +272,41 @@ asyncchecksuite "Sales": let expected = SlotQueueItem.init(request, 2.uint16) check eventually itemsProcessed.contains(expected) - test "adds past requests to queue once availability added": - var itemsProcessed: seq[SlotQueueItem] = @[] - - # ignore all - queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = - done.complete() - + test "items in queue are readded (and marked seen) once ignored": await market.requestStorage(request) - await sleepAsync(10.millis) + let items = SlotQueueItem.init(request) + await sleepAsync(10.millis) # queue starts paused, allow items to be added to the queue + check eventually queue.paused + # The first processed item will be will have been re-pushed with `seen = + # true`. Then, once this item is processed by the queue, its 'seen' flag + # will be checked, at which point the queue will be paused. This test could + # check item existence in the queue, but that would require inspecting + # onProcessSlot to see which item was first, and overridding onProcessSlot + # will prevent the queue working as expected in the Sales module. + check eventually queue.len == 4 - # check how many slots were processed by the queue - queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = - itemsProcessed.add item - done.complete() + for item in items: + check queue.contains(item) - # now add matching availability - createAvailability() - check eventually itemsProcessed.len == request.ask.slots.int + for i in 0.. itemB + test "correct prioritizes SlotQueueItems based on 'seen'": + let request = StorageRequest.example + let itemA = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 2.u256, # profitability is higher (good) + collateral: 1.u256, + expiry: 1.u256, + seen: true # seen (bad), more weight than profitability + ) + let itemB = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 1.u256, # profitability is lower (bad) + collateral: 1.u256, + expiry: 1.u256, + seen: false # not seen (good) + ) + check itemB.toSlotQueueItem < itemA.toSlotQueueItem # B higher priority than A + check itemA.toSlotQueueItem > itemB.toSlotQueueItem + + test "correct prioritizes SlotQueueItems based on profitability": + let request = StorageRequest.example + let itemA = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 1.u256, # reward is lower (bad) + collateral: 1.u256, # collateral is lower (good) + expiry: 1.u256, + seen: false + ) + let itemB = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 2.u256, # reward is higher (good), more weight than collateral + collateral: 2.u256, # collateral is higher (bad) + expiry: 1.u256, + seen: false + ) + + check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority + + test "correct prioritizes SlotQueueItems based on collateral": + let request = StorageRequest.example + let itemA = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 1.u256, + collateral: 2.u256, # collateral is higher (bad) + expiry: 2.u256, # expiry is longer (good) + seen: false + ) + let itemB = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 1.u256, + collateral: 1.u256, # collateral is lower (good), more weight than expiry + expiry: 1.u256, # expiry is shorter (bad) + seen: false + ) + + check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority + + test "correct prioritizes SlotQueueItems based on expiry": + let request = StorageRequest.example + let itemA = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, # slotSize is smaller (good) + duration: 1.u256, + reward: 1.u256, + collateral: 1.u256, + expiry: 1.u256, # expiry is shorter (bad) + seen: false + ) + let itemB = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 2.u256, # slotSize is larger (bad) + duration: 1.u256, + reward: 1.u256, + collateral: 1.u256, + expiry: 2.u256, # expiry is longer (good), more weight than slotSize + seen: false + ) + + check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority + + test "correct prioritizes SlotQueueItems based on slotSize": + let request = StorageRequest.example + let itemA = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 2.u256, # slotSize is larger (bad) + duration: 1.u256, + reward: 1.u256, + collateral: 1.u256, + expiry: 1.u256, # expiry is shorter (bad) + seen: false + ) + let itemB = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, # slotSize is smaller (good) + duration: 1.u256, + reward: 1.u256, + collateral: 1.u256, + expiry: 1.u256, + seen: false + ) + + check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority + test "expands available all possible slot indices on init": let request = StorageRequest.example let items = SlotQueueItem.init(request) @@ -391,3 +516,71 @@ suite "Slot queue": (item3.requestId, item3.slotIndex), ] ) + + test "processing a 'seen' item pauses the queue": + newSlotQueue(maxSize = 4, maxWorkers = 4) + let request = StorageRequest.example + let item = SlotQueueItem.init(request.id, 0'u16, + request.ask, + request.expiry, + seen = true) + queue.push(item) + check eventually queue.paused + check onProcessSlotCalledWith.len == 0 + + test "pushing items to queue unpauses queue": + newSlotQueue(maxSize = 4, maxWorkers = 4) + queue.pause + + let request = StorageRequest.example + var items = SlotQueueItem.init(request) + queue.push(items) + # check all items processed + check eventually queue.len == 0 + + test "pushing seen item does not unpause queue": + newSlotQueue(maxSize = 4, maxWorkers = 4) + let request = StorageRequest.example + let item0 = SlotQueueItem.init(request.id, 0'u16, + request.ask, + request.expiry, + seen = true) + check queue.paused + queue.push(item0) + check queue.paused + + test "paused queue waits for unpause before continuing processing": + newSlotQueue(maxSize = 4, maxWorkers = 4) + let request = StorageRequest.example + let item = SlotQueueItem.init(request.id, 1'u16, + request.ask, + request.expiry, + seen = false) + check queue.paused + # push causes unpause + queue.push(item) + # check all items processed + check eventually onProcessSlotCalledWith == @[ + (item.requestId, item.slotIndex), + ] + check eventually queue.len == 0 + + test "item 'seen' flags can be cleared": + newSlotQueue(maxSize = 4, maxWorkers = 1) + let request = StorageRequest.example + let item0 = SlotQueueItem.init(request.id, 0'u16, + request.ask, + request.expiry, + seen = true) + let item1 = SlotQueueItem.init(request.id, 1'u16, + request.ask, + request.expiry, + seen = true) + queue.push(item0) + queue.push(item1) + check queue[0].seen + check queue[1].seen + + queue.clearSeenFlags() + check queue[0].seen == false + check queue[1].seen == false