Add peak to AsyncQueue.

This commit is contained in:
bhartnett 2024-10-02 22:07:36 +08:00
parent c04576d829
commit 925e6ca9b4
No known key found for this signature in database
GPG Key ID: 076F2830DA6BD535
2 changed files with 81 additions and 0 deletions

View File

@ -257,6 +257,12 @@ proc popLastImpl[T](aq: AsyncQueue[T]): T =
aq.putters.wakeupNext()
res
proc peakFirstImpl[T](aq: AsyncQueue[T]): T =
aq.queue.peekFirst()
proc peakLastImpl[T](aq: AsyncQueue[T]): T =
aq.queue.peekLast()
proc addFirstNoWait*[T](aq: AsyncQueue[T], item: T) {.
raises: [AsyncQueueFullError].} =
## Put an item ``item`` to the beginning of the queue ``aq`` immediately.
@ -293,6 +299,26 @@ proc popLastNoWait*[T](aq: AsyncQueue[T]): T {.
raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!")
aq.popLastImpl()
proc peakFirstNoWait*[T](aq: AsyncQueue[T]): T {.
raises: [AsyncQueueEmptyError].} =
## Get an item from the beginning of the queue ``aq`` immediately but without
## removing it.
##
## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised.
if aq.empty():
raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!")
aq.peakFirstImpl()
proc peakLastNoWait*[T](aq: AsyncQueue[T]): T {.
raises: [AsyncQueueEmptyError].} =
## Get an item from the end of the queue ``aq`` immediately but without
## removing it.
##
## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised.
if aq.empty():
raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!")
aq.peakLastImpl()
proc addFirst*[T](aq: AsyncQueue[T], item: T) {.
async: (raises: [CancelledError]).} =
## Put an ``item`` to the beginning of the queue ``aq``. If the queue is full,
@ -357,6 +383,38 @@ proc popLast*[T](aq: AsyncQueue[T]): Future[T] {.
raise exc
aq.popLastImpl()
proc peakFirst*[T](aq: AsyncQueue[T]): Future[T] {.
async: (raises: [CancelledError]).} =
## Return an ``item`` without removing it from the beginning of the queue
## ``aq``. If the queue is empty, wait until an item is available.
while aq.empty():
let getter =
Future[void].Raising([CancelledError]).init("AsyncQueue.peakFirst")
aq.getters.add(getter)
try:
await getter
except CancelledError as exc:
if not(aq.empty()) and not(getter.cancelled()):
aq.getters.wakeupNext()
raise exc
aq.peakFirstImpl()
proc peakLast*[T](aq: AsyncQueue[T]): Future[T] {.
async: (raises: [CancelledError]).} =
## Return an ``item`` without removing it from the end of the queue ``aq``.
## If the queue is empty, wait until an item is available.
while aq.empty():
let getter =
Future[void].Raising([CancelledError]).init("AsyncQueue.peakLast")
aq.getters.add(getter)
try:
await getter
except CancelledError as exc:
if not(aq.empty()) and not(getter.cancelled()):
aq.getters.wakeupNext()
raise exc
aq.peakLastImpl()
proc putNoWait*[T](aq: AsyncQueue[T], item: T) {.
raises: [AsyncQueueFullError].} =
## Alias of ``addLastNoWait()``.
@ -367,6 +425,11 @@ proc getNoWait*[T](aq: AsyncQueue[T]): T {.
## Alias of ``popFirstNoWait()``.
aq.popFirstNoWait()
proc peakNoWait*[T](aq: AsyncQueue[T]): T {.
raises: [AsyncQueueEmptyError].} =
## Alias of ``peakFirstNoWait()``.
aq.peakFirstNoWait()
proc put*[T](aq: AsyncQueue[T], item: T): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
## Alias of ``addLast()``.
@ -377,6 +440,11 @@ proc get*[T](aq: AsyncQueue[T]): Future[T] {.
## Alias of ``popFirst()``.
aq.popFirst()
proc peak*[T](aq: AsyncQueue[T]): Future[T] {.
async: (raw: true, raises: [CancelledError]).} =
## Alias of ``peakFirst()``.
aq.peakFirst()
proc clear*[T](aq: AsyncQueue[T]) {.inline.} =
## Clears all elements of queue ``aq``.
aq.queue.clear()

View File

@ -353,6 +353,19 @@ suite "Asynchronous sync primitives test suite":
test "AsyncQueue() contains test":
check test9() == true
test "AsyncQueue() peak test":
let q = newAsyncQueue[int]()
q.putNoWait(1)
q.putNoWait(2)
check:
q.peakNoWait() == 1
q.peakFirstNoWait() == 1
q.peakLastNoWait() == 2
(waitFor q.peak()) == 1
(waitFor q.peakFirst()) == 1
(waitFor q.peakLast()) == 2
test "AsyncEventQueue() behavior test":
let eventQueue = newAsyncEventQueue[int]()
let key = eventQueue.register()