From 826d48c4aacbe58402572fea76fa7a085b2c653b Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Sun, 16 Aug 2020 01:43:44 +0300 Subject: [PATCH] Revert "more docs (#111)" This reverts commit d0a17d551ff9dec8213b07ce067ecd931dce1021. Moving CancelledError outside of the established Nim hierarchy is not a solution that has rough consensus and has an unknown impact on compatibility with otherwise correctly implemented cancellation code (for example when `CatchableError` is caught, cleanup is done, then the exception is reraised). Further, this breaks the established convention in the Nim community that Exception should not be inherited from, complicating compatibility with future Nim versions that may enforce this more strongly. --- README.md | 43 --------------------------------- chronos/asyncfutures2.nim | 18 ++++++-------- chronos/streams/asyncstream.nim | 34 +++++++++++++++++++++++--- 3 files changed, 38 insertions(+), 57 deletions(-) diff --git a/README.md b/README.md index 54a800f7..3c5cdf9a 100644 --- a/README.md +++ b/README.md @@ -199,49 +199,6 @@ Exceptions inheriting from `Defect` are treated differently, being raised directly. Don't try to catch them coming out of `poll()`, because this would leave behind some zombie futures. -### Cancellation - -Calling `cancel()` on a future will set its state to `FutureState.Cancelled` -and the cancellation will propagate to all its children and all its parents, at -some point in the future. A cancelled future's callbacks are still scheduled for execution. - -```nim -proc p1() {.async.} = - await sleepAsync(100.seconds) # this sleep will also be cancelled - -proc p2() {.async.} = - await p1() - -let fut2 = p2() -fut2.cancel() -while not(fut2.finished()): - poll() - -echo "fut2.state = ", fut2.state # prints "Cancelled" -doAssert fut2.cancelled() == true -``` - -Sometimes you need to wait for a future to be cancelled (and all its callbacks -executed). To do this, you `await` a new future created by `cancelAndWait()` -which is guaranteed to complete after the cancellation processed is finished. - -```nim -proc p1() {.async.} = - await sleepAsync(100.seconds) # the sleep will also be cancelled - -proc p2() {.async.} = - let fut1 = p1() - await cancelAndWait(fut1) - doAssert fut1.cancelled() == true - -waitFor p2() -``` - -If you put an `await` in a `try` block, always catch `CatchableError` or some -other specific exception, in order to avoid catching by mistake -`CancelledError` (object of `Exception`, used internally to propagate -cancellation). - ## TODO * Pipe/Subprocess Transports. * Multithreading Stream/Datagram servers diff --git a/chronos/asyncfutures2.nim b/chronos/asyncfutures2.nim index b55b0416..ad949437 100644 --- a/chronos/asyncfutures2.nim +++ b/chronos/asyncfutures2.nim @@ -65,16 +65,13 @@ type FutureError* = object of CatchableError + CancelledError* = object of FutureError + FutureList* = object head*: FutureBase tail*: FutureBase count*: int -{.push warning[InheritFromException]: off.} -# used internally; should not be caught by the API user -type CancelledError* = object of Exception -{.pop.} - var currentID* {.threadvar.}: int currentID = 0 @@ -343,12 +340,12 @@ proc cancel(future: FutureBase, loc: ptr SrcLoc) = # `cancelAndSchedule()` on that parent, thus propagating the cancellation # up the chain. if not(isNil(future.child)): - cancel(future.child, loc) + cancel(future.child, getSrcLocation()) future.mustCancel = true else: if not(isNil(future.cancelcb)): future.cancelcb(cast[pointer](future)) - cancelAndSchedule(future, loc) + cancelAndSchedule(future, getSrcLocation()) template cancel*[T](future: Future[T]) = ## Cancel ``future``. @@ -770,15 +767,16 @@ proc cancelAndWait*[T](fut: Future[T]): Future[void] = ## ``await``s on another Future. # When `retFuture` completes, `fut` and all its children have been - # cancelled. + # cancelled. If `fut` doesn't have any children, the `continuation()` callback + # runs immediately, without control getting back to the dispatcher. var retFuture = newFuture[void]("chronos.cancelAndWait(T)") proc continuation(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): retFuture.complete() fut.addCallback(continuation) - # Start the cancellation process. One or more event loop steps will be needed - # for it to complete. + # Start the cancellation process. If `fut` has children, multiple event loop + # steps will be needed for it to complete. fut.cancel() return retFuture diff --git a/chronos/streams/asyncstream.nim b/chronos/streams/asyncstream.nim index 3268f369..6fc194fa 100644 --- a/chronos/streams/asyncstream.nim +++ b/chronos/streams/asyncstream.nim @@ -295,6 +295,8 @@ proc readExactly*(rstream: AsyncStreamReader, pbytes: pointer, if isNil(rstream.rsource): try: await readExactly(rstream.tsource, pbytes, nbytes) + except CancelledError: + raise except TransportIncompleteError: raise newAsyncStreamIncompleteError() except CatchableError as exc: @@ -333,6 +335,8 @@ proc readOnce*(rstream: AsyncStreamReader, pbytes: pointer, if isNil(rstream.rsource): try: result = await readOnce(rstream.tsource, pbytes, nbytes) + except CancelledError: + raise except CatchableError as exc: raise newAsyncStreamReadError(exc) else: @@ -376,6 +380,8 @@ proc readUntil*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int, if isNil(rstream.rsource): try: result = await readUntil(rstream.tsource, pbytes, nbytes, sep) + except CancelledError: + raise except TransportIncompleteError: raise newAsyncStreamIncompleteError() except TransportLimitError: @@ -441,6 +447,8 @@ proc readLine*(rstream: AsyncStreamReader, limit = 0, if isNil(rstream.rsource): try: result = await readLine(rstream.tsource, limit, sep) + except CancelledError: + raise except CatchableError as exc: raise newAsyncStreamReadError(exc) else: @@ -494,6 +502,8 @@ proc read*(rstream: AsyncStreamReader, n = 0): Future[seq[byte]] {.async.} = if isNil(rstream.rsource): try: result = await read(rstream.tsource, n) + except CancelledError: + raise except CatchableError as exc: raise newAsyncStreamReadError(exc) else: @@ -542,6 +552,8 @@ proc consume*(rstream: AsyncStreamReader, n = -1): Future[int] {.async.} = if isNil(rstream.rsource): try: result = await consume(rstream.tsource, n) + except CancelledError: + raise except TransportLimitError: raise newAsyncStreamLimitError() except CatchableError as exc: @@ -594,6 +606,8 @@ proc write*(wstream: AsyncStreamWriter, pbytes: pointer, var res: int try: res = await write(wstream.tsource, pbytes, nbytes) + except CancelledError: + raise except CatchableError as exc: raise newAsyncStreamWriteError(exc) if res != nbytes: @@ -609,7 +623,9 @@ proc write*(wstream: AsyncStreamWriter, pbytes: pointer, await wstream.queue.put(item) try: await item.future - except CatchableError: + except CancelledError: + raise + except: raise newAsyncStreamWriteError(item.future.error) proc write*(wstream: AsyncStreamWriter, sbytes: seq[byte], @@ -633,6 +649,8 @@ proc write*(wstream: AsyncStreamWriter, sbytes: seq[byte], var res: int try: res = await write(wstream.tsource, sbytes, msglen) + except CancelledError: + raise except CatchableError as exc: raise newAsyncStreamWriteError(exc) if res != length: @@ -651,7 +669,9 @@ proc write*(wstream: AsyncStreamWriter, sbytes: seq[byte], await wstream.queue.put(item) try: await item.future - except CatchableError: + except CancelledError: + raise + except: raise newAsyncStreamWriteError(item.future.error) proc write*(wstream: AsyncStreamWriter, sbytes: string, @@ -674,6 +694,8 @@ proc write*(wstream: AsyncStreamWriter, sbytes: string, var res: int try: res = await write(wstream.tsource, sbytes, msglen) + except CancelledError: + raise except CatchableError as exc: raise newAsyncStreamWriteError(exc) if res != length: @@ -692,7 +714,9 @@ proc write*(wstream: AsyncStreamWriter, sbytes: string, await wstream.queue.put(item) try: await item.future - except CatchableError: + except CancelledError: + raise + except: raise newAsyncStreamWriteError(item.future.error) proc finish*(wstream: AsyncStreamWriter) {.async.} = @@ -710,7 +734,9 @@ proc finish*(wstream: AsyncStreamWriter) {.async.} = await wstream.queue.put(item) try: await item.future - except CatchableError: + except CancelledError: + raise + except: raise newAsyncStreamWriteError(item.future.error) proc join*(rw: AsyncStreamRW): Future[void] =