fix(slotqueue): asyncSpawns futures correctly

- asyncSpawns `run` and worker `dispatch` in slotqueue.
- removes usage of `then` from slotqueue.
This commit is contained in:
Eric 2024-12-12 17:18:17 +11:00
parent dc244a36bd
commit f241d84e18
No known key found for this signature in database
2 changed files with 32 additions and 35 deletions

View File

@ -10,7 +10,6 @@ import ../rng
import ../utils
import ../contracts/requests
import ../utils/asyncheapqueue
import ../utils/then
import ../utils/trackedfutures
logScope:
@ -333,7 +332,7 @@ proc addWorker(self: SlotQueue): ?!void =
proc dispatch(self: SlotQueue,
worker: SlotQueueWorker,
item: SlotQueueItem) {.async.} =
item: SlotQueueItem) {.async: (raises: []).} =
logScope:
requestId = item.requestId
slotIndex = item.slotIndex
@ -380,22 +379,7 @@ proc clearSeenFlags*(self: SlotQueue) =
trace "all 'seen' flags cleared"
proc start*(self: SlotQueue) {.async.} =
if self.running:
return
trace "starting slot queue"
self.running = true
# must be called in `start` to avoid sideeffects in `new`
self.workers = newAsyncQueue[SlotQueueWorker](self.maxWorkers)
# Add initial workers to the `AsyncHeapQueue`. Once a worker has completed its
# task, a new worker will be pushed to the queue
for i in 0..<self.maxWorkers:
if err =? self.addWorker().errorOption:
error "start: error adding new worker", error = err.msg
proc run(self: SlotQueue) {.async: (raises: []).} =
while self.running:
try:
@ -405,8 +389,8 @@ proc start*(self: SlotQueue) {.async.} =
# block until unpaused is true/fired, ie wait for queue to be unpaused
await self.unpaused.wait()
let worker = await self.workers.popFirst().track(self) # if workers saturated, wait here for new workers
let item = await self.queue.pop().track(self) # if queue empty, wait here for new items
let worker = await self.workers.popFirst() # if workers saturated, wait here for new workers
let item = await self.queue.pop() # if queue empty, wait here for new items
logScope:
reqId = item.requestId
@ -434,19 +418,34 @@ proc start*(self: SlotQueue) {.async.} =
trace "processing item"
self.dispatch(worker, item)
.track(self)
.catch(proc (e: ref CatchableError) =
error "Unknown error dispatching worker", error = e.msg
)
asyncSpawn self.dispatch(worker, item).track(self)
await sleepAsync(1.millis) # poll
except CancelledError:
trace "slot queue cancelled"
return
break
except CatchableError as e: # raised from self.queue.pop() or self.workers.pop()
warn "slot queue error encountered during processing", error = e.msg
proc start*(self: SlotQueue) =
if self.running:
return
trace "starting slot queue"
self.running = true
# must be called in `start` to avoid sideeffects in `new`
self.workers = newAsyncQueue[SlotQueueWorker](self.maxWorkers)
# Add initial workers to the `AsyncHeapQueue`. Once a worker has completed its
# task, a new worker will be pushed to the queue
for i in 0..<self.maxWorkers:
if err =? self.addWorker().errorOption:
error "start: error adding new worker", error = err.msg
asyncSpawn self.run().track(self)
proc stop*(self: SlotQueue) {.async.} =
if not self.running:
return

View File

@ -27,8 +27,8 @@ suite "Slot queue start/stop":
check not queue.running
test "can call start multiple times, and when already running":
asyncSpawn queue.start()
asyncSpawn queue.start()
queue.start()
queue.start()
check queue.running
test "can call stop when alrady stopped":
@ -36,12 +36,12 @@ suite "Slot queue start/stop":
check not queue.running
test "can call stop when running":
asyncSpawn queue.start()
queue.start()
await queue.stop()
check not queue.running
test "can call stop multiple times":
asyncSpawn queue.start()
queue.start()
await queue.stop()
await queue.stop()
check not queue.running
@ -62,8 +62,6 @@ suite "Slot queue workers":
queue = SlotQueue.new(maxSize = 5, maxWorkers = 3)
queue.onProcessSlot = onProcessSlot
proc startQueue = asyncSpawn queue.start()
teardown:
await queue.stop()
@ -79,7 +77,7 @@ suite "Slot queue workers":
discard SlotQueue.new(maxSize = 1, maxWorkers = 2)
test "does not surpass max workers":
startQueue()
queue.start()
let item1 = SlotQueueItem.example
let item2 = SlotQueueItem.example
let item3 = SlotQueueItem.example
@ -97,7 +95,7 @@ suite "Slot queue workers":
queue.onProcessSlot = processSlot
startQueue()
queue.start()
let item1 = SlotQueueItem.example
let item2 = SlotQueueItem.example
let item3 = SlotQueueItem.example
@ -122,7 +120,7 @@ suite "Slot queue":
onProcessSlotCalled = true
onProcessSlotCalledWith.add (item.requestId, item.slotIndex)
done.complete()
asyncSpawn queue.start()
queue.start()
setup:
onProcessSlotCalled = false