From b5915ecd290c66cf06e04b2f6a451c39b3245e53 Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Fri, 13 Nov 2020 14:22:58 +0200 Subject: [PATCH] Small fixes (#134) * Fix cancellation behavior. Fix some compilation warnings. * Fix cancelAndWait() and add proper documentation. * Add completed(Future) and done(Future) calls to check if Future[T] completed without an error. * Add stepsAsync() and tests. * Fix comments. * Fix new primitive comment, to avoid usage for task switches. --- chronos/asyncfutures2.nim | 101 ++++++++++++++++++++++++-------------- chronos/asyncloop.nim | 38 +++++++++++++- tests/testasyncstream.nim | 2 +- tests/testserver.nim | 2 +- tests/testtime.nim | 26 +++++++++- 5 files changed, 128 insertions(+), 41 deletions(-) diff --git a/chronos/asyncfutures2.nim b/chronos/asyncfutures2.nim index 84860c38..b892c792 100644 --- a/chronos/asyncfutures2.nim +++ b/chronos/asyncfutures2.nim @@ -146,9 +146,9 @@ proc clean*[T](future: FutureVar[T]) = Future[T](future).error = nil proc finished*(future: FutureBase | FutureVar): bool {.inline.} = - ## Determines whether ``future`` has completed. - ## - ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish. + ## Determines whether ``future`` has completed, i.e. ``future`` state changed + ## from state ``Pending`` to one of the states (``Finished``, ``Cancelled``, + ## ``Failed``). when future is FutureVar: result = (FutureBase(future).state != FutureState.Pending) else: @@ -156,11 +156,19 @@ proc finished*(future: FutureBase | FutureVar): bool {.inline.} = proc cancelled*(future: FutureBase): bool {.inline.} = ## Determines whether ``future`` has cancelled. - result = (future.state == FutureState.Cancelled) + (future.state == FutureState.Cancelled) proc failed*(future: FutureBase): bool {.inline.} = ## Determines whether ``future`` completed with an error. - result = (future.state == FutureState.Failed) + (future.state == FutureState.Failed) + +proc completed*(future: FutureBase): bool {.inline.} = + ## Determines whether ``future`` completed without an error. + (future.state == FutureState.Finished) + +proc done*(future: FutureBase): bool {.inline.} = + ## This is an alias for ``completed(future)`` procedure. + completed(future) when defined(chronosFutureTracking): proc futureDestructor(udata: pointer) {.gcsafe.} = @@ -292,24 +300,39 @@ proc cancelAndSchedule(future: FutureBase, loc: ptr SrcLoc) = template cancelAndSchedule*[T](future: Future[T]) = cancelAndSchedule(FutureBase(future), getSrcLocation()) -proc cancel(future: FutureBase, loc: ptr SrcLoc) = - if not(future.finished()): - # Cancel the bottom-most child. When that happens, its parent's `await` call - # will raise CancelledError. Some macro will catch that and call - # `cancelAndSchedule()` on that parent, thus propagating the cancellation - # up the chain. - if not(isNil(future.child)): - cancel(future.child, getSrcLocation()) - future.mustCancel = true - else: - if not(isNil(future.cancelcb)): - future.cancelcb(cast[pointer](future)) - future.cancelcb = nil - cancelAndSchedule(future, getSrcLocation()) +proc cancel(future: FutureBase, loc: ptr SrcLoc): bool = + ## Request that Future ``future`` cancel itself. + ## + ## This arranges for a `CancelledError` to be thrown into procedure which + ## waits for ``future`` on the next cycle through the event loop. + ## The procedure then has a chance to clean up or even deny the request + ## using `try/except/finally`. + ## + ## This call do not guarantee that the ``future`` will be cancelled: the + ## exception might be caught and acted upon, delaying cancellation of the + ## ``future`` or preventing cancellation completely. The ``future`` may also + ## return value or raise different exception. + ## + ## Immediately after this procedure is called, ``future.cancelled()`` will + ## not return ``true`` (unless the Future was already cancelled). + if future.finished(): + return false + + if not(isNil(future.child)): + if cancel(future.child, getSrcLocation()): + return true + else: + if not(isNil(future.cancelcb)): + future.cancelcb(cast[pointer](future)) + future.cancelcb = nil + cancelAndSchedule(future, getSrcLocation()) + + future.mustCancel = true + return true template cancel*[T](future: Future[T]) = ## Cancel ``future``. - cancel(FutureBase(future), getSrcLocation()) + discard cancel(FutureBase(future), getSrcLocation()) proc clearCallbacks(future: FutureBase) = future.callbacks = default(seq[AsyncCallback]) @@ -566,8 +589,10 @@ proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] {. proc cancellation(udata: pointer) {.gcsafe.} = # On cancel we remove all our callbacks only. - fut1.removeCallback(cb) - fut2.removeCallback(cb) + if not(fut1.finished()): + fut1.removeCallback(cb) + if not(fut2.finished()): + fut2.removeCallback(cb) retFuture.cancelCallback = cancellation return retFuture @@ -600,8 +625,10 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = proc cancellation(udata: pointer) {.gcsafe.} = # On cancel we remove all our callbacks only. - fut1.removeCallback(cb) - fut2.removeCallback(cb) + if not(fut1.finished()): + fut1.removeCallback(cb) + if not(fut2.finished()): + fut2.removeCallback(cb) if fut1.finished(): if fut1.failed(): @@ -761,22 +788,24 @@ proc oneValue*[T](futs: varargs[Future[T]]): Future[T] {. return retFuture proc cancelAndWait*[T](fut: Future[T]): Future[void] = - ## Cancel ``fut`` and wait until it completes, in case it already - ## ``await``s on another Future. - - # When `retFuture` completes, `fut` and all its children have been - # cancelled. If `fut` doesn't have any children, the `continuation()` callback - # runs immediately, without control getting back to the dispatcher. + ## Initiate cancellation process for Future ``fut`` and wait until ``fut`` is + ## done e.g. changes its state (become completed, failed or cancelled). + ## + ## If ``fut`` is already finished (completed, failed or cancelled) result + ## Future[void] object will be returned complete. var retFuture = newFuture[void]("chronos.cancelAndWait(T)") proc continuation(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): retFuture.complete() - fut.addCallback(continuation) - - # Start the cancellation process. If `fut` has children, multiple event loop - # steps will be needed for it to complete. - fut.cancel() - + proc cancellation(udata: pointer) {.gcsafe.} = + if not(fut.finished()): + fut.removeCallback(continuation) + if fut.finished(): + retFuture.complete() + else: + fut.addCallback(continuation) + # Initiate cancellation process. + fut.cancel() return retFuture proc allFutures*[T](futs: varargs[Future[T]]): Future[void] = diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index a1e9f946..41d495c3 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -790,10 +790,12 @@ proc sleepAsync*(duration: Duration): Future[void] = var timer: TimerCallback proc completion(data: pointer) {.gcsafe.} = - retFuture.complete() + if not(retFuture.finished()): + retFuture.complete() proc cancellation(udata: pointer) {.gcsafe.} = - clearTimer(timer) + if not(retFuture.finished()): + clearTimer(timer) retFuture.cancelCallback = cancellation timer = setTimer(moment, completion, cast[pointer](retFuture)) @@ -803,6 +805,38 @@ proc sleepAsync*(ms: int): Future[void] {. inline, deprecated: "Use sleepAsync(Duration)".} = result = sleepAsync(ms.milliseconds()) +proc stepsAsync*(number: int): Future[void] = + ## Suspends the execution of the current async procedure for the next + ## ``number`` of asynchronous steps (``poll()`` calls). + ## + ## This primitive can be useful when you need to create more deterministic + ## tests and cases. + ## + ## WARNING! Do not use this primitive to perform switch between tasks, because + ## this can lead to 100% CPU load in the moments when there are no I/O + ## events. Usually when there no I/O events CPU consumption should be near 0%. + var retFuture = newFuture[void]("chronos.stepsAsync(int)") + var counter = 0 + + proc continuation(data: pointer) {.gcsafe.} = + if not(retFuture.finished()): + inc(counter) + if counter < number: + callSoon(continuation, nil) + else: + retFuture.complete() + + proc cancellation(udata: pointer) {.gcsafe.} = + discard + + if number <= 0: + retFuture.complete() + else: + retFuture.cancelCallback = cancellation + callSoon(continuation, nil) + + retFuture + proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] = ## Returns a future which will complete once ``fut`` completes or after ## ``timeout`` milliseconds has elapsed. diff --git a/tests/testasyncstream.nim b/tests/testasyncstream.nim index ca8cff01..d2720f5c 100644 --- a/tests/testasyncstream.nim +++ b/tests/testasyncstream.nim @@ -5,7 +5,7 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import strutils, unittest +import unittest import ../chronos, ../chronos/streams/tlsstream when defined(nimHasUsed): {.used.} diff --git a/tests/testserver.nim b/tests/testserver.nim index a72346d3..d079648b 100644 --- a/tests/testserver.nim +++ b/tests/testserver.nim @@ -5,7 +5,7 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import strutils, unittest +import unittest import ../chronos when defined(nimHasUsed): {.used.} diff --git a/tests/testtime.nim b/tests/testtime.nim index 56649e3e..3a659e52 100644 --- a/tests/testtime.nim +++ b/tests/testtime.nim @@ -10,7 +10,7 @@ import ../chronos, ../chronos/timer when defined(nimHasUsed): {.used.} -suite "Asynchronous timers test suite": +suite "Asynchronous timers & steps test suite": const TimersCount = 10 proc timeWorker(time: Duration): Future[Duration] {.async.} = @@ -61,3 +61,27 @@ suite "Asynchronous timers test suite": test $TimersCount & " timers with 1000ms timeout": var res = waitFor(test(1000.milliseconds)) check (res >= 1000.milliseconds) and (res <= 5000.milliseconds) + test "Asynchronous steps test": + var futn1 = stepsAsync(-1) + var fut0 = stepsAsync(0) + var fut1 = stepsAsync(1) + var fut2 = stepsAsync(2) + var fut3 = stepsAsync(3) + check: + futn1.completed() == true + fut0.completed() == true + fut1.completed() == false + fut2.completed() == false + fut3.completed() == false + poll() + check: + fut1.completed() == true + fut2.completed() == false + fut3.completed() == false + poll() + check: + fut2.completed() == true + fut3.completed() == false + poll() + check: + fut3.completed() == true