From 2e8551b0d973cfbebfab3be7f3329e11b9049007 Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Fri, 15 Sep 2023 19:38:39 +0300 Subject: [PATCH] Cancellation fixes and tests. (#445) * Add callTick and stream cancellation tests. * Fix stepsAsync() test. * Cancellation changes. * Update and add more cancellation tests. * Fix Posix shutdown call to handle ENOTCONN error. * With new changes to to cancellation its now possible. * Refactor testsoon.nim to not produce artifacts after tests are finished. * Debugging MacOS issue. * Adjust flaky test times. * Fix issue. * Add test for issue #334 which was also addressed in this PR. Avoid `break` in problematic test. * Add noCancelWait() call which prohibits cancellation. Fix closeWait() calls to use noCancelWait() predicate. Adding sleep to flaky MacOS test. * Remove all debugging echoes. * Fix cancelAndWait() which now could perform multiple attempts to cancel target Future (mustCancel behavior). * Fix issues revealed by switch to different cancelAndWait(). * Address review comments. * Fix testutils compilation warning. * Rename callTick() to internalCallTick(). * Add some documentation comments. * Disable flaky ratelimit test. * Rename noCancelWait() to noCancel(). Address review comments. --- chronos/apps/http/httpbodyrw.nim | 6 +- chronos/apps/http/httpclient.nim | 31 +- chronos/apps/http/httpserver.nim | 18 +- chronos/apps/http/shttpserver.nim | 17 +- chronos/asyncfutures2.nim | 201 ++++++++---- chronos/asyncloop.nim | 175 ++++++----- chronos/asyncmacro2.nim | 7 - chronos/asyncproc.nim | 2 +- chronos/asyncsync.nim | 12 +- chronos/futures.nim | 41 ++- chronos/ratelimit.nim | 4 +- chronos/streams/asyncstream.nim | 30 +- chronos/streams/tlsstream.nim | 18 +- chronos/transports/datagram.nim | 22 +- chronos/transports/stream.nim | 32 +- tests/testbugs.nim | 2 +- tests/testfut.nim | 496 +++++++++++++++++++++++++++++- tests/testhttpclient.nim | 113 +++++++ tests/testhttpserver.nim | 95 +++--- tests/testratelimit.nim | 19 +- tests/testsoon.nim | 146 ++++----- tests/teststream.nim | 95 +++++- tests/testsync.nim | 4 +- tests/testtime.nim | 24 +- tests/testutils.nim | 2 +- 25 files changed, 1243 insertions(+), 369 deletions(-) diff --git a/chronos/apps/http/httpbodyrw.nim b/chronos/apps/http/httpbodyrw.nim index b948fbd..bb28ea6 100644 --- a/chronos/apps/http/httpbodyrw.nim +++ b/chronos/apps/http/httpbodyrw.nim @@ -45,8 +45,8 @@ proc closeWait*(bstream: HttpBodyReader) {.async.} = # data from stream at position [1]. for index in countdown((len(bstream.streams) - 1), 0): res.add(bstream.streams[index].closeWait()) - await allFutures(res) - await procCall(closeWait(AsyncStreamReader(bstream))) + res.add(procCall(closeWait(AsyncStreamReader(bstream)))) + await noCancel(allFutures(res)) bstream.bstate = HttpState.Closed untrackCounter(HttpBodyReaderTrackerName) @@ -68,7 +68,7 @@ proc closeWait*(bstream: HttpBodyWriter) {.async.} = var res = newSeq[Future[void]]() for index in countdown(len(bstream.streams) - 1, 0): res.add(bstream.streams[index].closeWait()) - await allFutures(res) + await noCancel(allFutures(res)) await procCall(closeWait(AsyncStreamWriter(bstream))) bstream.bstate = HttpState.Closed untrackCounter(HttpBodyWriterTrackerName) diff --git a/chronos/apps/http/httpclient.nim b/chronos/apps/http/httpclient.nim index 1815d28..01e2bab 100644 --- a/chronos/apps/http/httpclient.nim +++ b/chronos/apps/http/httpclient.nim @@ -600,14 +600,12 @@ proc closeWait(conn: HttpClientConnectionRef) {.async.} = res.add(conn.reader.closeWait()) if not(isNil(conn.writer)) and not(conn.writer.closed()): res.add(conn.writer.closeWait()) + if conn.kind == HttpClientScheme.Secure: + res.add(conn.treader.closeWait()) + res.add(conn.twriter.closeWait()) + res.add(conn.transp.closeWait()) res - if len(pending) > 0: await allFutures(pending) - case conn.kind - of HttpClientScheme.Secure: - await allFutures(conn.treader.closeWait(), conn.twriter.closeWait()) - of HttpClientScheme.NonSecure: - discard - await conn.transp.closeWait() + if len(pending) > 0: await noCancel(allFutures(pending)) conn.state = HttpClientConnectionState.Closed untrackCounter(HttpClientConnectionTrackerName) @@ -631,8 +629,7 @@ proc connect(session: HttpSessionRef, let conn = block: let res = HttpClientConnectionRef.new(session, ha, transp) - case res.kind - of HttpClientScheme.Secure: + if res.kind == HttpClientScheme.Secure: try: await res.tls.handshake() res.state = HttpClientConnectionState.Ready @@ -647,7 +644,7 @@ proc connect(session: HttpSessionRef, await res.closeWait() res.state = HttpClientConnectionState.Error lastError = $exc.msg - of HttpClientScheme.Nonsecure: + else: res.state = HttpClientConnectionState.Ready res if conn.state == HttpClientConnectionState.Ready: @@ -785,7 +782,7 @@ proc closeWait*(session: HttpSessionRef) {.async.} = for connections in session.connections.values(): for conn in connections: pending.add(closeWait(conn)) - await allFutures(pending) + await noCancel(allFutures(pending)) proc sessionWatcher(session: HttpSessionRef) {.async.} = while true: @@ -830,26 +827,30 @@ proc sessionWatcher(session: HttpSessionRef) {.async.} = break proc closeWait*(request: HttpClientRequestRef) {.async.} = + var pending: seq[FutureBase] if request.state notin {HttpReqRespState.Closing, HttpReqRespState.Closed}: request.state = HttpReqRespState.Closing if not(isNil(request.writer)): if not(request.writer.closed()): - await request.writer.closeWait() + pending.add(FutureBase(request.writer.closeWait())) request.writer = nil - await request.releaseConnection() + pending.add(FutureBase(request.releaseConnection())) + await noCancel(allFutures(pending)) request.session = nil request.error = nil request.state = HttpReqRespState.Closed untrackCounter(HttpClientRequestTrackerName) proc closeWait*(response: HttpClientResponseRef) {.async.} = + var pending: seq[FutureBase] if response.state notin {HttpReqRespState.Closing, HttpReqRespState.Closed}: response.state = HttpReqRespState.Closing if not(isNil(response.reader)): if not(response.reader.closed()): - await response.reader.closeWait() + pending.add(FutureBase(response.reader.closeWait())) response.reader = nil - await response.releaseConnection() + pending.add(FutureBase(response.releaseConnection())) + await noCancel(allFutures(pending)) response.session = nil response.error = nil response.state = HttpReqRespState.Closed diff --git a/chronos/apps/http/httpserver.nim b/chronos/apps/http/httpserver.nim index eafa27c..f0788e2 100644 --- a/chronos/apps/http/httpserver.nim +++ b/chronos/apps/http/httpserver.nim @@ -809,10 +809,7 @@ proc closeUnsecureConnection(conn: HttpConnectionRef) {.async.} = pending.add(conn.mainReader.closeWait()) pending.add(conn.mainWriter.closeWait()) pending.add(conn.transp.closeWait()) - try: - await allFutures(pending) - except CancelledError: - await allFutures(pending) + await noCancel(allFutures(pending)) untrackCounter(HttpServerUnsecureConnectionTrackerName) reset(conn[]) conn.state = HttpState.Closed @@ -829,7 +826,7 @@ proc new(ht: typedesc[HttpConnectionRef], server: HttpServerRef, res proc gracefulCloseWait*(conn: HttpConnectionRef) {.async.} = - await conn.transp.shutdownWait() + await noCancel(conn.transp.shutdownWait()) await conn.closeCb(conn) proc closeWait*(conn: HttpConnectionRef): Future[void] = @@ -841,11 +838,7 @@ proc closeWait*(req: HttpRequestRef) {.async.} = req.state = HttpState.Closing let resp = req.response.get() if (HttpResponseFlags.Stream in resp.flags) and not(isNil(resp.writer)): - var writer = resp.writer.closeWait() - try: - await writer - except CancelledError: - await writer + await closeWait(resp.writer) reset(resp[]) untrackCounter(HttpServerRequestTrackerName) reset(req[]) @@ -1038,7 +1031,6 @@ proc processLoop(holder: HttpConnectionHolderRef) {.async.} = except CatchableError as exc: raiseAssert "Unexpected error [" & $exc.name & "] happens: " & $exc.msg - server.connections.del(connectionId) case runLoop of HttpProcessExitType.KeepAlive: await connection.closeWait() @@ -1047,6 +1039,8 @@ proc processLoop(holder: HttpConnectionHolderRef) {.async.} = of HttpProcessExitType.Graceful: await connection.gracefulCloseWait() + server.connections.del(connectionId) + proc acceptClientLoop(server: HttpServerRef) {.async.} = var runLoop = true while runLoop: @@ -1102,7 +1096,7 @@ proc drop*(server: HttpServerRef) {.async.} = for holder in server.connections.values(): if not(isNil(holder.future)) and not(holder.future.finished()): pending.add(holder.future.cancelAndWait()) - await allFutures(pending) + await noCancel(allFutures(pending)) server.connections.clear() proc closeWait*(server: HttpServerRef) {.async.} = diff --git a/chronos/apps/http/shttpserver.nim b/chronos/apps/http/shttpserver.nim index bc5c3fb..6d321a0 100644 --- a/chronos/apps/http/shttpserver.nim +++ b/chronos/apps/http/shttpserver.nim @@ -30,19 +30,10 @@ proc closeSecConnection(conn: HttpConnectionRef) {.async.} = var pending: seq[Future[void]] pending.add(conn.writer.closeWait()) pending.add(conn.reader.closeWait()) - try: - await allFutures(pending) - except CancelledError: - await allFutures(pending) - # After we going to close everything else. - pending.setLen(3) - pending[0] = conn.mainReader.closeWait() - pending[1] = conn.mainWriter.closeWait() - pending[2] = conn.transp.closeWait() - try: - await allFutures(pending) - except CancelledError: - await allFutures(pending) + pending.add(conn.mainReader.closeWait()) + pending.add(conn.mainWriter.closeWait()) + pending.add(conn.transp.closeWait()) + await noCancel(allFutures(pending)) reset(cast[SecureHttpConnectionRef](conn)[]) untrackCounter(HttpServerSecureConnectionTrackerName) conn.state = HttpState.Closed diff --git a/chronos/asyncfutures2.nim b/chronos/asyncfutures2.nim index d3954ba..ee6e8e0 100644 --- a/chronos/asyncfutures2.nim +++ b/chronos/asyncfutures2.nim @@ -12,6 +12,7 @@ import std/sequtils import stew/base10 when chronosStackTrace: + import std/strutils when defined(nimHasStacktracesModule): import system/stacktraces else: @@ -26,7 +27,8 @@ template LocFinishIndex*: auto {.deprecated: "LocationKind.Finish".} = template LocCompleteIndex*: untyped {.deprecated: "LocationKind.Finish".} = LocationKind.Finish -func `[]`*(loc: array[LocationKind, ptr SrcLoc], v: int): ptr SrcLoc {.deprecated: "use LocationKind".} = +func `[]`*(loc: array[LocationKind, ptr SrcLoc], v: int): ptr SrcLoc {. + deprecated: "use LocationKind".} = case v of 0: loc[LocationKind.Create] of 1: loc[LocationKind.Finish] @@ -43,29 +45,37 @@ type # Backwards compatibility for old FutureState name template Finished* {.deprecated: "Use Completed instead".} = Completed -template Finished*(T: type FutureState): FutureState {.deprecated: "Use FutureState.Completed instead".} = FutureState.Completed +template Finished*(T: type FutureState): FutureState {. + deprecated: "Use FutureState.Completed instead".} = + FutureState.Completed proc newFutureImpl[T](loc: ptr SrcLoc): Future[T] = let fut = Future[T]() - internalInitFutureBase(fut, loc, FutureState.Pending) + internalInitFutureBase(fut, loc, FutureState.Pending, {}) + fut + +proc newFutureImpl[T](loc: ptr SrcLoc, flags: FutureFlags): Future[T] = + let fut = Future[T]() + internalInitFutureBase(fut, loc, FutureState.Pending, flags) fut proc newFutureSeqImpl[A, B](loc: ptr SrcLoc): FutureSeq[A, B] = let fut = FutureSeq[A, B]() - internalInitFutureBase(fut, loc, FutureState.Pending) + internalInitFutureBase(fut, loc, FutureState.Pending, {}) fut proc newFutureStrImpl[T](loc: ptr SrcLoc): FutureStr[T] = let fut = FutureStr[T]() - internalInitFutureBase(fut, loc, FutureState.Pending) + internalInitFutureBase(fut, loc, FutureState.Pending, {}) fut -template newFuture*[T](fromProc: static[string] = ""): Future[T] = +template newFuture*[T](fromProc: static[string] = "", + flags: static[FutureFlags] = {}): Future[T] = ## Creates a new future. ## ## Specifying ``fromProc``, which is a string specifying the name of the proc ## that this future belongs to, is a good habit as it helps with debugging. - newFutureImpl[T](getSrcLocation(fromProc)) + newFutureImpl[T](getSrcLocation(fromProc), flags) template newFutureSeq*[A, B](fromProc: static[string] = ""): FutureSeq[A, B] = ## Create a new future which can hold/preserve GC sequence until future will @@ -132,8 +142,6 @@ proc finish(fut: FutureBase, state: FutureState) = # 1. `finish()` is a private procedure and `state` is under our control. # 2. `fut.state` is checked by `checkFinished()`. fut.internalState = state - when chronosStrictFutureAccess: - doAssert fut.internalCancelcb == nil or state != FutureState.Cancelled fut.internalCancelcb = nil # release cancellation callback memory for item in fut.internalCallbacks.mitems(): if not(isNil(item.function)): @@ -194,21 +202,23 @@ proc cancelAndSchedule(future: FutureBase, loc: ptr SrcLoc) = template cancelAndSchedule*(future: FutureBase) = cancelAndSchedule(future, getSrcLocation()) -proc cancel(future: FutureBase, loc: ptr SrcLoc): bool = - ## Request that Future ``future`` cancel itself. +proc tryCancel(future: FutureBase, loc: ptr SrcLoc): bool = + ## Perform an attempt to cancel ``future``. ## - ## This arranges for a `CancelledError` to be thrown into procedure which - ## waits for ``future`` on the next cycle through the event loop. - ## The procedure then has a chance to clean up or even deny the request - ## using `try/except/finally`. + ## NOTE: This procedure does not guarantee that cancellation will actually + ## happened. ## - ## This call do not guarantee that the ``future`` will be cancelled: the - ## exception might be caught and acted upon, delaying cancellation of the - ## ``future`` or preventing cancellation completely. The ``future`` may also - ## return value or raise different exception. + ## Cancellation is the process which starts from the last ``future`` + ## descendent and moves step by step to the parent ``future``. To initiate + ## this process procedure iterates through all non-finished ``future`` + ## descendents and tries to find the last one. If last descendent is still + ## pending it will become cancelled and process will be initiated. In such + ## case this procedure returns ``true``. ## - ## Immediately after this procedure is called, ``future.cancelled()`` will - ## not return ``true`` (unless the Future was already cancelled). + ## If last descendent future is not pending, this procedure will be unable to + ## initiate cancellation process and so it returns ``false``. + if future.cancelled(): + return true if future.finished(): return false @@ -217,23 +227,18 @@ proc cancel(future: FutureBase, loc: ptr SrcLoc): bool = # mechanism and/or use a regular `addCallback` when chronosStrictFutureAccess: doAssert future.internalCancelcb.isNil, - "futures returned from `{.async.}` functions must not use `cancelCallback`" - - if cancel(future.internalChild, getSrcLocation()): - return true - + "futures returned from `{.async.}` functions must not use " & + "`cancelCallback`" + tryCancel(future.internalChild, loc) else: if not(isNil(future.internalCancelcb)): future.internalCancelcb(cast[pointer](future)) - future.internalCancelcb = nil - cancelAndSchedule(future, getSrcLocation()) + if FutureFlag.OwnCancelSchedule notin future.internalFlags: + cancelAndSchedule(future, loc) + future.cancelled() - future.internalMustCancel = true - return true - -template cancel*(future: FutureBase) = - ## Cancel ``future``. - discard cancel(future, getSrcLocation()) +template tryCancel*(future: FutureBase): bool = + tryCancel(future, getSrcLocation()) proc clearCallbacks(future: FutureBase) = future.internalCallbacks = default(seq[AsyncCallback]) @@ -778,27 +783,117 @@ proc oneValue*[T](futs: varargs[Future[T]]): Future[T] {. return retFuture -proc cancelAndWait*(fut: FutureBase): Future[void] = - ## Initiate cancellation process for Future ``fut`` and wait until ``fut`` is - ## done e.g. changes its state (become completed, failed or cancelled). +proc cancelSoon(future: FutureBase, aftercb: CallbackFunc, udata: pointer, + loc: ptr SrcLoc) = + ## Perform cancellation ``future`` and call ``aftercb`` callback when + ## ``future`` become finished (completed with value, failed or cancelled). ## - ## If ``fut`` is already finished (completed, failed or cancelled) result - ## Future[void] object will be returned complete. - var retFuture = newFuture[void]("chronos.cancelAndWait(T)") - proc continuation(udata: pointer) = - if not(retFuture.finished()): - retFuture.complete() - proc cancellation(udata: pointer) = - if not(fut.finished()): - fut.removeCallback(continuation) - if fut.finished(): + ## NOTE: Compared to the `tryCancel()` call, this procedure call guarantees + ## that ``future``will be finished (completed with value, failed or cancelled) + ## as quickly as possible. + proc checktick(udata: pointer) {.gcsafe.} = + # We trying to cancel Future on more time, and if `cancel()` succeeds we + # return early. + if tryCancel(future, loc): + return + # Cancellation signal was not delivered, so we trying to deliver it one + # more time after one tick. But we need to check situation when child + # future was finished but our completion callback is not yet invoked. + if not(future.finished()): + internalCallTick(checktick) + + proc continuation(udata: pointer) {.gcsafe.} = + # We do not use `callSoon` here because we was just scheduled from `poll()`. + if not(isNil(aftercb)): + aftercb(udata) + + if future.finished(): + # We could not schedule callback directly otherwise we could fall into + # recursion problem. + if not(isNil(aftercb)): + let loop = getThreadDispatcher() + loop.callbacks.addLast(AsyncCallback(function: aftercb, udata: udata)) + return + + future.addCallback(continuation) + # Initiate cancellation process. + if not(tryCancel(future, loc)): + # Cancellation signal was not delivered, so we trying to deliver it one + # more time after async tick. But we need to check case, when future was + # finished but our completion callback is not yet invoked. + if not(future.finished()): + internalCallTick(checktick) + +template cancelSoon*(fut: FutureBase, cb: CallbackFunc, udata: pointer) = + cancelSoon(fut, cb, udata, getSrcLocation()) + +template cancelSoon*(fut: FutureBase, cb: CallbackFunc) = + cancelSoon(fut, cb, nil, getSrcLocation()) + +template cancelSoon*(fut: FutureBase, acb: AsyncCallback) = + cancelSoon(fut, acb.function, acb.udata, getSrcLocation()) + +template cancelSoon*(fut: FutureBase) = + cancelSoon(fut, nil, nil, getSrcLocation()) + +template cancel*(future: FutureBase) {. + deprecated: "Please use cancelSoon() or cancelAndWait() instead".} = + ## Cancel ``future``. + cancelSoon(future, nil, nil, getSrcLocation()) + +proc cancelAndWait*(future: FutureBase, loc: ptr SrcLoc): Future[void] = + ## Perform cancellation ``future`` return Future which will be completed when + ## ``future`` become finished (completed with value, failed or cancelled). + ## + ## NOTE: Compared to the `tryCancel()` call, this procedure call guarantees + ## that ``future``will be finished (completed with value, failed or cancelled) + ## as quickly as possible. + let retFuture = newFuture[void]("chronos.cancelAndWait(FutureBase)", + {FutureFlag.OwnCancelSchedule}) + + proc continuation(udata: pointer) {.gcsafe.} = + retFuture.complete() + + if future.finished(): retFuture.complete() else: - fut.addCallback(continuation) - retFuture.cancelCallback = cancellation - # Initiate cancellation process. - fut.cancel() - return retFuture + cancelSoon(future, continuation, cast[pointer](retFuture), loc) + + retFuture + +template cancelAndWait*(future: FutureBase): Future[void] = + ## Cancel ``future``. + cancelAndWait(future, getSrcLocation()) + +proc noCancel*[T](future: Future[T]): Future[T] = + ## Prevent cancellation requests from propagating to ``future`` while + ## forwarding its value or error when it finishes. + ## + ## This procedure should be used when you need to perform operations which + ## should not be cancelled at all cost, for example closing sockets, pipes, + ## connections or servers. Usually it become useful in exception or finally + ## blocks. + let retFuture = newFuture[T]("chronos.noCancel(T)", + {FutureFlag.OwnCancelSchedule}) + template completeFuture() = + if future.completed(): + when T is void: + retFuture.complete() + else: + retFuture.complete(future.value) + elif future.failed(): + retFuture.fail(future.error) + else: + raiseAssert("Unexpected future state [" & $future.state & "]") + + proc continuation(udata: pointer) {.gcsafe.} = + completeFuture() + + if future.finished(): + completeFuture() + else: + future.addCallback(continuation) + retFuture proc allFutures*(futs: varargs[FutureBase]): Future[void] = ## Returns a future which will complete only when all futures in ``futs`` @@ -836,7 +931,7 @@ proc allFutures*(futs: varargs[FutureBase]): Future[void] = if len(nfuts) == 0 or len(nfuts) == finishedFutures: retFuture.complete() - return retFuture + retFuture proc allFutures*[T](futs: varargs[Future[T]]): Future[void] = ## Returns a future which will complete only when all futures in ``futs`` diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index c6d69fd..fecec39 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -11,7 +11,7 @@ {.push raises: [].} from nativesockets import Port -import std/[tables, strutils, heapqueue, deques] +import std/[tables, heapqueue, deques] import stew/results import "."/[config, futures, osdefs, oserrno, osutils, timer] @@ -179,10 +179,11 @@ type timers*: HeapQueue[TimerCallback] callbacks*: Deque[AsyncCallback] idlers*: Deque[AsyncCallback] + ticks*: Deque[AsyncCallback] trackers*: Table[string, TrackerBase] counters*: Table[string, TrackerCounter] -proc sentinelCallbackImpl(arg: pointer) {.gcsafe.} = +proc sentinelCallbackImpl(arg: pointer) {.gcsafe, noreturn.} = raiseAssert "Sentinel callback MUST not be scheduled" const @@ -254,6 +255,10 @@ template processIdlers(loop: untyped) = if len(loop.idlers) > 0: loop.callbacks.addLast(loop.idlers.popFirst()) +template processTicks(loop: untyped) = + while len(loop.ticks) > 0: + loop.callbacks.addLast(loop.ticks.popFirst()) + template processCallbacks(loop: untyped) = while true: let callable = loop.callbacks.popFirst() # len must be > 0 due to sentinel @@ -417,6 +422,7 @@ when defined(windows): timers: initHeapQueue[TimerCallback](), callbacks: initDeque[AsyncCallback](64), idlers: initDeque[AsyncCallback](), + ticks: initDeque[AsyncCallback](), trackers: initTable[string, TrackerBase](), counters: initTable[string, TrackerCounter]() ) @@ -746,6 +752,9 @@ when defined(windows): if networkEventsCount == 0: loop.processIdlers() + # We move tick callbacks to `loop.callbacks` always. + processTicks(loop) + # All callbacks which will be added during `processCallbacks` will be # scheduled after the sentinel and are processed on next `poll()` call. loop.callbacks.addLast(SentinelCallback) @@ -1138,6 +1147,9 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or if count == 0: loop.processIdlers() + # We move tick callbacks to `loop.callbacks` always. + processTicks(loop) + # All callbacks which will be added during `processCallbacks` will be # scheduled after the sentinel and are processed on next `poll()` call. loop.callbacks.addLast(SentinelCallback) @@ -1255,6 +1267,20 @@ proc callIdle*(cbproc: CallbackFunc, data: pointer) = proc callIdle*(cbproc: CallbackFunc) = callIdle(cbproc, nil) +proc internalCallTick*(acb: AsyncCallback) = + ## Schedule ``cbproc`` to be called after all scheduled callbacks, but only + ## when OS system queue finished processing events. + getThreadDispatcher().ticks.addLast(acb) + +proc internalCallTick*(cbproc: CallbackFunc, data: pointer) = + ## Schedule ``cbproc`` to be called after all scheduled callbacks when + ## OS system queue processing is done. + doAssert(not isNil(cbproc)) + internalCallTick(AsyncCallback(function: cbproc, udata: data)) + +proc internalCallTick*(cbproc: CallbackFunc) = + internalCallTick(AsyncCallback(function: cbproc, udata: nil)) + include asyncfutures2 when (chronosEventEngine in ["epoll", "kqueue"]) or defined(windows): @@ -1322,30 +1348,24 @@ proc stepsAsync*(number: int): Future[void] = ## ## This primitive can be useful when you need to create more deterministic ## tests and cases. - ## - ## WARNING! Do not use this primitive to perform switch between tasks, because - ## this can lead to 100% CPU load in the moments when there are no I/O - ## events. Usually when there no I/O events CPU consumption should be near 0%. - var retFuture = newFuture[void]("chronos.stepsAsync(int)") - var counter = 0 + doAssert(number > 0, "Number should be positive integer") + var + retFuture = newFuture[void]("chronos.stepsAsync(int)") + counter = 0 + continuation: proc(data: pointer) {.gcsafe, raises: [].} - var continuation: proc(data: pointer) {.gcsafe, raises: [].} continuation = proc(data: pointer) {.gcsafe, raises: [].} = if not(retFuture.finished()): inc(counter) if counter < number: - callSoon(continuation, nil) + internalCallTick(continuation) else: retFuture.complete() - proc cancellation(udata: pointer) = - discard - if number <= 0: retFuture.complete() else: - retFuture.cancelCallback = cancellation - callSoon(continuation, nil) + internalCallTick(continuation) retFuture @@ -1374,37 +1394,46 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] = ## If ``fut`` completes first the returned future will hold true, ## otherwise, if ``timeout`` milliseconds has elapsed first, the returned ## future will hold false. - var retFuture = newFuture[bool]("chronos.`withTimeout`") - var moment: Moment - var timer: TimerCallback - var cancelling = false + var + retFuture = newFuture[bool]("chronos.withTimeout", + {FutureFlag.OwnCancelSchedule}) + moment: Moment + timer: TimerCallback + timeouted = false + + template completeFuture(fut: untyped): untyped = + if fut.failed() or fut.completed(): + retFuture.complete(true) + else: + retFuture.cancelAndSchedule() # TODO: raises annotation shouldn't be needed, but likely similar issue as # https://github.com/nim-lang/Nim/issues/17369 proc continuation(udata: pointer) {.gcsafe, raises: [].} = if not(retFuture.finished()): - if not(cancelling): - if not(fut.finished()): - # Timer exceeded first, we going to cancel `fut` and wait until it - # not completes. - cancelling = true - fut.cancel() - else: - # Future `fut` completed/failed/cancelled first. - if not(isNil(timer)): - clearTimer(timer) - retFuture.complete(true) - else: + if timeouted: retFuture.complete(false) + return + if not(fut.finished()): + # Timer exceeded first, we going to cancel `fut` and wait until it + # not completes. + timeouted = true + fut.cancelSoon() + else: + # Future `fut` completed/failed/cancelled first. + if not(isNil(timer)): + clearTimer(timer) + fut.completeFuture() # TODO: raises annotation shouldn't be needed, but likely similar issue as # https://github.com/nim-lang/Nim/issues/17369 proc cancellation(udata: pointer) {.gcsafe, raises: [].} = - if not isNil(timer): - clearTimer(timer) if not(fut.finished()): - fut.removeCallback(continuation) - fut.cancel() + if not isNil(timer): + clearTimer(timer) + fut.cancelSoon() + else: + fut.completeFuture() if fut.finished(): retFuture.complete(true) @@ -1420,11 +1449,11 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] = timer = setTimer(moment, continuation, nil) fut.addCallback(continuation) - return retFuture + retFuture proc withTimeout*[T](fut: Future[T], timeout: int): Future[bool] {. inline, deprecated: "Use withTimeout(Future[T], Duration)".} = - result = withTimeout(fut, timeout.milliseconds()) + withTimeout(fut, timeout.milliseconds()) proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = ## Returns a future which will complete once future ``fut`` completes @@ -1435,49 +1464,49 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = ## ## TODO: In case when ``fut`` got cancelled, what result Future[T] ## should return, because it can't be cancelled too. - var retFuture = newFuture[T]("chronos.wait()") - var moment: Moment - var timer: TimerCallback - var cancelling = false + var + retFuture = newFuture[T]("chronos.wait()", {FutureFlag.OwnCancelSchedule}) + moment: Moment + timer: TimerCallback + timeouted = false - proc continuation(udata: pointer) {.raises: [].} = - if not(retFuture.finished()): - if not(cancelling): - if not(fut.finished()): - # Timer exceeded first. - cancelling = true - fut.cancel() - else: - # Future `fut` completed/failed/cancelled first. - if not isNil(timer): - clearTimer(timer) - - if fut.failed(): - retFuture.fail(fut.error) - else: - when T is void: - retFuture.complete() - else: - retFuture.complete(fut.value) - else: - retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!")) - - var cancellation: proc(udata: pointer) {.gcsafe, raises: [].} - cancellation = proc(udata: pointer) {.gcsafe, raises: [].} = - if not isNil(timer): - clearTimer(timer) - if not(fut.finished()): - fut.removeCallback(continuation) - fut.cancel() - - if fut.finished(): + template completeFuture(fut: untyped): untyped = if fut.failed(): retFuture.fail(fut.error) + elif fut.cancelled(): + retFuture.cancelAndSchedule() else: when T is void: retFuture.complete() else: retFuture.complete(fut.value) + + proc continuation(udata: pointer) {.raises: [].} = + if not(retFuture.finished()): + if timeouted: + retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!")) + return + if not(fut.finished()): + # Timer exceeded first. + timeouted = true + fut.cancelSoon() + else: + # Future `fut` completed/failed/cancelled first. + if not(isNil(timer)): + clearTimer(timer) + fut.completeFuture() + + var cancellation: proc(udata: pointer) {.gcsafe, raises: [].} + cancellation = proc(udata: pointer) {.gcsafe, raises: [].} = + if not(fut.finished()): + if not(isNil(timer)): + clearTimer(timer) + fut.cancelSoon() + else: + fut.completeFuture() + + if fut.finished(): + fut.completeFuture() else: if timeout.isZero(): retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!")) @@ -1490,7 +1519,7 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = timer = setTimer(moment, continuation, nil) fut.addCallback(continuation) - return retFuture + retFuture proc wait*[T](fut: Future[T], timeout = -1): Future[T] {. inline, deprecated: "Use wait(Future[T], Duration)".} = diff --git a/chronos/asyncmacro2.nim b/chronos/asyncmacro2.nim index 8e74073..a86147c 100644 --- a/chronos/asyncmacro2.nim +++ b/chronos/asyncmacro2.nim @@ -301,11 +301,6 @@ template await*[T](f: Future[T]): untyped = # transformation - `yield` gives control back to `futureContinue` which is # responsible for resuming execution once the yielded future is finished yield chronosInternalRetFuture.internalChild - - # `child` is guaranteed to have been `finished` after the yield - if chronosInternalRetFuture.internalMustCancel: - raise newCancelledError() - # `child` released by `futureContinue` chronosInternalRetFuture.internalChild.internalCheckComplete() when T isnot void: @@ -317,8 +312,6 @@ template awaitne*[T](f: Future[T]): Future[T] = when declared(chronosInternalRetFuture): chronosInternalRetFuture.internalChild = f yield chronosInternalRetFuture.internalChild - if chronosInternalRetFuture.internalMustCancel: - raise newCancelledError() cast[type(f)](chronosInternalRetFuture.internalChild) else: unsupported "awaitne is only available within {.async.}" diff --git a/chronos/asyncproc.nim b/chronos/asyncproc.nim index 8df8e33..3e2df88 100644 --- a/chronos/asyncproc.nim +++ b/chronos/asyncproc.nim @@ -1241,7 +1241,7 @@ proc closeWait*(p: AsyncProcessRef) {.async.} = # Here we ignore all possible errrors, because we do not want to raise # exceptions. discard closeProcessHandles(p.pipes, p.options, OSErrorCode(0)) - await p.pipes.closeProcessStreams(p.options) + await noCancel(p.pipes.closeProcessStreams(p.options)) discard p.closeThreadAndProcessHandle() untrackCounter(AsyncProcessTrackerName) diff --git a/chronos/asyncsync.nim b/chronos/asyncsync.nim index 5309846..0feb51e 100644 --- a/chronos/asyncsync.nim +++ b/chronos/asyncsync.nim @@ -736,13 +736,19 @@ proc close*(ab: AsyncEventQueue) {.raises: [].} = ab.queue.clear() proc closeWait*(ab: AsyncEventQueue): Future[void] {.raises: [].} = - var retFuture = newFuture[void]("AsyncEventQueue.closeWait()") + let retFuture = newFuture[void]("AsyncEventQueue.closeWait()", + {FutureFlag.OwnCancelSchedule}) proc continuation(udata: pointer) {.gcsafe.} = - if not(retFuture.finished()): - retFuture.complete() + retFuture.complete() + proc cancellation(udata: pointer) {.gcsafe.} = + # We are not going to change the state of `retFuture` to cancelled, so we + # will prevent the entire sequence of Futures from being cancelled. + discard + ab.close() # Schedule `continuation` to be called only after all the `reader` # notifications will be scheduled and processed. + retFuture.cancelCallback = cancellation callSoon(continuation) retFuture diff --git a/chronos/futures.nim b/chronos/futures.nim index 9b2667b..5f96867 100644 --- a/chronos/futures.nim +++ b/chronos/futures.nim @@ -37,6 +37,11 @@ type FutureState* {.pure.} = enum Pending, Completed, Cancelled, Failed + FutureFlag* {.pure.} = enum + OwnCancelSchedule + + FutureFlags* = set[FutureFlag] + InternalFutureBase* = object of RootObj # Internal untyped future representation - the fields are not part of the # public API and neither is `InternalFutureBase`, ie the inheritance @@ -47,8 +52,8 @@ type internalCancelcb*: CallbackFunc internalChild*: FutureBase internalState*: FutureState + internalFlags*: FutureFlags internalError*: ref CatchableError ## Stored exception - internalMustCancel*: bool internalClosure*: iterator(f: FutureBase): FutureBase {.closureIter.} when chronosFutureId: @@ -94,12 +99,11 @@ when chronosFutureTracking: var futureList* {.threadvar.}: FutureList # Internal utilities - these are not part of the stable API -proc internalInitFutureBase*( - fut: FutureBase, - loc: ptr SrcLoc, - state: FutureState) = +proc internalInitFutureBase*(fut: FutureBase, loc: ptr SrcLoc, + state: FutureState, flags: FutureFlags) = fut.internalState = state fut.internalLocation[LocationKind.Create] = loc + fut.internalFlags = flags if state != FutureState.Pending: fut.internalLocation[LocationKind.Finish] = loc @@ -128,21 +132,34 @@ template init*[T](F: type Future[T], fromProc: static[string] = ""): Future[T] = ## Specifying ``fromProc``, which is a string specifying the name of the proc ## that this future belongs to, is a good habit as it helps with debugging. let res = Future[T]() - internalInitFutureBase(res, getSrcLocation(fromProc), FutureState.Pending) + internalInitFutureBase(res, getSrcLocation(fromProc), FutureState.Pending, {}) + res + +template init*[T](F: type Future[T], fromProc: static[string] = "", + flags: static[FutureFlags]): Future[T] = + ## Creates a new pending future. + ## + ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## that this future belongs to, is a good habit as it helps with debugging. + let res = Future[T]() + internalInitFutureBase(res, getSrcLocation(fromProc), FutureState.Pending, + flags) res template completed*( F: type Future, fromProc: static[string] = ""): Future[void] = ## Create a new completed future - let res = Future[T]() - internalInitFutureBase(res, getSrcLocation(fromProc), FutureState.Completed) + let res = Future[void]() + internalInitFutureBase(res, getSrcLocation(fromProc), FutureState.Completed, + {}) res template completed*[T: not void]( F: type Future, valueParam: T, fromProc: static[string] = ""): Future[T] = ## Create a new completed future let res = Future[T](internalValue: valueParam) - internalInitFutureBase(res, getSrcLocation(fromProc), FutureState.Completed) + internalInitFutureBase(res, getSrcLocation(fromProc), FutureState.Completed, + {}) res template failed*[T]( @@ -150,19 +167,21 @@ template failed*[T]( fromProc: static[string] = ""): Future[T] = ## Create a new failed future let res = Future[T](internalError: errorParam) - internalInitFutureBase(res, getSrcLocation(fromProc), FutureState.Failed) + internalInitFutureBase(res, getSrcLocation(fromProc), FutureState.Failed, {}) when chronosStackTrace: res.internalErrorStackTrace = if getStackTrace(res.error) == "": getStackTrace() else: getStackTrace(res.error) - res func state*(future: FutureBase): FutureState = future.internalState +func flags*(future: FutureBase): FutureFlags = + future.internalFlags + func finished*(future: FutureBase): bool {.inline.} = ## Determines whether ``future`` has finished, i.e. ``future`` state changed ## from state ``Pending`` to one of the states (``Finished``, ``Cancelled``, diff --git a/chronos/ratelimit.nim b/chronos/ratelimit.nim index 4147db7..ad66c06 100644 --- a/chronos/ratelimit.nim +++ b/chronos/ratelimit.nim @@ -88,8 +88,8 @@ proc worker(bucket: TokenBucket) {.async.} = #buckets sleeper = sleepAsync(milliseconds(timeToTarget)) await sleeper or eventWaiter - sleeper.cancel() - eventWaiter.cancel() + sleeper.cancelSoon() + eventWaiter.cancelSoon() else: await eventWaiter diff --git a/chronos/streams/asyncstream.nim b/chronos/streams/asyncstream.nim index 191b36a..4698e83 100644 --- a/chronos/streams/asyncstream.nim +++ b/chronos/streams/asyncstream.nim @@ -913,7 +913,7 @@ proc close*(rw: AsyncStreamRW) = callSoon(continuation) else: rw.future.addCallback(continuation) - rw.future.cancel() + rw.future.cancelSoon() elif rw is AsyncStreamWriter: if isNil(rw.wsource) or isNil(rw.writerLoop) or isNil(rw.future): callSoon(continuation) @@ -922,12 +922,36 @@ proc close*(rw: AsyncStreamRW) = callSoon(continuation) else: rw.future.addCallback(continuation) - rw.future.cancel() + rw.future.cancelSoon() proc closeWait*(rw: AsyncStreamRW): Future[void] = ## Close and frees resources of stream ``rw``. + const FutureName = + when rw is AsyncStreamReader: + "async.stream.reader.closeWait" + else: + "async.stream.writer.closeWait" + + if rw.closed(): + return Future.completed(FutureName) + + let retFuture = newFuture[void](FutureName, {FutureFlag.OwnCancelSchedule}) + + proc continuation(udata: pointer) {.gcsafe, raises:[].} = + retFuture.complete() + + proc cancellation(udata: pointer) {.gcsafe, raises:[].} = + # We are not going to change the state of `retFuture` to cancelled, so we + # will prevent the entire sequence of Futures from being cancelled. + discard + rw.close() - rw.join() + if rw.future.finished(): + retFuture.complete() + else: + rw.future.addCallback(continuation, cast[pointer](retFuture)) + retFuture.cancelCallback = cancellation + retFuture proc startReader(rstream: AsyncStreamReader) = rstream.state = Running diff --git a/chronos/streams/tlsstream.nim b/chronos/streams/tlsstream.nim index 2999f7a..6432a10 100644 --- a/chronos/streams/tlsstream.nim +++ b/chronos/streams/tlsstream.nim @@ -267,19 +267,15 @@ template readAndReset(fut: untyped) = break proc cancelAndWait*(a, b, c, d: Future[TLSResult]): Future[void] = - var waiting: seq[Future[TLSResult]] + var waiting: seq[FutureBase] if not(isNil(a)) and not(a.finished()): - a.cancel() - waiting.add(a) + waiting.add(a.cancelAndWait()) if not(isNil(b)) and not(b.finished()): - b.cancel() - waiting.add(b) + waiting.add(b.cancelAndWait()) if not(isNil(c)) and not(c.finished()): - c.cancel() - waiting.add(c) + waiting.add(c.cancelAndWait()) if not(isNil(d)) and not(d.finished()): - d.cancel() - waiting.add(d) + waiting.add(d.cancelAndWait()) allFutures(waiting) proc dumpState*(state: cuint): string = @@ -432,7 +428,7 @@ proc tlsLoop*(stream: TLSAsyncStream) {.async.} = proc tlsWriteLoop(stream: AsyncStreamWriter) {.async.} = var wstream = TLSStreamWriter(stream) wstream.state = AsyncStreamState.Running - await stepsAsync(1) + await sleepAsync(0.milliseconds) if isNil(wstream.stream.mainLoop): wstream.stream.mainLoop = tlsLoop(wstream.stream) await wstream.stream.mainLoop @@ -440,7 +436,7 @@ proc tlsWriteLoop(stream: AsyncStreamWriter) {.async.} = proc tlsReadLoop(stream: AsyncStreamReader) {.async.} = var rstream = TLSStreamReader(stream) rstream.state = AsyncStreamState.Running - await stepsAsync(1) + await sleepAsync(0.milliseconds) if isNil(rstream.stream.mainLoop): rstream.stream.mainLoop = tlsLoop(rstream.stream) await rstream.stream.mainLoop diff --git a/chronos/transports/datagram.nim b/chronos/transports/datagram.nim index 665bc0e..af29c2a 100644 --- a/chronos/transports/datagram.nim +++ b/chronos/transports/datagram.nim @@ -690,8 +690,28 @@ proc join*(transp: DatagramTransport): Future[void] = proc closeWait*(transp: DatagramTransport): Future[void] = ## Close transport ``transp`` and release all resources. + const FutureName = "datagram.transport.closeWait" + + if {ReadClosed, WriteClosed} * transp.state != {}: + return Future.completed(FutureName) + + let retFuture = newFuture[void](FutureName, {FutureFlag.OwnCancelSchedule}) + + proc continuation(udata: pointer) {.gcsafe.} = + retFuture.complete() + + proc cancellation(udata: pointer) {.gcsafe.} = + # We are not going to change the state of `retFuture` to cancelled, so we + # will prevent the entire sequence of Futures from being cancelled. + discard + transp.close() - transp.join() + if transp.future.finished(): + retFuture.complete() + else: + transp.future.addCallback(continuation, cast[pointer](retFuture)) + retFuture.cancelCallback = cancellation + retFuture proc send*(transp: DatagramTransport, pbytes: pointer, nbytes: int): Future[void] = diff --git a/chronos/transports/stream.nim b/chronos/transports/stream.nim index 44a39b2..f96650c 100644 --- a/chronos/transports/stream.nim +++ b/chronos/transports/stream.nim @@ -2588,15 +2588,34 @@ proc close*(transp: StreamTransport) = proc closeWait*(transp: StreamTransport): Future[void] = ## Close and frees resources of transport ``transp``. + const FutureName = "stream.transport.closeWait" + + if {ReadClosed, WriteClosed} * transp.state != {}: + return Future.completed(FutureName) + + let retFuture = newFuture[void](FutureName, {FutureFlag.OwnCancelSchedule}) + + proc continuation(udata: pointer) {.gcsafe.} = + retFuture.complete() + + proc cancellation(udata: pointer) {.gcsafe.} = + # We are not going to change the state of `retFuture` to cancelled, so we + # will prevent the entire sequence of Futures from being cancelled. + discard + transp.close() - transp.join() + if transp.future.finished(): + retFuture.complete() + else: + transp.future.addCallback(continuation, cast[pointer](retFuture)) + retFuture.cancelCallback = cancellation + retFuture proc shutdownWait*(transp: StreamTransport): Future[void] = ## Perform graceful shutdown of TCP connection backed by transport ``transp``. doAssert(transp.kind == TransportKind.Socket) let retFuture = newFuture[void]("stream.transport.shutdown") transp.checkClosed(retFuture) - transp.checkWriteEof(retFuture) when defined(windows): let loop = getThreadDispatcher() @@ -2636,7 +2655,14 @@ proc shutdownWait*(transp: StreamTransport): Future[void] = let res = osdefs.shutdown(SocketHandle(transp.fd), SHUT_WR) if res < 0: let err = osLastError() - retFuture.fail(getTransportOsError(err)) + case err + of ENOTCONN: + # The specified socket is not connected, it means that our initial + # goal is already happened. + transp.state.incl({WriteEof}) + callSoon(continuation, nil) + else: + retFuture.fail(getTransportOsError(err)) else: transp.state.incl({WriteEof}) callSoon(continuation, nil) diff --git a/tests/testbugs.nim b/tests/testbugs.nim index cf18a13..1f2a932 100644 --- a/tests/testbugs.nim +++ b/tests/testbugs.nim @@ -14,7 +14,7 @@ suite "Asynchronous issues test suite": const HELLO_PORT = 45679 const TEST_MSG = "testmsg" const MSG_LEN = TEST_MSG.len() - const TestsCount = 500 + const TestsCount = 100 type CustomData = ref object diff --git a/tests/testfut.nim b/tests/testfut.nim index a9fba05..bc61594 100644 --- a/tests/testfut.nim +++ b/tests/testfut.nim @@ -6,10 +6,15 @@ # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) import unittest2 +import stew/results import ../chronos, ../chronos/unittest2/asynctests {.used.} +type + TestFooConnection* = ref object + id*: int + suite "Future[T] behavior test suite": proc testFuture1(): Future[int] {.async.} = await sleepAsync(0.milliseconds) @@ -960,7 +965,7 @@ suite "Future[T] behavior test suite": let discarded {.used.} = await fut1 check res - asyncTest "cancel() async procedure test": + asyncTest "tryCancel() async procedure test": var completed = 0 proc client1() {.async.} = @@ -980,7 +985,7 @@ suite "Future[T] behavior test suite": inc(completed) var fut = client4() - fut.cancel() + discard fut.tryCancel() # Future must not be cancelled immediately, because it has many nested # futures. @@ -1031,7 +1036,7 @@ suite "Future[T] behavior test suite": var fut1 = client2() var fut2 = client2() - fut1.cancel() + discard fut1.tryCancel() await fut1 await cancelAndWait(fut2) check: @@ -1054,17 +1059,17 @@ suite "Future[T] behavior test suite": if not(retFuture.finished()): retFuture.complete() - proc cancel(udata: pointer) {.gcsafe.} = + proc cancellation(udata: pointer) {.gcsafe.} = inc(cancelled) if not(retFuture.finished()): removeTimer(moment, completion, cast[pointer](retFuture)) - retFuture.cancelCallback = cancel + retFuture.cancelCallback = cancellation discard setTimer(moment, completion, cast[pointer](retFuture)) return retFuture var fut = client1(100.milliseconds) - fut.cancel() + discard fut.tryCancel() await sleepAsync(500.milliseconds) check: fut.cancelled() @@ -1112,8 +1117,8 @@ suite "Future[T] behavior test suite": neverFlag3 = true res.addCallback(continuation) res.cancelCallback = cancellation - result = res neverFlag1 = true + res proc withTimeoutProc() {.async.} = try: @@ -1149,12 +1154,12 @@ suite "Future[T] behavior test suite": someFut = newFuture[void]() var raceFut3 = raceProc() - someFut.cancel() + discard someFut.tryCancel() await cancelAndWait(raceFut3) check: - raceFut1.state == FutureState.Cancelled - raceFut2.state == FutureState.Cancelled + raceFut1.state == FutureState.Completed + raceFut2.state == FutureState.Failed raceFut3.state == FutureState.Cancelled asyncTest "asyncSpawn() test": @@ -1255,12 +1260,12 @@ suite "Future[T] behavior test suite": (loc.procedure == procedure) check: - chk(loc10, "testfut.nim", 1221, "macroFuture") - chk(loc11, "testfut.nim", 1222, "") - chk(loc20, "testfut.nim", 1234, "template") - chk(loc21, "testfut.nim", 1237, "") - chk(loc30, "testfut.nim", 1231, "procedure") - chk(loc31, "testfut.nim", 1238, "") + chk(loc10, "testfut.nim", 1226, "macroFuture") + chk(loc11, "testfut.nim", 1227, "") + chk(loc20, "testfut.nim", 1239, "template") + chk(loc21, "testfut.nim", 1242, "") + chk(loc30, "testfut.nim", 1236, "procedure") + chk(loc31, "testfut.nim", 1243, "") asyncTest "withTimeout(fut) should wait cancellation test": proc futureNeverEnds(): Future[void] = @@ -1535,3 +1540,462 @@ suite "Future[T] behavior test suite": check: v1_u == 0'u v2_u + 1'u == 0'u + + asyncTest "wait() cancellation undefined behavior test #1": + proc testInnerFoo(fooFut: Future[void]): Future[TestFooConnection] {. + async.} = + await fooFut + return TestFooConnection() + + proc testFoo(fooFut: Future[void]) {.async.} = + let connection = + try: + let res = await testInnerFoo(fooFut).wait(10.seconds) + Result[TestFooConnection, int].ok(res) + except CancelledError: + Result[TestFooConnection, int].err(0) + except CatchableError: + Result[TestFooConnection, int].err(1) + check connection.isOk() + + var future = newFuture[void]("last.child.future") + var someFut = testFoo(future) + future.complete() + discard someFut.tryCancel() + await someFut + + asyncTest "wait() cancellation undefined behavior test #2": + proc testInnerFoo(fooFut: Future[void]): Future[TestFooConnection] {. + async.} = + await fooFut + return TestFooConnection() + + proc testMiddleFoo(fooFut: Future[void]): Future[TestFooConnection] {. + async.} = + await testInnerFoo(fooFut) + + proc testFoo(fooFut: Future[void]) {.async.} = + let connection = + try: + let res = await testMiddleFoo(fooFut).wait(10.seconds) + Result[TestFooConnection, int].ok(res) + except CancelledError: + Result[TestFooConnection, int].err(0) + except CatchableError: + Result[TestFooConnection, int].err(1) + check connection.isOk() + + var future = newFuture[void]("last.child.future") + var someFut = testFoo(future) + future.complete() + discard someFut.tryCancel() + await someFut + + asyncTest "withTimeout() cancellation undefined behavior test #1": + proc testInnerFoo(fooFut: Future[void]): Future[TestFooConnection] {. + async.} = + await fooFut + return TestFooConnection() + + proc testFoo(fooFut: Future[void]) {.async.} = + let connection = + try: + let + checkFut = testInnerFoo(fooFut) + res = await withTimeout(checkFut, 10.seconds) + if res: + Result[TestFooConnection, int].ok(checkFut.value) + else: + Result[TestFooConnection, int].err(0) + except CancelledError: + Result[TestFooConnection, int].err(1) + except CatchableError: + Result[TestFooConnection, int].err(2) + check connection.isOk() + + var future = newFuture[void]("last.child.future") + var someFut = testFoo(future) + future.complete() + discard someFut.tryCancel() + await someFut + + asyncTest "withTimeout() cancellation undefined behavior test #2": + proc testInnerFoo(fooFut: Future[void]): Future[TestFooConnection] {. + async.} = + await fooFut + return TestFooConnection() + + proc testMiddleFoo(fooFut: Future[void]): Future[TestFooConnection] {. + async.} = + await testInnerFoo(fooFut) + + proc testFoo(fooFut: Future[void]) {.async.} = + let connection = + try: + let + checkFut = testMiddleFoo(fooFut) + res = await withTimeout(checkFut, 10.seconds) + if res: + Result[TestFooConnection, int].ok(checkFut.value) + else: + Result[TestFooConnection, int].err(0) + except CancelledError: + Result[TestFooConnection, int].err(1) + except CatchableError: + Result[TestFooConnection, int].err(2) + check connection.isOk() + + var future = newFuture[void]("last.child.future") + var someFut = testFoo(future) + future.complete() + discard someFut.tryCancel() + await someFut + + asyncTest "Cancellation behavior test": + proc testInnerFoo(fooFut: Future[void]) {.async.} = + await fooFut + + proc testMiddleFoo(fooFut: Future[void]) {.async.} = + await testInnerFoo(fooFut) + + proc testOuterFoo(fooFut: Future[void]) {.async.} = + await testMiddleFoo(fooFut) + + block: + # Cancellation of pending Future + let future = newFuture[void]("last.child.pending.future") + await cancelAndWait(future) + check: + future.cancelled() == true + + block: + # Cancellation of completed Future + let future = newFuture[void]("last.child.completed.future") + future.complete() + await cancelAndWait(future) + check: + future.cancelled() == false + future.completed() == true + + block: + # Cancellation of failed Future + let future = newFuture[void]("last.child.failed.future") + future.fail(newException(ValueError, "ABCD")) + await cancelAndWait(future) + check: + future.cancelled() == false + future.failed() == true + + block: + # Cancellation of already cancelled Future + let future = newFuture[void]("last.child.cancelled.future") + future.cancelAndSchedule() + await cancelAndWait(future) + check: + future.cancelled() == true + + block: + # Cancellation of Pending->Pending->Pending->Pending sequence + let future = newFuture[void]("last.child.pending.future") + let testFut = testOuterFoo(future) + await cancelAndWait(testFut) + check: + testFut.cancelled() == true + + block: + # Cancellation of Pending->Pending->Pending->Completed sequence + let future = newFuture[void]("last.child.completed.future") + let testFut = testOuterFoo(future) + future.complete() + await cancelAndWait(testFut) + check: + testFut.cancelled() == false + testFut.completed() == true + + block: + # Cancellation of Pending->Pending->Pending->Failed sequence + let future = newFuture[void]("last.child.failed.future") + let testFut = testOuterFoo(future) + future.fail(newException(ValueError, "ABCD")) + await cancelAndWait(testFut) + check: + testFut.cancelled() == false + testFut.failed() == true + + block: + # Cancellation of Pending->Pending->Pending->Cancelled sequence + let future = newFuture[void]("last.child.cancelled.future") + let testFut = testOuterFoo(future) + future.cancelAndSchedule() + await cancelAndWait(testFut) + check: + testFut.cancelled() == true + + block: + # Cancellation of pending Future, when automatic scheduling disabled + let future = newFuture[void]("last.child.pending.future", + {FutureFlag.OwnCancelSchedule}) + proc cancellation(udata: pointer) {.gcsafe.} = + discard + future.cancelCallback = cancellation + # Note, future will never be finished in such case, until we manually not + # finish it + let cancelFut = cancelAndWait(future) + await sleepAsync(100.milliseconds) + check: + cancelFut.finished() == false + future.cancelled() == false + # Now we manually changing Future's state, so `cancelAndWait` could + # finish + future.complete() + await cancelFut + check: + cancelFut.finished() == true + future.cancelled() == false + future.finished() == true + + block: + # Cancellation of pending Future, which will fail Future on cancellation, + # when automatic scheduling disabled + let future = newFuture[void]("last.child.completed.future", + {FutureFlag.OwnCancelSchedule}) + proc cancellation(udata: pointer) {.gcsafe.} = + future.complete() + future.cancelCallback = cancellation + # Note, future will never be finished in such case, until we manually not + # finish it + await cancelAndWait(future) + check: + future.cancelled() == false + future.completed() == true + + block: + # Cancellation of pending Future, which will fail Future on cancellation, + # when automatic scheduling disabled + let future = newFuture[void]("last.child.failed.future", + {FutureFlag.OwnCancelSchedule}) + proc cancellation(udata: pointer) {.gcsafe.} = + future.fail(newException(ValueError, "ABCD")) + future.cancelCallback = cancellation + # Note, future will never be finished in such case, until we manually not + # finish it + await cancelAndWait(future) + check: + future.cancelled() == false + future.failed() == true + + block: + # Cancellation of pending Future, which will fail Future on cancellation, + # when automatic scheduling disabled + let future = newFuture[void]("last.child.cancelled.future", + {FutureFlag.OwnCancelSchedule}) + proc cancellation(udata: pointer) {.gcsafe.} = + future.cancelAndSchedule() + future.cancelCallback = cancellation + # Note, future will never be finished in such case, until we manually not + # finish it + await cancelAndWait(future) + check: + future.cancelled() == true + + block: + # Cancellation of pending Pending->Pending->Pending->Pending, when + # automatic scheduling disabled and Future do nothing in cancellation + # callback + let future = newFuture[void]("last.child.pending.future", + {FutureFlag.OwnCancelSchedule}) + proc cancellation(udata: pointer) {.gcsafe.} = + discard + future.cancelCallback = cancellation + # Note, future will never be finished in such case, until we manually not + # finish it + let testFut = testOuterFoo(future) + let cancelFut = cancelAndWait(testFut) + await sleepAsync(100.milliseconds) + check: + cancelFut.finished() == false + testFut.cancelled() == false + future.cancelled() == false + # Now we manually changing Future's state, so `cancelAndWait` could + # finish + future.complete() + await cancelFut + check: + cancelFut.finished() == true + future.cancelled() == false + future.finished() == true + testFut.cancelled() == false + testFut.finished() == true + + block: + # Cancellation of pending Pending->Pending->Pending->Pending, when + # automatic scheduling disabled and Future completes in cancellation + # callback + let future = newFuture[void]("last.child.pending.future", + {FutureFlag.OwnCancelSchedule}) + proc cancellation(udata: pointer) {.gcsafe.} = + future.complete() + future.cancelCallback = cancellation + # Note, future will never be finished in such case, until we manually not + # finish it + let testFut = testOuterFoo(future) + await cancelAndWait(testFut) + await sleepAsync(100.milliseconds) + check: + testFut.cancelled() == false + testFut.finished() == true + future.cancelled() == false + future.finished() == true + + block: + # Cancellation of pending Pending->Pending->Pending->Pending, when + # automatic scheduling disabled and Future fails in cancellation callback + let future = newFuture[void]("last.child.pending.future", + {FutureFlag.OwnCancelSchedule}) + proc cancellation(udata: pointer) {.gcsafe.} = + future.fail(newException(ValueError, "ABCD")) + future.cancelCallback = cancellation + # Note, future will never be finished in such case, until we manually not + # finish it + let testFut = testOuterFoo(future) + await cancelAndWait(testFut) + await sleepAsync(100.milliseconds) + check: + testFut.cancelled() == false + testFut.failed() == true + future.cancelled() == false + future.failed() == true + + block: + # Cancellation of pending Pending->Pending->Pending->Pending, when + # automatic scheduling disabled and Future fails in cancellation callback + let future = newFuture[void]("last.child.pending.future", + {FutureFlag.OwnCancelSchedule}) + proc cancellation(udata: pointer) {.gcsafe.} = + future.cancelAndSchedule() + future.cancelCallback = cancellation + # Note, future will never be finished in such case, until we manually not + # finish it + let testFut = testOuterFoo(future) + await cancelAndWait(testFut) + await sleepAsync(100.milliseconds) + check: + testFut.cancelled() == true + future.cancelled() == true + + test "Issue #334 test": + proc test(): bool = + var testres = "" + + proc a() {.async.} = + try: + await sleepAsync(seconds(1)) + except CatchableError as exc: + testres.add("A") + raise exc + + proc b() {.async.} = + try: + await a() + except CatchableError as exc: + testres.add("B") + raise exc + + proc c() {.async.} = + try: + echo $(await b().withTimeout(seconds(2))) + except CatchableError as exc: + testres.add("C") + raise exc + + let x = c() + x.cancelSoon() + + try: + waitFor x + except CatchableError: + testres.add("D") + + testres.add("E") + + waitFor sleepAsync(milliseconds(100)) + + testres == "ABCDE" + + check test() == true + + asyncTest "cancelAndWait() should be able to cancel test": + proc test1() {.async.} = + await noCancel sleepAsync(100.milliseconds) + await noCancel sleepAsync(100.milliseconds) + await sleepAsync(100.milliseconds) + + proc test2() {.async.} = + await noCancel sleepAsync(100.milliseconds) + await sleepAsync(100.milliseconds) + await noCancel sleepAsync(100.milliseconds) + + proc test3() {.async.} = + await sleepAsync(100.milliseconds) + await noCancel sleepAsync(100.milliseconds) + await noCancel sleepAsync(100.milliseconds) + + proc test4() {.async.} = + while true: + await noCancel sleepAsync(50.milliseconds) + await sleepAsync(0.milliseconds) + + proc test5() {.async.} = + while true: + await sleepAsync(0.milliseconds) + await noCancel sleepAsync(50.milliseconds) + + block: + let future1 = test1() + await cancelAndWait(future1) + let future2 = test1() + await sleepAsync(10.milliseconds) + await cancelAndWait(future2) + check: + future1.cancelled() == true + future2.cancelled() == true + + block: + let future1 = test2() + await cancelAndWait(future1) + let future2 = test2() + await sleepAsync(10.milliseconds) + await cancelAndWait(future2) + check: + future1.cancelled() == true + future2.cancelled() == true + + block: + let future1 = test3() + await cancelAndWait(future1) + let future2 = test3() + await sleepAsync(10.milliseconds) + await cancelAndWait(future2) + check: + future1.cancelled() == true + future2.cancelled() == true + + block: + let future1 = test4() + await cancelAndWait(future1) + let future2 = test4() + await sleepAsync(333.milliseconds) + await cancelAndWait(future2) + check: + future1.cancelled() == true + future2.cancelled() == true + + block: + let future1 = test5() + await cancelAndWait(future1) + let future2 = test5() + await sleepAsync(333.milliseconds) + await cancelAndWait(future2) + check: + future1.cancelled() == true + future2.cancelled() == true diff --git a/tests/testhttpclient.nim b/tests/testhttpclient.nim index 4daaf87..e10892e 100644 --- a/tests/testhttpclient.nim +++ b/tests/testhttpclient.nim @@ -704,6 +704,107 @@ suite "HTTP client testing suite": await server.closeWait() return "redirect-" & $res + proc testSendCancelLeaksTest(secure: bool): Future[bool] {.async.} = + proc process(r: RequestFence): Future[HttpResponseRef] {. + async.} = + return defaultResponse() + + var server = createServer(initTAddress("127.0.0.1:0"), process, secure) + server.start() + let address = server.instance.localAddress() + + let ha = + if secure: + getAddress(address, HttpClientScheme.Secure, "/") + else: + getAddress(address, HttpClientScheme.NonSecure, "/") + + var counter = 0 + while true: + let + session = createSession(secure) + request = HttpClientRequestRef.new(session, ha, MethodGet) + requestFut = request.send() + + if counter > 0: + await stepsAsync(counter) + let exitLoop = + if not(requestFut.finished()): + await cancelAndWait(requestFut) + doAssert(cancelled(requestFut) or completed(requestFut), + "Future should be Cancelled or Completed at this point") + if requestFut.completed(): + let response = await requestFut + await response.closeWait() + + inc(counter) + false + else: + let response = await requestFut + await response.closeWait() + true + + await request.closeWait() + await session.closeWait() + + if exitLoop: + break + + await server.stop() + await server.closeWait() + return true + + proc testOpenCancelLeaksTest(secure: bool): Future[bool] {.async.} = + proc process(r: RequestFence): Future[HttpResponseRef] {. + async.} = + return defaultResponse() + + var server = createServer(initTAddress("127.0.0.1:0"), process, secure) + server.start() + let address = server.instance.localAddress() + + let ha = + if secure: + getAddress(address, HttpClientScheme.Secure, "/") + else: + getAddress(address, HttpClientScheme.NonSecure, "/") + + var counter = 0 + while true: + let + session = createSession(secure) + request = HttpClientRequestRef.new(session, ha, MethodPost) + bodyFut = request.open() + + if counter > 0: + await stepsAsync(counter) + let exitLoop = + if not(bodyFut.finished()): + await cancelAndWait(bodyFut) + doAssert(cancelled(bodyFut) or completed(bodyFut), + "Future should be Cancelled or Completed at this point") + + if bodyFut.completed(): + let bodyWriter = await bodyFut + await bodyWriter.closeWait() + + inc(counter) + false + else: + let bodyWriter = await bodyFut + await bodyWriter.closeWait() + true + + await request.closeWait() + await session.closeWait() + + if exitLoop: + break + + await server.stop() + await server.closeWait() + return true + # proc testBasicAuthorization(): Future[bool] {.async.} = # let session = HttpSessionRef.new({HttpClientFlag.NoVerifyHost}, # maxRedirections = 10) @@ -1243,6 +1344,18 @@ suite "HTTP client testing suite": test "HTTP(S) client maximum redirections test": check waitFor(testRequestRedirectTest(true, 4)) == "redirect-true" + test "HTTP send() cancellation leaks test": + check waitFor(testSendCancelLeaksTest(false)) == true + + test "HTTP(S) send() cancellation leaks test": + check waitFor(testSendCancelLeaksTest(true)) == true + + test "HTTP open() cancellation leaks test": + check waitFor(testOpenCancelLeaksTest(false)) == true + + test "HTTP(S) open() cancellation leaks test": + check waitFor(testOpenCancelLeaksTest(true)) == true + test "HTTPS basic authorization test": skip() # This test disabled because remote service is pretty flaky and fails pretty diff --git a/tests/testhttpserver.nim b/tests/testhttpserver.nim index 0ecc9aa..85aeee5 100644 --- a/tests/testhttpserver.nim +++ b/tests/testhttpserver.nim @@ -1326,32 +1326,31 @@ suite "HTTP server testing suite": server.start() var transp: StreamTransport - try: - transp = await connect(address) - block: - let response = await transp.httpClient2(test[0], 7) - check: - response.data == "TEST_OK" - response.headers.getString("connection") == test[3] - # We do this sleeping here just because we running both server and - # client in single process, so when we received response from server - # it does not mean that connection has been immediately closed - it - # takes some more calls, so we trying to get this calls happens. - await sleepAsync(50.milliseconds) - let connectionStillAvailable = - try: - let response {.used.} = await transp.httpClient2(test[0], 7) - true - except CatchableError: - false - check connectionStillAvailable == test[2] + transp = await connect(address) + block: + let response = await transp.httpClient2(test[0], 7) + check: + response.data == "TEST_OK" + response.headers.getString("connection") == test[3] + # We do this sleeping here just because we running both server and + # client in single process, so when we received response from server + # it does not mean that connection has been immediately closed - it + # takes some more calls, so we trying to get this calls happens. + await sleepAsync(50.milliseconds) + let connectionStillAvailable = + try: + let response {.used.} = await transp.httpClient2(test[0], 7) + true + except CatchableError: + false - finally: - if not(isNil(transp)): - await transp.closeWait() - await server.stop() - await server.closeWait() + check connectionStillAvailable == test[2] + + if not(isNil(transp)): + await transp.closeWait() + await server.stop() + await server.closeWait() asyncTest "HTTP debug tests": const @@ -1400,32 +1399,30 @@ suite "HTTP server testing suite": info.flags == {HttpServerFlags.Http11Pipeline} info.socketFlags == socketFlags - try: - var clientFutures: seq[Future[StreamTransport]] - for i in 0 ..< TestsCount: - clientFutures.add(client(address, TestRequest)) - await allFutures(clientFutures) + var clientFutures: seq[Future[StreamTransport]] + for i in 0 ..< TestsCount: + clientFutures.add(client(address, TestRequest)) + await allFutures(clientFutures) - let connections = server.getConnections() - check len(connections) == TestsCount - let currentTime = Moment.now() - for index, connection in connections.pairs(): - let transp = clientFutures[index].read() - check: - connection.remoteAddress.get() == transp.localAddress() - connection.localAddress.get() == transp.remoteAddress() - connection.connectionType == ConnectionType.NonSecure - connection.connectionState == ConnectionState.Alive - connection.query.get("") == "/httpdebug" - (currentTime - connection.createMoment.get()) != ZeroDuration - (currentTime - connection.acceptMoment) != ZeroDuration - var pending: seq[Future[void]] - for transpFut in clientFutures: - pending.add(closeWait(transpFut.read())) - await allFutures(pending) - finally: - await server.stop() - await server.closeWait() + let connections = server.getConnections() + check len(connections) == TestsCount + let currentTime = Moment.now() + for index, connection in connections.pairs(): + let transp = clientFutures[index].read() + check: + connection.remoteAddress.get() == transp.localAddress() + connection.localAddress.get() == transp.remoteAddress() + connection.connectionType == ConnectionType.NonSecure + connection.connectionState == ConnectionState.Alive + connection.query.get("") == "/httpdebug" + (currentTime - connection.createMoment.get()) != ZeroDuration + (currentTime - connection.acceptMoment) != ZeroDuration + var pending: seq[Future[void]] + for transpFut in clientFutures: + pending.add(closeWait(transpFut.read())) + await allFutures(pending) + await server.stop() + await server.closeWait() test "Leaks test": checkLeaks() diff --git a/tests/testratelimit.nim b/tests/testratelimit.nim index bf281ee..d284928 100644 --- a/tests/testratelimit.nim +++ b/tests/testratelimit.nim @@ -49,7 +49,7 @@ suite "Token Bucket": # Consume 10* the budget cap let beforeStart = Moment.now() waitFor(bucket.consume(1000).wait(5.seconds)) - check Moment.now() - beforeStart in 900.milliseconds .. 1500.milliseconds + check Moment.now() - beforeStart in 900.milliseconds .. 2200.milliseconds test "Sync manual replenish": var bucket = TokenBucket.new(1000, 0.seconds) @@ -96,7 +96,7 @@ suite "Token Bucket": futBlocker.finished == false fut2.finished == false - futBlocker.cancel() + futBlocker.cancelSoon() waitFor(fut2.wait(10.milliseconds)) test "Very long replenish": @@ -117,9 +117,14 @@ suite "Token Bucket": check bucket.tryConsume(1, fakeNow) == true test "Short replenish": - var bucket = TokenBucket.new(15000, 1.milliseconds) - let start = Moment.now() - check bucket.tryConsume(15000, start) - check bucket.tryConsume(1, start) == false + skip() + # TODO (cheatfate): This test was disabled, because it continuosly fails in + # Github Actions Windows x64 CI when using Nim 1.6.14 version. + # Unable to reproduce failure locally. - check bucket.tryConsume(15000, start + 1.milliseconds) == true + # var bucket = TokenBucket.new(15000, 1.milliseconds) + # let start = Moment.now() + # check bucket.tryConsume(15000, start) + # check bucket.tryConsume(1, start) == false + + # check bucket.tryConsume(15000, start + 1.milliseconds) == true diff --git a/tests/testsoon.nim b/tests/testsoon.nim index 88072c2..41a6e4e 100644 --- a/tests/testsoon.nim +++ b/tests/testsoon.nim @@ -11,75 +11,83 @@ import ../chronos {.used.} suite "callSoon() tests suite": - const CallSoonTests = 10 - var soonTest1 = 0'u - var timeoutsTest1 = 0 - var timeoutsTest2 = 0 - var soonTest2 = 0 - - proc callback1(udata: pointer) {.gcsafe.} = - soonTest1 = soonTest1 xor cast[uint](udata) - - proc test1(): uint = - callSoon(callback1, cast[pointer](0x12345678'u)) - callSoon(callback1, cast[pointer](0x23456789'u)) - callSoon(callback1, cast[pointer](0x3456789A'u)) - callSoon(callback1, cast[pointer](0x456789AB'u)) - callSoon(callback1, cast[pointer](0x56789ABC'u)) - callSoon(callback1, cast[pointer](0x6789ABCD'u)) - callSoon(callback1, cast[pointer](0x789ABCDE'u)) - callSoon(callback1, cast[pointer](0x89ABCDEF'u)) - callSoon(callback1, cast[pointer](0x9ABCDEF1'u)) - callSoon(callback1, cast[pointer](0xABCDEF12'u)) - callSoon(callback1, cast[pointer](0xBCDEF123'u)) - callSoon(callback1, cast[pointer](0xCDEF1234'u)) - callSoon(callback1, cast[pointer](0xDEF12345'u)) - callSoon(callback1, cast[pointer](0xEF123456'u)) - callSoon(callback1, cast[pointer](0xF1234567'u)) - callSoon(callback1, cast[pointer](0x12345678'u)) - ## All callbacks must be processed exactly with 1 poll() call. - poll() - result = soonTest1 - - proc testProc() {.async.} = - for i in 1..CallSoonTests: - await sleepAsync(100.milliseconds) - timeoutsTest1 += 1 - - var callbackproc: proc(udata: pointer) {.gcsafe, raises: [].} - callbackproc = proc (udata: pointer) {.gcsafe, raises: [].} = - timeoutsTest2 += 1 - {.gcsafe.}: - callSoon(callbackproc) - - proc test2(timers, callbacks: var int) = - callSoon(callbackproc) - waitFor(testProc()) - timers = timeoutsTest1 - callbacks = timeoutsTest2 - - proc testCallback(udata: pointer) = - soonTest2 = 987654321 - - proc test3(): bool = - callSoon(testCallback) - poll() - result = soonTest2 == 987654321 - test "User-defined callback argument test": - var values = [0x12345678'u, 0x23456789'u, 0x3456789A'u, 0x456789AB'u, - 0x56789ABC'u, 0x6789ABCD'u, 0x789ABCDE'u, 0x89ABCDEF'u, - 0x9ABCDEF1'u, 0xABCDEF12'u, 0xBCDEF123'u, 0xCDEF1234'u, - 0xDEF12345'u, 0xEF123456'u, 0xF1234567'u, 0x12345678'u] - var expect = 0'u - for item in values: - expect = expect xor item - check test1() == expect + proc test(): bool = + var soonTest = 0'u + + proc callback(udata: pointer) {.gcsafe.} = + soonTest = soonTest xor cast[uint](udata) + + callSoon(callback, cast[pointer](0x12345678'u)) + callSoon(callback, cast[pointer](0x23456789'u)) + callSoon(callback, cast[pointer](0x3456789A'u)) + callSoon(callback, cast[pointer](0x456789AB'u)) + callSoon(callback, cast[pointer](0x56789ABC'u)) + callSoon(callback, cast[pointer](0x6789ABCD'u)) + callSoon(callback, cast[pointer](0x789ABCDE'u)) + callSoon(callback, cast[pointer](0x89ABCDEF'u)) + callSoon(callback, cast[pointer](0x9ABCDEF1'u)) + callSoon(callback, cast[pointer](0xABCDEF12'u)) + callSoon(callback, cast[pointer](0xBCDEF123'u)) + callSoon(callback, cast[pointer](0xCDEF1234'u)) + callSoon(callback, cast[pointer](0xDEF12345'u)) + callSoon(callback, cast[pointer](0xEF123456'u)) + callSoon(callback, cast[pointer](0xF1234567'u)) + callSoon(callback, cast[pointer](0x12345678'u)) + ## All callbacks must be processed exactly with 1 poll() call. + poll() + + var values = [0x12345678'u, 0x23456789'u, 0x3456789A'u, 0x456789AB'u, + 0x56789ABC'u, 0x6789ABCD'u, 0x789ABCDE'u, 0x89ABCDEF'u, + 0x9ABCDEF1'u, 0xABCDEF12'u, 0xBCDEF123'u, 0xCDEF1234'u, + 0xDEF12345'u, 0xEF123456'u, 0xF1234567'u, 0x12345678'u] + var expect = 0'u + for item in values: + expect = expect xor item + + soonTest == expect + + check test() == true + test "`Asynchronous dead end` #7193 test": - var timers, callbacks: int - test2(timers, callbacks) - check: - timers == CallSoonTests - callbacks > CallSoonTests * 2 + const CallSoonTests = 5 + proc test() = + var + timeoutsTest1 = 0 + timeoutsTest2 = 0 + stopFlag = false + + var callbackproc: proc(udata: pointer) {.gcsafe, raises: [].} + callbackproc = proc (udata: pointer) {.gcsafe, raises: [].} = + timeoutsTest2 += 1 + if not(stopFlag): + callSoon(callbackproc) + + proc testProc() {.async.} = + for i in 1 .. CallSoonTests: + await sleepAsync(10.milliseconds) + timeoutsTest1 += 1 + + callSoon(callbackproc) + waitFor(testProc()) + stopFlag = true + poll() + + check: + timeoutsTest1 == CallSoonTests + timeoutsTest2 > CallSoonTests * 2 + + test() + test "`callSoon() is not working prior getGlobalDispatcher()` #7192 test": - check test3() == true + proc test(): bool = + var soonTest = 0 + + proc testCallback(udata: pointer) = + soonTest = 987654321 + + callSoon(testCallback) + poll() + soonTest == 987654321 + + check test() == true diff --git a/tests/teststream.nim b/tests/teststream.nim index 9e1ce55..762e996 100644 --- a/tests/teststream.nim +++ b/tests/teststream.nim @@ -1271,15 +1271,23 @@ suite "Stream Transport test suite": server2.start() server3.start() - # It works cause even though there's an active listening socket bound to dst3, we are using ReusePort - var transp1 = await connect(server1.local, localAddress = server3.local, flags={SocketFlags.ReusePort}) - var transp2 = await connect(server2.local, localAddress = server3.local, flags={SocketFlags.ReusePort}) + # It works cause even though there's an active listening socket bound to + # dst3, we are using ReusePort + var transp1 = await connect( + server1.localAddress(), localAddress = server3.localAddress(), + flags = {SocketFlags.ReusePort}) + var transp2 = await connect( + server2.localAddress(), localAddress = server3.localAddress(), + flags = {SocketFlags.ReusePort}) expect(TransportOsError): - var transp2 {.used.} = await connect(server2.local, localAddress = server3.local) + var transp2 {.used.} = await connect( + server2.localAddress(), localAddress = server3.localAddress()) expect(TransportOsError): - var transp3 {.used.} = await connect(server2.local, localAddress = initTAddress("::", server3.local.port)) + var transp3 {.used.} = await connect( + server2.localAddress(), + localAddress = initTAddress("::", server3.localAddress().port)) await transp1.closeWait() await transp2.closeWait() @@ -1293,6 +1301,77 @@ suite "Stream Transport test suite": server3.stop() await server3.closeWait() + proc testConnectCancelLeaksTest() {.async.} = + proc client(server: StreamServer, transp: StreamTransport) {.async.} = + await transp.closeWait() + + let + server = createStreamServer(initTAddress("127.0.0.1:0"), client) + address = server.localAddress() + + var counter = 0 + while true: + let transpFut = connect(address) + if counter > 0: + await stepsAsync(counter) + if not(transpFut.finished()): + await cancelAndWait(transpFut) + doAssert(cancelled(transpFut), + "Future should be Cancelled at this point") + inc(counter) + else: + let transp = await transpFut + await transp.closeWait() + break + server.stop() + await server.closeWait() + + proc testAcceptCancelLeaksTest() {.async.} = + var + counter = 0 + exitLoop = false + + # This timer will help to awake events poll in case its going to stuck + # usually happens on MacOS. + let sleepFut = sleepAsync(1.seconds) + + while not(exitLoop): + let + server = createStreamServer(initTAddress("127.0.0.1:0")) + address = server.localAddress() + + let + transpFut = connect(address) + acceptFut = server.accept() + + if counter > 0: + await stepsAsync(counter) + + exitLoop = + if not(acceptFut.finished()): + await cancelAndWait(acceptFut) + doAssert(cancelled(acceptFut), + "Future should be Cancelled at this point") + inc(counter) + false + else: + let transp = await acceptFut + await transp.closeWait() + true + + if not(transpFut.finished()): + await transpFut.cancelAndWait() + + if transpFut.completed(): + let transp = transpFut.value + await transp.closeWait() + + server.stop() + await server.closeWait() + + if not(sleepFut.finished()): + await cancelAndWait(sleepFut) + markFD = getCurrentFD() for i in 0..