small fixes (#127)

* small fixes

* more efficient codegen for nil check (much less code)
* release futures earlier in AsyncEvent
* release finished future earlier in AsyncQueue
* avoid searches for futures (deque variant unused / broken)
* avoid catching defects

* Fix AsyncEvent test, because of optimization.

* delete fix

* avoid seq allocs

* Keep style consistent with other code.
Refactor AsyncEvent and AsyncQueue to not use `result` keyword.

Co-authored-by: cheatfate <eugene.kabanov@status.im>
This commit is contained in:
Jacek Sieka 2020-09-10 10:39:10 +02:00 committed by GitHub
parent 57ebe84d17
commit 483054cda6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 58 additions and 82 deletions

View File

@ -42,9 +42,9 @@ template createCb(retFutureSym, iteratorNameSym,
if next == nil:
if not(retFutureSym.finished()):
let msg = "Async procedure ($1) yielded `nil`, " &
"are you await'ing a `nil` Future?"
raise newException(AssertionError, msg % strName)
const msg = "Async procedure (&" & strName & ") yielded `nil`, " &
"are you await'ing a `nil` Future?"
raiseAssert msg
else:
{.gcsafe.}:
next.addCallback(identName)

View File

@ -9,6 +9,7 @@
# MIT license (LICENSE-MIT)
## This module implements some core synchronization primitives
import std/sequtils
import asyncloop, deques
type
@ -141,26 +142,19 @@ proc newAsyncEvent*(): AsyncEvent =
# Workaround for callSoon() not worked correctly before
# getGlobalDispatcher() call.
discard getGlobalDispatcher()
result = new AsyncEvent
result.waiters = newSeq[Future[void]]()
result.flag = false
AsyncEvent(waiters: newSeq[Future[void]](), flag: false)
proc removeWaiter(event: AsyncEvent, waiter: Future[void]) {.inline.} =
## Removes ``waiter`` from list of waiters in ``lock``.
event.waiters.delete(event.waiters.find(waiter))
proc wait*(event: AsyncEvent) {.async.} =
proc wait*(event: AsyncEvent): Future[void] =
## Block until the internal flag of ``event`` is `true`.
## If the internal flag is `true` on entry, return immediately. Otherwise,
## block until another task calls `fire()` to set the flag to `true`,
## then return.
var w = newFuture[void]("AsyncEvent.wait")
if not(event.flag):
var w = newFuture[void]("AsyncEvent.wait")
event.waiters.add(w)
try:
await w
finally:
event.removeWaiter(w)
else:
w.complete()
w
proc fire*(event: AsyncEvent) =
## Set the internal flag of ``event`` to `true`. All tasks waiting for it
@ -169,8 +163,9 @@ proc fire*(event: AsyncEvent) =
if not(event.flag):
event.flag = true
for fut in event.waiters:
if not(fut.finished()):
if not(fut.finished()): # Could have been cancelled
fut.complete()
event.waiters.setLen(0)
proc clear*(event: AsyncEvent) =
## Reset the internal flag of ``event`` to `false`. Subsequently, tasks
@ -180,7 +175,7 @@ proc clear*(event: AsyncEvent) =
proc isSet*(event: AsyncEvent): bool =
## Return `true` if and only if the internal flag of ``event`` is `true`.
result = event.flag
event.flag
proc newAsyncQueue*[T](maxsize: int = 0): AsyncQueue[T] =
## Creates a new asynchronous queue ``AsyncQueue``.
@ -188,42 +183,25 @@ proc newAsyncQueue*[T](maxsize: int = 0): AsyncQueue[T] =
# Workaround for callSoon() not worked correctly before
# getGlobalDispatcher() call.
discard getGlobalDispatcher()
result = new AsyncQueue[T]
result.getters = newSeq[Future[void]]()
result.putters = newSeq[Future[void]]()
result.queue = initDeque[T]()
result.maxsize = maxsize
AsyncQueue[T](
getters: newSeq[Future[void]](),
putters: newSeq[Future[void]](),
queue: initDeque[T](),
maxsize: maxsize
)
proc wakeupNext(waiters: var seq[Future[void]]) {.inline.} =
var i = 0
while i < len(waiters):
var waiter = waiters[i]
if not(waiter.finished()):
let length = len(waiters) - (i + 1)
let offset = len(waiters) - length
if length > 0:
for k in 0..<length:
shallowCopy(waiters[k], waiters[k + offset])
waiters.setLen(length)
waiter.complete()
break
inc(i)
proc removeWaiter(waiters: var seq[Future[void]],
waiter: Future[void]) {.inline.} =
## Safely remove ``waiter`` from list of waiters in ``waiters``. This
## procedure will not raise if ``waiter`` is not in the list of waiters.
var index = waiters.find(waiter)
if index >= 0:
waiters.delete(index)
if not(waiter.finished()):
waiter.complete()
break
proc removeWaiter(waiters: var Deque[Future[void]],
fut: Future[void]) {.inline.} =
var nwaiters = initDeque[Future[void]]()
while len(waiters) > 0:
var waiter = waiters.popFirst()
if waiter != fut:
nwaiters.addFirst(waiter)
if i > 0:
waiters.delete(0, i - 1)
proc full*[T](aq: AsyncQueue[T]): bool {.inline.} =
## Return ``true`` if there are ``maxsize`` items in the queue.
@ -231,13 +209,13 @@ proc full*[T](aq: AsyncQueue[T]): bool {.inline.} =
## Note: If the ``aq`` was initialized with ``maxsize = 0`` (default),
## then ``full()`` is never ``true``.
if aq.maxsize <= 0:
result = false
false
else:
result = (len(aq.queue) >= aq.maxsize)
(len(aq.queue) >= aq.maxsize)
proc empty*[T](aq: AsyncQueue[T]): bool {.inline.} =
## Return ``true`` if the queue is empty, ``false`` otherwise.
result = (len(aq.queue) == 0)
(len(aq.queue) == 0)
proc addFirstNoWait*[T](aq: AsyncQueue[T], item: T) =
## Put an item ``item`` to the beginning of the queue ``aq`` immediately.
@ -263,8 +241,9 @@ proc popFirstNoWait*[T](aq: AsyncQueue[T]): T =
## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised.
if aq.empty():
raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!")
result = aq.queue.popFirst()
let res = aq.queue.popFirst()
aq.putters.wakeupNext()
res
proc popLastNoWait*[T](aq: AsyncQueue[T]): T =
## Get an item from the end of the queue ``aq`` immediately.
@ -272,8 +251,9 @@ proc popLastNoWait*[T](aq: AsyncQueue[T]): T =
## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised.
if aq.empty():
raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!")
result = aq.queue.popLast()
let res = aq.queue.popLast()
aq.putters.wakeupNext()
res
proc addFirst*[T](aq: AsyncQueue[T], item: T) {.async.} =
## Put an ``item`` to the beginning of the queue ``aq``. If the queue is full,
@ -283,11 +263,10 @@ proc addFirst*[T](aq: AsyncQueue[T], item: T) {.async.} =
aq.putters.add(putter)
try:
await putter
except:
aq.putters.removeWaiter(putter)
if not aq.full() and not(putter.cancelled()):
except CatchableError as exc:
if not(aq.full()) and not(putter.cancelled()):
aq.putters.wakeupNext()
raise
raise exc
aq.addFirstNoWait(item)
proc addLast*[T](aq: AsyncQueue[T], item: T) {.async.} =
@ -298,11 +277,10 @@ proc addLast*[T](aq: AsyncQueue[T], item: T) {.async.} =
aq.putters.add(putter)
try:
await putter
except:
aq.putters.removeWaiter(putter)
if not aq.full() and not(putter.cancelled()):
except CatchableError as exc:
if not(aq.full()) and not(putter.cancelled()):
aq.putters.wakeupNext()
raise
raise exc
aq.addLastNoWait(item)
proc popFirst*[T](aq: AsyncQueue[T]): Future[T] {.async.} =
@ -313,12 +291,11 @@ proc popFirst*[T](aq: AsyncQueue[T]): Future[T] {.async.} =
aq.getters.add(getter)
try:
await getter
except:
aq.getters.removeWaiter(getter)
except CatchableError as exc:
if not(aq.empty()) and not(getter.cancelled()):
aq.getters.wakeupNext()
raise
result = aq.popFirstNoWait()
raise exc
return aq.popFirstNoWait()
proc popLast*[T](aq: AsyncQueue[T]): Future[T] {.async.} =
## Remove and return an ``item`` from the end of the queue ``aq``.
@ -328,12 +305,11 @@ proc popLast*[T](aq: AsyncQueue[T]): Future[T] {.async.} =
aq.getters.add(getter)
try:
await getter
except:
aq.getters.removeWaiter(getter)
except CatchableError as exc:
if not(aq.empty()) and not(getter.cancelled()):
aq.getters.wakeupNext()
raise
result = aq.popLastNoWait()
raise exc
return aq.popLastNoWait()
proc putNoWait*[T](aq: AsyncQueue[T], item: T) {.inline.} =
## Alias of ``addLastNoWait()``.
@ -341,15 +317,15 @@ proc putNoWait*[T](aq: AsyncQueue[T], item: T) {.inline.} =
proc getNoWait*[T](aq: AsyncQueue[T]): T {.inline.} =
## Alias of ``popFirstNoWait()``.
result = aq.popFirstNoWait()
aq.popFirstNoWait()
proc put*[T](aq: AsyncQueue[T], item: T): Future[void] {.inline.} =
## Alias of ``addLast()``.
result = aq.addLast(item)
aq.addLast(item)
proc get*[T](aq: AsyncQueue[T]): Future[T] {.inline.} =
## Alias of ``popFirst()``.
result = aq.popFirst()
aq.popFirst()
proc clear*[T](aq: AsyncQueue[T]) {.inline.} =
## Clears all elements of queue ``aq``.
@ -357,21 +333,21 @@ proc clear*[T](aq: AsyncQueue[T]) {.inline.} =
proc len*[T](aq: AsyncQueue[T]): int {.inline.} =
## Return the number of elements in ``aq``.
result = len(aq.queue)
len(aq.queue)
proc size*[T](aq: AsyncQueue[T]): int {.inline.} =
## Return the maximum number of elements in ``aq``.
result = len(aq.maxsize)
len(aq.maxsize)
proc `[]`*[T](aq: AsyncQueue[T], i: Natural) : T {.inline.} =
## Access the i-th element of ``aq`` by order from first to last.
## ``aq[0]`` is the first element, ``aq[^1]`` is the last element.
result = aq.queue[i]
aq.queue[i]
proc `[]`*[T](aq: AsyncQueue[T], i: BackwardsIndex) : T {.inline.} =
## Access the i-th element of ``aq`` by order from first to last.
## ``aq[0]`` is the first element, ``aq[^1]`` is the last element.
result = aq.queue[len(aq.queue) - int(i)]
aq.queue[len(aq.queue) - int(i)]
proc `[]=`* [T](aq: AsyncQueue[T], i: Natural, item: T) {.inline.} =
## Change the i-th element of ``aq``.
@ -405,8 +381,9 @@ proc contains*[T](aq: AsyncQueue[T], item: T): bool {.inline.} =
proc `$`*[T](aq: AsyncQueue[T]): string =
## Turn an async queue ``aq`` into its string representation.
result = "["
var res = "["
for item in aq.queue.items():
if result.len > 1: result.add(", ")
result.addQuoted(item)
result.add("]")
if len(res) > 1: res.add(", ")
res.addQuoted(item)
res.add("]")
res

View File

@ -525,8 +525,8 @@ proc `+`*(address: TransportAddress, v: uint): TransportAddress =
a = a + v
result.address_v4[0..<4] = uint32(a).toBytesBE()
elif address.family == AddressFamily.IPv6:
var a1 = uint64.fromBytesBE(address.address_v6[0..<8])
var a2 = uint64.fromBytesBE(address.address_v6[8..<16])
var a1 = uint64.fromBytesBE(address.address_v6.toOpenArray(0, 7))
var a2 = uint64.fromBytesBE(address.address_v6.toOpenArray(8, 15))
var a3 = a2 + v
if a3 < a2:

View File

@ -105,8 +105,7 @@ suite "Asynchronous sync primitives test suite":
discard testEvent(8, event)
discard testEvent(9, event)
event.fire()
## There must be exactly 2 poll() calls
poll()
## There must be exactly 1 poll() call
poll()
result = testEventResult