use a different list for waitNotEmpty

reusing getters seems to create a race condition
This commit is contained in:
Diego 2024-01-26 17:24:41 +01:00
parent 39917ad8ce
commit 4cea594b0e
No known key found for this signature in database
GPG Key ID: C9DAC9BF68D1F806
1 changed files with 8 additions and 5 deletions

View File

@ -52,6 +52,7 @@ type
## removed by "await get()".
getters: seq[Future[void].Raising([CancelledError])]
putters: seq[Future[void].Raising([CancelledError])]
waitersForNonEmpty: seq[Future[void].Raising([CancelledError])]
queue: Deque[T]
maxsize: int
@ -267,14 +268,14 @@ proc waitNotEmpty*[T](aq: AsyncQueue[T]): Future[void] {.
## It merely ensures that the queue is not empty. After `waitNotEmpty` completes,
## use other queue operations to manipulate items.
while aq.empty():
let getter =
let waiter =
Future[void].Raising([CancelledError]).init("AsyncQueue.waitNotEmpty")
aq.getters.add(getter)
aq.waitersForNonEmpty.add(waiter)
try:
await getter
await waiter
except CancelledError as exc:
if not(aq.empty()) and not(getter.cancelled()):
aq.getters.wakeupNext()
if not(aq.empty()) and not(waiter.cancelled()):
aq.waitersForNonEmpty.wakeupNext()
raise exc
proc full*[T](aq: AsyncQueue[T]): bool {.inline.} =
@ -294,10 +295,12 @@ proc empty*[T](aq: AsyncQueue[T]): bool {.inline.} =
proc addFirstImpl[T](aq: AsyncQueue[T], item: T) =
aq.queue.addFirst(item)
aq.getters.wakeupNext()
aq.waitersForNonEmpty.wakeupNext()
proc addLastImpl[T](aq: AsyncQueue[T], item: T) =
aq.queue.addLast(item)
aq.getters.wakeupNext()
aq.waitersForNonEmpty.wakeupNext()
proc popFirstImpl[T](aq: AsyncQueue[T]): T =
let res = aq.queue.popFirst()