From 1f49f86131fbc9721fa1d136705a1aec4601572e Mon Sep 17 00:00:00 2001 From: Eric <5089238+emizzle@users.noreply.github.com> Date: Fri, 13 Dec 2024 13:42:05 +0700 Subject: [PATCH] fix(slotqueue): asyncSpawns futures correctly (#1034) - asyncSpawns `run` and worker `dispatch` in slotqueue. - removes usage of `then` from slotqueue. --- codex/sales.nim | 2 +- codex/sales/slotqueue.nim | 51 ++++++++++++++--------------- tests/codex/sales/testslotqueue.nim | 16 ++++----- 3 files changed, 33 insertions(+), 36 deletions(-) diff --git a/codex/sales.nim b/codex/sales.nim index 5882ec1f..f2cc366c 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -491,7 +491,7 @@ proc startSlotQueue(sales: Sales) {.async.} = trace "processing slot queue item", reqId = item.requestId, slotIdx = item.slotIndex sales.processSlot(item, done) - asyncSpawn slotQueue.start() + slotQueue.start() proc onAvailabilityAdded(availability: Availability) {.async.} = await sales.onAvailabilityAdded(availability) diff --git a/codex/sales/slotqueue.nim b/codex/sales/slotqueue.nim index 198ef80f..f565d276 100644 --- a/codex/sales/slotqueue.nim +++ b/codex/sales/slotqueue.nim @@ -10,7 +10,6 @@ import ../rng import ../utils import ../contracts/requests import ../utils/asyncheapqueue -import ../utils/then import ../utils/trackedfutures logScope: @@ -333,7 +332,7 @@ proc addWorker(self: SlotQueue): ?!void = proc dispatch(self: SlotQueue, worker: SlotQueueWorker, - item: SlotQueueItem) {.async.} = + item: SlotQueueItem) {.async: (raises: []).} = logScope: requestId = item.requestId slotIndex = item.slotIndex @@ -380,22 +379,7 @@ proc clearSeenFlags*(self: SlotQueue) = trace "all 'seen' flags cleared" -proc start*(self: SlotQueue) {.async.} = - if self.running: - return - - trace "starting slot queue" - - self.running = true - - # must be called in `start` to avoid sideeffects in `new` - self.workers = newAsyncQueue[SlotQueueWorker](self.maxWorkers) - - # Add initial workers to the `AsyncHeapQueue`. Once a worker has completed its - # task, a new worker will be pushed to the queue - for i in 0..