mirror of
https://github.com/status-im/nim-chronos.git
synced 2025-01-23 09:48:54 +00:00
Optimize asyncsync primitives
This commit is contained in:
parent
c1159286e6
commit
c220b05a4c
@ -10,7 +10,12 @@ skipDirs = @["tests", "Nim", "nim"]
|
|||||||
requires "nim > 0.18.0"
|
requires "nim > 0.18.0"
|
||||||
|
|
||||||
task test, "Run all tests":
|
task test, "Run all tests":
|
||||||
|
exec "nim c -r -d:useSysAssert -d:useGcAssert tests/testsync"
|
||||||
|
exec "nim c -r tests/testsync"
|
||||||
|
exec "nim c -r -d:release tests/testsync"
|
||||||
|
|
||||||
exec "nim c -r -d:useSysAssert -d:useGcAssert tests/testdatagram"
|
exec "nim c -r -d:useSysAssert -d:useGcAssert tests/testdatagram"
|
||||||
exec "nim c -r -d:useSysAssert -d:useGcAssert tests/teststream"
|
exec "nim c -r -d:useSysAssert -d:useGcAssert tests/teststream"
|
||||||
|
|
||||||
exec "nim c -r -d:release tests/testdatagram"
|
exec "nim c -r -d:release tests/testdatagram"
|
||||||
exec "nim c -r -d:release tests/teststream"
|
exec "nim c -r -d:release tests/teststream"
|
||||||
|
@ -110,14 +110,14 @@ proc release*(lock: AsyncLock) =
|
|||||||
## other coroutines are blocked waiting for the lock to become unlocked,
|
## other coroutines are blocked waiting for the lock to become unlocked,
|
||||||
## allow exactly one of them to proceed.
|
## allow exactly one of them to proceed.
|
||||||
var w: Future[void]
|
var w: Future[void]
|
||||||
proc wakeup(udata: pointer) {.gcsafe.} = w.complete()
|
# proc wakeup(udata: pointer) {.gcsafe.} = w.complete()
|
||||||
|
|
||||||
if lock.locked:
|
if lock.locked:
|
||||||
lock.locked = false
|
lock.locked = false
|
||||||
while len(lock.waiters) > 0:
|
while len(lock.waiters) > 0:
|
||||||
w = lock.waiters.popFirst()
|
w = lock.waiters.popFirst()
|
||||||
if not w.finished:
|
if not w.finished:
|
||||||
callSoon(wakeup)
|
w.complete()
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
raise newException(AsyncLockError, "AsyncLock is not acquired!")
|
raise newException(AsyncLockError, "AsyncLock is not acquired!")
|
||||||
@ -153,16 +153,13 @@ proc fire*(event: AsyncEvent) =
|
|||||||
## Set the internal flag of ``event`` to `true`. All tasks waiting for it
|
## Set the internal flag of ``event`` to `true`. All tasks waiting for it
|
||||||
## to become `true` are awakened. Task that call `wait()` once the flag is
|
## to become `true` are awakened. Task that call `wait()` once the flag is
|
||||||
## `true` will not block at all.
|
## `true` will not block at all.
|
||||||
proc wakeupAll(udata: pointer) {.gcsafe.} =
|
var w: Future[void]
|
||||||
if len(event.waiters) > 0:
|
|
||||||
var w = event.waiters.popFirst()
|
|
||||||
if not w.finished:
|
|
||||||
w.complete()
|
|
||||||
callSoon(wakeupAll)
|
|
||||||
|
|
||||||
if not event.flag:
|
if not event.flag:
|
||||||
event.flag = true
|
event.flag = true
|
||||||
callSoon(wakeupAll)
|
while len(event.waiters) > 0:
|
||||||
|
w = event.waiters.popFirst()
|
||||||
|
if not w.finished:
|
||||||
|
w.complete()
|
||||||
|
|
||||||
proc clear*(event: AsyncEvent) =
|
proc clear*(event: AsyncEvent) =
|
||||||
## Reset the internal flag of ``event`` to `false`. Subsequently, tasks
|
## Reset the internal flag of ``event`` to `false`. Subsequently, tasks
|
||||||
@ -205,23 +202,19 @@ proc putNoWait*[T](aq: AsyncQueue[T], item: T) =
|
|||||||
##
|
##
|
||||||
## If queue ``aq`` is full, then ``AsyncQueueFullError`` exception raised
|
## If queue ``aq`` is full, then ``AsyncQueueFullError`` exception raised
|
||||||
var w: Future[void]
|
var w: Future[void]
|
||||||
proc wakeup(udata: pointer) {.gcsafe.} = w.complete()
|
|
||||||
|
|
||||||
if aq.full():
|
if aq.full():
|
||||||
raise newException(AsyncQueueFullError, "AsyncQueue is full!")
|
raise newException(AsyncQueueFullError, "AsyncQueue is full!")
|
||||||
aq.queue.addLast(item)
|
aq.queue.addLast(item)
|
||||||
|
|
||||||
while len(aq.getters) > 0:
|
while len(aq.getters) > 0:
|
||||||
w = aq.getters.popFirst()
|
w = aq.getters.popFirst()
|
||||||
if not w.finished:
|
if not w.finished:
|
||||||
callSoon(wakeup)
|
w.complete()
|
||||||
|
|
||||||
proc getNoWait*[T](aq: AsyncQueue[T]): T =
|
proc getNoWait*[T](aq: AsyncQueue[T]): T =
|
||||||
## Remove and return ``item`` from the queue immediately.
|
## Remove and return ``item`` from the queue immediately.
|
||||||
##
|
##
|
||||||
## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised.
|
## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised.
|
||||||
var w: Future[void]
|
var w: Future[void]
|
||||||
proc wakeup(udata: pointer) {.gcsafe.} = w.complete()
|
|
||||||
|
|
||||||
if aq.empty():
|
if aq.empty():
|
||||||
raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!")
|
raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!")
|
||||||
@ -229,7 +222,7 @@ proc getNoWait*[T](aq: AsyncQueue[T]): T =
|
|||||||
while len(aq.putters) > 0:
|
while len(aq.putters) > 0:
|
||||||
w = aq.putters.popFirst()
|
w = aq.putters.popFirst()
|
||||||
if not w.finished:
|
if not w.finished:
|
||||||
callSoon(wakeup)
|
w.complete()
|
||||||
|
|
||||||
proc put*[T](aq: AsyncQueue[T], item: T) {.async.} =
|
proc put*[T](aq: AsyncQueue[T], item: T) {.async.} =
|
||||||
## Put an ``item`` into the queue ``aq``. If the queue is full, wait until
|
## Put an ``item`` into the queue ``aq``. If the queue is full, wait until
|
||||||
@ -257,74 +250,3 @@ proc len*[T](aq: AsyncQueue[T]): int {.inline.} =
|
|||||||
proc size*[T](aq: AsyncQueue[T]): int {.inline.} =
|
proc size*[T](aq: AsyncQueue[T]): int {.inline.} =
|
||||||
## Return the maximum number of elements in ``aq``.
|
## Return the maximum number of elements in ``aq``.
|
||||||
result = len(aq.maxsize)
|
result = len(aq.maxsize)
|
||||||
|
|
||||||
when isMainModule:
|
|
||||||
# Locks test
|
|
||||||
block:
|
|
||||||
var test = ""
|
|
||||||
var lock = newAsyncLock()
|
|
||||||
|
|
||||||
proc testLock(n: int, lock: AsyncLock) {.async.} =
|
|
||||||
await lock.acquire()
|
|
||||||
test = test & $n
|
|
||||||
lock.release()
|
|
||||||
|
|
||||||
lock.own()
|
|
||||||
asyncCheck testLock(0, lock)
|
|
||||||
asyncCheck testLock(1, lock)
|
|
||||||
asyncCheck testLock(2, lock)
|
|
||||||
asyncCheck testLock(3, lock)
|
|
||||||
asyncCheck testLock(4, lock)
|
|
||||||
asyncCheck testLock(5, lock)
|
|
||||||
asyncCheck testLock(6, lock)
|
|
||||||
asyncCheck testLock(7, lock)
|
|
||||||
asyncCheck testLock(8, lock)
|
|
||||||
asyncCheck testLock(9, lock)
|
|
||||||
lock.release()
|
|
||||||
poll()
|
|
||||||
doAssert(test == "0123456789")
|
|
||||||
|
|
||||||
# Events test
|
|
||||||
block:
|
|
||||||
var test = ""
|
|
||||||
var event = newAsyncEvent()
|
|
||||||
|
|
||||||
proc testEvent(n: int, ev: AsyncEvent) {.async.} =
|
|
||||||
await ev.wait()
|
|
||||||
test = test & $n
|
|
||||||
|
|
||||||
event.clear()
|
|
||||||
asyncCheck testEvent(0, event)
|
|
||||||
asyncCheck testEvent(1, event)
|
|
||||||
asyncCheck testEvent(2, event)
|
|
||||||
asyncCheck testEvent(3, event)
|
|
||||||
asyncCheck testEvent(4, event)
|
|
||||||
asyncCheck testEvent(5, event)
|
|
||||||
asyncCheck testEvent(6, event)
|
|
||||||
asyncCheck testEvent(7, event)
|
|
||||||
asyncCheck testEvent(8, event)
|
|
||||||
asyncCheck testEvent(9, event)
|
|
||||||
event.fire()
|
|
||||||
poll()
|
|
||||||
doAssert(test == "0123456789")
|
|
||||||
|
|
||||||
# Queues test
|
|
||||||
block:
|
|
||||||
const queueSize = 10
|
|
||||||
const testsCount = 1000
|
|
||||||
var test = 0
|
|
||||||
|
|
||||||
proc task1(aq: AsyncQueue[int]) {.async.} =
|
|
||||||
for i in 1..(testsCount - 1):
|
|
||||||
var item = await aq.get()
|
|
||||||
test -= item
|
|
||||||
|
|
||||||
proc task2(aq: AsyncQueue[int]) {.async.} =
|
|
||||||
for i in 1..testsCount:
|
|
||||||
await aq.put(i)
|
|
||||||
test += i
|
|
||||||
|
|
||||||
var queue = newAsyncQueue[int](queueSize)
|
|
||||||
discard task1(queue) or task2(queue)
|
|
||||||
poll()
|
|
||||||
doAssert(test == testsCount)
|
|
||||||
|
109
tests/testsync.nim
Normal file
109
tests/testsync.nim
Normal file
@ -0,0 +1,109 @@
|
|||||||
|
# Asyncdispatch2 Test Suite
|
||||||
|
# (c) Copyright 2018
|
||||||
|
# Status Research & Development GmbH
|
||||||
|
#
|
||||||
|
# Licensed under either of
|
||||||
|
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||||
|
# MIT license (LICENSE-MIT)
|
||||||
|
|
||||||
|
import strutils, net, unittest, os
|
||||||
|
import ../asyncdispatch2
|
||||||
|
|
||||||
|
var testLockResult = ""
|
||||||
|
var testEventResult = ""
|
||||||
|
var testQueue1Result = 0
|
||||||
|
var testQueue2Result = 0
|
||||||
|
|
||||||
|
proc testLock(n: int, lock: AsyncLock) {.async.} =
|
||||||
|
await lock.acquire()
|
||||||
|
testLockResult = testLockResult & $n
|
||||||
|
lock.release()
|
||||||
|
|
||||||
|
proc test1(): string =
|
||||||
|
var lock = newAsyncLock()
|
||||||
|
lock.own()
|
||||||
|
discard testLock(0, lock)
|
||||||
|
discard testLock(1, lock)
|
||||||
|
discard testLock(2, lock)
|
||||||
|
discard testLock(3, lock)
|
||||||
|
discard testLock(4, lock)
|
||||||
|
discard testLock(5, lock)
|
||||||
|
discard testLock(6, lock)
|
||||||
|
discard testLock(7, lock)
|
||||||
|
discard testLock(8, lock)
|
||||||
|
discard testLock(9, lock)
|
||||||
|
lock.release()
|
||||||
|
## There must be exactly 20 poll() calls
|
||||||
|
for i in 0..<20:
|
||||||
|
poll()
|
||||||
|
result = testLockResult
|
||||||
|
|
||||||
|
proc testEvent(n: int, ev: AsyncEvent) {.async.} =
|
||||||
|
await ev.wait()
|
||||||
|
testEventResult = testEventResult & $n
|
||||||
|
|
||||||
|
proc test2(): string =
|
||||||
|
var event = newAsyncEvent()
|
||||||
|
event.clear()
|
||||||
|
discard testEvent(0, event)
|
||||||
|
discard testEvent(1, event)
|
||||||
|
discard testEvent(2, event)
|
||||||
|
discard testEvent(3, event)
|
||||||
|
discard testEvent(4, event)
|
||||||
|
discard testEvent(5, event)
|
||||||
|
discard testEvent(6, event)
|
||||||
|
discard testEvent(7, event)
|
||||||
|
discard testEvent(8, event)
|
||||||
|
discard testEvent(9, event)
|
||||||
|
event.fire()
|
||||||
|
## There must be exactly 2 poll() calls
|
||||||
|
poll()
|
||||||
|
poll()
|
||||||
|
result = testEventResult
|
||||||
|
|
||||||
|
proc task1(aq: AsyncQueue[int]) {.async.} =
|
||||||
|
var item1 = await aq.get()
|
||||||
|
var item2 = await aq.get()
|
||||||
|
testQueue1Result = item1 + item2
|
||||||
|
|
||||||
|
proc task2(aq: AsyncQueue[int]) {.async.} =
|
||||||
|
await aq.put(1000)
|
||||||
|
await aq.put(2000)
|
||||||
|
|
||||||
|
proc test3(): int =
|
||||||
|
var queue = newAsyncQueue[int](1)
|
||||||
|
discard task1(queue)
|
||||||
|
discard task2(queue)
|
||||||
|
## There must be exactly 2 poll() calls
|
||||||
|
poll()
|
||||||
|
poll()
|
||||||
|
result = testQueue1Result
|
||||||
|
|
||||||
|
const testsCount = 1000
|
||||||
|
const queueSize = 10
|
||||||
|
|
||||||
|
proc task3(aq: AsyncQueue[int]) {.async.} =
|
||||||
|
for i in 1..testsCount:
|
||||||
|
var item = await aq.get()
|
||||||
|
testQueue2Result -= item
|
||||||
|
|
||||||
|
proc task4(aq: AsyncQueue[int]) {.async.} =
|
||||||
|
for i in 1..testsCount:
|
||||||
|
await aq.put(i)
|
||||||
|
testQueue2Result += i
|
||||||
|
|
||||||
|
proc test4(): int =
|
||||||
|
var queue = newAsyncQueue[int](queueSize)
|
||||||
|
waitFor(task3(queue) and task4(queue))
|
||||||
|
result = testQueue2Result
|
||||||
|
|
||||||
|
when isMainModule:
|
||||||
|
suite "Asynchronous sync primitives test suite":
|
||||||
|
test "AsyncLock() behavior test":
|
||||||
|
check test1() == "0123456789"
|
||||||
|
test "AsyncEvent() behavior test":
|
||||||
|
check test2() == "0123456789"
|
||||||
|
test "AsyncQueue() behavior test":
|
||||||
|
check test3() == 3000
|
||||||
|
test "AsyncQueue() many iterations test":
|
||||||
|
check test4() == 0
|
Loading…
x
Reference in New Issue
Block a user