diff --git a/README.md b/README.md index 54a800f7..3c5cdf9a 100644 --- a/README.md +++ b/README.md @@ -199,49 +199,6 @@ Exceptions inheriting from `Defect` are treated differently, being raised directly. Don't try to catch them coming out of `poll()`, because this would leave behind some zombie futures. -### Cancellation - -Calling `cancel()` on a future will set its state to `FutureState.Cancelled` -and the cancellation will propagate to all its children and all its parents, at -some point in the future. A cancelled future's callbacks are still scheduled for execution. - -```nim -proc p1() {.async.} = - await sleepAsync(100.seconds) # this sleep will also be cancelled - -proc p2() {.async.} = - await p1() - -let fut2 = p2() -fut2.cancel() -while not(fut2.finished()): - poll() - -echo "fut2.state = ", fut2.state # prints "Cancelled" -doAssert fut2.cancelled() == true -``` - -Sometimes you need to wait for a future to be cancelled (and all its callbacks -executed). To do this, you `await` a new future created by `cancelAndWait()` -which is guaranteed to complete after the cancellation processed is finished. - -```nim -proc p1() {.async.} = - await sleepAsync(100.seconds) # the sleep will also be cancelled - -proc p2() {.async.} = - let fut1 = p1() - await cancelAndWait(fut1) - doAssert fut1.cancelled() == true - -waitFor p2() -``` - -If you put an `await` in a `try` block, always catch `CatchableError` or some -other specific exception, in order to avoid catching by mistake -`CancelledError` (object of `Exception`, used internally to propagate -cancellation). - ## TODO * Pipe/Subprocess Transports. * Multithreading Stream/Datagram servers diff --git a/chronos/asyncfutures2.nim b/chronos/asyncfutures2.nim index b55b0416..ad949437 100644 --- a/chronos/asyncfutures2.nim +++ b/chronos/asyncfutures2.nim @@ -65,16 +65,13 @@ type FutureError* = object of CatchableError + CancelledError* = object of FutureError + FutureList* = object head*: FutureBase tail*: FutureBase count*: int -{.push warning[InheritFromException]: off.} -# used internally; should not be caught by the API user -type CancelledError* = object of Exception -{.pop.} - var currentID* {.threadvar.}: int currentID = 0 @@ -343,12 +340,12 @@ proc cancel(future: FutureBase, loc: ptr SrcLoc) = # `cancelAndSchedule()` on that parent, thus propagating the cancellation # up the chain. if not(isNil(future.child)): - cancel(future.child, loc) + cancel(future.child, getSrcLocation()) future.mustCancel = true else: if not(isNil(future.cancelcb)): future.cancelcb(cast[pointer](future)) - cancelAndSchedule(future, loc) + cancelAndSchedule(future, getSrcLocation()) template cancel*[T](future: Future[T]) = ## Cancel ``future``. @@ -770,15 +767,16 @@ proc cancelAndWait*[T](fut: Future[T]): Future[void] = ## ``await``s on another Future. # When `retFuture` completes, `fut` and all its children have been - # cancelled. + # cancelled. If `fut` doesn't have any children, the `continuation()` callback + # runs immediately, without control getting back to the dispatcher. var retFuture = newFuture[void]("chronos.cancelAndWait(T)") proc continuation(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): retFuture.complete() fut.addCallback(continuation) - # Start the cancellation process. One or more event loop steps will be needed - # for it to complete. + # Start the cancellation process. If `fut` has children, multiple event loop + # steps will be needed for it to complete. fut.cancel() return retFuture diff --git a/chronos/streams/asyncstream.nim b/chronos/streams/asyncstream.nim index 3268f369..6fc194fa 100644 --- a/chronos/streams/asyncstream.nim +++ b/chronos/streams/asyncstream.nim @@ -295,6 +295,8 @@ proc readExactly*(rstream: AsyncStreamReader, pbytes: pointer, if isNil(rstream.rsource): try: await readExactly(rstream.tsource, pbytes, nbytes) + except CancelledError: + raise except TransportIncompleteError: raise newAsyncStreamIncompleteError() except CatchableError as exc: @@ -333,6 +335,8 @@ proc readOnce*(rstream: AsyncStreamReader, pbytes: pointer, if isNil(rstream.rsource): try: result = await readOnce(rstream.tsource, pbytes, nbytes) + except CancelledError: + raise except CatchableError as exc: raise newAsyncStreamReadError(exc) else: @@ -376,6 +380,8 @@ proc readUntil*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int, if isNil(rstream.rsource): try: result = await readUntil(rstream.tsource, pbytes, nbytes, sep) + except CancelledError: + raise except TransportIncompleteError: raise newAsyncStreamIncompleteError() except TransportLimitError: @@ -441,6 +447,8 @@ proc readLine*(rstream: AsyncStreamReader, limit = 0, if isNil(rstream.rsource): try: result = await readLine(rstream.tsource, limit, sep) + except CancelledError: + raise except CatchableError as exc: raise newAsyncStreamReadError(exc) else: @@ -494,6 +502,8 @@ proc read*(rstream: AsyncStreamReader, n = 0): Future[seq[byte]] {.async.} = if isNil(rstream.rsource): try: result = await read(rstream.tsource, n) + except CancelledError: + raise except CatchableError as exc: raise newAsyncStreamReadError(exc) else: @@ -542,6 +552,8 @@ proc consume*(rstream: AsyncStreamReader, n = -1): Future[int] {.async.} = if isNil(rstream.rsource): try: result = await consume(rstream.tsource, n) + except CancelledError: + raise except TransportLimitError: raise newAsyncStreamLimitError() except CatchableError as exc: @@ -594,6 +606,8 @@ proc write*(wstream: AsyncStreamWriter, pbytes: pointer, var res: int try: res = await write(wstream.tsource, pbytes, nbytes) + except CancelledError: + raise except CatchableError as exc: raise newAsyncStreamWriteError(exc) if res != nbytes: @@ -609,7 +623,9 @@ proc write*(wstream: AsyncStreamWriter, pbytes: pointer, await wstream.queue.put(item) try: await item.future - except CatchableError: + except CancelledError: + raise + except: raise newAsyncStreamWriteError(item.future.error) proc write*(wstream: AsyncStreamWriter, sbytes: seq[byte], @@ -633,6 +649,8 @@ proc write*(wstream: AsyncStreamWriter, sbytes: seq[byte], var res: int try: res = await write(wstream.tsource, sbytes, msglen) + except CancelledError: + raise except CatchableError as exc: raise newAsyncStreamWriteError(exc) if res != length: @@ -651,7 +669,9 @@ proc write*(wstream: AsyncStreamWriter, sbytes: seq[byte], await wstream.queue.put(item) try: await item.future - except CatchableError: + except CancelledError: + raise + except: raise newAsyncStreamWriteError(item.future.error) proc write*(wstream: AsyncStreamWriter, sbytes: string, @@ -674,6 +694,8 @@ proc write*(wstream: AsyncStreamWriter, sbytes: string, var res: int try: res = await write(wstream.tsource, sbytes, msglen) + except CancelledError: + raise except CatchableError as exc: raise newAsyncStreamWriteError(exc) if res != length: @@ -692,7 +714,9 @@ proc write*(wstream: AsyncStreamWriter, sbytes: string, await wstream.queue.put(item) try: await item.future - except CatchableError: + except CancelledError: + raise + except: raise newAsyncStreamWriteError(item.future.error) proc finish*(wstream: AsyncStreamWriter) {.async.} = @@ -710,7 +734,9 @@ proc finish*(wstream: AsyncStreamWriter) {.async.} = await wstream.queue.put(item) try: await item.future - except CatchableError: + except CancelledError: + raise + except: raise newAsyncStreamWriteError(item.future.error) proc join*(rw: AsyncStreamRW): Future[void] =