diff --git a/README.md b/README.md index 3c5cdf9..54a800f 100644 --- a/README.md +++ b/README.md @@ -199,6 +199,49 @@ Exceptions inheriting from `Defect` are treated differently, being raised directly. Don't try to catch them coming out of `poll()`, because this would leave behind some zombie futures. +### Cancellation + +Calling `cancel()` on a future will set its state to `FutureState.Cancelled` +and the cancellation will propagate to all its children and all its parents, at +some point in the future. A cancelled future's callbacks are still scheduled for execution. + +```nim +proc p1() {.async.} = + await sleepAsync(100.seconds) # this sleep will also be cancelled + +proc p2() {.async.} = + await p1() + +let fut2 = p2() +fut2.cancel() +while not(fut2.finished()): + poll() + +echo "fut2.state = ", fut2.state # prints "Cancelled" +doAssert fut2.cancelled() == true +``` + +Sometimes you need to wait for a future to be cancelled (and all its callbacks +executed). To do this, you `await` a new future created by `cancelAndWait()` +which is guaranteed to complete after the cancellation processed is finished. + +```nim +proc p1() {.async.} = + await sleepAsync(100.seconds) # the sleep will also be cancelled + +proc p2() {.async.} = + let fut1 = p1() + await cancelAndWait(fut1) + doAssert fut1.cancelled() == true + +waitFor p2() +``` + +If you put an `await` in a `try` block, always catch `CatchableError` or some +other specific exception, in order to avoid catching by mistake +`CancelledError` (object of `Exception`, used internally to propagate +cancellation). + ## TODO * Pipe/Subprocess Transports. * Multithreading Stream/Datagram servers diff --git a/chronos/asyncfutures2.nim b/chronos/asyncfutures2.nim index ad94943..b55b041 100644 --- a/chronos/asyncfutures2.nim +++ b/chronos/asyncfutures2.nim @@ -65,13 +65,16 @@ type FutureError* = object of CatchableError - CancelledError* = object of FutureError - FutureList* = object head*: FutureBase tail*: FutureBase count*: int +{.push warning[InheritFromException]: off.} +# used internally; should not be caught by the API user +type CancelledError* = object of Exception +{.pop.} + var currentID* {.threadvar.}: int currentID = 0 @@ -340,12 +343,12 @@ proc cancel(future: FutureBase, loc: ptr SrcLoc) = # `cancelAndSchedule()` on that parent, thus propagating the cancellation # up the chain. if not(isNil(future.child)): - cancel(future.child, getSrcLocation()) + cancel(future.child, loc) future.mustCancel = true else: if not(isNil(future.cancelcb)): future.cancelcb(cast[pointer](future)) - cancelAndSchedule(future, getSrcLocation()) + cancelAndSchedule(future, loc) template cancel*[T](future: Future[T]) = ## Cancel ``future``. @@ -767,16 +770,15 @@ proc cancelAndWait*[T](fut: Future[T]): Future[void] = ## ``await``s on another Future. # When `retFuture` completes, `fut` and all its children have been - # cancelled. If `fut` doesn't have any children, the `continuation()` callback - # runs immediately, without control getting back to the dispatcher. + # cancelled. var retFuture = newFuture[void]("chronos.cancelAndWait(T)") proc continuation(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): retFuture.complete() fut.addCallback(continuation) - # Start the cancellation process. If `fut` has children, multiple event loop - # steps will be needed for it to complete. + # Start the cancellation process. One or more event loop steps will be needed + # for it to complete. fut.cancel() return retFuture diff --git a/chronos/streams/asyncstream.nim b/chronos/streams/asyncstream.nim index 6fc194f..3268f36 100644 --- a/chronos/streams/asyncstream.nim +++ b/chronos/streams/asyncstream.nim @@ -295,8 +295,6 @@ proc readExactly*(rstream: AsyncStreamReader, pbytes: pointer, if isNil(rstream.rsource): try: await readExactly(rstream.tsource, pbytes, nbytes) - except CancelledError: - raise except TransportIncompleteError: raise newAsyncStreamIncompleteError() except CatchableError as exc: @@ -335,8 +333,6 @@ proc readOnce*(rstream: AsyncStreamReader, pbytes: pointer, if isNil(rstream.rsource): try: result = await readOnce(rstream.tsource, pbytes, nbytes) - except CancelledError: - raise except CatchableError as exc: raise newAsyncStreamReadError(exc) else: @@ -380,8 +376,6 @@ proc readUntil*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int, if isNil(rstream.rsource): try: result = await readUntil(rstream.tsource, pbytes, nbytes, sep) - except CancelledError: - raise except TransportIncompleteError: raise newAsyncStreamIncompleteError() except TransportLimitError: @@ -447,8 +441,6 @@ proc readLine*(rstream: AsyncStreamReader, limit = 0, if isNil(rstream.rsource): try: result = await readLine(rstream.tsource, limit, sep) - except CancelledError: - raise except CatchableError as exc: raise newAsyncStreamReadError(exc) else: @@ -502,8 +494,6 @@ proc read*(rstream: AsyncStreamReader, n = 0): Future[seq[byte]] {.async.} = if isNil(rstream.rsource): try: result = await read(rstream.tsource, n) - except CancelledError: - raise except CatchableError as exc: raise newAsyncStreamReadError(exc) else: @@ -552,8 +542,6 @@ proc consume*(rstream: AsyncStreamReader, n = -1): Future[int] {.async.} = if isNil(rstream.rsource): try: result = await consume(rstream.tsource, n) - except CancelledError: - raise except TransportLimitError: raise newAsyncStreamLimitError() except CatchableError as exc: @@ -606,8 +594,6 @@ proc write*(wstream: AsyncStreamWriter, pbytes: pointer, var res: int try: res = await write(wstream.tsource, pbytes, nbytes) - except CancelledError: - raise except CatchableError as exc: raise newAsyncStreamWriteError(exc) if res != nbytes: @@ -623,9 +609,7 @@ proc write*(wstream: AsyncStreamWriter, pbytes: pointer, await wstream.queue.put(item) try: await item.future - except CancelledError: - raise - except: + except CatchableError: raise newAsyncStreamWriteError(item.future.error) proc write*(wstream: AsyncStreamWriter, sbytes: seq[byte], @@ -649,8 +633,6 @@ proc write*(wstream: AsyncStreamWriter, sbytes: seq[byte], var res: int try: res = await write(wstream.tsource, sbytes, msglen) - except CancelledError: - raise except CatchableError as exc: raise newAsyncStreamWriteError(exc) if res != length: @@ -669,9 +651,7 @@ proc write*(wstream: AsyncStreamWriter, sbytes: seq[byte], await wstream.queue.put(item) try: await item.future - except CancelledError: - raise - except: + except CatchableError: raise newAsyncStreamWriteError(item.future.error) proc write*(wstream: AsyncStreamWriter, sbytes: string, @@ -694,8 +674,6 @@ proc write*(wstream: AsyncStreamWriter, sbytes: string, var res: int try: res = await write(wstream.tsource, sbytes, msglen) - except CancelledError: - raise except CatchableError as exc: raise newAsyncStreamWriteError(exc) if res != length: @@ -714,9 +692,7 @@ proc write*(wstream: AsyncStreamWriter, sbytes: string, await wstream.queue.put(item) try: await item.future - except CancelledError: - raise - except: + except CatchableError: raise newAsyncStreamWriteError(item.future.error) proc finish*(wstream: AsyncStreamWriter) {.async.} = @@ -734,9 +710,7 @@ proc finish*(wstream: AsyncStreamWriter) {.async.} = await wstream.queue.put(item) try: await item.future - except CancelledError: - raise - except: + except CatchableError: raise newAsyncStreamWriteError(item.future.error) proc join*(rw: AsyncStreamRW): Future[void] =