mirror of
https://github.com/logos-storage/nim-chronos.git
synced 2026-01-05 23:13:08 +00:00
Add join() operation to wait for future completion. (#525)
* Add `join()` operation to wait for future completion without cancelling it when `join()` got cancelled. * Start using join() operation.
This commit is contained in:
parent
402914f4cf
commit
2d85229dce
@ -1187,23 +1187,7 @@ proc closeWait*(server: HttpServerRef) {.async: (raises: []).} =
|
|||||||
proc join*(server: HttpServerRef): Future[void] {.
|
proc join*(server: HttpServerRef): Future[void] {.
|
||||||
async: (raw: true, raises: [CancelledError]).} =
|
async: (raw: true, raises: [CancelledError]).} =
|
||||||
## Wait until HTTP server will not be closed.
|
## Wait until HTTP server will not be closed.
|
||||||
var retFuture = newFuture[void]("http.server.join")
|
server.lifetime.join()
|
||||||
|
|
||||||
proc continuation(udata: pointer) {.gcsafe.} =
|
|
||||||
if not(retFuture.finished()):
|
|
||||||
retFuture.complete()
|
|
||||||
|
|
||||||
proc cancellation(udata: pointer) {.gcsafe.} =
|
|
||||||
if not(retFuture.finished()):
|
|
||||||
server.lifetime.removeCallback(continuation, cast[pointer](retFuture))
|
|
||||||
|
|
||||||
if server.state == ServerClosed:
|
|
||||||
retFuture.complete()
|
|
||||||
else:
|
|
||||||
server.lifetime.addCallback(continuation, cast[pointer](retFuture))
|
|
||||||
retFuture.cancelCallback = cancellation
|
|
||||||
|
|
||||||
retFuture
|
|
||||||
|
|
||||||
proc getMultipartReader*(req: HttpRequestRef): HttpResult[MultiPartReaderRef] =
|
proc getMultipartReader*(req: HttpRequestRef): HttpResult[MultiPartReaderRef] =
|
||||||
## Create new MultiPartReader interface for specific request.
|
## Create new MultiPartReader interface for specific request.
|
||||||
|
|||||||
@ -1607,6 +1607,39 @@ proc wait*[T](fut: Future[T], timeout = -1): Future[T] {.
|
|||||||
else:
|
else:
|
||||||
wait(fut, timeout.milliseconds())
|
wait(fut, timeout.milliseconds())
|
||||||
|
|
||||||
|
proc join*(future: FutureBase): Future[void] {.
|
||||||
|
async: (raw: true, raises: [CancelledError]).} =
|
||||||
|
## Returns a future which will complete once future ``future`` completes.
|
||||||
|
##
|
||||||
|
## This primitive helps to carefully monitor ``future`` state, in case of
|
||||||
|
## cancellation ``join`` operation it will not going to cancel ``future``.
|
||||||
|
##
|
||||||
|
## If ``future`` is already completed - ``join`` will return completed
|
||||||
|
## future immediately.
|
||||||
|
let retFuture = newFuture[void]("chronos.join()")
|
||||||
|
|
||||||
|
proc continuation(udata: pointer) {.gcsafe.} =
|
||||||
|
retFuture.complete()
|
||||||
|
|
||||||
|
proc cancellation(udata: pointer) {.gcsafe.} =
|
||||||
|
future.removeCallback(continuation, cast[pointer](retFuture))
|
||||||
|
|
||||||
|
if not(future.finished()):
|
||||||
|
future.addCallback(continuation, cast[pointer](retFuture))
|
||||||
|
retFuture.cancelCallback = cancellation
|
||||||
|
else:
|
||||||
|
retFuture.complete()
|
||||||
|
|
||||||
|
retFuture
|
||||||
|
|
||||||
|
proc join*(future: SomeFuture): Future[void] {.
|
||||||
|
async: (raw: true, raises: [CancelledError]).} =
|
||||||
|
## Returns a future which will complete once future ``future`` completes.
|
||||||
|
##
|
||||||
|
## This primitive helps to carefully monitor ``future`` state, in case of
|
||||||
|
## cancellation ``join`` operation it will not going to cancel ``future``.
|
||||||
|
join(FutureBase(future))
|
||||||
|
|
||||||
when defined(windows):
|
when defined(windows):
|
||||||
import ../osdefs
|
import ../osdefs
|
||||||
|
|
||||||
|
|||||||
@ -836,24 +836,7 @@ proc join*(rw: AsyncStreamRW): Future[void] {.
|
|||||||
async: (raw: true, raises: [CancelledError]).} =
|
async: (raw: true, raises: [CancelledError]).} =
|
||||||
## Get Future[void] which will be completed when stream become finished or
|
## Get Future[void] which will be completed when stream become finished or
|
||||||
## closed.
|
## closed.
|
||||||
when rw is AsyncStreamReader:
|
rw.future.join()
|
||||||
var retFuture = newFuture[void]("async.stream.reader.join")
|
|
||||||
else:
|
|
||||||
var retFuture = newFuture[void]("async.stream.writer.join")
|
|
||||||
|
|
||||||
proc continuation(udata: pointer) {.gcsafe, raises:[].} =
|
|
||||||
retFuture.complete()
|
|
||||||
|
|
||||||
proc cancellation(udata: pointer) {.gcsafe, raises:[].} =
|
|
||||||
rw.future.removeCallback(continuation, cast[pointer](retFuture))
|
|
||||||
|
|
||||||
if not(rw.future.finished()):
|
|
||||||
rw.future.addCallback(continuation, cast[pointer](retFuture))
|
|
||||||
retFuture.cancelCallback = cancellation
|
|
||||||
else:
|
|
||||||
retFuture.complete()
|
|
||||||
|
|
||||||
return retFuture
|
|
||||||
|
|
||||||
proc close*(rw: AsyncStreamRW) =
|
proc close*(rw: AsyncStreamRW) =
|
||||||
## Close and frees resources of stream ``rw``.
|
## Close and frees resources of stream ``rw``.
|
||||||
|
|||||||
@ -827,21 +827,7 @@ proc newDatagramTransport6*[T](cbproc: UnsafeDatagramCallback,
|
|||||||
proc join*(transp: DatagramTransport): Future[void] {.
|
proc join*(transp: DatagramTransport): Future[void] {.
|
||||||
async: (raw: true, raises: [CancelledError]).} =
|
async: (raw: true, raises: [CancelledError]).} =
|
||||||
## Wait until the transport ``transp`` will be closed.
|
## Wait until the transport ``transp`` will be closed.
|
||||||
let retFuture = newFuture[void]("datagram.transport.join")
|
transp.future.join()
|
||||||
|
|
||||||
proc continuation(udata: pointer) {.gcsafe.} =
|
|
||||||
retFuture.complete()
|
|
||||||
|
|
||||||
proc cancel(udata: pointer) {.gcsafe.} =
|
|
||||||
transp.future.removeCallback(continuation, cast[pointer](retFuture))
|
|
||||||
|
|
||||||
if not(transp.future.finished()):
|
|
||||||
transp.future.addCallback(continuation, cast[pointer](retFuture))
|
|
||||||
retFuture.cancelCallback = cancel
|
|
||||||
else:
|
|
||||||
retFuture.complete()
|
|
||||||
|
|
||||||
return retFuture
|
|
||||||
|
|
||||||
proc closed*(transp: DatagramTransport): bool {.inline.} =
|
proc closed*(transp: DatagramTransport): bool {.inline.} =
|
||||||
## Returns ``true`` if transport in closed state.
|
## Returns ``true`` if transport in closed state.
|
||||||
|
|||||||
@ -1780,20 +1780,7 @@ proc stop*(server: StreamServer) {.raises: [TransportOsError].} =
|
|||||||
proc join*(server: StreamServer): Future[void] {.
|
proc join*(server: StreamServer): Future[void] {.
|
||||||
async: (raw: true, raises: [CancelledError]).} =
|
async: (raw: true, raises: [CancelledError]).} =
|
||||||
## Waits until ``server`` is not closed.
|
## Waits until ``server`` is not closed.
|
||||||
var retFuture = newFuture[void]("stream.transport.server.join")
|
server.loopFuture.join()
|
||||||
|
|
||||||
proc continuation(udata: pointer) =
|
|
||||||
retFuture.complete()
|
|
||||||
|
|
||||||
proc cancel(udata: pointer) =
|
|
||||||
server.loopFuture.removeCallback(continuation, cast[pointer](retFuture))
|
|
||||||
|
|
||||||
if not(server.loopFuture.finished()):
|
|
||||||
server.loopFuture.addCallback(continuation, cast[pointer](retFuture))
|
|
||||||
retFuture.cancelCallback = cancel
|
|
||||||
else:
|
|
||||||
retFuture.complete()
|
|
||||||
return retFuture
|
|
||||||
|
|
||||||
proc connect*(address: TransportAddress,
|
proc connect*(address: TransportAddress,
|
||||||
bufferSize = DefaultStreamBufferSize,
|
bufferSize = DefaultStreamBufferSize,
|
||||||
|
|||||||
@ -2048,6 +2048,112 @@ suite "Future[T] behavior test suite":
|
|||||||
future1.cancelled() == true
|
future1.cancelled() == true
|
||||||
future2.cancelled() == true
|
future2.cancelled() == true
|
||||||
|
|
||||||
|
asyncTest "join() test":
|
||||||
|
proc joinFoo0(future: FutureBase) {.async.} =
|
||||||
|
await join(future)
|
||||||
|
|
||||||
|
proc joinFoo1(future: Future[void]) {.async.} =
|
||||||
|
await join(future)
|
||||||
|
|
||||||
|
proc joinFoo2(future: Future[void]) {.
|
||||||
|
async: (raises: [CancelledError]).} =
|
||||||
|
await join(future)
|
||||||
|
|
||||||
|
let
|
||||||
|
future0 = newFuture[void]()
|
||||||
|
future1 = newFuture[void]()
|
||||||
|
future2 = Future[void].Raising([CancelledError]).init()
|
||||||
|
|
||||||
|
let
|
||||||
|
resfut0 = joinFoo0(future0)
|
||||||
|
resfut1 = joinFoo1(future1)
|
||||||
|
resfut2 = joinFoo2(future2)
|
||||||
|
|
||||||
|
check:
|
||||||
|
resfut0.finished() == false
|
||||||
|
resfut1.finished() == false
|
||||||
|
resfut2.finished() == false
|
||||||
|
|
||||||
|
future0.complete()
|
||||||
|
future1.complete()
|
||||||
|
future2.complete()
|
||||||
|
|
||||||
|
let res =
|
||||||
|
try:
|
||||||
|
await noCancel allFutures(resfut0, resfut1, resfut2).wait(1.seconds)
|
||||||
|
true
|
||||||
|
except AsyncTimeoutError:
|
||||||
|
false
|
||||||
|
|
||||||
|
check:
|
||||||
|
res == true
|
||||||
|
resfut0.finished() == true
|
||||||
|
resfut1.finished() == true
|
||||||
|
resfut2.finished() == true
|
||||||
|
future0.finished() == true
|
||||||
|
future1.finished() == true
|
||||||
|
future2.finished() == true
|
||||||
|
|
||||||
|
asyncTest "join() cancellation test":
|
||||||
|
proc joinFoo0(future: FutureBase) {.async.} =
|
||||||
|
await join(future)
|
||||||
|
|
||||||
|
proc joinFoo1(future: Future[void]) {.async.} =
|
||||||
|
await join(future)
|
||||||
|
|
||||||
|
proc joinFoo2(future: Future[void]) {.
|
||||||
|
async: (raises: [CancelledError]).} =
|
||||||
|
await join(future)
|
||||||
|
|
||||||
|
let
|
||||||
|
future0 = newFuture[void]()
|
||||||
|
future1 = newFuture[void]()
|
||||||
|
future2 = Future[void].Raising([CancelledError]).init()
|
||||||
|
|
||||||
|
let
|
||||||
|
resfut0 = joinFoo0(future0)
|
||||||
|
resfut1 = joinFoo1(future1)
|
||||||
|
resfut2 = joinFoo2(future2)
|
||||||
|
|
||||||
|
check:
|
||||||
|
resfut0.finished() == false
|
||||||
|
resfut1.finished() == false
|
||||||
|
resfut2.finished() == false
|
||||||
|
|
||||||
|
let
|
||||||
|
cancelfut0 = cancelAndWait(resfut0)
|
||||||
|
cancelfut1 = cancelAndWait(resfut1)
|
||||||
|
cancelfut2 = cancelAndWait(resfut2)
|
||||||
|
|
||||||
|
let res =
|
||||||
|
try:
|
||||||
|
await noCancel allFutures(cancelfut0, cancelfut1,
|
||||||
|
cancelfut2).wait(1.seconds)
|
||||||
|
true
|
||||||
|
except AsyncTimeoutError:
|
||||||
|
false
|
||||||
|
|
||||||
|
check:
|
||||||
|
res == true
|
||||||
|
cancelfut0.finished() == true
|
||||||
|
cancelfut1.finished() == true
|
||||||
|
cancelfut2.finished() == true
|
||||||
|
resfut0.cancelled() == true
|
||||||
|
resfut1.cancelled() == true
|
||||||
|
resfut2.cancelled() == true
|
||||||
|
future0.finished() == false
|
||||||
|
future1.finished() == false
|
||||||
|
future2.finished() == false
|
||||||
|
|
||||||
|
future0.complete()
|
||||||
|
future1.complete()
|
||||||
|
future2.complete()
|
||||||
|
|
||||||
|
check:
|
||||||
|
future0.finished() == true
|
||||||
|
future1.finished() == true
|
||||||
|
future2.finished() == true
|
||||||
|
|
||||||
test "Sink with literals":
|
test "Sink with literals":
|
||||||
# https://github.com/nim-lang/Nim/issues/22175
|
# https://github.com/nim-lang/Nim/issues/22175
|
||||||
let fut = newFuture[string]()
|
let fut = newFuture[string]()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user