From 2d85229dce6a2c0229d5c1985c6dce211ed9e8ee Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Thu, 4 Apr 2024 00:30:01 +0300 Subject: [PATCH] Add `join()` operation to wait for future completion. (#525) * Add `join()` operation to wait for future completion without cancelling it when `join()` got cancelled. * Start using join() operation. --- chronos/apps/http/httpserver.nim | 18 +---- chronos/internal/asyncfutures.nim | 33 ++++++++++ chronos/streams/asyncstream.nim | 19 +----- chronos/transports/datagram.nim | 16 +---- chronos/transports/stream.nim | 15 +---- tests/testfut.nim | 106 ++++++++++++++++++++++++++++++ 6 files changed, 143 insertions(+), 64 deletions(-) diff --git a/chronos/apps/http/httpserver.nim b/chronos/apps/http/httpserver.nim index c1e52793..1adb8fc8 100644 --- a/chronos/apps/http/httpserver.nim +++ b/chronos/apps/http/httpserver.nim @@ -1187,23 +1187,7 @@ proc closeWait*(server: HttpServerRef) {.async: (raises: []).} = proc join*(server: HttpServerRef): Future[void] {. async: (raw: true, raises: [CancelledError]).} = ## Wait until HTTP server will not be closed. - var retFuture = newFuture[void]("http.server.join") - - proc continuation(udata: pointer) {.gcsafe.} = - if not(retFuture.finished()): - retFuture.complete() - - proc cancellation(udata: pointer) {.gcsafe.} = - if not(retFuture.finished()): - server.lifetime.removeCallback(continuation, cast[pointer](retFuture)) - - if server.state == ServerClosed: - retFuture.complete() - else: - server.lifetime.addCallback(continuation, cast[pointer](retFuture)) - retFuture.cancelCallback = cancellation - - retFuture + server.lifetime.join() proc getMultipartReader*(req: HttpRequestRef): HttpResult[MultiPartReaderRef] = ## Create new MultiPartReader interface for specific request. diff --git a/chronos/internal/asyncfutures.nim b/chronos/internal/asyncfutures.nim index 49c6acd7..7f93b0e1 100644 --- a/chronos/internal/asyncfutures.nim +++ b/chronos/internal/asyncfutures.nim @@ -1607,6 +1607,39 @@ proc wait*[T](fut: Future[T], timeout = -1): Future[T] {. else: wait(fut, timeout.milliseconds()) +proc join*(future: FutureBase): Future[void] {. + async: (raw: true, raises: [CancelledError]).} = + ## Returns a future which will complete once future ``future`` completes. + ## + ## This primitive helps to carefully monitor ``future`` state, in case of + ## cancellation ``join`` operation it will not going to cancel ``future``. + ## + ## If ``future`` is already completed - ``join`` will return completed + ## future immediately. + let retFuture = newFuture[void]("chronos.join()") + + proc continuation(udata: pointer) {.gcsafe.} = + retFuture.complete() + + proc cancellation(udata: pointer) {.gcsafe.} = + future.removeCallback(continuation, cast[pointer](retFuture)) + + if not(future.finished()): + future.addCallback(continuation, cast[pointer](retFuture)) + retFuture.cancelCallback = cancellation + else: + retFuture.complete() + + retFuture + +proc join*(future: SomeFuture): Future[void] {. + async: (raw: true, raises: [CancelledError]).} = + ## Returns a future which will complete once future ``future`` completes. + ## + ## This primitive helps to carefully monitor ``future`` state, in case of + ## cancellation ``join`` operation it will not going to cancel ``future``. + join(FutureBase(future)) + when defined(windows): import ../osdefs diff --git a/chronos/streams/asyncstream.nim b/chronos/streams/asyncstream.nim index 473cc38b..bb878dbc 100644 --- a/chronos/streams/asyncstream.nim +++ b/chronos/streams/asyncstream.nim @@ -836,24 +836,7 @@ proc join*(rw: AsyncStreamRW): Future[void] {. async: (raw: true, raises: [CancelledError]).} = ## Get Future[void] which will be completed when stream become finished or ## closed. - when rw is AsyncStreamReader: - var retFuture = newFuture[void]("async.stream.reader.join") - else: - var retFuture = newFuture[void]("async.stream.writer.join") - - proc continuation(udata: pointer) {.gcsafe, raises:[].} = - retFuture.complete() - - proc cancellation(udata: pointer) {.gcsafe, raises:[].} = - rw.future.removeCallback(continuation, cast[pointer](retFuture)) - - if not(rw.future.finished()): - rw.future.addCallback(continuation, cast[pointer](retFuture)) - retFuture.cancelCallback = cancellation - else: - retFuture.complete() - - return retFuture + rw.future.join() proc close*(rw: AsyncStreamRW) = ## Close and frees resources of stream ``rw``. diff --git a/chronos/transports/datagram.nim b/chronos/transports/datagram.nim index d6391219..7f471424 100644 --- a/chronos/transports/datagram.nim +++ b/chronos/transports/datagram.nim @@ -827,21 +827,7 @@ proc newDatagramTransport6*[T](cbproc: UnsafeDatagramCallback, proc join*(transp: DatagramTransport): Future[void] {. async: (raw: true, raises: [CancelledError]).} = ## Wait until the transport ``transp`` will be closed. - let retFuture = newFuture[void]("datagram.transport.join") - - proc continuation(udata: pointer) {.gcsafe.} = - retFuture.complete() - - proc cancel(udata: pointer) {.gcsafe.} = - transp.future.removeCallback(continuation, cast[pointer](retFuture)) - - if not(transp.future.finished()): - transp.future.addCallback(continuation, cast[pointer](retFuture)) - retFuture.cancelCallback = cancel - else: - retFuture.complete() - - return retFuture + transp.future.join() proc closed*(transp: DatagramTransport): bool {.inline.} = ## Returns ``true`` if transport in closed state. diff --git a/chronos/transports/stream.nim b/chronos/transports/stream.nim index b81a512d..7b5925b7 100644 --- a/chronos/transports/stream.nim +++ b/chronos/transports/stream.nim @@ -1780,20 +1780,7 @@ proc stop*(server: StreamServer) {.raises: [TransportOsError].} = proc join*(server: StreamServer): Future[void] {. async: (raw: true, raises: [CancelledError]).} = ## Waits until ``server`` is not closed. - var retFuture = newFuture[void]("stream.transport.server.join") - - proc continuation(udata: pointer) = - retFuture.complete() - - proc cancel(udata: pointer) = - server.loopFuture.removeCallback(continuation, cast[pointer](retFuture)) - - if not(server.loopFuture.finished()): - server.loopFuture.addCallback(continuation, cast[pointer](retFuture)) - retFuture.cancelCallback = cancel - else: - retFuture.complete() - return retFuture + server.loopFuture.join() proc connect*(address: TransportAddress, bufferSize = DefaultStreamBufferSize, diff --git a/tests/testfut.nim b/tests/testfut.nim index c2231f12..1cf0aed5 100644 --- a/tests/testfut.nim +++ b/tests/testfut.nim @@ -2048,6 +2048,112 @@ suite "Future[T] behavior test suite": future1.cancelled() == true future2.cancelled() == true + asyncTest "join() test": + proc joinFoo0(future: FutureBase) {.async.} = + await join(future) + + proc joinFoo1(future: Future[void]) {.async.} = + await join(future) + + proc joinFoo2(future: Future[void]) {. + async: (raises: [CancelledError]).} = + await join(future) + + let + future0 = newFuture[void]() + future1 = newFuture[void]() + future2 = Future[void].Raising([CancelledError]).init() + + let + resfut0 = joinFoo0(future0) + resfut1 = joinFoo1(future1) + resfut2 = joinFoo2(future2) + + check: + resfut0.finished() == false + resfut1.finished() == false + resfut2.finished() == false + + future0.complete() + future1.complete() + future2.complete() + + let res = + try: + await noCancel allFutures(resfut0, resfut1, resfut2).wait(1.seconds) + true + except AsyncTimeoutError: + false + + check: + res == true + resfut0.finished() == true + resfut1.finished() == true + resfut2.finished() == true + future0.finished() == true + future1.finished() == true + future2.finished() == true + + asyncTest "join() cancellation test": + proc joinFoo0(future: FutureBase) {.async.} = + await join(future) + + proc joinFoo1(future: Future[void]) {.async.} = + await join(future) + + proc joinFoo2(future: Future[void]) {. + async: (raises: [CancelledError]).} = + await join(future) + + let + future0 = newFuture[void]() + future1 = newFuture[void]() + future2 = Future[void].Raising([CancelledError]).init() + + let + resfut0 = joinFoo0(future0) + resfut1 = joinFoo1(future1) + resfut2 = joinFoo2(future2) + + check: + resfut0.finished() == false + resfut1.finished() == false + resfut2.finished() == false + + let + cancelfut0 = cancelAndWait(resfut0) + cancelfut1 = cancelAndWait(resfut1) + cancelfut2 = cancelAndWait(resfut2) + + let res = + try: + await noCancel allFutures(cancelfut0, cancelfut1, + cancelfut2).wait(1.seconds) + true + except AsyncTimeoutError: + false + + check: + res == true + cancelfut0.finished() == true + cancelfut1.finished() == true + cancelfut2.finished() == true + resfut0.cancelled() == true + resfut1.cancelled() == true + resfut2.cancelled() == true + future0.finished() == false + future1.finished() == false + future2.finished() == false + + future0.complete() + future1.complete() + future2.complete() + + check: + future0.finished() == true + future1.finished() == true + future2.finished() == true + test "Sink with literals": # https://github.com/nim-lang/Nim/issues/22175 let fut = newFuture[string]()