Add `join()` operation to wait for future completion without cancelling it when `join()` got cancelled.
This commit is contained in:
parent
d5bc90fef2
commit
aab6e8085e
|
@ -1607,6 +1607,39 @@ proc wait*[T](fut: Future[T], timeout = -1): Future[T] {.
|
|||
else:
|
||||
wait(fut, timeout.milliseconds())
|
||||
|
||||
proc join*(future: FutureBase): Future[void] {.
|
||||
async: (raw: true, raises: [CancelledError]).} =
|
||||
## Returns a future which will complete once future ``future`` completes.
|
||||
##
|
||||
## This primitive helps to carefully monitor ``future`` state, in case of
|
||||
## cancellation ``join`` operation it will not going to cancel ``future``.
|
||||
##
|
||||
## If ``future`` is already completed - ``join`` will return completed
|
||||
## future immediately.
|
||||
let retFuture = newFuture[void]("chronos.join()")
|
||||
|
||||
proc continuation(udata: pointer) {.gcsafe.} =
|
||||
retFuture.complete()
|
||||
|
||||
proc cancellation(udata: pointer) {.gcsafe.} =
|
||||
future.removeCallback(continuation, cast[pointer](retFuture))
|
||||
|
||||
if not(future.finished()):
|
||||
future.addCallback(continuation, cast[pointer](retFuture))
|
||||
retFuture.cancelCallback = cancellation
|
||||
else:
|
||||
retFuture.complete()
|
||||
|
||||
retFuture
|
||||
|
||||
proc join*(future: SomeFuture): Future[void] {.
|
||||
async: (raw: true, raises: [CancelledError]).} =
|
||||
## Returns a future which will complete once future ``future`` completes.
|
||||
##
|
||||
## This primitive helps to carefully monitor ``future`` state, in case of
|
||||
## cancellation ``join`` operation it will not going to cancel ``future``.
|
||||
join(FutureBase(future))
|
||||
|
||||
when defined(windows):
|
||||
import ../osdefs
|
||||
|
||||
|
|
|
@ -2047,6 +2047,113 @@ suite "Future[T] behavior test suite":
|
|||
check:
|
||||
future1.cancelled() == true
|
||||
future2.cancelled() == true
|
||||
|
||||
asyncTest "join() test":
|
||||
proc joinFoo0(future: FutureBase) {.async.} =
|
||||
await join(future)
|
||||
|
||||
proc joinFoo1(future: Future[void]) {.async.} =
|
||||
await join(future)
|
||||
|
||||
proc joinFoo2(future: Future[void]) {.
|
||||
async: (raises: [CancelledError]).} =
|
||||
await join(future)
|
||||
|
||||
let
|
||||
future0 = newFuture[void]()
|
||||
future1 = newFuture[void]()
|
||||
future2 = Future[void].Raising([CancelledError]).init()
|
||||
|
||||
let
|
||||
resfut0 = joinFoo0(future0)
|
||||
resfut1 = joinFoo1(future1)
|
||||
resfut2 = joinFoo2(future2)
|
||||
|
||||
check:
|
||||
resfut0.finished() == false
|
||||
resfut1.finished() == false
|
||||
resfut2.finished() == false
|
||||
|
||||
future0.complete()
|
||||
future1.complete()
|
||||
future2.complete()
|
||||
|
||||
let res =
|
||||
try:
|
||||
await noCancel allFutures(resfut0, resfut1, resfut2).wait(1.seconds)
|
||||
true
|
||||
except AsyncTimeoutError:
|
||||
false
|
||||
|
||||
check:
|
||||
res == true
|
||||
resfut0.finished() == true
|
||||
resfut1.finished() == true
|
||||
resfut2.finished() == true
|
||||
future0.finished() == true
|
||||
future1.finished() == true
|
||||
future2.finished() == true
|
||||
|
||||
asyncTest "join() cancellation test":
|
||||
proc joinFoo0(future: FutureBase) {.async.} =
|
||||
await join(future)
|
||||
|
||||
proc joinFoo1(future: Future[void]) {.async.} =
|
||||
await join(future)
|
||||
|
||||
proc joinFoo2(future: Future[void]) {.
|
||||
async: (raises: [CancelledError]).} =
|
||||
await join(future)
|
||||
|
||||
let
|
||||
future0 = newFuture[void]()
|
||||
future1 = newFuture[void]()
|
||||
future2 = Future[void].Raising([CancelledError]).init()
|
||||
|
||||
let
|
||||
resfut0 = joinFoo0(future0)
|
||||
resfut1 = joinFoo1(future1)
|
||||
resfut2 = joinFoo2(future2)
|
||||
|
||||
check:
|
||||
resfut0.finished() == false
|
||||
resfut1.finished() == false
|
||||
resfut2.finished() == false
|
||||
|
||||
let
|
||||
cancelfut0 = cancelAndWait(resfut0)
|
||||
cancelfut1 = cancelAndWait(resfut1)
|
||||
cancelfut2 = cancelAndWait(resfut2)
|
||||
|
||||
let res =
|
||||
try:
|
||||
await noCancel allFutures(cancelfut0, cancelfut1,
|
||||
cancelfut2).wait(1.seconds)
|
||||
true
|
||||
except AsyncTimeoutError:
|
||||
false
|
||||
|
||||
check:
|
||||
res == true
|
||||
cancelfut0.finished() == true
|
||||
cancelfut1.finished() == true
|
||||
cancelfut2.finished() == true
|
||||
resfut0.cancelled() == true
|
||||
resfut1.cancelled() == true
|
||||
resfut2.cancelled() == true
|
||||
future0.finished() == false
|
||||
future1.finished() == false
|
||||
future2.finished() == false
|
||||
|
||||
future0.complete()
|
||||
future1.complete()
|
||||
future2.complete()
|
||||
|
||||
check:
|
||||
future0.finished() == true
|
||||
future1.finished() == true
|
||||
future2.finished() == true
|
||||
|
||||
test "Sink with literals":
|
||||
# https://github.com/nim-lang/Nim/issues/22175
|
||||
let fut = newFuture[string]()
|
||||
|
|
Loading…
Reference in New Issue