mirror of
https://github.com/status-im/nim-chronos.git
synced 2025-02-03 06:54:10 +00:00
introduce asyncraises
to core future utilities (#454)
* introduce `asyncraises` to core future utilities Similar to the introduction of `raises` into a codebase, `asyncraises` needs to be introduced gradually across all functionality before deriving benefit. This is a first introduction along with utilities to manage raises lists and transform them at compile time. Several scenarios ensue: * for trivial cases, adding `asyncraises` is enough and the framework deduces the rest * some functions "add" new asyncraises (similar to what `raise` does in "normal" code) - for example `wait` may raise all exceptions of the future passed to it and additionally a few of its own - this requires extending the raises list * som functions "remove" raises (similar to what `try/except` does) such as `nocancel` with blocks cancellations and therefore reduce the raising set Both of the above cases are currently handled by a macro, but depending on the situation lead to code organisation issues around return types and pragma limitations - in particular, to keep `asyncraises` backwards-compatibility, some code needs to exist in two versions which somewhat complicates the implementation. * add `asyncraises` versions for several `asyncfutures` utilities * when assigning exceptions to a `Future` via `fail`, check at compile time if possible and at runtime if not that the exception matches constraints * fix `waitFor` comments * move async raises to separate module, implement `or`
This commit is contained in:
parent
e3c5a86a14
commit
f56d286687
30
README.md
30
README.md
@ -222,8 +222,8 @@ proc p1(): Future[void] {.async, asyncraises: [IOError].} =
|
||||
raise newException(IOError, "works") # Or any child of IOError
|
||||
```
|
||||
|
||||
Under the hood, the return type of `p1` will be rewritten to another type,
|
||||
which will convey raises informations to await.
|
||||
Under the hood, the return type of `p1` will be rewritten to an internal type,
|
||||
which will convey raises informations to `await`.
|
||||
|
||||
```nim
|
||||
proc p2(): Future[void] {.async, asyncraises: [IOError].} =
|
||||
@ -231,8 +231,10 @@ proc p2(): Future[void] {.async, asyncraises: [IOError].} =
|
||||
# can only raise IOError
|
||||
```
|
||||
|
||||
The hidden type (`RaiseTrackingFuture`) is implicitely convertible into a Future.
|
||||
However, it may causes issues when creating callback or methods
|
||||
Raw functions and callbacks that don't go through the `async` transformation but
|
||||
still return a `Future` and interact with the rest of the framework also need to
|
||||
be annotated with `asyncraises` to participate in the checked exception scheme:
|
||||
|
||||
```nim
|
||||
proc p3(): Future[void] {.async, asyncraises: [IOError].} =
|
||||
let fut: Future[void] = p1() # works
|
||||
@ -247,6 +249,24 @@ proc p3(): Future[void] {.async, asyncraises: [IOError].} =
|
||||
)
|
||||
```
|
||||
|
||||
When `chronos` performs the `async` transformation, all code is placed in a
|
||||
a special `try/except` clause that re-routes exception handling to the `Future`.
|
||||
|
||||
Beacuse of this re-routing, functions that return a `Future` instance manually
|
||||
never directly raise exceptions themselves - instead, exceptions are handled
|
||||
indirectly via `await` or `Future.read`. When writing raw async functions, they
|
||||
too must not raise exceptions - instead, they must store exceptions in the
|
||||
future they return:
|
||||
|
||||
```nim
|
||||
proc p4(): Future[void] {.asyncraises: [ValueError].} =
|
||||
let fut = newFuture[void]
|
||||
|
||||
# Equivalent of `raise (ref ValueError)()` in raw async functions:
|
||||
fut.fail((ref ValueError)(msg: "raising in raw async function"))
|
||||
fut
|
||||
```
|
||||
|
||||
### Platform independence
|
||||
|
||||
Several functions in `chronos` are backed by the operating system, such as
|
||||
@ -268,7 +288,7 @@ Because of this, the effect system thinks no exceptions are "leaking" because in
|
||||
fact, exception _handling_ is deferred to when the future is being read.
|
||||
|
||||
Effectively, this means that while code can be compiled with
|
||||
`{.push raises: [Defect]}`, the intended effect propagation and checking is
|
||||
`{.push raises: []}`, the intended effect propagation and checking is
|
||||
**disabled** for `async` functions.
|
||||
|
||||
To enable checking exception effects in `async` code, enable strict mode with
|
||||
|
@ -8,12 +8,16 @@
|
||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||
# MIT license (LICENSE-MIT)
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[sequtils, macros]
|
||||
import stew/base10
|
||||
|
||||
import ./asyncengine
|
||||
import ./[asyncengine, raisesfutures]
|
||||
import ../[config, futures]
|
||||
|
||||
export raisesfutures.InternalRaisesFuture
|
||||
|
||||
when chronosStackTrace:
|
||||
import std/strutils
|
||||
when defined(nimHasStacktracesModule):
|
||||
@ -38,12 +42,6 @@ func `[]`*(loc: array[LocationKind, ptr SrcLoc], v: int): ptr SrcLoc {.
|
||||
else: raiseAssert("Unknown source location " & $v)
|
||||
|
||||
type
|
||||
InternalRaisesFuture*[T, E] = ref object of Future[T]
|
||||
## Future with a tuple of possible exception types
|
||||
## eg InternalRaisesFuture[void, (ValueError, OSError)]
|
||||
## Will be injected by `asyncraises`, should generally
|
||||
## not be used manually
|
||||
|
||||
FutureStr*[T] = ref object of Future[T]
|
||||
## Future to hold GC strings
|
||||
gcholder*: string
|
||||
@ -52,6 +50,8 @@ type
|
||||
## Future to hold GC seqs
|
||||
gcholder*: seq[B]
|
||||
|
||||
SomeFuture = Future|InternalRaisesFuture
|
||||
|
||||
# Backwards compatibility for old FutureState name
|
||||
template Finished* {.deprecated: "Use Completed instead".} = Completed
|
||||
template Finished*(T: type FutureState): FutureState {.
|
||||
@ -68,11 +68,18 @@ proc newFutureImpl[T](loc: ptr SrcLoc, flags: FutureFlags): Future[T] =
|
||||
internalInitFutureBase(fut, loc, FutureState.Pending, flags)
|
||||
fut
|
||||
|
||||
proc newInternalRaisesFutureImpl[T, E](loc: ptr SrcLoc): InternalRaisesFuture[T, E] =
|
||||
proc newInternalRaisesFutureImpl[T, E](
|
||||
loc: ptr SrcLoc): InternalRaisesFuture[T, E] =
|
||||
let fut = InternalRaisesFuture[T, E]()
|
||||
internalInitFutureBase(fut, loc, FutureState.Pending, {})
|
||||
fut
|
||||
|
||||
proc newInternalRaisesFutureImpl[T, E](
|
||||
loc: ptr SrcLoc, flags: FutureFlags): InternalRaisesFuture[T, E] =
|
||||
let fut = InternalRaisesFuture[T, E]()
|
||||
internalInitFutureBase(fut, loc, FutureState.Pending, flags)
|
||||
fut
|
||||
|
||||
proc newFutureSeqImpl[A, B](loc: ptr SrcLoc): FutureSeq[A, B] =
|
||||
let fut = FutureSeq[A, B]()
|
||||
internalInitFutureBase(fut, loc, FutureState.Pending, {})
|
||||
@ -90,7 +97,8 @@ template newFuture*[T](fromProc: static[string] = "",
|
||||
## Specifying ``fromProc``, which is a string specifying the name of the proc
|
||||
## that this future belongs to, is a good habit as it helps with debugging.
|
||||
when declared(InternalRaisesFutureRaises): # injected by `asyncraises`
|
||||
newInternalRaisesFutureImpl[T, InternalRaisesFutureRaises](getSrcLocation(fromProc))
|
||||
newInternalRaisesFutureImpl[T, InternalRaisesFutureRaises](
|
||||
getSrcLocation(fromProc), flags)
|
||||
else:
|
||||
newFutureImpl[T](getSrcLocation(fromProc), flags)
|
||||
|
||||
@ -214,53 +222,11 @@ proc fail(future: FutureBase, error: ref CatchableError, loc: ptr SrcLoc) =
|
||||
getStackTrace(error)
|
||||
future.finish(FutureState.Failed)
|
||||
|
||||
template fail*(future: FutureBase, error: ref CatchableError) =
|
||||
template fail*(
|
||||
future: FutureBase, error: ref CatchableError, warn: static bool = false) =
|
||||
## Completes ``future`` with ``error``.
|
||||
fail(future, error, getSrcLocation())
|
||||
|
||||
macro checkFailureType(future, error: typed): untyped =
|
||||
let e = getTypeInst(future)[2]
|
||||
let types = getType(e)
|
||||
|
||||
if types.eqIdent("void"):
|
||||
error("Can't raise exceptions on this Future")
|
||||
|
||||
expectKind(types, nnkBracketExpr)
|
||||
expectKind(types[0], nnkSym)
|
||||
assert types[0].strVal == "tuple"
|
||||
assert types.len > 1
|
||||
|
||||
expectKind(getTypeInst(error), nnkRefTy)
|
||||
let toMatch = getTypeInst(error)[0]
|
||||
|
||||
# Can't find a way to check `is` in the macro. (sameType doesn't
|
||||
# work for inherited objects). Dirty hack here, for [IOError, OSError],
|
||||
# this will generate:
|
||||
#
|
||||
# static:
|
||||
# if not((`toMatch` is IOError) or (`toMatch` is OSError)
|
||||
# or (`toMatch` is CancelledError) or false):
|
||||
# raiseAssert("Can't fail with `toMatch`, only [IOError, OSError] is allowed")
|
||||
var typeChecker = ident"false"
|
||||
|
||||
for errorType in types[1..^1]:
|
||||
typeChecker = newCall("or", typeChecker, newCall("is", toMatch, errorType))
|
||||
typeChecker = newCall(
|
||||
"or", typeChecker,
|
||||
newCall("is", toMatch, ident"CancelledError"))
|
||||
|
||||
let errorMsg = "Can't fail with " & repr(toMatch) & ". Only " & repr(types[1..^1]) & " allowed"
|
||||
|
||||
result = nnkStaticStmt.newNimNode(lineInfoFrom=error).add(
|
||||
quote do:
|
||||
if not(`typeChecker`):
|
||||
raiseAssert(`errorMsg`)
|
||||
)
|
||||
|
||||
template fail*[T, E](future: InternalRaisesFuture[T, E], error: ref CatchableError) =
|
||||
checkFailureType(future, error)
|
||||
fail(future, error, getSrcLocation())
|
||||
|
||||
template newCancelledError(): ref CancelledError =
|
||||
(ref CancelledError)(msg: "Future operation cancelled!")
|
||||
|
||||
@ -572,29 +538,6 @@ proc read*(future: Future[void] ) {.raises: [CatchableError].} =
|
||||
# TODO: Make a custom exception type for this?
|
||||
raise newException(ValueError, "Future still in progress.")
|
||||
|
||||
proc read*[T: not void, E](future: InternalRaisesFuture[T, E] ): lent T =
|
||||
## Retrieves the value of ``future``. Future must be finished otherwise
|
||||
## this function will fail with a ``ValueError`` exception.
|
||||
##
|
||||
## If the result of the future is an error then that error will be raised.
|
||||
if not future.finished():
|
||||
# TODO: Make a custom exception type for this?
|
||||
raise newException(ValueError, "Future still in progress.")
|
||||
|
||||
internalCheckComplete(future)
|
||||
future.internalValue
|
||||
|
||||
proc read*[E](future: InternalRaisesFuture[void, E]) =
|
||||
## Retrieves the value of ``future``. Future must be finished otherwise
|
||||
## this function will fail with a ``ValueError`` exception.
|
||||
##
|
||||
## If the result of the future is an error then that error will be raised.
|
||||
if future.finished():
|
||||
internalCheckComplete(future)
|
||||
else:
|
||||
# TODO: Make a custom exception type for this?
|
||||
raise newException(ValueError, "Future still in progress.")
|
||||
|
||||
proc readError*(future: FutureBase): ref CatchableError {.raises: [ValueError].} =
|
||||
## Retrieves the exception stored in ``future``.
|
||||
##
|
||||
@ -621,8 +564,9 @@ template taskCancelMessage(future: FutureBase): string =
|
||||
"Asynchronous task " & taskFutureLocation(future) & " was cancelled!"
|
||||
|
||||
proc waitFor*[T](fut: Future[T]): T {.raises: [CatchableError].} =
|
||||
## **Blocks** the current thread until the specified future completes.
|
||||
## There's no way to tell if poll or read raised the exception
|
||||
## **Blocks** the current thread until the specified future finishes and
|
||||
## reads it, potentially raising an exception if the future failed or was
|
||||
## cancelled.
|
||||
while not(fut.finished()):
|
||||
poll()
|
||||
|
||||
@ -716,6 +660,47 @@ proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] {.
|
||||
retFuture.cancelCallback = cancellation
|
||||
return retFuture
|
||||
|
||||
template orImpl*[T, Y](fut1: Future[T], fut2: Future[Y]): untyped =
|
||||
var cb: proc(udata: pointer) {.gcsafe, raises: [].}
|
||||
cb = proc(udata: pointer) {.gcsafe, raises: [].} =
|
||||
if not(retFuture.finished()):
|
||||
var fut = cast[FutureBase](udata)
|
||||
if cast[pointer](fut1) == udata:
|
||||
fut2.removeCallback(cb)
|
||||
else:
|
||||
fut1.removeCallback(cb)
|
||||
if fut.failed():
|
||||
retFuture.fail(fut.error, warn = false)
|
||||
else:
|
||||
retFuture.complete()
|
||||
|
||||
proc cancellation(udata: pointer) =
|
||||
# On cancel we remove all our callbacks only.
|
||||
if not(fut1.finished()):
|
||||
fut1.removeCallback(cb)
|
||||
if not(fut2.finished()):
|
||||
fut2.removeCallback(cb)
|
||||
|
||||
if fut1.finished():
|
||||
if fut1.failed():
|
||||
retFuture.fail(fut1.error, warn = false)
|
||||
else:
|
||||
retFuture.complete()
|
||||
return retFuture
|
||||
|
||||
if fut2.finished():
|
||||
if fut2.failed():
|
||||
retFuture.fail(fut2.error, warn = false)
|
||||
else:
|
||||
retFuture.complete()
|
||||
return retFuture
|
||||
|
||||
fut1.addCallback(cb)
|
||||
fut2.addCallback(cb)
|
||||
|
||||
retFuture.cancelCallback = cancellation
|
||||
return retFuture
|
||||
|
||||
proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
|
||||
## Returns a future which will complete once either ``fut1`` or ``fut2``
|
||||
## finish.
|
||||
@ -730,45 +715,8 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
|
||||
##
|
||||
## If cancelled, ``fut1`` and ``fut2`` futures WILL NOT BE cancelled.
|
||||
var retFuture = newFuture[void]("chronos.or")
|
||||
var cb: proc(udata: pointer) {.gcsafe, raises: [].}
|
||||
cb = proc(udata: pointer) {.gcsafe, raises: [].} =
|
||||
if not(retFuture.finished()):
|
||||
var fut = cast[FutureBase](udata)
|
||||
if cast[pointer](fut1) == udata:
|
||||
fut2.removeCallback(cb)
|
||||
else:
|
||||
fut1.removeCallback(cb)
|
||||
if fut.failed():
|
||||
retFuture.fail(fut.error)
|
||||
else:
|
||||
retFuture.complete()
|
||||
orImpl(fut1, fut2)
|
||||
|
||||
proc cancellation(udata: pointer) =
|
||||
# On cancel we remove all our callbacks only.
|
||||
if not(fut1.finished()):
|
||||
fut1.removeCallback(cb)
|
||||
if not(fut2.finished()):
|
||||
fut2.removeCallback(cb)
|
||||
|
||||
if fut1.finished():
|
||||
if fut1.failed():
|
||||
retFuture.fail(fut1.error)
|
||||
else:
|
||||
retFuture.complete()
|
||||
return retFuture
|
||||
|
||||
if fut2.finished():
|
||||
if fut2.failed():
|
||||
retFuture.fail(fut2.error)
|
||||
else:
|
||||
retFuture.complete()
|
||||
return retFuture
|
||||
|
||||
fut1.addCallback(cb)
|
||||
fut2.addCallback(cb)
|
||||
|
||||
retFuture.cancelCallback = cancellation
|
||||
return retFuture
|
||||
|
||||
proc all*[T](futs: varargs[Future[T]]): auto {.
|
||||
deprecated: "Use allFutures(varargs[Future[T]])".} =
|
||||
@ -908,7 +856,7 @@ proc oneValue*[T](futs: varargs[Future[T]]): Future[T] {.
|
||||
return retFuture
|
||||
|
||||
proc cancelSoon(future: FutureBase, aftercb: CallbackFunc, udata: pointer,
|
||||
loc: ptr SrcLoc) =
|
||||
loc: ptr SrcLoc) {.raises: [].} =
|
||||
## Perform cancellation ``future`` and call ``aftercb`` callback when
|
||||
## ``future`` become finished (completed with value, failed or cancelled).
|
||||
##
|
||||
@ -965,7 +913,8 @@ template cancel*(future: FutureBase) {.
|
||||
## Cancel ``future``.
|
||||
cancelSoon(future, nil, nil, getSrcLocation())
|
||||
|
||||
proc cancelAndWait*(future: FutureBase, loc: ptr SrcLoc): Future[void] =
|
||||
proc cancelAndWait*(future: FutureBase, loc: ptr SrcLoc): Future[void] {.
|
||||
asyncraises: [CancelledError].} =
|
||||
## Perform cancellation ``future`` return Future which will be completed when
|
||||
## ``future`` become finished (completed with value, failed or cancelled).
|
||||
##
|
||||
@ -989,7 +938,7 @@ template cancelAndWait*(future: FutureBase): Future[void] =
|
||||
## Cancel ``future``.
|
||||
cancelAndWait(future, getSrcLocation())
|
||||
|
||||
proc noCancel*[T](future: Future[T]): Future[T] =
|
||||
proc noCancel*[F: SomeFuture](future: F): auto = # asyncraises: asyncraiseOf(future) - CancelledError
|
||||
## Prevent cancellation requests from propagating to ``future`` while
|
||||
## forwarding its value or error when it finishes.
|
||||
##
|
||||
@ -997,16 +946,25 @@ proc noCancel*[T](future: Future[T]): Future[T] =
|
||||
## should not be cancelled at all cost, for example closing sockets, pipes,
|
||||
## connections or servers. Usually it become useful in exception or finally
|
||||
## blocks.
|
||||
let retFuture = newFuture[T]("chronos.noCancel(T)",
|
||||
{FutureFlag.OwnCancelSchedule})
|
||||
when F is InternalRaisesFuture:
|
||||
type
|
||||
E = F.E
|
||||
InternalRaisesFutureRaises = E.remove(CancelledError)
|
||||
|
||||
let retFuture = newFuture[F.T]("chronos.noCancel(T)",
|
||||
{FutureFlag.OwnCancelSchedule})
|
||||
template completeFuture() =
|
||||
if future.completed():
|
||||
when T is void:
|
||||
when F.T is void:
|
||||
retFuture.complete()
|
||||
else:
|
||||
retFuture.complete(future.value)
|
||||
elif future.failed():
|
||||
retFuture.fail(future.error)
|
||||
when F is Future:
|
||||
retFuture.fail(future.error, warn = false)
|
||||
when declared(InternalRaisesFutureRaises):
|
||||
when InternalRaisesFutureRaises isnot void:
|
||||
retFuture.fail(future.error, warn = false)
|
||||
else:
|
||||
raiseAssert("Unexpected future state [" & $future.state & "]")
|
||||
|
||||
@ -1019,7 +977,8 @@ proc noCancel*[T](future: Future[T]): Future[T] =
|
||||
future.addCallback(continuation)
|
||||
retFuture
|
||||
|
||||
proc allFutures*(futs: varargs[FutureBase]): Future[void] =
|
||||
proc allFutures*(futs: varargs[FutureBase]): Future[void] {.
|
||||
asyncraises: [CancelledError].} =
|
||||
## Returns a future which will complete only when all futures in ``futs``
|
||||
## will be completed, failed or canceled.
|
||||
##
|
||||
@ -1057,7 +1016,8 @@ proc allFutures*(futs: varargs[FutureBase]): Future[void] =
|
||||
|
||||
retFuture
|
||||
|
||||
proc allFutures*[T](futs: varargs[Future[T]]): Future[void] =
|
||||
proc allFutures*[T](futs: varargs[Future[T]]): Future[void] {.
|
||||
asyncraises: [CancelledError].} =
|
||||
## Returns a future which will complete only when all futures in ``futs``
|
||||
## will be completed, failed or canceled.
|
||||
##
|
||||
@ -1070,7 +1030,8 @@ proc allFutures*[T](futs: varargs[Future[T]]): Future[void] =
|
||||
nfuts.add(future)
|
||||
allFutures(nfuts)
|
||||
|
||||
proc allFinished*[T](futs: varargs[Future[T]]): Future[seq[Future[T]]] =
|
||||
proc allFinished*[F: SomeFuture](futs: varargs[F]): Future[seq[F]] {.
|
||||
asyncraises: [CancelledError].} =
|
||||
## Returns a future which will complete only when all futures in ``futs``
|
||||
## will be completed, failed or canceled.
|
||||
##
|
||||
@ -1080,7 +1041,7 @@ proc allFinished*[T](futs: varargs[Future[T]]): Future[seq[Future[T]]] =
|
||||
## If the argument is empty, the returned future COMPLETES immediately.
|
||||
##
|
||||
## On cancel all the awaited futures ``futs`` WILL NOT BE cancelled.
|
||||
var retFuture = newFuture[seq[Future[T]]]("chronos.allFinished()")
|
||||
var retFuture = newFuture[seq[F]]("chronos.allFinished()")
|
||||
let totalFutures = len(futs)
|
||||
var finishedFutures = 0
|
||||
|
||||
@ -1110,7 +1071,8 @@ proc allFinished*[T](futs: varargs[Future[T]]): Future[seq[Future[T]]] =
|
||||
|
||||
return retFuture
|
||||
|
||||
proc one*[T](futs: varargs[Future[T]]): Future[Future[T]] =
|
||||
proc one*[F: SomeFuture](futs: varargs[F]): Future[F] {.
|
||||
asyncraises: [ValueError, CancelledError].} =
|
||||
## Returns a future which will complete and return completed Future[T] inside,
|
||||
## when one of the futures in ``futs`` will be completed, failed or canceled.
|
||||
##
|
||||
@ -1119,7 +1081,7 @@ proc one*[T](futs: varargs[Future[T]]): Future[Future[T]] =
|
||||
## On success returned Future will hold finished Future[T].
|
||||
##
|
||||
## On cancel futures in ``futs`` WILL NOT BE cancelled.
|
||||
var retFuture = newFuture[Future[T]]("chronos.one()")
|
||||
var retFuture = newFuture[F]("chronos.one()")
|
||||
|
||||
if len(futs) == 0:
|
||||
retFuture.fail(newException(ValueError, "Empty Future[T] list"))
|
||||
@ -1137,7 +1099,7 @@ proc one*[T](futs: varargs[Future[T]]): Future[Future[T]] =
|
||||
var cb: proc(udata: pointer) {.gcsafe, raises: [].}
|
||||
cb = proc(udata: pointer) {.gcsafe, raises: [].} =
|
||||
if not(retFuture.finished()):
|
||||
var res: Future[T]
|
||||
var res: F
|
||||
var rfut = cast[FutureBase](udata)
|
||||
for i in 0..<len(nfuts):
|
||||
if cast[FutureBase](nfuts[i]) != rfut:
|
||||
@ -1158,7 +1120,8 @@ proc one*[T](futs: varargs[Future[T]]): Future[Future[T]] =
|
||||
retFuture.cancelCallback = cancellation
|
||||
return retFuture
|
||||
|
||||
proc race*(futs: varargs[FutureBase]): Future[FutureBase] =
|
||||
proc race*(futs: varargs[FutureBase]): Future[FutureBase] {.
|
||||
asyncraises: [CancelledError].} =
|
||||
## Returns a future which will complete and return completed FutureBase,
|
||||
## when one of the futures in ``futs`` will be completed, failed or canceled.
|
||||
##
|
||||
@ -1210,7 +1173,7 @@ proc race*(futs: varargs[FutureBase]): Future[FutureBase] =
|
||||
when (chronosEventEngine in ["epoll", "kqueue"]) or defined(windows):
|
||||
import std/os
|
||||
|
||||
proc waitSignal*(signal: int): Future[void] {.raises: [].} =
|
||||
proc waitSignal*(signal: int): Future[void] {.asyncraises: [AsyncError, CancelledError].} =
|
||||
var retFuture = newFuture[void]("chronos.waitSignal()")
|
||||
var signalHandle: Opt[SignalHandle]
|
||||
|
||||
@ -1244,7 +1207,8 @@ when (chronosEventEngine in ["epoll", "kqueue"]) or defined(windows):
|
||||
retFuture.cancelCallback = cancellation
|
||||
retFuture
|
||||
|
||||
proc sleepAsync*(duration: Duration): Future[void] =
|
||||
proc sleepAsync*(duration: Duration): Future[void] {.
|
||||
asyncraises: [CancelledError].} =
|
||||
## Suspends the execution of the current async procedure for the next
|
||||
## ``duration`` time.
|
||||
var retFuture = newFuture[void]("chronos.sleepAsync(Duration)")
|
||||
@ -1264,10 +1228,10 @@ proc sleepAsync*(duration: Duration): Future[void] =
|
||||
return retFuture
|
||||
|
||||
proc sleepAsync*(ms: int): Future[void] {.
|
||||
inline, deprecated: "Use sleepAsync(Duration)".} =
|
||||
inline, deprecated: "Use sleepAsync(Duration)", asyncraises: [CancelledError].} =
|
||||
result = sleepAsync(ms.milliseconds())
|
||||
|
||||
proc stepsAsync*(number: int): Future[void] =
|
||||
proc stepsAsync*(number: int): Future[void] {.asyncraises: [CancelledError].} =
|
||||
## Suspends the execution of the current async procedure for the next
|
||||
## ``number`` of asynchronous steps (``poll()`` calls).
|
||||
##
|
||||
@ -1294,7 +1258,7 @@ proc stepsAsync*(number: int): Future[void] =
|
||||
|
||||
retFuture
|
||||
|
||||
proc idleAsync*(): Future[void] =
|
||||
proc idleAsync*(): Future[void] {.asyncraises: [CancelledError].} =
|
||||
## Suspends the execution of the current asynchronous task until "idle" time.
|
||||
##
|
||||
## "idle" time its moment of time, when no network events were processed by
|
||||
@ -1312,7 +1276,8 @@ proc idleAsync*(): Future[void] =
|
||||
callIdle(continuation, nil)
|
||||
retFuture
|
||||
|
||||
proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] =
|
||||
proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] {.
|
||||
asyncraises: [CancelledError].} =
|
||||
## Returns a future which will complete once ``fut`` completes or after
|
||||
## ``timeout`` milliseconds has elapsed.
|
||||
##
|
||||
@ -1380,28 +1345,19 @@ proc withTimeout*[T](fut: Future[T], timeout: int): Future[bool] {.
|
||||
inline, deprecated: "Use withTimeout(Future[T], Duration)".} =
|
||||
withTimeout(fut, timeout.milliseconds())
|
||||
|
||||
proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] =
|
||||
## Returns a future which will complete once future ``fut`` completes
|
||||
## or if timeout of ``timeout`` milliseconds has been expired.
|
||||
##
|
||||
## If ``timeout`` is ``-1``, then statement ``await wait(fut)`` is
|
||||
## equal to ``await fut``.
|
||||
##
|
||||
## TODO: In case when ``fut`` got cancelled, what result Future[T]
|
||||
## should return, because it can't be cancelled too.
|
||||
proc waitImpl[F: SomeFuture](fut: F, retFuture: auto, timeout: Duration): auto =
|
||||
var
|
||||
retFuture = newFuture[T]("chronos.wait()", {FutureFlag.OwnCancelSchedule})
|
||||
moment: Moment
|
||||
timer: TimerCallback
|
||||
timeouted = false
|
||||
|
||||
template completeFuture(fut: untyped): untyped =
|
||||
if fut.failed():
|
||||
retFuture.fail(fut.error)
|
||||
retFuture.fail(fut.error(), warn = false)
|
||||
elif fut.cancelled():
|
||||
retFuture.cancelAndSchedule()
|
||||
else:
|
||||
when T is void:
|
||||
when type(fut).T is void:
|
||||
retFuture.complete()
|
||||
else:
|
||||
retFuture.complete(fut.value)
|
||||
@ -1446,6 +1402,20 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] =
|
||||
|
||||
retFuture
|
||||
|
||||
proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] =
|
||||
## Returns a future which will complete once future ``fut`` completes
|
||||
## or if timeout of ``timeout`` milliseconds has been expired.
|
||||
##
|
||||
## If ``timeout`` is ``-1``, then statement ``await wait(fut)`` is
|
||||
## equal to ``await fut``.
|
||||
##
|
||||
## TODO: In case when ``fut`` got cancelled, what result Future[T]
|
||||
## should return, because it can't be cancelled too.
|
||||
var
|
||||
retFuture = newFuture[T]("chronos.wait()", {FutureFlag.OwnCancelSchedule})
|
||||
|
||||
waitImpl(fut, retFuture, timeout)
|
||||
|
||||
proc wait*[T](fut: Future[T], timeout = -1): Future[T] {.
|
||||
inline, deprecated: "Use wait(Future[T], Duration)".} =
|
||||
if timeout == -1:
|
||||
@ -1455,7 +1425,6 @@ proc wait*[T](fut: Future[T], timeout = -1): Future[T] {.
|
||||
else:
|
||||
wait(fut, timeout.milliseconds())
|
||||
|
||||
|
||||
when defined(windows):
|
||||
import ../osdefs
|
||||
|
||||
@ -1515,3 +1484,64 @@ when defined(windows):
|
||||
|
||||
retFuture.cancelCallback = cancellation
|
||||
return retFuture
|
||||
|
||||
{.pop.} # Automatically deduced raises from here onwards
|
||||
|
||||
template fail*[T, E](
|
||||
future: InternalRaisesFuture[T, E], error: ref CatchableError,
|
||||
warn: static bool = true) =
|
||||
checkRaises(future, error, warn)
|
||||
fail(future, error, getSrcLocation())
|
||||
|
||||
proc waitFor*[T, E](fut: InternalRaisesFuture[T, E]): T = # {.raises: [E]}
|
||||
## **Blocks** the current thread until the specified future finishes and
|
||||
## reads it, potentially raising an exception if the future failed or was
|
||||
## cancelled.
|
||||
while not(fut.finished()):
|
||||
poll()
|
||||
|
||||
fut.read()
|
||||
|
||||
proc read*[T: not void, E](future: InternalRaisesFuture[T, E]): lent T = # {.raises: [E, ValueError].}
|
||||
## Retrieves the value of ``future``. Future must be finished otherwise
|
||||
## this function will fail with a ``ValueError`` exception.
|
||||
##
|
||||
## If the result of the future is an error then that error will be raised.
|
||||
if not future.finished():
|
||||
# TODO: Make a custom exception type for this?
|
||||
raise newException(ValueError, "Future still in progress.")
|
||||
|
||||
internalCheckComplete(future)
|
||||
future.internalValue
|
||||
|
||||
proc read*[E](future: InternalRaisesFuture[void, E]) = # {.raises: [E, CancelledError].}
|
||||
## Retrieves the value of ``future``. Future must be finished otherwise
|
||||
## this function will fail with a ``ValueError`` exception.
|
||||
##
|
||||
## If the result of the future is an error then that error will be raised.
|
||||
if future.finished():
|
||||
internalCheckComplete(future)
|
||||
else:
|
||||
# TODO: Make a custom exception type for this?
|
||||
raise newException(ValueError, "Future still in progress.")
|
||||
|
||||
proc `or`*[T, Y, E1, E2](
|
||||
fut1: InternalRaisesFuture[T, E1],
|
||||
fut2: InternalRaisesFuture[Y, E2]): auto =
|
||||
type
|
||||
InternalRaisesFutureRaises = union(E1, E2)
|
||||
|
||||
let
|
||||
retFuture = newFuture[void]("chronos.wait()", {FutureFlag.OwnCancelSchedule})
|
||||
orImpl(fut1, fut2)
|
||||
|
||||
proc wait*(fut: InternalRaisesFuture, timeout = InfiniteDuration): auto =
|
||||
type
|
||||
T = type(fut).T
|
||||
E = type(fut).E
|
||||
InternalRaisesFutureRaises = E.prepend(CancelledError, AsyncTimeoutError)
|
||||
|
||||
let
|
||||
retFuture = newFuture[T]("chronos.wait()", {FutureFlag.OwnCancelSchedule})
|
||||
|
||||
waitImpl(fut, retFuture, timeout)
|
||||
|
@ -283,12 +283,17 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} =
|
||||
prc.addPragma(newIdentNode("gcsafe"))
|
||||
|
||||
if isAsync == false: # `asyncraises` without `async`
|
||||
# type InternalRaisesFutureRaises = `raisesTuple`
|
||||
# type InternalRaisesFutureRaises {.used.} = `raisesTuple`
|
||||
# `body`
|
||||
prc.body = nnkStmtList.newTree(
|
||||
nnkTypeSection.newTree(
|
||||
nnkTypeDef.newTree(
|
||||
ident"InternalRaisesFutureRaises",
|
||||
nnkPragmaExpr.newTree(
|
||||
ident"InternalRaisesFutureRaises",
|
||||
nnkPragma.newTree(
|
||||
newIdentNode("used")
|
||||
)
|
||||
),
|
||||
newEmptyNode(),
|
||||
raisesTuple
|
||||
)
|
||||
|
124
chronos/internal/raisesfutures.nim
Normal file
124
chronos/internal/raisesfutures.nim
Normal file
@ -0,0 +1,124 @@
|
||||
import
|
||||
std/macros,
|
||||
../futures
|
||||
|
||||
type
|
||||
InternalRaisesFuture*[T, E] = ref object of Future[T]
|
||||
## Future with a tuple of possible exception types
|
||||
## eg InternalRaisesFuture[void, (ValueError, OSError)]
|
||||
##
|
||||
## This type gets injected by `asyncraises` and similar utilities and
|
||||
## should not be used manually as the internal exception representation is
|
||||
## subject to change in future chronos versions.
|
||||
|
||||
iterator members(tup: NimNode): NimNode =
|
||||
# Given a typedesc[tuple] = (A, B, C), yields the tuple members (A, B C)
|
||||
if not tup.eqIdent("void"):
|
||||
for n in getType(getTypeInst(tup)[1])[1..^1]:
|
||||
yield n
|
||||
|
||||
proc members(tup: NimNode): seq[NimNode] {.compileTime.} =
|
||||
for t in tup.members():
|
||||
result.add(t)
|
||||
|
||||
proc containsSignature(members: openArray[NimNode], typ: NimNode): bool {.compileTime.} =
|
||||
let typHash = signatureHash(typ)
|
||||
|
||||
for err in members:
|
||||
if signatureHash(err) == typHash:
|
||||
return true
|
||||
false
|
||||
|
||||
# Utilities for working with the E part of InternalRaisesFuture - unstable
|
||||
macro prepend*(tup: typedesc[tuple], typs: varargs[typed]): typedesc =
|
||||
result = nnkTupleConstr.newTree()
|
||||
for err in typs:
|
||||
if not tup.members().containsSignature(err):
|
||||
result.add err
|
||||
|
||||
for err in tup.members():
|
||||
result.add err
|
||||
|
||||
if result.len == 0:
|
||||
result = ident"void"
|
||||
|
||||
macro remove*(tup: typedesc[tuple], typs: varargs[typed]): typedesc =
|
||||
result = nnkTupleConstr.newTree()
|
||||
for err in tup.members():
|
||||
if not typs[0..^1].containsSignature(err):
|
||||
result.add err
|
||||
|
||||
if result.len == 0:
|
||||
result = ident"void"
|
||||
|
||||
macro union*(tup0: typedesc[tuple], tup1: typedesc[tuple]): typedesc =
|
||||
## Join the types of the two tuples deduplicating the entries
|
||||
result = nnkTupleConstr.newTree()
|
||||
|
||||
for err in tup0.members():
|
||||
var found = false
|
||||
for err2 in tup1.members():
|
||||
if signatureHash(err) == signatureHash(err2):
|
||||
found = true
|
||||
if not found:
|
||||
result.add err
|
||||
|
||||
for err2 in getType(getTypeInst(tup1)[1])[1..^1]:
|
||||
result.add err2
|
||||
|
||||
proc getRaises*(future: NimNode): NimNode {.compileTime.} =
|
||||
# Given InternalRaisesFuture[T, (A, B, C)], returns (A, B, C)
|
||||
let types = getType(getTypeInst(future)[2])
|
||||
if types.eqIdent("void"):
|
||||
nnkBracketExpr.newTree(newEmptyNode())
|
||||
else:
|
||||
expectKind(types, nnkBracketExpr)
|
||||
expectKind(types[0], nnkSym)
|
||||
assert types[0].strVal == "tuple"
|
||||
assert types.len >= 1
|
||||
|
||||
types
|
||||
|
||||
macro checkRaises*[T: CatchableError](
|
||||
future: InternalRaisesFuture, error: ref T, warn: static bool = true): untyped =
|
||||
## Generate code that checks that the given error is compatible with the
|
||||
## raises restrictions of `future`.
|
||||
##
|
||||
## This check is done either at compile time or runtime depending on the
|
||||
## information available at compile time - in particular, if the raises
|
||||
## inherit from `error`, we end up with the equivalent of a downcast which
|
||||
## raises a Defect if it fails.
|
||||
let raises = getRaises(future)
|
||||
|
||||
expectKind(getTypeInst(error), nnkRefTy)
|
||||
let toMatch = getTypeInst(error)[0]
|
||||
|
||||
var
|
||||
typeChecker = ident"false"
|
||||
maybeChecker = ident"false"
|
||||
runtimeChecker = ident"false"
|
||||
|
||||
for errorType in raises[1..^1]:
|
||||
typeChecker = infix(typeChecker, "or", infix(toMatch, "is", errorType))
|
||||
maybeChecker = infix(maybeChecker, "or", infix(errorType, "is", toMatch))
|
||||
runtimeChecker = infix(
|
||||
runtimeChecker, "or",
|
||||
infix(error, "of", nnkBracketExpr.newTree(ident"typedesc", errorType)))
|
||||
|
||||
let
|
||||
errorMsg = "`fail`: `" & repr(toMatch) & "` incompatible with `asyncraises: " & repr(raises[1..^1]) & "`"
|
||||
warningMsg = "Can't verify `fail` exception type at compile time - expected one of " & repr(raises[1..^1]) & ", got `" & repr(toMatch) & "`"
|
||||
# A warning from this line means exception type will be verified at runtime
|
||||
warning = if warn:
|
||||
quote do: {.warning: `warningMsg`.}
|
||||
else: newEmptyNode()
|
||||
|
||||
# Cannot check inhertance in macro so we let `static` do the heavy lifting
|
||||
quote do:
|
||||
when not(`typeChecker`):
|
||||
when not(`maybeChecker`):
|
||||
static:
|
||||
{.error: `errorMsg`.}
|
||||
else:
|
||||
`warning`
|
||||
assert(`runtimeChecker`, `errorMsg`)
|
@ -477,3 +477,63 @@ suite "Exceptions tracking":
|
||||
proc test44 {.asyncraises: [ValueError], async.} = raise newException(ValueError, "hey")
|
||||
checkNotCompiles:
|
||||
proc test33 {.asyncraises: [IOError], async.} = raise newException(ValueError, "hey")
|
||||
|
||||
test "or errors":
|
||||
proc testit {.asyncraises: [ValueError], async.} =
|
||||
raise (ref ValueError)()
|
||||
|
||||
proc testit2 {.asyncraises: [IOError], async.} =
|
||||
raise (ref IOError)()
|
||||
|
||||
proc test {.async, asyncraises: [ValueError, IOError].} =
|
||||
await testit() or testit2()
|
||||
|
||||
proc noraises() {.raises: [].} =
|
||||
expect(ValueError):
|
||||
try:
|
||||
let f = test()
|
||||
waitFor(f)
|
||||
except IOError:
|
||||
doAssert false
|
||||
|
||||
noraises()
|
||||
|
||||
test "Wait errors":
|
||||
proc testit {.asyncraises: [ValueError], async.} = raise newException(ValueError, "hey")
|
||||
|
||||
proc test {.async, asyncraises: [ValueError, AsyncTimeoutError, CancelledError].} =
|
||||
await wait(testit(), 1000.milliseconds)
|
||||
|
||||
proc noraises() {.raises: [].} =
|
||||
try:
|
||||
expect(ValueError): waitFor(test())
|
||||
except CancelledError: doAssert false
|
||||
except AsyncTimeoutError: doAssert false
|
||||
|
||||
noraises()
|
||||
|
||||
test "Nocancel errors":
|
||||
proc testit {.asyncraises: [ValueError, CancelledError], async.} =
|
||||
await sleepAsync(5.milliseconds)
|
||||
raise (ref ValueError)()
|
||||
|
||||
proc test {.async, asyncraises: [ValueError].} =
|
||||
await noCancel testit()
|
||||
|
||||
proc noraises() {.raises: [].} =
|
||||
expect(ValueError):
|
||||
let f = test()
|
||||
waitFor(f.cancelAndWait())
|
||||
waitFor(f)
|
||||
|
||||
noraises()
|
||||
|
||||
test "Defect on wrong exception type at runtime":
|
||||
{.push warning[User]: off}
|
||||
let f = InternalRaisesFuture[void, (ValueError,)]()
|
||||
expect(Defect): f.fail((ref CatchableError)())
|
||||
{.pop.}
|
||||
check: not f.finished()
|
||||
|
||||
expect(Defect): f.fail((ref CatchableError)(), warn = false)
|
||||
check: not f.finished()
|
||||
|
Loading…
x
Reference in New Issue
Block a user