Small fixes (#134)

* Fix cancellation behavior.
Fix some compilation warnings.

* Fix cancelAndWait() and add proper documentation.

* Add completed(Future) and done(Future) calls to check if Future[T] completed without an error.

* Add stepsAsync() and tests.

* Fix comments.

* Fix new primitive comment, to avoid usage for task switches.
This commit is contained in:
Eugene Kabanov 2020-11-13 14:22:58 +02:00 committed by GitHub
parent d3018ae908
commit b5915ecd29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 128 additions and 41 deletions

View File

@ -146,9 +146,9 @@ proc clean*[T](future: FutureVar[T]) =
Future[T](future).error = nil
proc finished*(future: FutureBase | FutureVar): bool {.inline.} =
## Determines whether ``future`` has completed.
##
## ``True`` may indicate an error or a value. Use ``failed`` to distinguish.
## Determines whether ``future`` has completed, i.e. ``future`` state changed
## from state ``Pending`` to one of the states (``Finished``, ``Cancelled``,
## ``Failed``).
when future is FutureVar:
result = (FutureBase(future).state != FutureState.Pending)
else:
@ -156,11 +156,19 @@ proc finished*(future: FutureBase | FutureVar): bool {.inline.} =
proc cancelled*(future: FutureBase): bool {.inline.} =
## Determines whether ``future`` has cancelled.
result = (future.state == FutureState.Cancelled)
(future.state == FutureState.Cancelled)
proc failed*(future: FutureBase): bool {.inline.} =
## Determines whether ``future`` completed with an error.
result = (future.state == FutureState.Failed)
(future.state == FutureState.Failed)
proc completed*(future: FutureBase): bool {.inline.} =
## Determines whether ``future`` completed without an error.
(future.state == FutureState.Finished)
proc done*(future: FutureBase): bool {.inline.} =
## This is an alias for ``completed(future)`` procedure.
completed(future)
when defined(chronosFutureTracking):
proc futureDestructor(udata: pointer) {.gcsafe.} =
@ -292,24 +300,39 @@ proc cancelAndSchedule(future: FutureBase, loc: ptr SrcLoc) =
template cancelAndSchedule*[T](future: Future[T]) =
cancelAndSchedule(FutureBase(future), getSrcLocation())
proc cancel(future: FutureBase, loc: ptr SrcLoc) =
if not(future.finished()):
# Cancel the bottom-most child. When that happens, its parent's `await` call
# will raise CancelledError. Some macro will catch that and call
# `cancelAndSchedule()` on that parent, thus propagating the cancellation
# up the chain.
if not(isNil(future.child)):
cancel(future.child, getSrcLocation())
future.mustCancel = true
else:
if not(isNil(future.cancelcb)):
future.cancelcb(cast[pointer](future))
future.cancelcb = nil
cancelAndSchedule(future, getSrcLocation())
proc cancel(future: FutureBase, loc: ptr SrcLoc): bool =
## Request that Future ``future`` cancel itself.
##
## This arranges for a `CancelledError` to be thrown into procedure which
## waits for ``future`` on the next cycle through the event loop.
## The procedure then has a chance to clean up or even deny the request
## using `try/except/finally`.
##
## This call do not guarantee that the ``future`` will be cancelled: the
## exception might be caught and acted upon, delaying cancellation of the
## ``future`` or preventing cancellation completely. The ``future`` may also
## return value or raise different exception.
##
## Immediately after this procedure is called, ``future.cancelled()`` will
## not return ``true`` (unless the Future was already cancelled).
if future.finished():
return false
if not(isNil(future.child)):
if cancel(future.child, getSrcLocation()):
return true
else:
if not(isNil(future.cancelcb)):
future.cancelcb(cast[pointer](future))
future.cancelcb = nil
cancelAndSchedule(future, getSrcLocation())
future.mustCancel = true
return true
template cancel*[T](future: Future[T]) =
## Cancel ``future``.
cancel(FutureBase(future), getSrcLocation())
discard cancel(FutureBase(future), getSrcLocation())
proc clearCallbacks(future: FutureBase) =
future.callbacks = default(seq[AsyncCallback])
@ -566,8 +589,10 @@ proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] {.
proc cancellation(udata: pointer) {.gcsafe.} =
# On cancel we remove all our callbacks only.
fut1.removeCallback(cb)
fut2.removeCallback(cb)
if not(fut1.finished()):
fut1.removeCallback(cb)
if not(fut2.finished()):
fut2.removeCallback(cb)
retFuture.cancelCallback = cancellation
return retFuture
@ -600,8 +625,10 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
proc cancellation(udata: pointer) {.gcsafe.} =
# On cancel we remove all our callbacks only.
fut1.removeCallback(cb)
fut2.removeCallback(cb)
if not(fut1.finished()):
fut1.removeCallback(cb)
if not(fut2.finished()):
fut2.removeCallback(cb)
if fut1.finished():
if fut1.failed():
@ -761,22 +788,24 @@ proc oneValue*[T](futs: varargs[Future[T]]): Future[T] {.
return retFuture
proc cancelAndWait*[T](fut: Future[T]): Future[void] =
## Cancel ``fut`` and wait until it completes, in case it already
## ``await``s on another Future.
# When `retFuture` completes, `fut` and all its children have been
# cancelled. If `fut` doesn't have any children, the `continuation()` callback
# runs immediately, without control getting back to the dispatcher.
## Initiate cancellation process for Future ``fut`` and wait until ``fut`` is
## done e.g. changes its state (become completed, failed or cancelled).
##
## If ``fut`` is already finished (completed, failed or cancelled) result
## Future[void] object will be returned complete.
var retFuture = newFuture[void]("chronos.cancelAndWait(T)")
proc continuation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
retFuture.complete()
fut.addCallback(continuation)
# Start the cancellation process. If `fut` has children, multiple event loop
# steps will be needed for it to complete.
fut.cancel()
proc cancellation(udata: pointer) {.gcsafe.} =
if not(fut.finished()):
fut.removeCallback(continuation)
if fut.finished():
retFuture.complete()
else:
fut.addCallback(continuation)
# Initiate cancellation process.
fut.cancel()
return retFuture
proc allFutures*[T](futs: varargs[Future[T]]): Future[void] =

View File

@ -790,10 +790,12 @@ proc sleepAsync*(duration: Duration): Future[void] =
var timer: TimerCallback
proc completion(data: pointer) {.gcsafe.} =
retFuture.complete()
if not(retFuture.finished()):
retFuture.complete()
proc cancellation(udata: pointer) {.gcsafe.} =
clearTimer(timer)
if not(retFuture.finished()):
clearTimer(timer)
retFuture.cancelCallback = cancellation
timer = setTimer(moment, completion, cast[pointer](retFuture))
@ -803,6 +805,38 @@ proc sleepAsync*(ms: int): Future[void] {.
inline, deprecated: "Use sleepAsync(Duration)".} =
result = sleepAsync(ms.milliseconds())
proc stepsAsync*(number: int): Future[void] =
## Suspends the execution of the current async procedure for the next
## ``number`` of asynchronous steps (``poll()`` calls).
##
## This primitive can be useful when you need to create more deterministic
## tests and cases.
##
## WARNING! Do not use this primitive to perform switch between tasks, because
## this can lead to 100% CPU load in the moments when there are no I/O
## events. Usually when there no I/O events CPU consumption should be near 0%.
var retFuture = newFuture[void]("chronos.stepsAsync(int)")
var counter = 0
proc continuation(data: pointer) {.gcsafe.} =
if not(retFuture.finished()):
inc(counter)
if counter < number:
callSoon(continuation, nil)
else:
retFuture.complete()
proc cancellation(udata: pointer) {.gcsafe.} =
discard
if number <= 0:
retFuture.complete()
else:
retFuture.cancelCallback = cancellation
callSoon(continuation, nil)
retFuture
proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] =
## Returns a future which will complete once ``fut`` completes or after
## ``timeout`` milliseconds has elapsed.

View File

@ -5,7 +5,7 @@
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import strutils, unittest
import unittest
import ../chronos, ../chronos/streams/tlsstream
when defined(nimHasUsed): {.used.}

View File

@ -5,7 +5,7 @@
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import strutils, unittest
import unittest
import ../chronos
when defined(nimHasUsed): {.used.}

View File

@ -10,7 +10,7 @@ import ../chronos, ../chronos/timer
when defined(nimHasUsed): {.used.}
suite "Asynchronous timers test suite":
suite "Asynchronous timers & steps test suite":
const TimersCount = 10
proc timeWorker(time: Duration): Future[Duration] {.async.} =
@ -61,3 +61,27 @@ suite "Asynchronous timers test suite":
test $TimersCount & " timers with 1000ms timeout":
var res = waitFor(test(1000.milliseconds))
check (res >= 1000.milliseconds) and (res <= 5000.milliseconds)
test "Asynchronous steps test":
var futn1 = stepsAsync(-1)
var fut0 = stepsAsync(0)
var fut1 = stepsAsync(1)
var fut2 = stepsAsync(2)
var fut3 = stepsAsync(3)
check:
futn1.completed() == true
fut0.completed() == true
fut1.completed() == false
fut2.completed() == false
fut3.completed() == false
poll()
check:
fut1.completed() == true
fut2.completed() == false
fut3.completed() == false
poll()
check:
fut2.completed() == true
fut3.completed() == false
poll()
check:
fut3.completed() == true