add async waitNotEmpty proc

This commit is contained in:
Diego 2024-01-23 13:08:53 +01:00
parent e296ae30c8
commit 39917ad8ce
No known key found for this signature in database
GPG Key ID: C9DAC9BF68D1F806
2 changed files with 57 additions and 21 deletions

View File

@ -225,6 +225,58 @@ proc wakeupNext(waiters: var seq) {.inline.} =
else:
waiters.delete(0, i - 1)
proc waitNotEmpty*[T](aq: AsyncQueue[T]): Future[void] {.
async: (raises: [CancelledError]).} =
## Waits asynchronously until the `AsyncQueue` is not empty.
##
## This procedure is an asynchronous operation that suspends execution until
## at least one item is available in the specified `AsyncQueue`.
##
## Args:
## - `aq`: The `AsyncQueue` instance to monitor. It holds elements of type `T`.
##
## Returns:
## A `Future[void]` that represents the asynchronous operation, completing when
## there is at least one item in the queue.
##
## Raises:
## - `CancelledError`: If the waiting operation is cancelled. This allows for
## handling of cancellation scenarios.
##
## Example:
## ```nim
## var
## priorityQueue = newAsyncQueue[int]()
## nonPriorityQueue = newAsyncQueue[int]()
## # Assuming there are producers that add items to these queues
## var futures = @[priorityQueue.waitNotEmpty(), nonPriorityQueue.waitNotEmpty()]
## try:
## discard await anyCompleted(futures) # Wait for any queue to have a message
## finally:
## for fut in futures: fut.cancelSoon() # Cancel the other future
## if not priorityQueue.empty():
## let message = priorityQueue.getNoWait()
## # Process priority message
## elif not nonPriorityQueue.empty():
## let message = nonPriorityQueue.getNoWait()
## # Process non-priority message
## ```
##
## Note:
## This procedure does not automatically retrieve or remove the item from the queue.
## It merely ensures that the queue is not empty. After `waitNotEmpty` completes,
## use other queue operations to manipulate items.
while aq.empty():
let getter =
Future[void].Raising([CancelledError]).init("AsyncQueue.waitNotEmpty")
aq.getters.add(getter)
try:
await getter
except CancelledError as exc:
if not(aq.empty()) and not(getter.cancelled()):
aq.getters.wakeupNext()
raise exc
proc full*[T](aq: AsyncQueue[T]): bool {.inline.} =
## Return ``true`` if there are ``maxsize`` items in the queue.
##
@ -329,32 +381,14 @@ proc popFirst*[T](aq: AsyncQueue[T]): Future[T] {.
async: (raises: [CancelledError]).} =
## Remove and return an ``item`` from the beginning of the queue ``aq``.
## If the queue is empty, wait until an item is available.
while aq.empty():
let getter =
Future[void].Raising([CancelledError]).init("AsyncQueue.popFirst")
aq.getters.add(getter)
try:
await getter
except CancelledError as exc:
if not(aq.empty()) and not(getter.cancelled()):
aq.getters.wakeupNext()
raise exc
await aq.waitNotEmpty()
aq.popFirstImpl()
proc popLast*[T](aq: AsyncQueue[T]): Future[T] {.
async: (raises: [CancelledError]).} =
## Remove and return an ``item`` from the end of the queue ``aq``.
## If the queue is empty, wait until an item is available.
while aq.empty():
let getter =
Future[void].Raising([CancelledError]).init("AsyncQueue.popLast")
aq.getters.add(getter)
try:
await getter
except CancelledError as exc:
if not(aq.empty()) and not(getter.cancelled()):
aq.getters.wakeupNext()
raise exc
await aq.waitNotEmpty()
aq.popLastImpl()
proc putNoWait*[T](aq: AsyncQueue[T], item: T) {.

View File

@ -192,7 +192,8 @@ suite "Asynchronous sync primitives test suite":
var queue = newAsyncQueue[int](1)
discard task1(queue)
discard task2(queue)
## There must be exactly 2 poll() calls
## There must be exactly 3 poll() calls
poll()
poll()
poll()
result = testQueue1Result
@ -232,6 +233,7 @@ suite "Asynchronous sync primitives test suite":
discard task52(queue)
poll()
poll()
poll()
result = testQueue3Result
proc test6(): bool =