diff --git a/codex/sales.nim b/codex/sales.nim index 37e2c06a..01cc0fd7 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -111,11 +111,7 @@ proc remove(sales: Sales, agent: SalesAgent) {.async.} = sales.agents.keepItIf(it != agent) proc cleanUp( - sales: Sales, - agent: SalesAgent, - reprocessSlot: bool, - returnedCollateral: ?UInt256, - processing: Future[void], + sales: Sales, agent: SalesAgent, reprocessSlot: bool, returnedCollateral: ?UInt256 ) {.async.} = let data = agent.data @@ -181,37 +177,39 @@ proc cleanUp( await sales.remove(agent) - # signal back to the slot queue to cycle a worker - if not processing.isNil and not processing.finished(): - processing.complete() - -proc filled( - sales: Sales, request: StorageRequest, slotIndex: uint64, processing: Future[void] -) = +proc filled(sales: Sales, request: StorageRequest, slotIndex: uint64) = if onSale =? sales.context.onSale: onSale(request, slotIndex) - # signal back to the slot queue to cycle a worker - if not processing.isNil and not processing.finished(): - processing.complete() - -proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) = +proc processSlot( + sales: Sales, item: SlotQueueItem +) {.async: (raises: [CancelledError]).} = debug "Processing slot from queue", requestId = item.requestId, slot = item.slotIndex let agent = newSalesAgent(sales.context, item.requestId, item.slotIndex, none StorageRequest) + let completed = newAsyncEvent() + agent.onCleanUp = proc( reprocessSlot = false, returnedCollateral = UInt256.none ) {.async.} = - await sales.cleanUp(agent, reprocessSlot, returnedCollateral, done) + trace "slot cleanup" + await sales.cleanUp(agent, reprocessSlot, returnedCollateral) + completed.fire() agent.onFilled = some proc(request: StorageRequest, slotIndex: uint64) = - sales.filled(request, slotIndex, done) + trace "slot filled" + sales.filled(request, slotIndex) + completed.fire() agent.start(SalePreparing()) sales.agents.add agent + trace "waiting for slot processing to complete" + await completed.wait() + trace "slot processing completed" + proc deleteInactiveReservations(sales: Sales, activeSlots: seq[Slot]) {.async.} = let reservations = sales.context.reservations without reservs =? await reservations.all(Reservation): @@ -272,10 +270,7 @@ proc load*(sales: Sales) {.async.} = agent.onCleanUp = proc( reprocessSlot = false, returnedCollateral = UInt256.none ) {.async.} = - # since workers are not being dispatched, this future has not been created - # by a worker. Create a dummy one here so we can call sales.cleanUp - let done: Future[void] = nil - await sales.cleanUp(agent, reprocessSlot, returnedCollateral, done) + await sales.cleanUp(agent, reprocessSlot, returnedCollateral) # There is no need to assign agent.onFilled as slots loaded from `mySlots` # are inherently already filled and so assigning agent.onFilled would be @@ -526,11 +521,12 @@ proc startSlotQueue(sales: Sales) = let slotQueue = sales.context.slotQueue let reservations = sales.context.reservations - slotQueue.onProcessSlot = proc( - item: SlotQueueItem, done: Future[void] - ) {.async: (raises: []).} = + slotQueue.onProcessSlot = proc(item: SlotQueueItem) {.async: (raises: []).} = trace "processing slot queue item", reqId = item.requestId, slotIdx = item.slotIndex - sales.processSlot(item, done) + try: + await sales.processSlot(item) + except CancelledError: + discard slotQueue.start() diff --git a/codex/sales/slotqueue.nim b/codex/sales/slotqueue.nim index 60700d44..890d912f 100644 --- a/codex/sales/slotqueue.nim +++ b/codex/sales/slotqueue.nim @@ -4,7 +4,6 @@ import pkg/chronos import pkg/questionable import pkg/questionable/results import ../errors -import ../clock import ../logutils import ../rng import ../utils @@ -16,18 +15,14 @@ logScope: topics = "marketplace slotqueue" type - OnProcessSlot* = proc(item: SlotQueueItem, done: Future[void]): Future[void] {. - gcsafe, async: (raises: []) - .} + OnProcessSlot* = + proc(item: SlotQueueItem): Future[void] {.gcsafe, async: (raises: []).} # Non-ref obj copies value when assigned, preventing accidental modification # of values which could cause an incorrect order (eg # ``slotQueue[1].collateral = 1`` would cause ``collateral`` to be updated, # but the heap invariant would no longer be honoured. When non-ref, the # compiler can ensure that statement will fail). - SlotQueueWorker = object - doneProcessing*: Future[void].Raising([]) - SlotQueueItem* = object requestId: RequestId slotIndex: uint16 @@ -47,7 +42,6 @@ type onProcessSlot: ?OnProcessSlot queue: AsyncHeapQueue[SlotQueueItem] running: bool - workers: AsyncQueue[SlotQueueWorker] trackedFutures: TrackedFutures unpaused: AsyncEvent @@ -125,19 +119,6 @@ proc new*( # avoid instantiating `workers` in constructor to avoid side effects in # `newAsyncQueue` procedure -proc init(_: type SlotQueueWorker): SlotQueueWorker = - let workerFut = Future[void].Raising([]).init( - "slotqueue.worker.processing", {FutureFlag.OwnCancelSchedule} - ) - - workerFut.cancelCallback = proc(data: pointer) {.raises: [].} = - # this is equivalent to try: ... except CatchableError: ... - if not workerFut.finished: - workerFut.complete() - trace "Cancelling `SlotQueue` worker processing future" - - SlotQueueWorker(doneProcessing: workerFut) - proc init*( _: type SlotQueueItem, requestId: RequestId, @@ -233,13 +214,6 @@ proc `$`*(self: SlotQueue): string = proc `onProcessSlot=`*(self: SlotQueue, onProcessSlot: OnProcessSlot) = self.onProcessSlot = some onProcessSlot -proc activeWorkers*(self: SlotQueue): int = - if not self.running: - return 0 - - # active = capacity - available - self.maxWorkers - self.workers.len - proc contains*(self: SlotQueue, item: SlotQueueItem): bool = self.queue.contains(item) @@ -323,52 +297,6 @@ proc delete*(self: SlotQueue, requestId: RequestId) = proc `[]`*(self: SlotQueue, i: Natural): SlotQueueItem = self.queue[i] -proc addWorker(self: SlotQueue): ?!void = - if not self.running: - let err = newException(QueueNotRunningError, "queue must be running") - return failure(err) - - trace "adding new worker to worker queue" - - let worker = SlotQueueWorker.init() - try: - self.trackedFutures.track(worker.doneProcessing) - self.workers.addLastNoWait(worker) - except AsyncQueueFullError: - return failure("failed to add worker, worker queue full") - - return success() - -proc dispatch( - self: SlotQueue, worker: SlotQueueWorker, item: SlotQueueItem -) {.async: (raises: []).} = - logScope: - requestId = item.requestId - slotIndex = item.slotIndex - - if not self.running: - warn "Could not dispatch worker because queue is not running" - return - - if onProcessSlot =? self.onProcessSlot: - try: - self.trackedFutures.track(worker.doneProcessing) - await onProcessSlot(item, worker.doneProcessing) - await worker.doneProcessing - - if err =? self.addWorker().errorOption: - raise err # catch below - except QueueNotRunningError as e: - info "could not re-add worker to worker queue, queue not running", error = e.msg - except CancelledError: - # do not bubble exception up as it is called with `asyncSpawn` which would - # convert the exception into a `FutureDefect` - discard - except CatchableError as e: - # we don't have any insight into types of errors that `onProcessSlot` can - # throw because it is caller-defined - warn "Unknown error processing slot in worker", error = e.msg - proc clearSeenFlags*(self: SlotQueue) = # Enumerate all items in the queue, overwriting each item with `seen = false`. # To avoid issues with new queue items being pushed to the queue while all @@ -386,7 +314,8 @@ proc clearSeenFlags*(self: SlotQueue) = trace "all 'seen' flags cleared" -proc run(self: SlotQueue) {.async: (raises: []).} = +proc runWorker(self: SlotQueue) {.async: (raises: []).} = + trace "slot queue worker loop started" while self.running: try: if self.paused: @@ -395,8 +324,6 @@ proc run(self: SlotQueue) {.async: (raises: []).} = # block until unpaused is true/fired, ie wait for queue to be unpaused await self.unpaused.wait() - let worker = - await self.workers.popFirst() # if workers saturated, wait here for new workers let item = await self.queue.pop() # if queue empty, wait here for new items logScope: @@ -419,23 +346,19 @@ proc run(self: SlotQueue) {.async: (raises: []).} = # immediately (with priority over other items) once unpaused trace "readding seen item back into the queue" discard self.push(item) # on error, drop the item and continue - worker.doneProcessing.complete() - if err =? self.addWorker().errorOption: - error "error adding new worker", error = err.msg - await sleepAsync(1.millis) # poll continue trace "processing item" + without onProcessSlot =? self.onProcessSlot: + raiseAssert "slot queue onProcessSlot not set" - let fut = self.dispatch(worker, item) - self.trackedFutures.track(fut) - - await sleepAsync(1.millis) # poll + await onProcessSlot(item) except CancelledError: - trace "slot queue cancelled" + trace "slot queue worker cancelled" break - except CatchableError as e: # raised from self.queue.pop() or self.workers.pop() - warn "slot queue error encountered during processing", error = e.msg + except CatchableError as e: # raised from self.queue.pop() + warn "slot queue worker error encountered during processing", error = e.msg + trace "slot queue worker loop stopped" proc start*(self: SlotQueue) = if self.running: @@ -445,17 +368,11 @@ proc start*(self: SlotQueue) = self.running = true - # must be called in `start` to avoid sideeffects in `new` - self.workers = newAsyncQueue[SlotQueueWorker](self.maxWorkers) - # Add initial workers to the `AsyncHeapQueue`. Once a worker has completed its # task, a new worker will be pushed to the queue for i in 0 ..< self.maxWorkers: - if err =? self.addWorker().errorOption: - error "start: error adding new worker", error = err.msg - - let fut = self.run() - self.trackedFutures.track(fut) + let worker = self.runWorker() + self.trackedFutures.track(worker) proc stop*(self: SlotQueue) {.async.} = if not self.running: diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index f4d9cbae..347e5fa6 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -240,17 +240,12 @@ asyncchecksuite "Sales": return true proc addRequestToSaturatedQueue(): Future[StorageRequest] {.async.} = - queue.onProcessSlot = proc( - item: SlotQueueItem, done: Future[void] - ) {.async: (raises: []).} = + queue.onProcessSlot = proc(item: SlotQueueItem) {.async: (raises: []).} = try: await sleepAsync(10.millis) itemsProcessed.add item except CancelledError as exc: checkpoint(exc.msg) - finally: - if not done.finished: - done.complete() var request1 = StorageRequest.example request1.ask.collateralPerByte = request.ask.collateralPerByte + 1 @@ -272,12 +267,8 @@ asyncchecksuite "Sales": waitFor run() test "processes all request's slots once StorageRequested emitted": - queue.onProcessSlot = proc( - item: SlotQueueItem, done: Future[void] - ) {.async: (raises: []).} = + queue.onProcessSlot = proc(item: SlotQueueItem) {.async: (raises: []).} = itemsProcessed.add item - if not done.finished: - done.complete() createAvailability() await market.requestStorage(request) let items = SlotQueueItem.init(request, collateral = request.ask.collateralPerSlot) @@ -313,12 +304,8 @@ asyncchecksuite "Sales": check always (not itemsProcessed.contains(expected)) test "adds slot index to slot queue once SlotFreed emitted": - queue.onProcessSlot = proc( - item: SlotQueueItem, done: Future[void] - ) {.async: (raises: []).} = + queue.onProcessSlot = proc(item: SlotQueueItem) {.async: (raises: []).} = itemsProcessed.add item - if not done.finished: - done.complete() createAvailability() market.requested.add request # "contract" must be able to return request diff --git a/tests/codex/sales/testslotqueue.nim b/tests/codex/sales/testslotqueue.nim index 7abad7eb..7dd46916 100644 --- a/tests/codex/sales/testslotqueue.nim +++ b/tests/codex/sales/testslotqueue.nim @@ -50,19 +50,11 @@ suite "Slot queue start/stop": suite "Slot queue workers": var queue: SlotQueue - proc onProcessSlot( - item: SlotQueueItem, doneProcessing: Future[void] - ) {.async: (raises: []).} = - # this is not illustrative of the realistic scenario as the - # `doneProcessing` future would be passed to another context before being - # completed and therefore is not as simple as making the callback async + proc onProcessSlot(item: SlotQueueItem) {.async: (raises: []).} = try: await sleepAsync(1000.millis) except CatchableError as exc: checkpoint(exc.msg) - finally: - if not doneProcessing.finished: - doneProcessing.complete() setup: let request = StorageRequest.example @@ -72,9 +64,6 @@ suite "Slot queue workers": teardown: await queue.stop() - test "activeWorkers should be 0 when not running": - check queue.activeWorkers == 0 - test "maxWorkers cannot be 0": expect ValueError: discard SlotQueue.new(maxSize = 1, maxWorkers = 0) @@ -83,41 +72,6 @@ suite "Slot queue workers": expect ValueError: discard SlotQueue.new(maxSize = 1, maxWorkers = 2) - test "does not surpass max workers": - queue.start() - let item1 = SlotQueueItem.example - let item2 = SlotQueueItem.example - let item3 = SlotQueueItem.example - let item4 = SlotQueueItem.example - check queue.push(item1).isOk - check queue.push(item2).isOk - check queue.push(item3).isOk - check queue.push(item4).isOk - check eventually queue.activeWorkers == 3 - - test "discards workers once processing completed": - proc processSlot(item: SlotQueueItem, done: Future[void]) {.async: (raises: []).} = - try: - await sleepAsync(1.millis) - except CatchableError as exc: - checkpoint(exc.msg) - finally: - if not done.finished: - done.complete() - - queue.onProcessSlot = processSlot - - queue.start() - let item1 = SlotQueueItem.example - let item2 = SlotQueueItem.example - let item3 = SlotQueueItem.example - let item4 = SlotQueueItem.example - check queue.push(item1).isOk # finishes after 1.millis - check queue.push(item2).isOk # finishes after 1.millis - check queue.push(item3).isOk # finishes after 1.millis - check queue.push(item4).isOk - check eventually queue.activeWorkers == 1 - suite "Slot queue": var onProcessSlotCalled = false var onProcessSlotCalledWith: seq[(RequestId, uint16)] @@ -126,9 +80,7 @@ suite "Slot queue": proc newSlotQueue(maxSize, maxWorkers: int, processSlotDelay = 1.millis) = queue = SlotQueue.new(maxWorkers, maxSize.uint16) - queue.onProcessSlot = proc( - item: SlotQueueItem, done: Future[void] - ) {.async: (raises: []).} = + queue.onProcessSlot = proc(item: SlotQueueItem) {.async: (raises: []).} = try: await sleepAsync(processSlotDelay) except CatchableError as exc: @@ -136,8 +88,6 @@ suite "Slot queue": finally: onProcessSlotCalled = true onProcessSlotCalledWith.add (item.requestId, item.slotIndex) - if not done.finished: - done.complete() queue.start() @@ -155,11 +105,6 @@ suite "Slot queue": check queue.len == 0 check $queue == "[]" - test "starts with 0 active workers": - newSlotQueue(maxSize = 2, maxWorkers = 2) - check eventually queue.running - check queue.activeWorkers == 0 - test "reports correct size": newSlotQueue(maxSize = 2, maxWorkers = 2) check queue.size == 2 @@ -657,38 +602,6 @@ suite "Slot queue": # queue should be paused check eventually queue.paused - test "processing a 'seen' item does not decrease the number of workers": - newSlotQueue(maxSize = 4, maxWorkers = 4) - let request = StorageRequest.example - let unseen = SlotQueueItem.init( - request.id, - 0'u16, - request.ask, - request.expiry, - request.ask.collateralPerSlot, - seen = false, - ) - let seen = SlotQueueItem.init( - request.id, - 1'u16, - request.ask, - request.expiry, - request.ask.collateralPerSlot, - seen = true, - ) - # push seen item to ensure that queue is pausing - check queue.push(seen).isSuccess - # unpause and pause a number of times - for _ in 0 ..< 10: - # push unseen item to unpause the queue - check queue.push(unseen).isSuccess - # wait for unseen item to be processed - check eventually queue.len == 1 - # wait for queue to pause because of seen item - check eventually queue.paused - # check that the number of workers equals maximimum workers - check eventually queue.activeWorkers == 0 - test "item 'seen' flags can be cleared": newSlotQueue(maxSize = 4, maxWorkers = 1) let request = StorageRequest.example