Add raising `Defect` functions to AsyncQueue.

This commit is contained in:
cheatfate 2023-11-16 01:58:02 +02:00
parent b8d835b2bd
commit 3ef6ad5eaf
No known key found for this signature in database
GPG Key ID: 46ADD633A7201F95
1 changed files with 60 additions and 19 deletions

View File

@ -165,7 +165,7 @@ proc newAsyncEvent*(): AsyncEvent =
AsyncEvent()
proc wait*(event: AsyncEvent): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
async: (raw: true, raises: [CancelledError]).} =
## Block until the internal flag of ``event`` is `true`.
## If the internal flag is `true` on entry, return immediately. Otherwise,
## block until another task calls `fire()` to set the flag to `true`,
@ -258,7 +258,7 @@ proc popLastImpl[T](aq: AsyncQueue[T]): T =
res
proc addFirstNoWait*[T](aq: AsyncQueue[T], item: T) {.
raises: [AsyncQueueFullError].}=
raises: [AsyncQueueFullError].} =
## Put an item ``item`` to the beginning of the queue ``aq`` immediately.
##
## If queue ``aq`` is full, then ``AsyncQueueFullError`` exception raised.
@ -267,7 +267,7 @@ proc addFirstNoWait*[T](aq: AsyncQueue[T], item: T) {.
aq.addFirstImpl(item)
proc addLastNoWait*[T](aq: AsyncQueue[T], item: T) {.
raises: [AsyncQueueFullError].}=
raises: [AsyncQueueFullError].} =
## Put an item ``item`` at the end of the queue ``aq`` immediately.
##
## If queue ``aq`` is full, then ``AsyncQueueFullError`` exception raised.
@ -275,8 +275,24 @@ proc addLastNoWait*[T](aq: AsyncQueue[T], item: T) {.
raise newException(AsyncQueueFullError, "AsyncQueue is full!")
aq.addLastImpl(item)
proc addFirstNoWaitSafe*[T](aq: AsyncQueue[T], item: T) {.
raises: [].} =
## Put an item ``item`` to the beginning of the queue ``aq`` immediately.
##
## If queue ``aq`` is full, then ``Defect`` exception raised.
doAssert(not(aq.full()), "AsyncQueue is full!")
aq.addFirstImpl(item)
proc addLastNoWaitSafe*[T](aq: AsyncQueue[T], item: T) {.
raises: [].} =
## Put an item ``item`` at the end of the queue ``aq`` immediately.
##
## If queue ``aq`` is full, then ``Defect`` exception raised.
doAssert(not(aq.full()), "AsyncQueue is full!")
aq.addLastImpl(item)
proc popFirstNoWait*[T](aq: AsyncQueue[T]): T {.
raises: [AsyncQueueEmptyError].} =
raises: [AsyncQueueEmptyError].} =
## Get an item from the beginning of the queue ``aq`` immediately.
##
## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised.
@ -285,7 +301,7 @@ proc popFirstNoWait*[T](aq: AsyncQueue[T]): T {.
aq.popFirstImpl()
proc popLastNoWait*[T](aq: AsyncQueue[T]): T {.
raises: [AsyncQueueEmptyError].} =
raises: [AsyncQueueEmptyError].} =
## Get an item from the end of the queue ``aq`` immediately.
##
## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised.
@ -293,11 +309,29 @@ proc popLastNoWait*[T](aq: AsyncQueue[T]): T {.
raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!")
aq.popLastImpl()
proc addFirst*[T](aq: AsyncQueue[T], item: T) {.async: (raises: [CancelledError]).} =
proc popFirstNoWaitSafe*[T](aq: AsyncQueue[T]): T {.
raises: [].} =
## Get an item from the beginning of the queue ``aq`` immediately.
##
## If queue ``aq`` is empty, then ``Defect`` raised.
doAssert(not(aq.empty()), "AsyncQueue is empty!")
aq.popLastImpl()
proc popLastNoWaitSafe*[T](aq: AsyncQueue[T]): T {.
raises: [].} =
## Get an item from the end of the queue ``aq`` immediately.
##
## If queue ``aq`` is empty, then ``Defect`` exception raised.
doAssert(not(aq.empty()), "AsyncQueue is empty!")
aq.popLastImpl()
proc addFirst*[T](aq: AsyncQueue[T], item: T) {.
async: (raises: [CancelledError]).} =
## Put an ``item`` to the beginning of the queue ``aq``. If the queue is full,
## wait until a free slot is available before adding item.
while aq.full():
let putter = Future[void].Raising([CancelledError]).init("AsyncQueue.addFirst")
let putter =
Future[void].Raising([CancelledError]).init("AsyncQueue.addFirst")
aq.putters.add(putter)
try:
await putter
@ -307,11 +341,13 @@ proc addFirst*[T](aq: AsyncQueue[T], item: T) {.async: (raises: [CancelledError]
raise exc
aq.addFirstImpl(item)
proc addLast*[T](aq: AsyncQueue[T], item: T) {.async: (raises: [CancelledError]).} =
proc addLast*[T](aq: AsyncQueue[T], item: T) {.
async: (raises: [CancelledError]).} =
## Put an ``item`` to the end of the queue ``aq``. If the queue is full,
## wait until a free slot is available before adding item.
while aq.full():
let putter = Future[void].Raising([CancelledError]).init("AsyncQueue.addLast")
let putter =
Future[void].Raising([CancelledError]).init("AsyncQueue.addLast")
aq.putters.add(putter)
try:
await putter
@ -321,11 +357,13 @@ proc addLast*[T](aq: AsyncQueue[T], item: T) {.async: (raises: [CancelledError])
raise exc
aq.addLastImpl(item)
proc popFirst*[T](aq: AsyncQueue[T]): Future[T] {.async: (raises: [CancelledError]).} =
proc popFirst*[T](aq: AsyncQueue[T]): Future[T] {.
async: (raises: [CancelledError]).} =
## Remove and return an ``item`` from the beginning of the queue ``aq``.
## If the queue is empty, wait until an item is available.
while aq.empty():
let getter = Future[void].Raising([CancelledError]).init("AsyncQueue.popFirst")
let getter =
Future[void].Raising([CancelledError]).init("AsyncQueue.popFirst")
aq.getters.add(getter)
try:
await getter
@ -335,11 +373,13 @@ proc popFirst*[T](aq: AsyncQueue[T]): Future[T] {.async: (raises: [CancelledErro
raise exc
aq.popFirstImpl()
proc popLast*[T](aq: AsyncQueue[T]): Future[T] {.async: (raises: [CancelledError]).} =
proc popLast*[T](aq: AsyncQueue[T]): Future[T] {.
async: (raises: [CancelledError]).} =
## Remove and return an ``item`` from the end of the queue ``aq``.
## If the queue is empty, wait until an item is available.
while aq.empty():
let getter = Future[void].Raising([CancelledError]).init("AsyncQueue.popLast")
let getter =
Future[void].Raising([CancelledError]).init("AsyncQueue.popLast")
aq.getters.add(getter)
try:
await getter
@ -350,22 +390,22 @@ proc popLast*[T](aq: AsyncQueue[T]): Future[T] {.async: (raises: [CancelledError
aq.popLastImpl()
proc putNoWait*[T](aq: AsyncQueue[T], item: T) {.
raises: [AsyncQueueFullError].} =
raises: [AsyncQueueFullError].} =
## Alias of ``addLastNoWait()``.
aq.addLastNoWait(item)
proc getNoWait*[T](aq: AsyncQueue[T]): T {.
raises: [AsyncQueueEmptyError].} =
raises: [AsyncQueueEmptyError].} =
## Alias of ``popFirstNoWait()``.
aq.popFirstNoWait()
proc put*[T](aq: AsyncQueue[T], item: T): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
async: (raw: true, raises: [CancelledError]).} =
## Alias of ``addLast()``.
aq.addLast(item)
proc get*[T](aq: AsyncQueue[T]): Future[T] {.
async: (raw: true, raises: [CancelledError]).} =
async: (raw: true, raises: [CancelledError]).} =
## Alias of ``popFirst()``.
aq.popFirst()
@ -509,7 +549,8 @@ proc close*(ab: AsyncEventQueue) {.raises: [].} =
ab.readers.reset()
ab.queue.clear()
proc closeWait*(ab: AsyncEventQueue): Future[void] {.async: (raw: true, raises: []).} =
proc closeWait*(ab: AsyncEventQueue): Future[void] {.
async: (raw: true, raises: []).} =
let retFuture = newFuture[void]("AsyncEventQueue.closeWait()",
{FutureFlag.OwnCancelSchedule})
proc continuation(udata: pointer) {.gcsafe.} =
@ -568,7 +609,7 @@ proc emit*[T](ab: AsyncEventQueue[T], data: T) =
proc waitEvents*[T](ab: AsyncEventQueue[T],
key: EventQueueKey,
eventsCount = -1): Future[seq[T]] {.
async: (raises: [AsyncEventQueueFullError, CancelledError]).} =
async: (raises: [AsyncEventQueueFullError, CancelledError]).} =
## Wait for events
var
events: seq[T]