fix(slotqueue): simplify slot queue workers (#1224)

* fix(slotqueue): simplify slot queue workers

- worker is now just an async running loop
- instead of passing a "done" Future, use an
  AsyncEvent to signal completion

* chore(slotqueue): address review comments

Co-Authored-By: Eric <5089238+emizzle@users.noreply.github.com>
Co-Authored-By: Dmitriy Ryajov <dryajov@gmail.com>

---------

Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com>
Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
This commit is contained in:
markspanbroek 2025-05-15 15:02:04 +02:00 committed by GitHub
parent 28e87d06cc
commit bde98738c2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 41 additions and 228 deletions

View File

@ -111,11 +111,7 @@ proc remove(sales: Sales, agent: SalesAgent) {.async.} =
sales.agents.keepItIf(it != agent)
proc cleanUp(
sales: Sales,
agent: SalesAgent,
reprocessSlot: bool,
returnedCollateral: ?UInt256,
processing: Future[void],
sales: Sales, agent: SalesAgent, reprocessSlot: bool, returnedCollateral: ?UInt256
) {.async.} =
let data = agent.data
@ -181,37 +177,39 @@ proc cleanUp(
await sales.remove(agent)
# signal back to the slot queue to cycle a worker
if not processing.isNil and not processing.finished():
processing.complete()
proc filled(
sales: Sales, request: StorageRequest, slotIndex: uint64, processing: Future[void]
) =
proc filled(sales: Sales, request: StorageRequest, slotIndex: uint64) =
if onSale =? sales.context.onSale:
onSale(request, slotIndex)
# signal back to the slot queue to cycle a worker
if not processing.isNil and not processing.finished():
processing.complete()
proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) =
proc processSlot(
sales: Sales, item: SlotQueueItem
) {.async: (raises: [CancelledError]).} =
debug "Processing slot from queue", requestId = item.requestId, slot = item.slotIndex
let agent =
newSalesAgent(sales.context, item.requestId, item.slotIndex, none StorageRequest)
let completed = newAsyncEvent()
agent.onCleanUp = proc(
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
await sales.cleanUp(agent, reprocessSlot, returnedCollateral, done)
trace "slot cleanup"
await sales.cleanUp(agent, reprocessSlot, returnedCollateral)
completed.fire()
agent.onFilled = some proc(request: StorageRequest, slotIndex: uint64) =
sales.filled(request, slotIndex, done)
trace "slot filled"
sales.filled(request, slotIndex)
completed.fire()
agent.start(SalePreparing())
sales.agents.add agent
trace "waiting for slot processing to complete"
await completed.wait()
trace "slot processing completed"
proc deleteInactiveReservations(sales: Sales, activeSlots: seq[Slot]) {.async.} =
let reservations = sales.context.reservations
without reservs =? await reservations.all(Reservation):
@ -272,10 +270,7 @@ proc load*(sales: Sales) {.async.} =
agent.onCleanUp = proc(
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
# since workers are not being dispatched, this future has not been created
# by a worker. Create a dummy one here so we can call sales.cleanUp
let done: Future[void] = nil
await sales.cleanUp(agent, reprocessSlot, returnedCollateral, done)
await sales.cleanUp(agent, reprocessSlot, returnedCollateral)
# There is no need to assign agent.onFilled as slots loaded from `mySlots`
# are inherently already filled and so assigning agent.onFilled would be
@ -526,11 +521,12 @@ proc startSlotQueue(sales: Sales) =
let slotQueue = sales.context.slotQueue
let reservations = sales.context.reservations
slotQueue.onProcessSlot = proc(
item: SlotQueueItem, done: Future[void]
) {.async: (raises: []).} =
slotQueue.onProcessSlot = proc(item: SlotQueueItem) {.async: (raises: []).} =
trace "processing slot queue item", reqId = item.requestId, slotIdx = item.slotIndex
sales.processSlot(item, done)
try:
await sales.processSlot(item)
except CancelledError:
discard
slotQueue.start()

View File

@ -4,7 +4,6 @@ import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import ../errors
import ../clock
import ../logutils
import ../rng
import ../utils
@ -16,18 +15,14 @@ logScope:
topics = "marketplace slotqueue"
type
OnProcessSlot* = proc(item: SlotQueueItem, done: Future[void]): Future[void] {.
gcsafe, async: (raises: [])
.}
OnProcessSlot* =
proc(item: SlotQueueItem): Future[void] {.gcsafe, async: (raises: []).}
# Non-ref obj copies value when assigned, preventing accidental modification
# of values which could cause an incorrect order (eg
# ``slotQueue[1].collateral = 1`` would cause ``collateral`` to be updated,
# but the heap invariant would no longer be honoured. When non-ref, the
# compiler can ensure that statement will fail).
SlotQueueWorker = object
doneProcessing*: Future[void].Raising([])
SlotQueueItem* = object
requestId: RequestId
slotIndex: uint16
@ -47,7 +42,6 @@ type
onProcessSlot: ?OnProcessSlot
queue: AsyncHeapQueue[SlotQueueItem]
running: bool
workers: AsyncQueue[SlotQueueWorker]
trackedFutures: TrackedFutures
unpaused: AsyncEvent
@ -125,19 +119,6 @@ proc new*(
# avoid instantiating `workers` in constructor to avoid side effects in
# `newAsyncQueue` procedure
proc init(_: type SlotQueueWorker): SlotQueueWorker =
let workerFut = Future[void].Raising([]).init(
"slotqueue.worker.processing", {FutureFlag.OwnCancelSchedule}
)
workerFut.cancelCallback = proc(data: pointer) {.raises: [].} =
# this is equivalent to try: ... except CatchableError: ...
if not workerFut.finished:
workerFut.complete()
trace "Cancelling `SlotQueue` worker processing future"
SlotQueueWorker(doneProcessing: workerFut)
proc init*(
_: type SlotQueueItem,
requestId: RequestId,
@ -233,13 +214,6 @@ proc `$`*(self: SlotQueue): string =
proc `onProcessSlot=`*(self: SlotQueue, onProcessSlot: OnProcessSlot) =
self.onProcessSlot = some onProcessSlot
proc activeWorkers*(self: SlotQueue): int =
if not self.running:
return 0
# active = capacity - available
self.maxWorkers - self.workers.len
proc contains*(self: SlotQueue, item: SlotQueueItem): bool =
self.queue.contains(item)
@ -323,52 +297,6 @@ proc delete*(self: SlotQueue, requestId: RequestId) =
proc `[]`*(self: SlotQueue, i: Natural): SlotQueueItem =
self.queue[i]
proc addWorker(self: SlotQueue): ?!void =
if not self.running:
let err = newException(QueueNotRunningError, "queue must be running")
return failure(err)
trace "adding new worker to worker queue"
let worker = SlotQueueWorker.init()
try:
self.trackedFutures.track(worker.doneProcessing)
self.workers.addLastNoWait(worker)
except AsyncQueueFullError:
return failure("failed to add worker, worker queue full")
return success()
proc dispatch(
self: SlotQueue, worker: SlotQueueWorker, item: SlotQueueItem
) {.async: (raises: []).} =
logScope:
requestId = item.requestId
slotIndex = item.slotIndex
if not self.running:
warn "Could not dispatch worker because queue is not running"
return
if onProcessSlot =? self.onProcessSlot:
try:
self.trackedFutures.track(worker.doneProcessing)
await onProcessSlot(item, worker.doneProcessing)
await worker.doneProcessing
if err =? self.addWorker().errorOption:
raise err # catch below
except QueueNotRunningError as e:
info "could not re-add worker to worker queue, queue not running", error = e.msg
except CancelledError:
# do not bubble exception up as it is called with `asyncSpawn` which would
# convert the exception into a `FutureDefect`
discard
except CatchableError as e:
# we don't have any insight into types of errors that `onProcessSlot` can
# throw because it is caller-defined
warn "Unknown error processing slot in worker", error = e.msg
proc clearSeenFlags*(self: SlotQueue) =
# Enumerate all items in the queue, overwriting each item with `seen = false`.
# To avoid issues with new queue items being pushed to the queue while all
@ -386,7 +314,8 @@ proc clearSeenFlags*(self: SlotQueue) =
trace "all 'seen' flags cleared"
proc run(self: SlotQueue) {.async: (raises: []).} =
proc runWorker(self: SlotQueue) {.async: (raises: []).} =
trace "slot queue worker loop started"
while self.running:
try:
if self.paused:
@ -395,8 +324,6 @@ proc run(self: SlotQueue) {.async: (raises: []).} =
# block until unpaused is true/fired, ie wait for queue to be unpaused
await self.unpaused.wait()
let worker =
await self.workers.popFirst() # if workers saturated, wait here for new workers
let item = await self.queue.pop() # if queue empty, wait here for new items
logScope:
@ -419,23 +346,19 @@ proc run(self: SlotQueue) {.async: (raises: []).} =
# immediately (with priority over other items) once unpaused
trace "readding seen item back into the queue"
discard self.push(item) # on error, drop the item and continue
worker.doneProcessing.complete()
if err =? self.addWorker().errorOption:
error "error adding new worker", error = err.msg
await sleepAsync(1.millis) # poll
continue
trace "processing item"
without onProcessSlot =? self.onProcessSlot:
raiseAssert "slot queue onProcessSlot not set"
let fut = self.dispatch(worker, item)
self.trackedFutures.track(fut)
await sleepAsync(1.millis) # poll
await onProcessSlot(item)
except CancelledError:
trace "slot queue cancelled"
trace "slot queue worker cancelled"
break
except CatchableError as e: # raised from self.queue.pop() or self.workers.pop()
warn "slot queue error encountered during processing", error = e.msg
except CatchableError as e: # raised from self.queue.pop()
warn "slot queue worker error encountered during processing", error = e.msg
trace "slot queue worker loop stopped"
proc start*(self: SlotQueue) =
if self.running:
@ -445,17 +368,11 @@ proc start*(self: SlotQueue) =
self.running = true
# must be called in `start` to avoid sideeffects in `new`
self.workers = newAsyncQueue[SlotQueueWorker](self.maxWorkers)
# Add initial workers to the `AsyncHeapQueue`. Once a worker has completed its
# task, a new worker will be pushed to the queue
for i in 0 ..< self.maxWorkers:
if err =? self.addWorker().errorOption:
error "start: error adding new worker", error = err.msg
let fut = self.run()
self.trackedFutures.track(fut)
let worker = self.runWorker()
self.trackedFutures.track(worker)
proc stop*(self: SlotQueue) {.async.} =
if not self.running:

View File

@ -240,17 +240,12 @@ asyncchecksuite "Sales":
return true
proc addRequestToSaturatedQueue(): Future[StorageRequest] {.async.} =
queue.onProcessSlot = proc(
item: SlotQueueItem, done: Future[void]
) {.async: (raises: []).} =
queue.onProcessSlot = proc(item: SlotQueueItem) {.async: (raises: []).} =
try:
await sleepAsync(10.millis)
itemsProcessed.add item
except CancelledError as exc:
checkpoint(exc.msg)
finally:
if not done.finished:
done.complete()
var request1 = StorageRequest.example
request1.ask.collateralPerByte = request.ask.collateralPerByte + 1
@ -272,12 +267,8 @@ asyncchecksuite "Sales":
waitFor run()
test "processes all request's slots once StorageRequested emitted":
queue.onProcessSlot = proc(
item: SlotQueueItem, done: Future[void]
) {.async: (raises: []).} =
queue.onProcessSlot = proc(item: SlotQueueItem) {.async: (raises: []).} =
itemsProcessed.add item
if not done.finished:
done.complete()
createAvailability()
await market.requestStorage(request)
let items = SlotQueueItem.init(request, collateral = request.ask.collateralPerSlot)
@ -313,12 +304,8 @@ asyncchecksuite "Sales":
check always (not itemsProcessed.contains(expected))
test "adds slot index to slot queue once SlotFreed emitted":
queue.onProcessSlot = proc(
item: SlotQueueItem, done: Future[void]
) {.async: (raises: []).} =
queue.onProcessSlot = proc(item: SlotQueueItem) {.async: (raises: []).} =
itemsProcessed.add item
if not done.finished:
done.complete()
createAvailability()
market.requested.add request # "contract" must be able to return request

View File

@ -50,19 +50,11 @@ suite "Slot queue start/stop":
suite "Slot queue workers":
var queue: SlotQueue
proc onProcessSlot(
item: SlotQueueItem, doneProcessing: Future[void]
) {.async: (raises: []).} =
# this is not illustrative of the realistic scenario as the
# `doneProcessing` future would be passed to another context before being
# completed and therefore is not as simple as making the callback async
proc onProcessSlot(item: SlotQueueItem) {.async: (raises: []).} =
try:
await sleepAsync(1000.millis)
except CatchableError as exc:
checkpoint(exc.msg)
finally:
if not doneProcessing.finished:
doneProcessing.complete()
setup:
let request = StorageRequest.example
@ -72,9 +64,6 @@ suite "Slot queue workers":
teardown:
await queue.stop()
test "activeWorkers should be 0 when not running":
check queue.activeWorkers == 0
test "maxWorkers cannot be 0":
expect ValueError:
discard SlotQueue.new(maxSize = 1, maxWorkers = 0)
@ -83,41 +72,6 @@ suite "Slot queue workers":
expect ValueError:
discard SlotQueue.new(maxSize = 1, maxWorkers = 2)
test "does not surpass max workers":
queue.start()
let item1 = SlotQueueItem.example
let item2 = SlotQueueItem.example
let item3 = SlotQueueItem.example
let item4 = SlotQueueItem.example
check queue.push(item1).isOk
check queue.push(item2).isOk
check queue.push(item3).isOk
check queue.push(item4).isOk
check eventually queue.activeWorkers == 3
test "discards workers once processing completed":
proc processSlot(item: SlotQueueItem, done: Future[void]) {.async: (raises: []).} =
try:
await sleepAsync(1.millis)
except CatchableError as exc:
checkpoint(exc.msg)
finally:
if not done.finished:
done.complete()
queue.onProcessSlot = processSlot
queue.start()
let item1 = SlotQueueItem.example
let item2 = SlotQueueItem.example
let item3 = SlotQueueItem.example
let item4 = SlotQueueItem.example
check queue.push(item1).isOk # finishes after 1.millis
check queue.push(item2).isOk # finishes after 1.millis
check queue.push(item3).isOk # finishes after 1.millis
check queue.push(item4).isOk
check eventually queue.activeWorkers == 1
suite "Slot queue":
var onProcessSlotCalled = false
var onProcessSlotCalledWith: seq[(RequestId, uint16)]
@ -126,9 +80,7 @@ suite "Slot queue":
proc newSlotQueue(maxSize, maxWorkers: int, processSlotDelay = 1.millis) =
queue = SlotQueue.new(maxWorkers, maxSize.uint16)
queue.onProcessSlot = proc(
item: SlotQueueItem, done: Future[void]
) {.async: (raises: []).} =
queue.onProcessSlot = proc(item: SlotQueueItem) {.async: (raises: []).} =
try:
await sleepAsync(processSlotDelay)
except CatchableError as exc:
@ -136,8 +88,6 @@ suite "Slot queue":
finally:
onProcessSlotCalled = true
onProcessSlotCalledWith.add (item.requestId, item.slotIndex)
if not done.finished:
done.complete()
queue.start()
@ -155,11 +105,6 @@ suite "Slot queue":
check queue.len == 0
check $queue == "[]"
test "starts with 0 active workers":
newSlotQueue(maxSize = 2, maxWorkers = 2)
check eventually queue.running
check queue.activeWorkers == 0
test "reports correct size":
newSlotQueue(maxSize = 2, maxWorkers = 2)
check queue.size == 2
@ -657,38 +602,6 @@ suite "Slot queue":
# queue should be paused
check eventually queue.paused
test "processing a 'seen' item does not decrease the number of workers":
newSlotQueue(maxSize = 4, maxWorkers = 4)
let request = StorageRequest.example
let unseen = SlotQueueItem.init(
request.id,
0'u16,
request.ask,
request.expiry,
request.ask.collateralPerSlot,
seen = false,
)
let seen = SlotQueueItem.init(
request.id,
1'u16,
request.ask,
request.expiry,
request.ask.collateralPerSlot,
seen = true,
)
# push seen item to ensure that queue is pausing
check queue.push(seen).isSuccess
# unpause and pause a number of times
for _ in 0 ..< 10:
# push unseen item to unpause the queue
check queue.push(unseen).isSuccess
# wait for unseen item to be processed
check eventually queue.len == 1
# wait for queue to pause because of seen item
check eventually queue.paused
# check that the number of workers equals maximimum workers
check eventually queue.activeWorkers == 0
test "item 'seen' flags can be cleared":
newSlotQueue(maxSize = 4, maxWorkers = 1)
let request = StorageRequest.example