From d5b887f3d8b723d80fe2789cc44adf5efa33682a Mon Sep 17 00:00:00 2001 From: cheatfate Date: Tue, 31 Jul 2018 12:50:22 +0300 Subject: [PATCH] Add more utility procedures and tests for AsyncQueue. Bump version to 2.0.6. --- asyncdispatch2.nimble | 2 +- asyncdispatch2/asyncsync.nim | 151 ++++++++++++++++++++++++++++++----- tests/testsync.nim | 109 +++++++++++++++++++++++++ 3 files changed, 239 insertions(+), 23 deletions(-) diff --git a/asyncdispatch2.nimble b/asyncdispatch2.nimble index 372c6175..ac7dca65 100644 --- a/asyncdispatch2.nimble +++ b/asyncdispatch2.nimble @@ -1,5 +1,5 @@ packageName = "asyncdispatch2" -version = "2.0.5" +version = "2.0.6" author = "Status Research & Development GmbH" description = "Asyncdispatch2" license = "Apache License 2.0 or MIT" diff --git a/asyncdispatch2/asyncsync.nim b/asyncdispatch2/asyncsync.nim index d1a4df09..3faf4b07 100644 --- a/asyncdispatch2/asyncsync.nim +++ b/asyncdispatch2/asyncsync.nim @@ -187,28 +187,39 @@ proc full*[T](aq: AsyncQueue[T]): bool {.inline.} = if aq.maxsize <= 0: result = false else: - result = len(aq.queue) >= aq.maxsize + result = (len(aq.queue) >= aq.maxsize) proc empty*[T](aq: AsyncQueue[T]): bool {.inline.} = ## Return ``true`` if the queue is empty, ``false`` otherwise. result = (len(aq.queue) == 0) -proc putNoWait*[T](aq: AsyncQueue[T], item: T) = - ## Put an item into the queue ``aq`` immediately. +proc addFirstNoWait*[T](aq: AsyncQueue[T], item: T) = + ## Put an item ``item`` to the beginning of the queue ``aq`` immediately. ## - ## If queue ``aq`` is full, then ``AsyncQueueFullError`` exception raised + ## If queue ``aq`` is full, then ``AsyncQueueFullError`` exception raised. + var w: Future[void] + if aq.full(): + raise newException(AsyncQueueFullError, "AsyncQueue is full!") + aq.queue.addFirst(item) + while len(aq.getters) > 0: + w = aq.getters.popFirst() + if not w.finished: w.complete() + +proc addLastNoWait*[T](aq: AsyncQueue[T], item: T) = + ## Put an item ``item`` at the end of the queue ``aq`` immediately. + ## + ## If queue ``aq`` is full, then ``AsyncQueueFullError`` exception raised. var w: Future[void] if aq.full(): raise newException(AsyncQueueFullError, "AsyncQueue is full!") aq.queue.addLast(item) while len(aq.getters) > 0: w = aq.getters.popFirst() - if not w.finished: - w.complete() + if not w.finished: w.complete() -proc getNoWait*[T](aq: AsyncQueue[T]): T = - ## Remove and return ``item`` from the queue immediately. - ## +proc popFirstNoWait*[T](aq: AsyncQueue[T]): T = + ## Get an item from the beginning of the queue ``aq`` immediately. + ## ## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised. var w: Future[void] if aq.empty(): @@ -216,27 +227,75 @@ proc getNoWait*[T](aq: AsyncQueue[T]): T = result = aq.queue.popFirst() while len(aq.putters) > 0: w = aq.putters.popFirst() - if not w.finished: - w.complete() + if not w.finished: w.complete() -proc put*[T](aq: AsyncQueue[T], item: T) {.async.} = - ## Put an ``item`` into the queue ``aq``. If the queue is full, wait until - ## a free slot is available before adding item. +proc popLastNoWait*[T](aq: AsyncQueue[T]): T = + ## Get an item from the end of the queue ``aq`` immediately. + ## + ## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised. + var w: Future[void] + if aq.empty(): + raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!") + result = aq.queue.popLast() + while len(aq.putters) > 0: + w = aq.putters.popFirst() + if not w.finished: w.complete() + +proc addFirst*[T](aq: AsyncQueue[T], item: T) {.async.} = + ## Put an ``item`` to the beginning of the queue ``aq``. If the queue is full, + ## wait until a free slot is available before adding item. while aq.full(): - var putter = newFuture[void]("asyncqueue.putter") + var putter = newFuture[void]("AsyncQueue.addFirst") aq.putters.addLast(putter) yield putter - aq.putNoWait(item) + aq.addFirstNoWait(item) -proc get*[T](aq: AsyncQueue[T]): Future[T] {.async.} = - ## Remove and return an item from the queue ``aq``. - ## - ## If queue is empty, wait until an item is available. +proc addLast*[T](aq: AsyncQueue[T], item: T) {.async.} = + ## Put an ``item`` to the end of the queue ``aq``. If the queue is full, + ## wait until a free slot is available before adding item. + while aq.full(): + var putter = newFuture[void]("AsyncQueue.addLast") + aq.putters.addLast(putter) + yield putter + aq.addLastNoWait(item) + +proc popFirst*[T](aq: AsyncQueue[T]): Future[T] {.async.} = + ## Remove and return an ``item`` from the beginning of the queue ``aq``. + ## If the queue is empty, wait until an item is available. while aq.empty(): - var getter = newFuture[void]("asyncqueue.getter") + var getter = newFuture[void]("AsyncQueue.popFirst") aq.getters.addLast(getter) yield getter - result = aq.getNoWait() + result = aq.popFirstNoWait() + +proc popLast*[T](aq: AsyncQueue[T]): Future[T] {.async.} = + ## Remove and return an ``item`` from the end of the queue ``aq``. + ## If the queue is empty, wait until an item is available. + while aq.empty(): + var getter = newFuture[void]("AsyncQueue.popLast") + aq.getters.addLast(getter) + yield getter + result = aq.popLastNoWait() + +proc putNoWait*[T](aq: AsyncQueue[T], item: T) {.inline.} = + ## Alias of ``addLastNoWait()``. + aq.addLastNoWait(item) + +proc getNoWait*[T](aq: AsyncQueue[T]): T {.inline.} = + ## Alias of ``popFirstNoWait()``. + result = aq.popFirstNoWait() + +proc put*[T](aq: AsyncQueue[T], item: T): Future[void] {.inline.} = + ## Alias of ``addLast()``. + result = aq.addLast(item) + +proc get*[T](aq: AsyncQueue[T]): Future[T] {.inline.} = + ## Alias of ``popFirst()``. + result = aq.popFirst() + +proc clear*[T](aq: AsyncQueue[T]) {.inline.} = + ## Clears all elements of queue ``aq``. + aq.queue.clear() proc len*[T](aq: AsyncQueue[T]): int {.inline.} = ## Return the number of elements in ``aq``. @@ -245,3 +304,51 @@ proc len*[T](aq: AsyncQueue[T]): int {.inline.} = proc size*[T](aq: AsyncQueue[T]): int {.inline.} = ## Return the maximum number of elements in ``aq``. result = len(aq.maxsize) + +proc `[]`*[T](aq: AsyncQueue[T], i: Natural) : T {.inline.} = + ## Access the i-th element of ``aq`` by order from first to last. + ## ``aq[0]`` is the first element, ``aq[^1]`` is the last element. + result = aq.queue[i] + +proc `[]`*[T](aq: AsyncQueue[T], i: BackwardsIndex) : T {.inline.} = + ## Access the i-th element of ``aq`` by order from first to last. + ## ``aq[0]`` is the first element, ``aq[^1]`` is the last element. + result = aq.queue[len(aq.queue) - int(i)] + +proc `[]=`* [T](aq: AsyncQueue[T], i: Natural, item: T) {.inline.} = + ## Change the i-th element of ``aq``. + aq.queue[i] = item + +proc `[]=`* [T](aq: AsyncQueue[T], i: BackwardsIndex, item: T) {.inline.} = + ## Change the i-th element of ``aq``. + aq.queue[len(aq.queue) - int(i)] = item + +iterator items*[T](aq: AsyncQueue[T]): T {.inline.} = + ## Yield every element of ``aq``. + for item in aq.queue.items(): + yield item + +iterator mitems*[T](aq: AsyncQueue[T]): var T {.inline.} = + ## Yield every element of ``aq``. + for mitem in aq.queue.mitems(): + yield mitem + +iterator pairs*[T](aq: AsyncQueue[T]): tuple[key: int, val: T] {.inline.} = + ## Yield every (position, value) of ``aq``. + for pair in aq.queue.pairs(): + yield pair + +proc contains*[T](aq: AsyncQueue[T], item: T): bool {.inline.} = + ## Return true if ``item`` is in ``aq`` or false if not found. Usually used + ## via the ``in`` operator. + for e in aq.queue.items(): + if e == item: return true + return false + +proc `$`*[T](aq: AsyncQueue[T]): string = + ## Turn an async queue ``aq`` into its string representation. + result = "[" + for item in aq.queue.items(): + if result.len > 1: result.add(", ") + result.addQuoted(item) + result.add("]") diff --git a/tests/testsync.nim b/tests/testsync.nim index a27bdb3f..cb0538f1 100644 --- a/tests/testsync.nim +++ b/tests/testsync.nim @@ -13,6 +13,7 @@ var testLockResult = "" var testEventResult = "" var testQueue1Result = 0 var testQueue2Result = 0 +var testQueue3Result = 0 proc testLock(n: int, lock: AsyncLock) {.async.} = await lock.acquire() @@ -97,6 +98,104 @@ proc test4(): int = waitFor(task3(queue) and task4(queue)) result = testQueue2Result +proc task51(aq: AsyncQueue[int]) {.async.} = + var item1 = await aq.popFirst() + var item2 = await aq.popLast() + var item3 = await aq.get() + testQueue3Result = item1 - item2 + item3 + +proc task52(aq: AsyncQueue[int]) {.async.} = + await aq.put(100) + await aq.addLast(1000) + await aq.addFirst(2000) + +proc test5(): int = + var queue = newAsyncQueue[int](3) + discard task51(queue) + discard task52(queue) + poll() + poll() + result = testQueue3Result + +proc test6(): bool = + var queue = newAsyncQueue[int]() + queue.putNoWait(1) + queue.putNoWait(2) + queue.putNoWait(3) + queue.putNoWait(4) + queue.putNoWait(5) + queue.clear() + result = (len(queue) == 0) + +proc test7(): bool = + var queue = newAsyncQueue[int]() + var arr1 = @[1, 2, 3, 4, 5] + var arr2 = @[2, 2, 2, 2, 2] + var arr3 = @[1, 2, 3, 4, 5] + queue.putNoWait(1) + queue.putNoWait(2) + queue.putNoWait(3) + queue.putNoWait(4) + queue.putNoWait(5) + var index = 0 + for item in queue.items(): + result = (item == arr1[index]) + inc(index) + + if not result: return + + queue[0] = 2 + + result = (queue[0] == 2) + + if not result: return + + for item in queue.mitems(): + item = 2 + + index = 0 + for item in queue.items(): + result = (item == arr2[index]) + inc(index) + + if not result: return + + queue[0] = 1 + queue[1] = 2 + queue[2] = 3 + queue[3] = 4 + queue[^1] = 5 + + for i, item in queue.pairs(): + result = (item == arr3[i]) + +proc test8(): bool = + var q0 = newAsyncQueue[int]() + q0.putNoWait(1) + q0.putNoWait(2) + q0.putNoWait(3) + q0.putNoWait(4) + q0.putNoWait(5) + result = ($q0 == "[1, 2, 3, 4, 5]") + if not result: return + + var q1 = newAsyncQueue[string]() + q1.putNoWait("1") + q1.putNoWait("2") + q1.putNoWait("3") + q1.putNoWait("4") + q1.putNoWait("5") + result = ($q1 == "[\"1\", \"2\", \"3\", \"4\", \"5\"]") + +proc test9(): bool = + var q = newAsyncQueue[int]() + q.putNoWait(1) + q.putNoWait(2) + q.putNoWait(3) + q.putNoWait(4) + q.putNoWait(5) + result = (5 in q and not(6 in q)) + when isMainModule: suite "Asynchronous sync primitives test suite": test "AsyncLock() behavior test": @@ -107,3 +206,13 @@ when isMainModule: check test3() == 3000 test "AsyncQueue() many iterations test": check test4() == 0 + test "AsyncQueue() addLast/addFirst/popLast/popFirst test": + check test5() == 1100 + test "AsyncQueue() clear test": + check test6() == true + test "AsyncQueue() iterators/assignments test": + check test7() == true + test "AsyncQueue() representation test": + check test8() == true + test "AsyncQueue() contains test": + check test9() == true