fix(slotqueue): asyncSpawns futures correctly (#1034)
- asyncSpawns `run` and worker `dispatch` in slotqueue. - removes usage of `then` from slotqueue.
This commit is contained in:
parent
7c804b0ec9
commit
1f49f86131
|
@ -491,7 +491,7 @@ proc startSlotQueue(sales: Sales) {.async.} =
|
|||
trace "processing slot queue item", reqId = item.requestId, slotIdx = item.slotIndex
|
||||
sales.processSlot(item, done)
|
||||
|
||||
asyncSpawn slotQueue.start()
|
||||
slotQueue.start()
|
||||
|
||||
proc onAvailabilityAdded(availability: Availability) {.async.} =
|
||||
await sales.onAvailabilityAdded(availability)
|
||||
|
|
|
@ -10,7 +10,6 @@ import ../rng
|
|||
import ../utils
|
||||
import ../contracts/requests
|
||||
import ../utils/asyncheapqueue
|
||||
import ../utils/then
|
||||
import ../utils/trackedfutures
|
||||
|
||||
logScope:
|
||||
|
@ -333,7 +332,7 @@ proc addWorker(self: SlotQueue): ?!void =
|
|||
|
||||
proc dispatch(self: SlotQueue,
|
||||
worker: SlotQueueWorker,
|
||||
item: SlotQueueItem) {.async.} =
|
||||
item: SlotQueueItem) {.async: (raises: []).} =
|
||||
logScope:
|
||||
requestId = item.requestId
|
||||
slotIndex = item.slotIndex
|
||||
|
@ -380,22 +379,7 @@ proc clearSeenFlags*(self: SlotQueue) =
|
|||
|
||||
trace "all 'seen' flags cleared"
|
||||
|
||||
proc start*(self: SlotQueue) {.async.} =
|
||||
if self.running:
|
||||
return
|
||||
|
||||
trace "starting slot queue"
|
||||
|
||||
self.running = true
|
||||
|
||||
# must be called in `start` to avoid sideeffects in `new`
|
||||
self.workers = newAsyncQueue[SlotQueueWorker](self.maxWorkers)
|
||||
|
||||
# Add initial workers to the `AsyncHeapQueue`. Once a worker has completed its
|
||||
# task, a new worker will be pushed to the queue
|
||||
for i in 0..<self.maxWorkers:
|
||||
if err =? self.addWorker().errorOption:
|
||||
error "start: error adding new worker", error = err.msg
|
||||
proc run(self: SlotQueue) {.async: (raises: []).} =
|
||||
|
||||
while self.running:
|
||||
try:
|
||||
|
@ -405,8 +389,8 @@ proc start*(self: SlotQueue) {.async.} =
|
|||
# block until unpaused is true/fired, ie wait for queue to be unpaused
|
||||
await self.unpaused.wait()
|
||||
|
||||
let worker = await self.workers.popFirst().track(self) # if workers saturated, wait here for new workers
|
||||
let item = await self.queue.pop().track(self) # if queue empty, wait here for new items
|
||||
let worker = await self.workers.popFirst() # if workers saturated, wait here for new workers
|
||||
let item = await self.queue.pop() # if queue empty, wait here for new items
|
||||
|
||||
logScope:
|
||||
reqId = item.requestId
|
||||
|
@ -434,19 +418,34 @@ proc start*(self: SlotQueue) {.async.} =
|
|||
|
||||
trace "processing item"
|
||||
|
||||
self.dispatch(worker, item)
|
||||
.track(self)
|
||||
.catch(proc (e: ref CatchableError) =
|
||||
error "Unknown error dispatching worker", error = e.msg
|
||||
)
|
||||
asyncSpawn self.dispatch(worker, item).track(self)
|
||||
|
||||
await sleepAsync(1.millis) # poll
|
||||
except CancelledError:
|
||||
trace "slot queue cancelled"
|
||||
return
|
||||
break
|
||||
except CatchableError as e: # raised from self.queue.pop() or self.workers.pop()
|
||||
warn "slot queue error encountered during processing", error = e.msg
|
||||
|
||||
proc start*(self: SlotQueue) =
|
||||
if self.running:
|
||||
return
|
||||
|
||||
trace "starting slot queue"
|
||||
|
||||
self.running = true
|
||||
|
||||
# must be called in `start` to avoid sideeffects in `new`
|
||||
self.workers = newAsyncQueue[SlotQueueWorker](self.maxWorkers)
|
||||
|
||||
# Add initial workers to the `AsyncHeapQueue`. Once a worker has completed its
|
||||
# task, a new worker will be pushed to the queue
|
||||
for i in 0..<self.maxWorkers:
|
||||
if err =? self.addWorker().errorOption:
|
||||
error "start: error adding new worker", error = err.msg
|
||||
|
||||
asyncSpawn self.run().track(self)
|
||||
|
||||
proc stop*(self: SlotQueue) {.async.} =
|
||||
if not self.running:
|
||||
return
|
||||
|
|
|
@ -27,8 +27,8 @@ suite "Slot queue start/stop":
|
|||
check not queue.running
|
||||
|
||||
test "can call start multiple times, and when already running":
|
||||
asyncSpawn queue.start()
|
||||
asyncSpawn queue.start()
|
||||
queue.start()
|
||||
queue.start()
|
||||
check queue.running
|
||||
|
||||
test "can call stop when alrady stopped":
|
||||
|
@ -36,12 +36,12 @@ suite "Slot queue start/stop":
|
|||
check not queue.running
|
||||
|
||||
test "can call stop when running":
|
||||
asyncSpawn queue.start()
|
||||
queue.start()
|
||||
await queue.stop()
|
||||
check not queue.running
|
||||
|
||||
test "can call stop multiple times":
|
||||
asyncSpawn queue.start()
|
||||
queue.start()
|
||||
await queue.stop()
|
||||
await queue.stop()
|
||||
check not queue.running
|
||||
|
@ -62,8 +62,6 @@ suite "Slot queue workers":
|
|||
queue = SlotQueue.new(maxSize = 5, maxWorkers = 3)
|
||||
queue.onProcessSlot = onProcessSlot
|
||||
|
||||
proc startQueue = asyncSpawn queue.start()
|
||||
|
||||
teardown:
|
||||
await queue.stop()
|
||||
|
||||
|
@ -79,7 +77,7 @@ suite "Slot queue workers":
|
|||
discard SlotQueue.new(maxSize = 1, maxWorkers = 2)
|
||||
|
||||
test "does not surpass max workers":
|
||||
startQueue()
|
||||
queue.start()
|
||||
let item1 = SlotQueueItem.example
|
||||
let item2 = SlotQueueItem.example
|
||||
let item3 = SlotQueueItem.example
|
||||
|
@ -97,7 +95,7 @@ suite "Slot queue workers":
|
|||
|
||||
queue.onProcessSlot = processSlot
|
||||
|
||||
startQueue()
|
||||
queue.start()
|
||||
let item1 = SlotQueueItem.example
|
||||
let item2 = SlotQueueItem.example
|
||||
let item3 = SlotQueueItem.example
|
||||
|
@ -122,7 +120,7 @@ suite "Slot queue":
|
|||
onProcessSlotCalled = true
|
||||
onProcessSlotCalledWith.add (item.requestId, item.slotIndex)
|
||||
done.complete()
|
||||
asyncSpawn queue.start()
|
||||
queue.start()
|
||||
|
||||
setup:
|
||||
onProcessSlotCalled = false
|
||||
|
|
Loading…
Reference in New Issue