From c220b05a4c5eea0c9e63ffabe9aa883f7c03dcae Mon Sep 17 00:00:00 2001 From: cheatfate Date: Tue, 22 May 2018 11:51:11 +0300 Subject: [PATCH] Optimize asyncsync primitives --- asyncdispatch2.nimble | 5 ++ asyncdispatch2/asyncsync.nim | 96 +++--------------------------- tests/testsync.nim | 109 +++++++++++++++++++++++++++++++++++ 3 files changed, 123 insertions(+), 87 deletions(-) create mode 100644 tests/testsync.nim diff --git a/asyncdispatch2.nimble b/asyncdispatch2.nimble index 048132a0..32fc0cba 100644 --- a/asyncdispatch2.nimble +++ b/asyncdispatch2.nimble @@ -10,7 +10,12 @@ skipDirs = @["tests", "Nim", "nim"] requires "nim > 0.18.0" task test, "Run all tests": + exec "nim c -r -d:useSysAssert -d:useGcAssert tests/testsync" + exec "nim c -r tests/testsync" + exec "nim c -r -d:release tests/testsync" + exec "nim c -r -d:useSysAssert -d:useGcAssert tests/testdatagram" exec "nim c -r -d:useSysAssert -d:useGcAssert tests/teststream" + exec "nim c -r -d:release tests/testdatagram" exec "nim c -r -d:release tests/teststream" diff --git a/asyncdispatch2/asyncsync.nim b/asyncdispatch2/asyncsync.nim index 19d243df..2fc58f9c 100644 --- a/asyncdispatch2/asyncsync.nim +++ b/asyncdispatch2/asyncsync.nim @@ -110,14 +110,14 @@ proc release*(lock: AsyncLock) = ## other coroutines are blocked waiting for the lock to become unlocked, ## allow exactly one of them to proceed. var w: Future[void] - proc wakeup(udata: pointer) {.gcsafe.} = w.complete() + # proc wakeup(udata: pointer) {.gcsafe.} = w.complete() if lock.locked: lock.locked = false while len(lock.waiters) > 0: w = lock.waiters.popFirst() if not w.finished: - callSoon(wakeup) + w.complete() break else: raise newException(AsyncLockError, "AsyncLock is not acquired!") @@ -153,16 +153,13 @@ proc fire*(event: AsyncEvent) = ## Set the internal flag of ``event`` to `true`. All tasks waiting for it ## to become `true` are awakened. Task that call `wait()` once the flag is ## `true` will not block at all. - proc wakeupAll(udata: pointer) {.gcsafe.} = - if len(event.waiters) > 0: - var w = event.waiters.popFirst() - if not w.finished: - w.complete() - callSoon(wakeupAll) - + var w: Future[void] if not event.flag: event.flag = true - callSoon(wakeupAll) + while len(event.waiters) > 0: + w = event.waiters.popFirst() + if not w.finished: + w.complete() proc clear*(event: AsyncEvent) = ## Reset the internal flag of ``event`` to `false`. Subsequently, tasks @@ -205,23 +202,19 @@ proc putNoWait*[T](aq: AsyncQueue[T], item: T) = ## ## If queue ``aq`` is full, then ``AsyncQueueFullError`` exception raised var w: Future[void] - proc wakeup(udata: pointer) {.gcsafe.} = w.complete() - if aq.full(): raise newException(AsyncQueueFullError, "AsyncQueue is full!") aq.queue.addLast(item) - while len(aq.getters) > 0: w = aq.getters.popFirst() if not w.finished: - callSoon(wakeup) + w.complete() proc getNoWait*[T](aq: AsyncQueue[T]): T = ## Remove and return ``item`` from the queue immediately. ## ## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised. var w: Future[void] - proc wakeup(udata: pointer) {.gcsafe.} = w.complete() if aq.empty(): raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!") @@ -229,7 +222,7 @@ proc getNoWait*[T](aq: AsyncQueue[T]): T = while len(aq.putters) > 0: w = aq.putters.popFirst() if not w.finished: - callSoon(wakeup) + w.complete() proc put*[T](aq: AsyncQueue[T], item: T) {.async.} = ## Put an ``item`` into the queue ``aq``. If the queue is full, wait until @@ -257,74 +250,3 @@ proc len*[T](aq: AsyncQueue[T]): int {.inline.} = proc size*[T](aq: AsyncQueue[T]): int {.inline.} = ## Return the maximum number of elements in ``aq``. result = len(aq.maxsize) - -when isMainModule: - # Locks test - block: - var test = "" - var lock = newAsyncLock() - - proc testLock(n: int, lock: AsyncLock) {.async.} = - await lock.acquire() - test = test & $n - lock.release() - - lock.own() - asyncCheck testLock(0, lock) - asyncCheck testLock(1, lock) - asyncCheck testLock(2, lock) - asyncCheck testLock(3, lock) - asyncCheck testLock(4, lock) - asyncCheck testLock(5, lock) - asyncCheck testLock(6, lock) - asyncCheck testLock(7, lock) - asyncCheck testLock(8, lock) - asyncCheck testLock(9, lock) - lock.release() - poll() - doAssert(test == "0123456789") - - # Events test - block: - var test = "" - var event = newAsyncEvent() - - proc testEvent(n: int, ev: AsyncEvent) {.async.} = - await ev.wait() - test = test & $n - - event.clear() - asyncCheck testEvent(0, event) - asyncCheck testEvent(1, event) - asyncCheck testEvent(2, event) - asyncCheck testEvent(3, event) - asyncCheck testEvent(4, event) - asyncCheck testEvent(5, event) - asyncCheck testEvent(6, event) - asyncCheck testEvent(7, event) - asyncCheck testEvent(8, event) - asyncCheck testEvent(9, event) - event.fire() - poll() - doAssert(test == "0123456789") - - # Queues test - block: - const queueSize = 10 - const testsCount = 1000 - var test = 0 - - proc task1(aq: AsyncQueue[int]) {.async.} = - for i in 1..(testsCount - 1): - var item = await aq.get() - test -= item - - proc task2(aq: AsyncQueue[int]) {.async.} = - for i in 1..testsCount: - await aq.put(i) - test += i - - var queue = newAsyncQueue[int](queueSize) - discard task1(queue) or task2(queue) - poll() - doAssert(test == testsCount) diff --git a/tests/testsync.nim b/tests/testsync.nim new file mode 100644 index 00000000..d80bacc9 --- /dev/null +++ b/tests/testsync.nim @@ -0,0 +1,109 @@ +# Asyncdispatch2 Test Suite +# (c) Copyright 2018 +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) + +import strutils, net, unittest, os +import ../asyncdispatch2 + +var testLockResult = "" +var testEventResult = "" +var testQueue1Result = 0 +var testQueue2Result = 0 + +proc testLock(n: int, lock: AsyncLock) {.async.} = + await lock.acquire() + testLockResult = testLockResult & $n + lock.release() + +proc test1(): string = + var lock = newAsyncLock() + lock.own() + discard testLock(0, lock) + discard testLock(1, lock) + discard testLock(2, lock) + discard testLock(3, lock) + discard testLock(4, lock) + discard testLock(5, lock) + discard testLock(6, lock) + discard testLock(7, lock) + discard testLock(8, lock) + discard testLock(9, lock) + lock.release() + ## There must be exactly 20 poll() calls + for i in 0..<20: + poll() + result = testLockResult + +proc testEvent(n: int, ev: AsyncEvent) {.async.} = + await ev.wait() + testEventResult = testEventResult & $n + +proc test2(): string = + var event = newAsyncEvent() + event.clear() + discard testEvent(0, event) + discard testEvent(1, event) + discard testEvent(2, event) + discard testEvent(3, event) + discard testEvent(4, event) + discard testEvent(5, event) + discard testEvent(6, event) + discard testEvent(7, event) + discard testEvent(8, event) + discard testEvent(9, event) + event.fire() + ## There must be exactly 2 poll() calls + poll() + poll() + result = testEventResult + +proc task1(aq: AsyncQueue[int]) {.async.} = + var item1 = await aq.get() + var item2 = await aq.get() + testQueue1Result = item1 + item2 + +proc task2(aq: AsyncQueue[int]) {.async.} = + await aq.put(1000) + await aq.put(2000) + +proc test3(): int = + var queue = newAsyncQueue[int](1) + discard task1(queue) + discard task2(queue) + ## There must be exactly 2 poll() calls + poll() + poll() + result = testQueue1Result + +const testsCount = 1000 +const queueSize = 10 + +proc task3(aq: AsyncQueue[int]) {.async.} = + for i in 1..testsCount: + var item = await aq.get() + testQueue2Result -= item + +proc task4(aq: AsyncQueue[int]) {.async.} = + for i in 1..testsCount: + await aq.put(i) + testQueue2Result += i + +proc test4(): int = + var queue = newAsyncQueue[int](queueSize) + waitFor(task3(queue) and task4(queue)) + result = testQueue2Result + +when isMainModule: + suite "Asynchronous sync primitives test suite": + test "AsyncLock() behavior test": + check test1() == "0123456789" + test "AsyncEvent() behavior test": + check test2() == "0123456789" + test "AsyncQueue() behavior test": + check test3() == 3000 + test "AsyncQueue() many iterations test": + check test4() == 0