more docs (#111)

* more docs

* make CancelledError and object of Exception
This commit is contained in:
Ștefan Talpalaru 2020-08-06 20:52:50 +02:00 committed by GitHub
parent e45ef32b5b
commit d0a17d551f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 57 additions and 38 deletions

View File

@ -199,6 +199,49 @@ Exceptions inheriting from `Defect` are treated differently, being raised
directly. Don't try to catch them coming out of `poll()`, because this would
leave behind some zombie futures.
### Cancellation
Calling `cancel()` on a future will set its state to `FutureState.Cancelled`
and the cancellation will propagate to all its children and all its parents, at
some point in the future. A cancelled future's callbacks are still scheduled for execution.
```nim
proc p1() {.async.} =
await sleepAsync(100.seconds) # this sleep will also be cancelled
proc p2() {.async.} =
await p1()
let fut2 = p2()
fut2.cancel()
while not(fut2.finished()):
poll()
echo "fut2.state = ", fut2.state # prints "Cancelled"
doAssert fut2.cancelled() == true
```
Sometimes you need to wait for a future to be cancelled (and all its callbacks
executed). To do this, you `await` a new future created by `cancelAndWait()`
which is guaranteed to complete after the cancellation processed is finished.
```nim
proc p1() {.async.} =
await sleepAsync(100.seconds) # the sleep will also be cancelled
proc p2() {.async.} =
let fut1 = p1()
await cancelAndWait(fut1)
doAssert fut1.cancelled() == true
waitFor p2()
```
If you put an `await` in a `try` block, always catch `CatchableError` or some
other specific exception, in order to avoid catching by mistake
`CancelledError` (object of `Exception`, used internally to propagate
cancellation).
## TODO
* Pipe/Subprocess Transports.
* Multithreading Stream/Datagram servers

View File

@ -65,13 +65,16 @@ type
FutureError* = object of CatchableError
CancelledError* = object of FutureError
FutureList* = object
head*: FutureBase
tail*: FutureBase
count*: int
{.push warning[InheritFromException]: off.}
# used internally; should not be caught by the API user
type CancelledError* = object of Exception
{.pop.}
var currentID* {.threadvar.}: int
currentID = 0
@ -340,12 +343,12 @@ proc cancel(future: FutureBase, loc: ptr SrcLoc) =
# `cancelAndSchedule()` on that parent, thus propagating the cancellation
# up the chain.
if not(isNil(future.child)):
cancel(future.child, getSrcLocation())
cancel(future.child, loc)
future.mustCancel = true
else:
if not(isNil(future.cancelcb)):
future.cancelcb(cast[pointer](future))
cancelAndSchedule(future, getSrcLocation())
cancelAndSchedule(future, loc)
template cancel*[T](future: Future[T]) =
## Cancel ``future``.
@ -767,16 +770,15 @@ proc cancelAndWait*[T](fut: Future[T]): Future[void] =
## ``await``s on another Future.
# When `retFuture` completes, `fut` and all its children have been
# cancelled. If `fut` doesn't have any children, the `continuation()` callback
# runs immediately, without control getting back to the dispatcher.
# cancelled.
var retFuture = newFuture[void]("chronos.cancelAndWait(T)")
proc continuation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
retFuture.complete()
fut.addCallback(continuation)
# Start the cancellation process. If `fut` has children, multiple event loop
# steps will be needed for it to complete.
# Start the cancellation process. One or more event loop steps will be needed
# for it to complete.
fut.cancel()
return retFuture

View File

@ -295,8 +295,6 @@ proc readExactly*(rstream: AsyncStreamReader, pbytes: pointer,
if isNil(rstream.rsource):
try:
await readExactly(rstream.tsource, pbytes, nbytes)
except CancelledError:
raise
except TransportIncompleteError:
raise newAsyncStreamIncompleteError()
except CatchableError as exc:
@ -335,8 +333,6 @@ proc readOnce*(rstream: AsyncStreamReader, pbytes: pointer,
if isNil(rstream.rsource):
try:
result = await readOnce(rstream.tsource, pbytes, nbytes)
except CancelledError:
raise
except CatchableError as exc:
raise newAsyncStreamReadError(exc)
else:
@ -380,8 +376,6 @@ proc readUntil*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int,
if isNil(rstream.rsource):
try:
result = await readUntil(rstream.tsource, pbytes, nbytes, sep)
except CancelledError:
raise
except TransportIncompleteError:
raise newAsyncStreamIncompleteError()
except TransportLimitError:
@ -447,8 +441,6 @@ proc readLine*(rstream: AsyncStreamReader, limit = 0,
if isNil(rstream.rsource):
try:
result = await readLine(rstream.tsource, limit, sep)
except CancelledError:
raise
except CatchableError as exc:
raise newAsyncStreamReadError(exc)
else:
@ -502,8 +494,6 @@ proc read*(rstream: AsyncStreamReader, n = 0): Future[seq[byte]] {.async.} =
if isNil(rstream.rsource):
try:
result = await read(rstream.tsource, n)
except CancelledError:
raise
except CatchableError as exc:
raise newAsyncStreamReadError(exc)
else:
@ -552,8 +542,6 @@ proc consume*(rstream: AsyncStreamReader, n = -1): Future[int] {.async.} =
if isNil(rstream.rsource):
try:
result = await consume(rstream.tsource, n)
except CancelledError:
raise
except TransportLimitError:
raise newAsyncStreamLimitError()
except CatchableError as exc:
@ -606,8 +594,6 @@ proc write*(wstream: AsyncStreamWriter, pbytes: pointer,
var res: int
try:
res = await write(wstream.tsource, pbytes, nbytes)
except CancelledError:
raise
except CatchableError as exc:
raise newAsyncStreamWriteError(exc)
if res != nbytes:
@ -623,9 +609,7 @@ proc write*(wstream: AsyncStreamWriter, pbytes: pointer,
await wstream.queue.put(item)
try:
await item.future
except CancelledError:
raise
except:
except CatchableError:
raise newAsyncStreamWriteError(item.future.error)
proc write*(wstream: AsyncStreamWriter, sbytes: seq[byte],
@ -649,8 +633,6 @@ proc write*(wstream: AsyncStreamWriter, sbytes: seq[byte],
var res: int
try:
res = await write(wstream.tsource, sbytes, msglen)
except CancelledError:
raise
except CatchableError as exc:
raise newAsyncStreamWriteError(exc)
if res != length:
@ -669,9 +651,7 @@ proc write*(wstream: AsyncStreamWriter, sbytes: seq[byte],
await wstream.queue.put(item)
try:
await item.future
except CancelledError:
raise
except:
except CatchableError:
raise newAsyncStreamWriteError(item.future.error)
proc write*(wstream: AsyncStreamWriter, sbytes: string,
@ -694,8 +674,6 @@ proc write*(wstream: AsyncStreamWriter, sbytes: string,
var res: int
try:
res = await write(wstream.tsource, sbytes, msglen)
except CancelledError:
raise
except CatchableError as exc:
raise newAsyncStreamWriteError(exc)
if res != length:
@ -714,9 +692,7 @@ proc write*(wstream: AsyncStreamWriter, sbytes: string,
await wstream.queue.put(item)
try:
await item.future
except CancelledError:
raise
except:
except CatchableError:
raise newAsyncStreamWriteError(item.future.error)
proc finish*(wstream: AsyncStreamWriter) {.async.} =
@ -734,9 +710,7 @@ proc finish*(wstream: AsyncStreamWriter) {.async.} =
await wstream.queue.put(item)
try:
await item.future
except CancelledError:
raise
except:
except CatchableError:
raise newAsyncStreamWriteError(item.future.error)
proc join*(rw: AsyncStreamRW): Future[void] =