From 0f0ed1d654aa2f2bdd792eb9ab55b227156fa544 Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Sat, 20 Apr 2024 03:49:07 +0300 Subject: [PATCH] Add wait(deadline future) implementation. (#535) * Add waitUntil(deadline) implementation. * Add one more test. * Fix rare race condition and tests for it. * Rename waitUntil() to wait(). --- chronos/internal/asyncfutures.nim | 94 ++++++- tests/testfut.nim | 397 +++++++++++++++++++++++++++++- 2 files changed, 481 insertions(+), 10 deletions(-) diff --git a/chronos/internal/asyncfutures.nim b/chronos/internal/asyncfutures.nim index c3396bf..2b92e74 100644 --- a/chronos/internal/asyncfutures.nim +++ b/chronos/internal/asyncfutures.nim @@ -1529,6 +1529,60 @@ proc withTimeout*[T](fut: Future[T], timeout: int): Future[bool] {. inline, deprecated: "Use withTimeout(Future[T], Duration)".} = withTimeout(fut, timeout.milliseconds()) +proc waitUntilImpl[F: SomeFuture](fut: F, retFuture: auto, + deadline: auto): auto = + var timeouted = false + + template completeFuture(fut: untyped, timeout: bool): untyped = + if fut.failed(): + retFuture.fail(fut.error(), warn = false) + elif fut.cancelled(): + if timeout: + # Its possible that `future` could be cancelled in some other place. In + # such case we can't detect if it was our cancellation due to timeout, + # or some other cancellation. + retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!")) + else: + retFuture.cancelAndSchedule() + else: + when type(fut).T is void: + retFuture.complete() + else: + retFuture.complete(fut.value) + + proc continuation(udata: pointer) {.raises: [].} = + if not(retFuture.finished()): + if timeouted: + # When timeout is exceeded and we cancelled future via cancelSoon(), + # its possible that future at this moment already has value + # and/or error. + fut.completeFuture(timeouted) + return + if not(fut.finished()): + timeouted = true + fut.cancelSoon() + else: + fut.completeFuture(false) + + var cancellation: proc(udata: pointer) {.gcsafe, raises: [].} + cancellation = proc(udata: pointer) {.gcsafe, raises: [].} = + deadline.removeCallback(continuation) + if not(fut.finished()): + fut.cancelSoon() + else: + fut.completeFuture(false) + + if fut.finished(): + fut.completeFuture(false) + else: + if deadline.finished(): + retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!")) + else: + retFuture.cancelCallback = cancellation + fut.addCallback(continuation) + deadline.addCallback(continuation) + retFuture + proc waitImpl[F: SomeFuture](fut: F, retFuture: auto, timeout: Duration): auto = var moment: Moment @@ -1606,7 +1660,8 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = ## TODO: In case when ``fut`` got cancelled, what result Future[T] ## should return, because it can't be cancelled too. var - retFuture = newFuture[T]("chronos.wait()", {FutureFlag.OwnCancelSchedule}) + retFuture = newFuture[T]("chronos.wait(duration)", + {FutureFlag.OwnCancelSchedule}) # We set `OwnCancelSchedule` flag, because we going to cancel `retFuture` # manually at proper time. @@ -1621,6 +1676,28 @@ proc wait*[T](fut: Future[T], timeout = -1): Future[T] {. else: wait(fut, timeout.milliseconds()) +proc wait*[T](fut: Future[T], deadline: SomeFuture): Future[T] = + ## Returns a future which will complete once future ``fut`` completes + ## or if ``deadline`` future completes. + ## + ## If `deadline` future completes before future `fut` - + ## `AsyncTimeoutError` exception will be raised. + ## + ## Note: `deadline` future will not be cancelled and/or failed. + ## + ## Note: While `waitUntil(future)` operation is pending, please avoid any + ## attempts to cancel future `fut`. If it happens `waitUntil()` could + ## introduce undefined behavior - it could raise`CancelledError` or + ## `AsyncTimeoutError`. + ## + ## If you need to cancel `future` - cancel `waitUntil(future)` instead. + var + retFuture = newFuture[T]("chronos.wait(future)", + {FutureFlag.OwnCancelSchedule}) + # We set `OwnCancelSchedule` flag, because we going to cancel `retFuture` + # manually at proper time. + waitUntilImpl(fut, retFuture, deadline) + proc join*(future: FutureBase): Future[void] {. async: (raw: true, raises: [CancelledError]).} = ## Returns a future which will complete once future ``future`` completes. @@ -1783,8 +1860,21 @@ proc wait*(fut: InternalRaisesFuture, timeout = InfiniteDuration): auto = InternalRaisesFutureRaises = E.prepend(CancelledError, AsyncTimeoutError) let - retFuture = newFuture[T]("chronos.wait()", {OwnCancelSchedule}) + retFuture = newFuture[T]("chronos.wait(duration)", {OwnCancelSchedule}) # We set `OwnCancelSchedule` flag, because we going to cancel `retFuture` # manually at proper time. waitImpl(fut, retFuture, timeout) + +proc wait*(fut: InternalRaisesFuture, deadline: InternalRaisesFuture): auto = + type + T = type(fut).T + E = type(fut).E + InternalRaisesFutureRaises = E.prepend(CancelledError, AsyncTimeoutError) + + let + retFuture = newFuture[T]("chronos.wait(future)", {OwnCancelSchedule}) + # We set `OwnCancelSchedule` flag, because we going to cancel `retFuture` + # manually at proper time. + + waitUntilImpl(fut, retFuture, deadline) diff --git a/tests/testfut.nim b/tests/testfut.nim index 46e9c2a..9737439 100644 --- a/tests/testfut.nim +++ b/tests/testfut.nim @@ -83,7 +83,7 @@ suite "Future[T] behavior test suite": fut.finished testResult == "1245" - asyncTest "wait[T]() test": + asyncTest "wait(duration) test": block: ## Test for not immediately completed future and timeout = -1 let res = @@ -146,6 +146,183 @@ suite "Future[T] behavior test suite": false check res + asyncTest "wait(future) test": + block: + ## Test for not immediately completed future and deadline which is not + ## going to be finished + let + deadline = newFuture[void]() + future1 = testFuture1() + let res = + try: + discard await wait(future1, deadline) + true + except CatchableError: + false + check: + deadline.finished() == false + future1.finished() == true + res == true + + await deadline.cancelAndWait() + + check deadline.finished() == true + block: + ## Test for immediately completed future and timeout = -1 + let + deadline = newFuture[void]() + future2 = testFuture2() + let res = + try: + discard await wait(future2, deadline) + true + except CatchableError: + false + check: + deadline.finished() == false + future2.finished() == true + res + + await deadline.cancelAndWait() + + check deadline.finished() == true + block: + ## Test for not immediately completed future and timeout = 0 + let + deadline = newFuture[void]() + future1 = testFuture1() + deadline.complete() + let res = + try: + discard await wait(future1, deadline) + false + except AsyncTimeoutError: + true + except CatchableError: + false + check: + future1.finished() == false + deadline.finished() == true + res + + block: + ## Test for immediately completed future and timeout = 0 + let + deadline = newFuture[void]() + future2 = testFuture2() + deadline.complete() + let (res1, res2) = + try: + let res = await wait(future2, deadline) + (true, res) + except CatchableError: + (false, -1) + check: + future2.finished() == true + deadline.finished() == true + res1 == true + res2 == 1 + + block: + ## Test for future which cannot be completed in timeout period + let + deadline = sleepAsync(50.milliseconds) + future100 = testFuture100() + let res = + try: + discard await wait(future100, deadline) + false + except AsyncTimeoutError: + true + except CatchableError: + false + check: + deadline.finished() == true + res + await future100.cancelAndWait() + check: + future100.finished() == true + + block: + ## Test for future which will be completed before timeout exceeded. + let + deadline = sleepAsync(500.milliseconds) + future100 = testFuture100() + let (res1, res2) = + try: + let res = await wait(future100, deadline) + (true, res) + except CatchableError: + (false, -1) + check: + future100.finished() == true + deadline.finished() == false + res1 == true + res2 == 0 + await deadline.cancelAndWait() + check: + deadline.finished() == true + + asyncTest "wait(future) cancellation behavior test": + proc deepTest3(future: Future[void]) {.async.} = + await future + + proc deepTest2(future: Future[void]) {.async.} = + await deepTest3(future) + + proc deepTest1(future: Future[void]) {.async.} = + await deepTest2(future) + + let + + deadlineFuture = newFuture[void]() + + block: + # Cancellation should affect `testFuture` because it is in pending state. + let monitorFuture = newFuture[void]() + var testFuture = deepTest1(monitorFuture) + let waitFut = wait(testFuture, deadlineFuture) + await cancelAndWait(waitFut) + check: + monitorFuture.cancelled() == true + testFuture.cancelled() == true + waitFut.cancelled() == true + deadlineFuture.finished() == false + + block: + # Cancellation should not affect `testFuture` because it is completed. + let monitorFuture = newFuture[void]() + var testFuture = deepTest1(monitorFuture) + let waitFut = wait(testFuture, deadlineFuture) + monitorFuture.complete() + await cancelAndWait(waitFut) + check: + monitorFuture.completed() == true + monitorFuture.cancelled() == false + testFuture.completed() == true + waitFut.completed() == true + deadlineFuture.finished() == false + + block: + # Cancellation should not affect `testFuture` because it is failed. + let monitorFuture = newFuture[void]() + var testFuture = deepTest1(monitorFuture) + let waitFut = wait(testFuture, deadlineFuture) + monitorFuture.fail(newException(ValueError, "TEST")) + await cancelAndWait(waitFut) + check: + monitorFuture.failed() == true + monitorFuture.cancelled() == false + testFuture.failed() == true + testFuture.cancelled() == false + waitFut.failed() == true + testFuture.cancelled() == false + deadlineFuture.finished() == false + + await cancelAndWait(deadlineFuture) + + check deadlineFuture.finished() == true + asyncTest "Discarded result Future[T] test": var completedFutures = 0 @@ -1082,7 +1259,7 @@ suite "Future[T] behavior test suite": completed == 0 cancelled == 1 - asyncTest "Cancellation wait() test": + asyncTest "Cancellation wait(duration) test": var neverFlag1, neverFlag2, neverFlag3: bool var waitProc1, waitProc2: bool proc neverEndingProc(): Future[void] = @@ -1143,7 +1320,39 @@ suite "Future[T] behavior test suite": fut.state == FutureState.Completed neverFlag1 and neverFlag2 and neverFlag3 and waitProc1 and waitProc2 - asyncTest "Cancellation race test": + asyncTest "Cancellation wait(future) test": + var neverFlag1, neverFlag2, neverFlag3: bool + var waitProc1, waitProc2: bool + proc neverEndingProc(): Future[void] = + var res = newFuture[void]() + proc continuation(udata: pointer) {.gcsafe.} = + neverFlag2 = true + proc cancellation(udata: pointer) {.gcsafe.} = + neverFlag3 = true + res.addCallback(continuation) + res.cancelCallback = cancellation + result = res + neverFlag1 = true + + proc waitProc() {.async.} = + let deadline = sleepAsync(100.milliseconds) + try: + await wait(neverEndingProc(), deadline) + except CancelledError: + waitProc1 = true + except CatchableError: + doAssert(false) + finally: + await cancelAndWait(deadline) + waitProc2 = true + + var fut = waitProc() + await cancelAndWait(fut) + check: + fut.state == FutureState.Completed + neverFlag1 and neverFlag2 and neverFlag3 and waitProc1 and waitProc2 + + asyncTest "Cancellation race() test": var someFut = newFuture[void]() proc raceProc(): Future[void] {.async.} = @@ -1298,7 +1507,7 @@ suite "Future[T] behavior test suite": false check res - asyncTest "wait(fut) should wait cancellation test": + asyncTest "wait(future) should wait cancellation test": proc futureNeverEnds(): Future[void] = newFuture[void]("neverending.future") @@ -1322,6 +1531,29 @@ suite "Future[T] behavior test suite": check res + asyncTest "wait(future) should wait cancellation test": + proc futureNeverEnds(): Future[void] = + newFuture[void]("neverending.future") + + proc futureOneLevelMore() {.async.} = + await futureNeverEnds() + + var fut = futureOneLevelMore() + let res = + try: + await wait(fut, sleepAsync(100.milliseconds)) + false + except AsyncTimeoutError: + # Because `fut` is never-ending Future[T], `wait` should raise + # `AsyncTimeoutError`, but only after `fut` is cancelled. + if fut.cancelled(): + true + else: + false + except CatchableError: + false + check res + test "race(zero) test": var tseq = newSeq[FutureBase]() var fut1 = race(tseq) @@ -1563,7 +1795,7 @@ suite "Future[T] behavior test suite": v1_u == 0'u v2_u + 1'u == 0'u - asyncTest "wait() cancellation undefined behavior test #1": + asyncTest "wait(duration) cancellation undefined behavior test #1": proc testInnerFoo(fooFut: Future[void]): Future[TestFooConnection] {. async.} = await fooFut @@ -1586,7 +1818,7 @@ suite "Future[T] behavior test suite": discard someFut.tryCancel() await someFut - asyncTest "wait() cancellation undefined behavior test #2": + asyncTest "wait(duration) cancellation undefined behavior test #2": proc testInnerFoo(fooFut: Future[void]): Future[TestFooConnection] {. async.} = await fooFut @@ -1613,7 +1845,7 @@ suite "Future[T] behavior test suite": discard someFut.tryCancel() await someFut - asyncTest "wait() should allow cancellation test (depends on race())": + asyncTest "wait(duration) should allow cancellation test (depends on race())": proc testFoo(): Future[bool] {.async.} = let resFut = sleepAsync(2.seconds).wait(3.seconds) @@ -1699,6 +1931,78 @@ suite "Future[T] behavior test suite": check (await testFoo()) == true + asyncTest "wait(future) cancellation undefined behavior test #1": + proc testInnerFoo(fooFut: Future[void]): Future[TestFooConnection] {. + async.} = + await fooFut + return TestFooConnection() + + proc testFoo(fooFut: Future[void]) {.async.} = + let deadline = sleepAsync(10.seconds) + let connection = + try: + let res = await testInnerFoo(fooFut).wait(deadline) + Result[TestFooConnection, int].ok(res) + except CancelledError: + Result[TestFooConnection, int].err(0) + except CatchableError: + Result[TestFooConnection, int].err(1) + finally: + await deadline.cancelAndWait() + + check connection.isOk() + + var future = newFuture[void]("last.child.future") + var someFut = testFoo(future) + future.complete() + discard someFut.tryCancel() + await someFut + + asyncTest "wait(future) cancellation undefined behavior test #2": + proc testInnerFoo(fooFut: Future[void]): Future[TestFooConnection] {. + async.} = + await fooFut + return TestFooConnection() + + proc testMiddleFoo(fooFut: Future[void]): Future[TestFooConnection] {. + async.} = + await testInnerFoo(fooFut) + + proc testFoo(fooFut: Future[void]) {.async.} = + let deadline = sleepAsync(10.seconds) + let connection = + try: + let res = await testMiddleFoo(fooFut).wait(deadline) + Result[TestFooConnection, int].ok(res) + except CancelledError: + Result[TestFooConnection, int].err(0) + except CatchableError: + Result[TestFooConnection, int].err(1) + finally: + await deadline.cancelAndWait() + check connection.isOk() + + var future = newFuture[void]("last.child.future") + var someFut = testFoo(future) + future.complete() + discard someFut.tryCancel() + await someFut + + asyncTest "wait(future) should allow cancellation test (depends on race())": + proc testFoo(): Future[bool] {.async.} = + let + deadline = sleepAsync(3.seconds) + resFut = sleepAsync(2.seconds).wait(deadline) + timeFut = sleepAsync(1.seconds) + cancelFut = cancelAndWait(resFut) + discard await race(cancelFut, timeFut) + await deadline.cancelAndWait() + if cancelFut.finished(): + return (resFut.cancelled() and cancelFut.completed()) + false + + check (await testFoo()) == true + asyncTest "Cancellation behavior test": proc testInnerFoo(fooFut: Future[void]) {.async.} = await fooFut @@ -2178,7 +2482,7 @@ suite "Future[T] behavior test suite": not compiles(Future[void].Raising([42])) not compiles(Future[void].Raising(42)) - asyncTest "Timeout/cancellation race wait() test": + asyncTest "Timeout/cancellation race wait(duration) test": proc raceTest(T: typedesc, itype: int) {.async.} = let monitorFuture = newFuture[T]("monitor", {FutureFlag.OwnCancelSchedule}) @@ -2252,6 +2556,83 @@ suite "Future[T] behavior test suite": await raceTest(int, 1) await raceTest(int, 2) + asyncTest "Timeout/cancellation race wait(future) test": + proc raceTest(T: typedesc, itype: int) {.async.} = + let monitorFuture = newFuture[T]() + + proc raceProc0(future: Future[T]): Future[T] {.async.} = + await future + proc raceProc1(future: Future[T]): Future[T] {.async.} = + await raceProc0(future) + proc raceProc2(future: Future[T]): Future[T] {.async.} = + await raceProc1(future) + + proc continuation(udata: pointer) {.gcsafe.} = + if itype == 0: + when T is void: + monitorFuture.complete() + elif T is int: + monitorFuture.complete(100) + elif itype == 1: + monitorFuture.fail(newException(ValueError, "test")) + else: + monitorFuture.cancelAndSchedule() + + let deadlineFuture = newFuture[void]() + deadlineFuture.addCallback continuation + + let + testFut = raceProc2(monitorFuture) + waitFut = wait(testFut, deadlineFuture) + + deadlineFuture.complete() + + when T is void: + let waitRes = + try: + await waitFut + if itype == 0: + true + else: + false + except CancelledError: + false + except CatchableError: + if itype != 0: + true + else: + false + check waitRes == true + elif T is int: + let waitRes = + try: + let res = await waitFut + if itype == 0: + (true, res) + else: + (false, -1) + except CancelledError: + (false, -1) + except CatchableError: + if itype != 0: + (true, 0) + else: + (false, -1) + if itype == 0: + check: + waitRes[0] == true + waitRes[1] == 100 + else: + check: + waitRes[0] == true + + await raceTest(void, 0) + await raceTest(void, 1) + await raceTest(void, 2) + await raceTest(int, 0) + await raceTest(int, 1) + await raceTest(int, 2) + asyncTest "Timeout/cancellation race withTimeout() test": proc raceTest(T: typedesc, itype: int) {.async.} = let monitorFuture = newFuture[T]("monitor",