diff --git a/codex/sales.nim b/codex/sales.nim index 95c29cd2..c4fcb217 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -78,13 +78,13 @@ proc onProve*(sales: Sales): ?OnProve = sales.context.onProve proc onExpiryUpdate*(sales: Sales): ?OnExpiryUpdate = sales.context.onExpiryUpdate -func new*(_: type Sales, +proc new*(_: type Sales, market: Market, clock: Clock, repo: RepoStore): Sales = Sales.new(market, clock, repo, 0) -func new*(_: type Sales, +proc new*(_: type Sales, market: Market, clock: Clock, repo: RepoStore, @@ -111,16 +111,20 @@ proc remove(sales: Sales, agent: SalesAgent) {.async.} = proc cleanUp(sales: Sales, agent: SalesAgent, returnBytes: bool, + reprocessSlot: bool, processing: Future[void]) {.async.} = let data = agent.data - trace "cleaning up sales agent", - requestId = data.requestId, - slotIndex = data.slotIndex, - reservationId = data.reservation.?id |? ReservationId.default, + logScope: + topics = "sales cleanUp" + requestId = data.requestId + slotIndex = data.slotIndex + reservationId = data.reservation.?id |? ReservationId.default availabilityId = data.reservation.?availabilityId |? AvailabilityId.default + trace "cleaning up sales agent" + # if reservation for the SalesAgent was not created, then it means # that the cleanUp was called before the sales process really started, so # there are not really any bytes to be returned @@ -132,7 +136,6 @@ proc cleanUp(sales: Sales, )).errorOption: error "failure returning bytes", error = returnErr.msg, - availabilityId = reservation.availabilityId, bytes = request.ask.slotSize # delete reservation and return reservation bytes back to the availability @@ -141,10 +144,21 @@ proc cleanUp(sales: Sales, reservation.id, reservation.availabilityId )).errorOption: - error "failure deleting reservation", - error = deleteErr.msg, - reservationId = reservation.id, - availabilityId = reservation.availabilityId + error "failure deleting reservation", error = deleteErr.msg + + # Re-add items back into the queue to prevent small availabilities from + # draining the queue. Seen items will be ordered last. + if reprocessSlot and request =? data.request: + let queue = sales.context.slotQueue + var seenItem = SlotQueueItem.init(data.requestId, + data.slotIndex.truncate(uint16), + data.ask, + request.expiry, + seen = true) + trace "pushing ignored item to queue, marked as seen" + if err =? queue.push(seenItem).errorOption: + error "failed to readd slot to queue", + errorType = $(type err), error = err.msg await sales.remove(agent) @@ -176,8 +190,8 @@ proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) = none StorageRequest ) - agent.onCleanUp = proc (returnBytes = false) {.async.} = - await sales.cleanUp(agent, returnBytes, done) + agent.onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} = + await sales.cleanUp(agent, returnBytes, reprocessSlot, done) agent.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) = sales.filled(request, slotIndex, done) @@ -222,7 +236,6 @@ proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} = return slots proc activeSale*(sales: Sales, slotId: SlotId): Future[?SalesAgent] {.async.} = - let market = sales.context.market for agent in sales.agents: if slotId(agent.data.requestId, agent.data.slotIndex) == slotId: return some agent @@ -241,60 +254,29 @@ proc load*(sales: Sales) {.async.} = slot.slotIndex, some slot.request) - agent.onCleanUp = proc(returnBytes = false) {.async.} = - let done = newFuture[void]("onCleanUp_Dummy") - await sales.cleanUp(agent, returnBytes, done) - await done # completed in sales.cleanUp + agent.onCleanUp = proc(returnBytes = false, reprocessSlot = false) {.async.} = + # since workers are not being dispatched, this future has not been created + # by a worker. Create a dummy one here so we can call sales.cleanUp + let done: Future[void] = nil + await sales.cleanUp(agent, returnBytes, reprocessSlot, done) + + # There is no need to assign agent.onFilled as slots loaded from `mySlots` + # are inherently already filled and so assigning agent.onFilled would be + # superfluous. agent.start(SaleUnknown()) sales.agents.add agent proc onAvailabilityAdded(sales: Sales, availability: Availability) {.async.} = - ## Query last 256 blocks for new requests, adding them to the queue. `push` - ## checks for availability before adding to the queue. If processed, the - ## sales agent will check if the slot is free. - let context = sales.context - let market = context.market - let queue = context.slotQueue + ## When availabilities are modified or added, the queue should be unpaused if + ## it was paused and any slots in the queue should have their `seen` flag + ## cleared. + let queue = sales.context.slotQueue - logScope: - topics = "marketplace sales onAvailabilityAdded callback" - - trace "availability added, querying past storage requests to add to queue" - - try: - let events = await market.queryPastStorageRequests(256) - - if events.len == 0: - trace "no storage request events found in recent past" - return - - let requests = events.map(event => - SlotQueueItem.init(event.requestId, event.ask, event.expiry) - ) - - trace "found past storage requested events to add to queue", - events = events.len - - for slots in requests: - for slot in slots: - if err =? queue.push(slot).errorOption: - # continue on error - if err of QueueNotRunningError: - warn "cannot push items to queue, queue is not running" - elif err of NoMatchingAvailabilityError: - info "slot in queue had no matching availabilities, ignoring" - elif err of SlotsOutOfRangeError: - warn "Too many slots, cannot add to queue" - elif err of SlotQueueItemExistsError: - trace "item already exists, ignoring" - discard - else: raise err - except CancelledError as error: - raise error - except CatchableError as e: - warn "Error adding request to SlotQueue", error = e.msg - discard + queue.clearSeenFlags() + if queue.paused: + trace "unpausing queue after new availability added" + queue.unpause() proc onStorageRequested(sales: Sales, requestId: RequestId, @@ -321,9 +303,7 @@ proc onStorageRequested(sales: Sales, for item in items: # continue on failure if err =? slotQueue.push(item).errorOption: - if err of NoMatchingAvailabilityError: - info "slot in queue had no matching availabilities, ignoring" - elif err of SlotQueueItemExistsError: + if err of SlotQueueItemExistsError: error "Failed to push item to queue becaue it already exists" elif err of QueueNotRunningError: warn "Failed to push item to queue becaue queue is not running" @@ -364,9 +344,7 @@ proc onSlotFreed(sales: Sales, addSlotToQueue() .track(sales) .catch(proc(err: ref CatchableError) = - if err of NoMatchingAvailabilityError: - info "slot in queue had no matching availabilities, ignoring" - elif err of SlotQueueItemExistsError: + if err of SlotQueueItemExistsError: error "Failed to push item to queue becaue it already exists" elif err of QueueNotRunningError: warn "Failed to push item to queue becaue queue is not running" @@ -489,6 +467,7 @@ proc startSlotQueue(sales: Sales) {.async.} = slotQueue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = + trace "processing slot queue item", reqId = item.requestId, slotIdx = item.slotIndex sales.processSlot(item, done) asyncSpawn slotQueue.start() diff --git a/codex/sales/reservations.nim b/codex/sales/reservations.nim index 68dd45e8..c64dd806 100644 --- a/codex/sales/reservations.nim +++ b/codex/sales/reservations.nim @@ -28,6 +28,8 @@ import pkg/upraises push: {.upraises: [].} +import std/sequtils +import std/sugar import std/typetraits import std/sequtils import pkg/chronos @@ -37,6 +39,7 @@ import pkg/questionable import pkg/questionable/results import pkg/stint import pkg/stew/byteutils +import ../codextypes import ../logutils import ../clock import ../stores @@ -90,6 +93,8 @@ const SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module ReservationsKey = (SalesKey / "reservations").tryGet +proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.} + proc new*(T: type Reservations, repo: RepoStore): Reservations = @@ -226,26 +231,57 @@ proc update*( without key =? obj.key, error: return failure(error) - let getResult = await self.get(key, Availability) - - if getResult.isOk: - let oldAvailability = !getResult - - # Sizing of the availability changed, we need to adjust the repo reservation accordingly - if oldAvailability.totalSize != obj.totalSize: - if oldAvailability.totalSize < obj.totalSize: # storage added - if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint))).errorOption: - return failure(reserveErr.toErr(ReserveFailedError)) - - elif oldAvailability.totalSize > obj.totalSize: # storage removed - if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint))).errorOption: - return failure(reserveErr.toErr(ReleaseFailedError)) - else: - let err = getResult.error() - if not (err of NotExistsError): + without oldAvailability =? await self.get(key, Availability), err: + if err of NotExistsError: + let res = await self.updateImpl(obj) + # inform subscribers that Availability has been added + if onAvailabilityAdded =? self.onAvailabilityAdded: + # when chronos v4 is implemented, and OnAvailabilityAdded is annotated + # with async:(raises:[]), we can remove this try/catch as we know, with + # certainty, that nothing will be raised + try: + await onAvailabilityAdded(obj) + except CancelledError as e: + raise e + except CatchableError as e: + # we don't have any insight into types of exceptions that + # `onAvailabilityAdded` can raise because it is caller-defined + warn "Unknown error during 'onAvailabilityAdded' callback", + availabilityId = obj.id, error = e.msg + return res + else: return failure(err) - return await self.updateImpl(obj) + # Sizing of the availability changed, we need to adjust the repo reservation accordingly + if oldAvailability.totalSize != obj.totalSize: + if oldAvailability.totalSize < obj.totalSize: # storage added + if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint))).errorOption: + return failure(reserveErr.toErr(ReserveFailedError)) + + elif oldAvailability.totalSize > obj.totalSize: # storage removed + if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint))).errorOption: + return failure(reserveErr.toErr(ReleaseFailedError)) + + let res = await self.updateImpl(obj) + + if oldAvailability.freeSize < obj.freeSize: # availability added + # inform subscribers that Availability has been modified (with increased + # size) + if onAvailabilityAdded =? self.onAvailabilityAdded: + # when chronos v4 is implemented, and OnAvailabilityAdded is annotated + # with async:(raises:[]), we can remove this try/catch as we know, with + # certainty, that nothing will be raised + try: + await onAvailabilityAdded(obj) + except CancelledError as e: + raise e + except CatchableError as e: + # we don't have any insight into types of exceptions that + # `onAvailabilityAdded` can raise because it is caller-defined + warn "Unknown error during 'onAvailabilityAdded' callback", + availabilityId = obj.id, error = e.msg + + return res proc delete( self: Reservations, @@ -300,6 +336,9 @@ proc deleteReservation*( return success() +# TODO: add support for deleting availabilities +# To delete, must not have any active sales. + proc createAvailability*( self: Reservations, size: UInt256, @@ -327,17 +366,6 @@ proc createAvailability*( return failure(updateErr) - if onAvailabilityAdded =? self.onAvailabilityAdded: - try: - await onAvailabilityAdded(availability) - except CancelledError as error: - raise error - except CatchableError as e: - # we don't have any insight into types of errors that `onProcessSlot` can - # throw because it is caller-defined - warn "Unknown error during 'onAvailabilityAdded' callback", - availabilityId = availability.id, error = e.msg - return success(availability) proc createReservation*( diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim index 5bb0e9fb..81de2d6f 100644 --- a/codex/sales/salesagent.nim +++ b/codex/sales/salesagent.nim @@ -25,7 +25,7 @@ type onCleanUp*: OnCleanUp onFilled*: ?OnFilled - OnCleanUp* = proc (returnBytes = false): Future[void] {.gcsafe, upraises: [].} + OnCleanUp* = proc (returnBytes = false, reprocessSlot = false): Future[void] {.gcsafe, upraises: [].} OnFilled* = proc(request: StorageRequest, slotIndex: UInt256) {.gcsafe, upraises: [].} diff --git a/codex/sales/slotqueue.nim b/codex/sales/slotqueue.nim index 0512d388..198ef80f 100644 --- a/codex/sales/slotqueue.nim +++ b/codex/sales/slotqueue.nim @@ -36,6 +36,7 @@ type reward: UInt256 collateral: UInt256 expiry: UInt256 + seen: bool # don't need to -1 to prevent overflow when adding 1 (to always allow push) # because AsyncHeapQueue size is of type `int`, which is larger than `uint16` @@ -48,12 +49,12 @@ type running: bool workers: AsyncQueue[SlotQueueWorker] trackedFutures: TrackedFutures + unpaused: AsyncEvent SlotQueueError = object of CodexError SlotQueueItemExistsError* = object of SlotQueueError SlotQueueItemNotExistsError* = object of SlotQueueError SlotsOutOfRangeError* = object of SlotQueueError - NoMatchingAvailabilityError* = object of SlotQueueError QueueNotRunningError* = object of SlotQueueError # Number of concurrent workers used for processing SlotQueueItems @@ -84,6 +85,9 @@ proc `<`*(a, b: SlotQueueItem): bool = if condition: score += 1'u8 shl addition + scoreA.addIf(a.seen < b.seen, 4) + scoreB.addIf(a.seen > b.seen, 4) + scoreA.addIf(a.profitability > b.profitability, 3) scoreB.addIf(a.profitability < b.profitability, 3) @@ -117,12 +121,13 @@ proc new*(_: type SlotQueue, # temporarily. After push (and sort), the bottom-most item will be deleted queue: newAsyncHeapQueue[SlotQueueItem](maxSize.int + 1), running: false, - trackedFutures: TrackedFutures.new() + trackedFutures: TrackedFutures.new(), + unpaused: newAsyncEvent() ) # avoid instantiating `workers` in constructor to avoid side effects in # `newAsyncQueue` procedure -proc init*(_: type SlotQueueWorker): SlotQueueWorker = +proc init(_: type SlotQueueWorker): SlotQueueWorker = SlotQueueWorker( doneProcessing: newFuture[void]("slotqueue.worker.processing") ) @@ -131,7 +136,8 @@ proc init*(_: type SlotQueueItem, requestId: RequestId, slotIndex: uint16, ask: StorageAsk, - expiry: UInt256): SlotQueueItem = + expiry: UInt256, + seen = false): SlotQueueItem = SlotQueueItem( requestId: requestId, @@ -140,7 +146,8 @@ proc init*(_: type SlotQueueItem, duration: ask.duration, reward: ask.reward, collateral: ask.collateral, - expiry: expiry + expiry: expiry, + seen: seen ) proc init*(_: type SlotQueueItem, @@ -184,6 +191,7 @@ proc slotSize*(self: SlotQueueItem): UInt256 = self.slotSize proc duration*(self: SlotQueueItem): UInt256 = self.duration proc reward*(self: SlotQueueItem): UInt256 = self.reward proc collateral*(self: SlotQueueItem): UInt256 = self.collateral +proc seen*(self: SlotQueueItem): bool = self.seen proc running*(self: SlotQueue): bool = self.running @@ -191,6 +199,8 @@ proc len*(self: SlotQueue): int = self.queue.len proc size*(self: SlotQueue): int = self.queue.size - 1 +proc paused*(self: SlotQueue): bool = not self.unpaused.isSet + proc `$`*(self: SlotQueue): string = $self.queue proc `onProcessSlot=`*(self: SlotQueue, onProcessSlot: OnProcessSlot) = @@ -205,6 +215,14 @@ proc activeWorkers*(self: SlotQueue): int = proc contains*(self: SlotQueue, item: SlotQueueItem): bool = self.queue.contains(item) +proc pause*(self: SlotQueue) = + # set unpaused flag to false -- coroutines will block on unpaused.wait() + self.unpaused.clear() + +proc unpause*(self: SlotQueue) = + # set unpaused flag to true -- unblocks coroutines waiting on unpaused.wait() + self.unpaused.fire() + proc populateItem*(self: SlotQueue, requestId: RequestId, slotIndex: uint16): ?SlotQueueItem = @@ -226,8 +244,12 @@ proc populateItem*(self: SlotQueue, proc push*(self: SlotQueue, item: SlotQueueItem): ?!void = - trace "pushing item to queue", - requestId = item.requestId, slotIndex = item.slotIndex + logScope: + requestId = item.requestId + slotIndex = item.slotIndex + seen = item.seen + + trace "pushing item to queue" if not self.running: let err = newException(QueueNotRunningError, "queue not running") @@ -245,6 +267,13 @@ proc push*(self: SlotQueue, item: SlotQueueItem): ?!void = self.queue.del(self.queue.size - 1) doAssert self.queue.len <= self.queue.size - 1 + + # when slots are pushed to the queue, the queue should be unpaused if it was + # paused + if self.paused and not item.seen: + trace "unpausing queue after new slot pushed" + self.unpause() + return success() proc push*(self: SlotQueue, items: seq[SlotQueueItem]): ?!void = @@ -295,6 +324,7 @@ proc addWorker(self: SlotQueue): ?!void = let worker = SlotQueueWorker.init() try: + discard worker.doneProcessing.track(self) self.workers.addLastNoWait(worker) except AsyncQueueFullError: return failure("failed to add worker, worker queue full") @@ -314,6 +344,7 @@ proc dispatch(self: SlotQueue, if onProcessSlot =? self.onProcessSlot: try: + discard worker.doneProcessing.track(self) await onProcessSlot(item, worker.doneProcessing) await worker.doneProcessing @@ -332,6 +363,23 @@ proc dispatch(self: SlotQueue, # throw because it is caller-defined warn "Unknown error processing slot in worker", error = e.msg +proc clearSeenFlags*(self: SlotQueue) = + # Enumerate all items in the queue, overwriting each item with `seen = false`. + # To avoid issues with new queue items being pushed to the queue while all + # items are being iterated (eg if a new storage request comes in and pushes + # new slots to the queue), this routine must remain synchronous. + + if self.queue.empty: + return + + for item in self.queue.mitems: + item.seen = false # does not maintain the heap invariant + + # force heap reshuffling to maintain the heap invariant + doAssert self.queue.update(self.queue[0]), "slot queue failed to reshuffle" + + trace "all 'seen' flags cleared" + proc start*(self: SlotQueue) {.async.} = if self.running: return @@ -351,21 +399,47 @@ proc start*(self: SlotQueue) {.async.} = while self.running: try: + if self.paused: + trace "Queue is paused, waiting for new slots or availabilities to be modified/added" + + # block until unpaused is true/fired, ie wait for queue to be unpaused + await self.unpaused.wait() + let worker = await self.workers.popFirst().track(self) # if workers saturated, wait here for new workers let item = await self.queue.pop().track(self) # if queue empty, wait here for new items + logScope: + reqId = item.requestId + slotIdx = item.slotIndex + seen = item.seen + if not self.running: # may have changed after waiting for pop trace "not running, exiting" break + # If, upon processing a slot, the slot item already has a `seen` flag set, + # the queue should be paused. + if item.seen: + trace "processing already seen item, pausing queue", + reqId = item.requestId, slotIdx = item.slotIndex + self.pause() + # put item back in queue so that if other items are pushed while paused, + # it will be sorted accordingly. Otherwise, this item would be processed + # immediately (with priority over other items) once unpaused + trace "readding seen item back into the queue" + discard self.push(item) # on error, drop the item and continue + worker.doneProcessing.complete() + await sleepAsync(1.millis) # poll + continue + + trace "processing item" + self.dispatch(worker, item) .track(self) .catch(proc (e: ref CatchableError) = error "Unknown error dispatching worker", error = e.msg ) - discard worker.doneProcessing.track(self) - await sleepAsync(1.millis) # poll except CancelledError: trace "slot queue cancelled" diff --git a/codex/sales/states/cancelled.nim b/codex/sales/states/cancelled.nim index 8464a61b..4bdc444e 100644 --- a/codex/sales/states/cancelled.nim +++ b/codex/sales/states/cancelled.nim @@ -28,6 +28,6 @@ method run*(state: SaleCancelled, machine: Machine): Future[?State] {.async.} = onClear(request, data.slotIndex) if onCleanUp =? agent.onCleanUp: - await onCleanUp(returnBytes = true) + await onCleanUp(returnBytes = true, reprocessSlot = false) warn "Sale cancelled due to timeout", requestId = data.requestId, slotIndex = data.slotIndex diff --git a/codex/sales/states/downloading.nim b/codex/sales/states/downloading.nim index c301ab2e..deed0e35 100644 --- a/codex/sales/states/downloading.nim +++ b/codex/sales/states/downloading.nim @@ -69,7 +69,7 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} if err =? (await onStore(request, data.slotIndex, onBlocks)).errorOption: - return some State(SaleErrored(error: err)) + return some State(SaleErrored(error: err, reprocessSlot: false)) trace "Download complete" return some State(SaleInitialProving()) diff --git a/codex/sales/states/errored.nim b/codex/sales/states/errored.nim index 51f34bc9..fdd83122 100644 --- a/codex/sales/states/errored.nim +++ b/codex/sales/states/errored.nim @@ -12,6 +12,7 @@ logScope: type SaleErrored* = ref object of SaleState error*: ref CatchableError + reprocessSlot*: bool method `$`*(state: SaleErrored): string = "SaleErrored" @@ -30,5 +31,5 @@ method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} = onClear(request, data.slotIndex) if onCleanUp =? agent.onCleanUp: - await onCleanUp(returnBytes = true) + await onCleanUp(returnBytes = true, reprocessSlot = state.reprocessSlot) diff --git a/codex/sales/states/ignored.nim b/codex/sales/states/ignored.nim index d757e9c1..7a70fb20 100644 --- a/codex/sales/states/ignored.nim +++ b/codex/sales/states/ignored.nim @@ -17,4 +17,7 @@ method run*(state: SaleIgnored, machine: Machine): Future[?State] {.async.} = let agent = SalesAgent(machine) if onCleanUp =? agent.onCleanUp: - await onCleanUp() + # Ignored slots mean there was no availability. In order to prevent small + # availabilities from draining the queue, mark this slot as seen and re-add + # back into the queue. + await onCleanUp(reprocessSlot = true) diff --git a/tests/codex/helpers/mockslotqueueitem.nim b/tests/codex/helpers/mockslotqueueitem.nim new file mode 100644 index 00000000..e4c0bbb6 --- /dev/null +++ b/tests/codex/helpers/mockslotqueueitem.nim @@ -0,0 +1,26 @@ +import pkg/codex/contracts/requests +import pkg/codex/sales/slotqueue + +type MockSlotQueueItem* = object + requestId*: RequestId + slotIndex*: uint16 + slotSize*: UInt256 + duration*: UInt256 + reward*: UInt256 + collateral*: UInt256 + expiry*: UInt256 + seen*: bool + +proc toSlotQueueItem*(item: MockSlotQueueItem): SlotQueueItem = + SlotQueueItem.init( + requestId = item.requestId, + slotIndex = item.slotIndex, + ask = StorageAsk( + slotSize: item.slotSize, + duration: item.duration, + reward: item.reward, + collateral: item.collateral + ), + expiry = item.expiry, + seen = item.seen + ) diff --git a/tests/codex/sales/states/testcancelled.nim b/tests/codex/sales/states/testcancelled.nim new file mode 100644 index 00000000..e252cd9c --- /dev/null +++ b/tests/codex/sales/states/testcancelled.nim @@ -0,0 +1,45 @@ +import pkg/questionable +import pkg/chronos +import pkg/codex/contracts/requests +import pkg/codex/sales/states/cancelled +import pkg/codex/sales/salesagent +import pkg/codex/sales/salescontext +import pkg/codex/market + +import ../../../asynctest +import ../../examples +import ../../helpers +import ../../helpers/mockmarket +import ../../helpers/mockclock + +asyncchecksuite "sales state 'cancelled'": + let request = StorageRequest.example + let slotIndex = (request.ask.slots div 2).u256 + let market = MockMarket.new() + let clock = MockClock.new() + + var state: SaleCancelled + var agent: SalesAgent + var returnBytesWas = false + var reprocessSlotWas = false + + setup: + let onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} = + returnBytesWas = returnBytes + reprocessSlotWas = reprocessSlot + + let context = SalesContext( + market: market, + clock: clock + ) + agent = newSalesAgent(context, + request.id, + slotIndex, + request.some) + agent.onCleanUp = onCleanUp + state = SaleCancelled.new() + + test "calls onCleanUp with returnBytes = false and reprocessSlot = true": + discard await state.run(agent) + check eventually returnBytesWas == true + check eventually reprocessSlotWas == false diff --git a/tests/codex/sales/states/testdownloading.nim b/tests/codex/sales/states/testdownloading.nim index 3f65c6e7..fc81b158 100644 --- a/tests/codex/sales/states/testdownloading.nim +++ b/tests/codex/sales/states/testdownloading.nim @@ -1,8 +1,9 @@ import std/unittest import pkg/questionable import pkg/codex/contracts/requests -import pkg/codex/sales/states/downloading import pkg/codex/sales/states/cancelled +import pkg/codex/sales/states/downloading +import pkg/codex/sales/states/errored import pkg/codex/sales/states/failed import pkg/codex/sales/states/filled import ../../examples diff --git a/tests/codex/sales/states/testerrored.nim b/tests/codex/sales/states/testerrored.nim new file mode 100644 index 00000000..dc525894 --- /dev/null +++ b/tests/codex/sales/states/testerrored.nim @@ -0,0 +1,49 @@ +import pkg/questionable +import pkg/chronos +import pkg/codex/contracts/requests +import pkg/codex/sales/states/errored +import pkg/codex/sales/salesagent +import pkg/codex/sales/salescontext +import pkg/codex/market + +import ../../../asynctest +import ../../examples +import ../../helpers +import ../../helpers/mockmarket +import ../../helpers/mockclock + +asyncchecksuite "sales state 'errored'": + let request = StorageRequest.example + let slotIndex = (request.ask.slots div 2).u256 + let market = MockMarket.new() + let clock = MockClock.new() + + var state: SaleErrored + var agent: SalesAgent + var returnBytesWas = false + var reprocessSlotWas = false + + setup: + let onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} = + returnBytesWas = returnBytes + reprocessSlotWas = reprocessSlot + + let context = SalesContext( + market: market, + clock: clock + ) + agent = newSalesAgent(context, + request.id, + slotIndex, + request.some) + agent.onCleanUp = onCleanUp + state = SaleErrored(error: newException(ValueError, "oh no!")) + + test "calls onCleanUp with returnBytes = false and reprocessSlot = true": + state = SaleErrored( + error: newException(ValueError, "oh no!"), + reprocessSlot: true + ) + discard await state.run(agent) + check eventually returnBytesWas == true + check eventually reprocessSlotWas == true diff --git a/tests/codex/sales/states/testignored.nim b/tests/codex/sales/states/testignored.nim new file mode 100644 index 00000000..680dca8d --- /dev/null +++ b/tests/codex/sales/states/testignored.nim @@ -0,0 +1,45 @@ +import pkg/questionable +import pkg/chronos +import pkg/codex/contracts/requests +import pkg/codex/sales/states/ignored +import pkg/codex/sales/salesagent +import pkg/codex/sales/salescontext +import pkg/codex/market + +import ../../../asynctest +import ../../examples +import ../../helpers +import ../../helpers/mockmarket +import ../../helpers/mockclock + +asyncchecksuite "sales state 'ignored'": + let request = StorageRequest.example + let slotIndex = (request.ask.slots div 2).u256 + let market = MockMarket.new() + let clock = MockClock.new() + + var state: SaleIgnored + var agent: SalesAgent + var returnBytesWas = false + var reprocessSlotWas = false + + setup: + let onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} = + returnBytesWas = returnBytes + reprocessSlotWas = reprocessSlot + + let context = SalesContext( + market: market, + clock: clock + ) + agent = newSalesAgent(context, + request.id, + slotIndex, + request.some) + agent.onCleanUp = onCleanUp + state = SaleIgnored.new() + + test "calls onCleanUp with returnBytes = false and reprocessSlot = true": + discard await state.run(agent) + check eventually returnBytesWas == false + check eventually reprocessSlotWas == true diff --git a/tests/codex/sales/testreservations.nim b/tests/codex/sales/testreservations.nim index 4b82fb89..ae15ad2f 100644 --- a/tests/codex/sales/testreservations.nim +++ b/tests/codex/sales/testreservations.nim @@ -258,7 +258,7 @@ asyncchecksuite "Reservations module": check updated.isErr check updated.error of NotExistsError - test "onAvailabilityAdded called when availability is reserved": + test "onAvailabilityAdded called when availability is created": var added: Availability reservations.onAvailabilityAdded = proc(a: Availability) {.async.} = added = a @@ -267,6 +267,26 @@ asyncchecksuite "Reservations module": check added == availability + test "onAvailabilityAdded called when availability size is increased": + var availability = createAvailability() + var added: Availability + reservations.onAvailabilityAdded = proc(a: Availability) {.async.} = + added = a + availability.freeSize += 1.u256 + discard await reservations.update(availability) + + check added == availability + + test "onAvailabilityAdded is not called when availability size is decreased": + var availability = createAvailability() + var called = false + reservations.onAvailabilityAdded = proc(a: Availability) {.async.} = + called = true + availability.freeSize -= 1.u256 + discard await reservations.update(availability) + + check not called + test "availabilities can be found": let availability = createAvailability() diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index 222ba0ff..4aa83e25 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -272,24 +272,41 @@ asyncchecksuite "Sales": let expected = SlotQueueItem.init(request, 2.uint16) check eventually itemsProcessed.contains(expected) - test "adds past requests to queue once availability added": - var itemsProcessed: seq[SlotQueueItem] = @[] - - # ignore all - queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = - done.complete() - + test "items in queue are readded (and marked seen) once ignored": await market.requestStorage(request) - await sleepAsync(10.millis) + let items = SlotQueueItem.init(request) + await sleepAsync(10.millis) # queue starts paused, allow items to be added to the queue + check eventually queue.paused + # The first processed item will be will have been re-pushed with `seen = + # true`. Then, once this item is processed by the queue, its 'seen' flag + # will be checked, at which point the queue will be paused. This test could + # check item existence in the queue, but that would require inspecting + # onProcessSlot to see which item was first, and overridding onProcessSlot + # will prevent the queue working as expected in the Sales module. + check eventually queue.len == 4 - # check how many slots were processed by the queue - queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = - itemsProcessed.add item - done.complete() + for item in items: + check queue.contains(item) - # now add matching availability - createAvailability() - check eventually itemsProcessed.len == request.ask.slots.int + for i in 0.. itemB + test "correct prioritizes SlotQueueItems based on 'seen'": + let request = StorageRequest.example + let itemA = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 2.u256, # profitability is higher (good) + collateral: 1.u256, + expiry: 1.u256, + seen: true # seen (bad), more weight than profitability + ) + let itemB = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 1.u256, # profitability is lower (bad) + collateral: 1.u256, + expiry: 1.u256, + seen: false # not seen (good) + ) + check itemB.toSlotQueueItem < itemA.toSlotQueueItem # B higher priority than A + check itemA.toSlotQueueItem > itemB.toSlotQueueItem + + test "correct prioritizes SlotQueueItems based on profitability": + let request = StorageRequest.example + let itemA = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 1.u256, # reward is lower (bad) + collateral: 1.u256, # collateral is lower (good) + expiry: 1.u256, + seen: false + ) + let itemB = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 2.u256, # reward is higher (good), more weight than collateral + collateral: 2.u256, # collateral is higher (bad) + expiry: 1.u256, + seen: false + ) + + check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority + + test "correct prioritizes SlotQueueItems based on collateral": + let request = StorageRequest.example + let itemA = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 1.u256, + collateral: 2.u256, # collateral is higher (bad) + expiry: 2.u256, # expiry is longer (good) + seen: false + ) + let itemB = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 1.u256, + collateral: 1.u256, # collateral is lower (good), more weight than expiry + expiry: 1.u256, # expiry is shorter (bad) + seen: false + ) + + check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority + + test "correct prioritizes SlotQueueItems based on expiry": + let request = StorageRequest.example + let itemA = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, # slotSize is smaller (good) + duration: 1.u256, + reward: 1.u256, + collateral: 1.u256, + expiry: 1.u256, # expiry is shorter (bad) + seen: false + ) + let itemB = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 2.u256, # slotSize is larger (bad) + duration: 1.u256, + reward: 1.u256, + collateral: 1.u256, + expiry: 2.u256, # expiry is longer (good), more weight than slotSize + seen: false + ) + + check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority + + test "correct prioritizes SlotQueueItems based on slotSize": + let request = StorageRequest.example + let itemA = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 2.u256, # slotSize is larger (bad) + duration: 1.u256, + reward: 1.u256, + collateral: 1.u256, + expiry: 1.u256, # expiry is shorter (bad) + seen: false + ) + let itemB = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, # slotSize is smaller (good) + duration: 1.u256, + reward: 1.u256, + collateral: 1.u256, + expiry: 1.u256, + seen: false + ) + + check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority + test "expands available all possible slot indices on init": let request = StorageRequest.example let items = SlotQueueItem.init(request) @@ -391,3 +516,71 @@ suite "Slot queue": (item3.requestId, item3.slotIndex), ] ) + + test "processing a 'seen' item pauses the queue": + newSlotQueue(maxSize = 4, maxWorkers = 4) + let request = StorageRequest.example + let item = SlotQueueItem.init(request.id, 0'u16, + request.ask, + request.expiry, + seen = true) + queue.push(item) + check eventually queue.paused + check onProcessSlotCalledWith.len == 0 + + test "pushing items to queue unpauses queue": + newSlotQueue(maxSize = 4, maxWorkers = 4) + queue.pause + + let request = StorageRequest.example + var items = SlotQueueItem.init(request) + queue.push(items) + # check all items processed + check eventually queue.len == 0 + + test "pushing seen item does not unpause queue": + newSlotQueue(maxSize = 4, maxWorkers = 4) + let request = StorageRequest.example + let item0 = SlotQueueItem.init(request.id, 0'u16, + request.ask, + request.expiry, + seen = true) + check queue.paused + queue.push(item0) + check queue.paused + + test "paused queue waits for unpause before continuing processing": + newSlotQueue(maxSize = 4, maxWorkers = 4) + let request = StorageRequest.example + let item = SlotQueueItem.init(request.id, 1'u16, + request.ask, + request.expiry, + seen = false) + check queue.paused + # push causes unpause + queue.push(item) + # check all items processed + check eventually onProcessSlotCalledWith == @[ + (item.requestId, item.slotIndex), + ] + check eventually queue.len == 0 + + test "item 'seen' flags can be cleared": + newSlotQueue(maxSize = 4, maxWorkers = 1) + let request = StorageRequest.example + let item0 = SlotQueueItem.init(request.id, 0'u16, + request.ask, + request.expiry, + seen = true) + let item1 = SlotQueueItem.init(request.id, 1'u16, + request.ask, + request.expiry, + seen = true) + queue.push(item0) + queue.push(item1) + check queue[0].seen + check queue[1].seen + + queue.clearSeenFlags() + check queue[0].seen == false + check queue[1].seen == false