From 483054cda6e8fd68d0af56edf5bdf59e9b7b3ce8 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Thu, 10 Sep 2020 10:39:10 +0200 Subject: [PATCH] small fixes (#127) * small fixes * more efficient codegen for nil check (much less code) * release futures earlier in AsyncEvent * release finished future earlier in AsyncQueue * avoid searches for futures (deque variant unused / broken) * avoid catching defects * Fix AsyncEvent test, because of optimization. * delete fix * avoid seq allocs * Keep style consistent with other code. Refactor AsyncEvent and AsyncQueue to not use `result` keyword. Co-authored-by: cheatfate --- chronos/asyncmacro2.nim | 6 +- chronos/asyncsync.nim | 127 ++++++++++++++--------------------- chronos/transports/ipnet.nim | 4 +- tests/testsync.nim | 3 +- 4 files changed, 58 insertions(+), 82 deletions(-) diff --git a/chronos/asyncmacro2.nim b/chronos/asyncmacro2.nim index 7792453..bc1b756 100644 --- a/chronos/asyncmacro2.nim +++ b/chronos/asyncmacro2.nim @@ -42,9 +42,9 @@ template createCb(retFutureSym, iteratorNameSym, if next == nil: if not(retFutureSym.finished()): - let msg = "Async procedure ($1) yielded `nil`, " & - "are you await'ing a `nil` Future?" - raise newException(AssertionError, msg % strName) + const msg = "Async procedure (&" & strName & ") yielded `nil`, " & + "are you await'ing a `nil` Future?" + raiseAssert msg else: {.gcsafe.}: next.addCallback(identName) diff --git a/chronos/asyncsync.nim b/chronos/asyncsync.nim index c526cc1..13d0f30 100644 --- a/chronos/asyncsync.nim +++ b/chronos/asyncsync.nim @@ -9,6 +9,7 @@ # MIT license (LICENSE-MIT) ## This module implements some core synchronization primitives +import std/sequtils import asyncloop, deques type @@ -141,26 +142,19 @@ proc newAsyncEvent*(): AsyncEvent = # Workaround for callSoon() not worked correctly before # getGlobalDispatcher() call. discard getGlobalDispatcher() - result = new AsyncEvent - result.waiters = newSeq[Future[void]]() - result.flag = false + AsyncEvent(waiters: newSeq[Future[void]](), flag: false) -proc removeWaiter(event: AsyncEvent, waiter: Future[void]) {.inline.} = - ## Removes ``waiter`` from list of waiters in ``lock``. - event.waiters.delete(event.waiters.find(waiter)) - -proc wait*(event: AsyncEvent) {.async.} = +proc wait*(event: AsyncEvent): Future[void] = ## Block until the internal flag of ``event`` is `true`. ## If the internal flag is `true` on entry, return immediately. Otherwise, ## block until another task calls `fire()` to set the flag to `true`, ## then return. + var w = newFuture[void]("AsyncEvent.wait") if not(event.flag): - var w = newFuture[void]("AsyncEvent.wait") event.waiters.add(w) - try: - await w - finally: - event.removeWaiter(w) + else: + w.complete() + w proc fire*(event: AsyncEvent) = ## Set the internal flag of ``event`` to `true`. All tasks waiting for it @@ -169,8 +163,9 @@ proc fire*(event: AsyncEvent) = if not(event.flag): event.flag = true for fut in event.waiters: - if not(fut.finished()): + if not(fut.finished()): # Could have been cancelled fut.complete() + event.waiters.setLen(0) proc clear*(event: AsyncEvent) = ## Reset the internal flag of ``event`` to `false`. Subsequently, tasks @@ -180,7 +175,7 @@ proc clear*(event: AsyncEvent) = proc isSet*(event: AsyncEvent): bool = ## Return `true` if and only if the internal flag of ``event`` is `true`. - result = event.flag + event.flag proc newAsyncQueue*[T](maxsize: int = 0): AsyncQueue[T] = ## Creates a new asynchronous queue ``AsyncQueue``. @@ -188,42 +183,25 @@ proc newAsyncQueue*[T](maxsize: int = 0): AsyncQueue[T] = # Workaround for callSoon() not worked correctly before # getGlobalDispatcher() call. discard getGlobalDispatcher() - result = new AsyncQueue[T] - result.getters = newSeq[Future[void]]() - result.putters = newSeq[Future[void]]() - result.queue = initDeque[T]() - result.maxsize = maxsize + AsyncQueue[T]( + getters: newSeq[Future[void]](), + putters: newSeq[Future[void]](), + queue: initDeque[T](), + maxsize: maxsize + ) proc wakeupNext(waiters: var seq[Future[void]]) {.inline.} = var i = 0 while i < len(waiters): var waiter = waiters[i] - if not(waiter.finished()): - let length = len(waiters) - (i + 1) - let offset = len(waiters) - length - if length > 0: - for k in 0..= 0: - waiters.delete(index) + if not(waiter.finished()): + waiter.complete() + break -proc removeWaiter(waiters: var Deque[Future[void]], - fut: Future[void]) {.inline.} = - var nwaiters = initDeque[Future[void]]() - while len(waiters) > 0: - var waiter = waiters.popFirst() - if waiter != fut: - nwaiters.addFirst(waiter) + if i > 0: + waiters.delete(0, i - 1) proc full*[T](aq: AsyncQueue[T]): bool {.inline.} = ## Return ``true`` if there are ``maxsize`` items in the queue. @@ -231,13 +209,13 @@ proc full*[T](aq: AsyncQueue[T]): bool {.inline.} = ## Note: If the ``aq`` was initialized with ``maxsize = 0`` (default), ## then ``full()`` is never ``true``. if aq.maxsize <= 0: - result = false + false else: - result = (len(aq.queue) >= aq.maxsize) + (len(aq.queue) >= aq.maxsize) proc empty*[T](aq: AsyncQueue[T]): bool {.inline.} = ## Return ``true`` if the queue is empty, ``false`` otherwise. - result = (len(aq.queue) == 0) + (len(aq.queue) == 0) proc addFirstNoWait*[T](aq: AsyncQueue[T], item: T) = ## Put an item ``item`` to the beginning of the queue ``aq`` immediately. @@ -263,8 +241,9 @@ proc popFirstNoWait*[T](aq: AsyncQueue[T]): T = ## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised. if aq.empty(): raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!") - result = aq.queue.popFirst() + let res = aq.queue.popFirst() aq.putters.wakeupNext() + res proc popLastNoWait*[T](aq: AsyncQueue[T]): T = ## Get an item from the end of the queue ``aq`` immediately. @@ -272,8 +251,9 @@ proc popLastNoWait*[T](aq: AsyncQueue[T]): T = ## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised. if aq.empty(): raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!") - result = aq.queue.popLast() + let res = aq.queue.popLast() aq.putters.wakeupNext() + res proc addFirst*[T](aq: AsyncQueue[T], item: T) {.async.} = ## Put an ``item`` to the beginning of the queue ``aq``. If the queue is full, @@ -283,11 +263,10 @@ proc addFirst*[T](aq: AsyncQueue[T], item: T) {.async.} = aq.putters.add(putter) try: await putter - except: - aq.putters.removeWaiter(putter) - if not aq.full() and not(putter.cancelled()): + except CatchableError as exc: + if not(aq.full()) and not(putter.cancelled()): aq.putters.wakeupNext() - raise + raise exc aq.addFirstNoWait(item) proc addLast*[T](aq: AsyncQueue[T], item: T) {.async.} = @@ -298,11 +277,10 @@ proc addLast*[T](aq: AsyncQueue[T], item: T) {.async.} = aq.putters.add(putter) try: await putter - except: - aq.putters.removeWaiter(putter) - if not aq.full() and not(putter.cancelled()): + except CatchableError as exc: + if not(aq.full()) and not(putter.cancelled()): aq.putters.wakeupNext() - raise + raise exc aq.addLastNoWait(item) proc popFirst*[T](aq: AsyncQueue[T]): Future[T] {.async.} = @@ -313,12 +291,11 @@ proc popFirst*[T](aq: AsyncQueue[T]): Future[T] {.async.} = aq.getters.add(getter) try: await getter - except: - aq.getters.removeWaiter(getter) + except CatchableError as exc: if not(aq.empty()) and not(getter.cancelled()): aq.getters.wakeupNext() - raise - result = aq.popFirstNoWait() + raise exc + return aq.popFirstNoWait() proc popLast*[T](aq: AsyncQueue[T]): Future[T] {.async.} = ## Remove and return an ``item`` from the end of the queue ``aq``. @@ -328,12 +305,11 @@ proc popLast*[T](aq: AsyncQueue[T]): Future[T] {.async.} = aq.getters.add(getter) try: await getter - except: - aq.getters.removeWaiter(getter) + except CatchableError as exc: if not(aq.empty()) and not(getter.cancelled()): aq.getters.wakeupNext() - raise - result = aq.popLastNoWait() + raise exc + return aq.popLastNoWait() proc putNoWait*[T](aq: AsyncQueue[T], item: T) {.inline.} = ## Alias of ``addLastNoWait()``. @@ -341,15 +317,15 @@ proc putNoWait*[T](aq: AsyncQueue[T], item: T) {.inline.} = proc getNoWait*[T](aq: AsyncQueue[T]): T {.inline.} = ## Alias of ``popFirstNoWait()``. - result = aq.popFirstNoWait() + aq.popFirstNoWait() proc put*[T](aq: AsyncQueue[T], item: T): Future[void] {.inline.} = ## Alias of ``addLast()``. - result = aq.addLast(item) + aq.addLast(item) proc get*[T](aq: AsyncQueue[T]): Future[T] {.inline.} = ## Alias of ``popFirst()``. - result = aq.popFirst() + aq.popFirst() proc clear*[T](aq: AsyncQueue[T]) {.inline.} = ## Clears all elements of queue ``aq``. @@ -357,21 +333,21 @@ proc clear*[T](aq: AsyncQueue[T]) {.inline.} = proc len*[T](aq: AsyncQueue[T]): int {.inline.} = ## Return the number of elements in ``aq``. - result = len(aq.queue) + len(aq.queue) proc size*[T](aq: AsyncQueue[T]): int {.inline.} = ## Return the maximum number of elements in ``aq``. - result = len(aq.maxsize) + len(aq.maxsize) proc `[]`*[T](aq: AsyncQueue[T], i: Natural) : T {.inline.} = ## Access the i-th element of ``aq`` by order from first to last. ## ``aq[0]`` is the first element, ``aq[^1]`` is the last element. - result = aq.queue[i] + aq.queue[i] proc `[]`*[T](aq: AsyncQueue[T], i: BackwardsIndex) : T {.inline.} = ## Access the i-th element of ``aq`` by order from first to last. ## ``aq[0]`` is the first element, ``aq[^1]`` is the last element. - result = aq.queue[len(aq.queue) - int(i)] + aq.queue[len(aq.queue) - int(i)] proc `[]=`* [T](aq: AsyncQueue[T], i: Natural, item: T) {.inline.} = ## Change the i-th element of ``aq``. @@ -405,8 +381,9 @@ proc contains*[T](aq: AsyncQueue[T], item: T): bool {.inline.} = proc `$`*[T](aq: AsyncQueue[T]): string = ## Turn an async queue ``aq`` into its string representation. - result = "[" + var res = "[" for item in aq.queue.items(): - if result.len > 1: result.add(", ") - result.addQuoted(item) - result.add("]") + if len(res) > 1: res.add(", ") + res.addQuoted(item) + res.add("]") + res diff --git a/chronos/transports/ipnet.nim b/chronos/transports/ipnet.nim index 67afc0c..615e42b 100644 --- a/chronos/transports/ipnet.nim +++ b/chronos/transports/ipnet.nim @@ -525,8 +525,8 @@ proc `+`*(address: TransportAddress, v: uint): TransportAddress = a = a + v result.address_v4[0..<4] = uint32(a).toBytesBE() elif address.family == AddressFamily.IPv6: - var a1 = uint64.fromBytesBE(address.address_v6[0..<8]) - var a2 = uint64.fromBytesBE(address.address_v6[8..<16]) + var a1 = uint64.fromBytesBE(address.address_v6.toOpenArray(0, 7)) + var a2 = uint64.fromBytesBE(address.address_v6.toOpenArray(8, 15)) var a3 = a2 + v if a3 < a2: diff --git a/tests/testsync.nim b/tests/testsync.nim index 3f70885..669f6ce 100644 --- a/tests/testsync.nim +++ b/tests/testsync.nim @@ -105,8 +105,7 @@ suite "Asynchronous sync primitives test suite": discard testEvent(8, event) discard testEvent(9, event) event.fire() - ## There must be exactly 2 poll() calls - poll() + ## There must be exactly 1 poll() call poll() result = testEventResult