From f6c792de7950a9b3c2032a99fcecaec3c8b440f1 Mon Sep 17 00:00:00 2001 From: markspanbroek Date: Thu, 23 Jan 2025 10:28:14 +0100 Subject: [PATCH] fix slotqueue worker starvation (#1081) * fix slotqueue worker starvation * improve slotqueue tests Co-Authored-By: Marcin Czenko * slotqueue nph formatting --------- Co-authored-by: Marcin Czenko --- codex/sales/slotqueue.nim | 2 ++ tests/codex/sales/testslotqueue.nim | 50 +++++++++++++++++++++++++---- 2 files changed, 45 insertions(+), 7 deletions(-) diff --git a/codex/sales/slotqueue.nim b/codex/sales/slotqueue.nim index 80ca0827..bfa1491a 100644 --- a/codex/sales/slotqueue.nim +++ b/codex/sales/slotqueue.nim @@ -421,6 +421,8 @@ proc run(self: SlotQueue) {.async: (raises: []).} = trace "readding seen item back into the queue" discard self.push(item) # on error, drop the item and continue worker.doneProcessing.complete() + if err =? self.addWorker().errorOption: + error "error adding new worker", error = err.msg await sleepAsync(1.millis) # poll continue diff --git a/tests/codex/sales/testslotqueue.nim b/tests/codex/sales/testslotqueue.nim index e6583bb7..5d331bc3 100644 --- a/tests/codex/sales/testslotqueue.nim +++ b/tests/codex/sales/testslotqueue.nim @@ -132,6 +132,11 @@ suite "Slot queue": check queue.len == 0 check $queue == "[]" + test "starts with 0 active workers": + newSlotQueue(maxSize = 2, maxWorkers = 2) + check eventually queue.running + check queue.activeWorkers == 0 + test "reports correct size": newSlotQueue(maxSize = 2, maxWorkers = 2) check queue.size == 2 @@ -506,14 +511,9 @@ suite "Slot queue": ] ) - test "processing a 'seen' item pauses the queue": + test "queue starts paused": newSlotQueue(maxSize = 4, maxWorkers = 4) - let request = StorageRequest.example - let item = - SlotQueueItem.init(request.id, 0'u16, request.ask, request.expiry, seen = true) - check queue.push(item).isOk - check eventually queue.paused - check onProcessSlotCalledWith.len == 0 + check queue.paused test "pushing items to queue unpauses queue": newSlotQueue(maxSize = 4, maxWorkers = 4) @@ -546,6 +546,42 @@ suite "Slot queue": check eventually onProcessSlotCalledWith == @[(item.requestId, item.slotIndex)] check eventually queue.len == 0 + test "processing a 'seen' item pauses the queue": + newSlotQueue(maxSize = 4, maxWorkers = 4) + let request = StorageRequest.example + let unseen = + SlotQueueItem.init(request.id, 0'u16, request.ask, request.expiry, seen = false) + let seen = + SlotQueueItem.init(request.id, 1'u16, request.ask, request.expiry, seen = true) + # push causes unpause + check queue.push(unseen).isSuccess + # check all items processed + check eventually queue.len == 0 + # push seen item + check queue.push(seen).isSuccess + # queue should be paused + check eventually queue.paused + + test "processing a 'seen' item does not decrease the number of workers": + newSlotQueue(maxSize = 4, maxWorkers = 4) + let request = StorageRequest.example + let unseen = + SlotQueueItem.init(request.id, 0'u16, request.ask, request.expiry, seen = false) + let seen = + SlotQueueItem.init(request.id, 1'u16, request.ask, request.expiry, seen = true) + # push seen item to ensure that queue is pausing + check queue.push(seen).isSuccess + # unpause and pause a number of times + for _ in 0 ..< 10: + # push unseen item to unpause the queue + check queue.push(unseen).isSuccess + # wait for unseen item to be processed + check eventually queue.len == 1 + # wait for queue to pause because of seen item + check eventually queue.paused + # check that the number of workers equals maximimum workers + check eventually queue.activeWorkers == 0 + test "item 'seen' flags can be cleared": newSlotQueue(maxSize = 4, maxWorkers = 1) let request = StorageRequest.example