Fix rare cancellation race issue on timeout for wait/withTimeout. (#536)
Add tests.
This commit is contained in:
parent
7a3eaffa4f
commit
d184a92227
|
@ -1466,18 +1466,25 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] {.
|
|||
timer: TimerCallback
|
||||
timeouted = false
|
||||
|
||||
template completeFuture(fut: untyped): untyped =
|
||||
template completeFuture(fut: untyped, timeout: bool): untyped =
|
||||
if fut.failed() or fut.completed():
|
||||
retFuture.complete(true)
|
||||
else:
|
||||
retFuture.cancelAndSchedule()
|
||||
if timeout:
|
||||
retFuture.complete(false)
|
||||
else:
|
||||
retFuture.cancelAndSchedule()
|
||||
|
||||
# TODO: raises annotation shouldn't be needed, but likely similar issue as
|
||||
# https://github.com/nim-lang/Nim/issues/17369
|
||||
proc continuation(udata: pointer) {.gcsafe, raises: [].} =
|
||||
if not(retFuture.finished()):
|
||||
if timeouted:
|
||||
retFuture.complete(false)
|
||||
# We should not unconditionally complete result future with `false`.
|
||||
# Initiated by timeout handler cancellation could fail, in this case
|
||||
# we could get `fut` in complete or in failed state, so we should
|
||||
# complete result future with `true` instead of `false` here.
|
||||
fut.completeFuture(timeouted)
|
||||
return
|
||||
if not(fut.finished()):
|
||||
# Timer exceeded first, we going to cancel `fut` and wait until it
|
||||
|
@ -1488,7 +1495,7 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] {.
|
|||
# Future `fut` completed/failed/cancelled first.
|
||||
if not(isNil(timer)):
|
||||
clearTimer(timer)
|
||||
fut.completeFuture()
|
||||
fut.completeFuture(false)
|
||||
timer = nil
|
||||
|
||||
# TODO: raises annotation shouldn't be needed, but likely similar issue as
|
||||
|
@ -1499,7 +1506,7 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] {.
|
|||
clearTimer(timer)
|
||||
fut.cancelSoon()
|
||||
else:
|
||||
fut.completeFuture()
|
||||
fut.completeFuture(false)
|
||||
timer = nil
|
||||
|
||||
if fut.finished():
|
||||
|
@ -1528,11 +1535,14 @@ proc waitImpl[F: SomeFuture](fut: F, retFuture: auto, timeout: Duration): auto =
|
|||
timer: TimerCallback
|
||||
timeouted = false
|
||||
|
||||
template completeFuture(fut: untyped): untyped =
|
||||
template completeFuture(fut: untyped, timeout: bool): untyped =
|
||||
if fut.failed():
|
||||
retFuture.fail(fut.error(), warn = false)
|
||||
elif fut.cancelled():
|
||||
retFuture.cancelAndSchedule()
|
||||
if timeout:
|
||||
retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!"))
|
||||
else:
|
||||
retFuture.cancelAndSchedule()
|
||||
else:
|
||||
when type(fut).T is void:
|
||||
retFuture.complete()
|
||||
|
@ -1542,7 +1552,11 @@ proc waitImpl[F: SomeFuture](fut: F, retFuture: auto, timeout: Duration): auto =
|
|||
proc continuation(udata: pointer) {.raises: [].} =
|
||||
if not(retFuture.finished()):
|
||||
if timeouted:
|
||||
retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!"))
|
||||
# We should not unconditionally fail `retFuture` with
|
||||
# `AsyncTimeoutError`. Initiated by timeout handler cancellation
|
||||
# could fail, in this case we could get `fut` in complete or in failed
|
||||
# state, so we should return error/value instead of `AsyncTimeoutError`.
|
||||
fut.completeFuture(timeouted)
|
||||
return
|
||||
if not(fut.finished()):
|
||||
# Timer exceeded first.
|
||||
|
@ -1552,7 +1566,7 @@ proc waitImpl[F: SomeFuture](fut: F, retFuture: auto, timeout: Duration): auto =
|
|||
# Future `fut` completed/failed/cancelled first.
|
||||
if not(isNil(timer)):
|
||||
clearTimer(timer)
|
||||
fut.completeFuture()
|
||||
fut.completeFuture(false)
|
||||
timer = nil
|
||||
|
||||
var cancellation: proc(udata: pointer) {.gcsafe, raises: [].}
|
||||
|
@ -1562,12 +1576,12 @@ proc waitImpl[F: SomeFuture](fut: F, retFuture: auto, timeout: Duration): auto =
|
|||
clearTimer(timer)
|
||||
fut.cancelSoon()
|
||||
else:
|
||||
fut.completeFuture()
|
||||
fut.completeFuture(false)
|
||||
|
||||
timer = nil
|
||||
|
||||
if fut.finished():
|
||||
fut.completeFuture()
|
||||
fut.completeFuture(false)
|
||||
else:
|
||||
if timeout.isZero():
|
||||
retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!"))
|
||||
|
|
|
@ -2177,3 +2177,141 @@ suite "Future[T] behavior test suite":
|
|||
check:
|
||||
not compiles(Future[void].Raising([42]))
|
||||
not compiles(Future[void].Raising(42))
|
||||
|
||||
asyncTest "Timeout/cancellation race wait() test":
|
||||
proc raceTest(T: typedesc, itype: int) {.async.} =
|
||||
let monitorFuture = newFuture[T]("monitor",
|
||||
{FutureFlag.OwnCancelSchedule})
|
||||
|
||||
proc raceProc0(future: Future[T]): Future[T] {.async.} =
|
||||
await future
|
||||
proc raceProc1(future: Future[T]): Future[T] {.async.} =
|
||||
await raceProc0(future)
|
||||
proc raceProc2(future: Future[T]): Future[T] {.async.} =
|
||||
await raceProc1(future)
|
||||
|
||||
proc activation(udata: pointer) {.gcsafe.} =
|
||||
if itype == 0:
|
||||
when T is void:
|
||||
monitorFuture.complete()
|
||||
elif T is int:
|
||||
monitorFuture.complete(100)
|
||||
elif itype == 1:
|
||||
monitorFuture.fail(newException(ValueError, "test"))
|
||||
else:
|
||||
monitorFuture.cancelAndSchedule()
|
||||
|
||||
monitorFuture.cancelCallback = activation
|
||||
let
|
||||
testFut = raceProc2(monitorFuture)
|
||||
waitFut = wait(testFut, 10.milliseconds)
|
||||
|
||||
when T is void:
|
||||
let waitRes =
|
||||
try:
|
||||
await waitFut
|
||||
if itype == 0:
|
||||
true
|
||||
else:
|
||||
false
|
||||
except CancelledError:
|
||||
false
|
||||
except CatchableError:
|
||||
if itype != 0:
|
||||
true
|
||||
else:
|
||||
false
|
||||
check waitRes == true
|
||||
elif T is int:
|
||||
let waitRes =
|
||||
try:
|
||||
let res = await waitFut
|
||||
if itype == 0:
|
||||
(true, res)
|
||||
else:
|
||||
(false, -1)
|
||||
except CancelledError:
|
||||
(false, -1)
|
||||
except CatchableError:
|
||||
if itype != 0:
|
||||
(true, 0)
|
||||
else:
|
||||
(false, -1)
|
||||
if itype == 0:
|
||||
check:
|
||||
waitRes[0] == true
|
||||
waitRes[1] == 100
|
||||
else:
|
||||
check:
|
||||
waitRes[0] == true
|
||||
|
||||
await raceTest(void, 0)
|
||||
await raceTest(void, 1)
|
||||
await raceTest(void, 2)
|
||||
await raceTest(int, 0)
|
||||
await raceTest(int, 1)
|
||||
await raceTest(int, 2)
|
||||
|
||||
asyncTest "Timeout/cancellation race withTimeout() test":
|
||||
proc raceTest(T: typedesc, itype: int) {.async.} =
|
||||
let monitorFuture = newFuture[T]("monitor",
|
||||
{FutureFlag.OwnCancelSchedule})
|
||||
|
||||
proc raceProc0(future: Future[T]): Future[T] {.async.} =
|
||||
await future
|
||||
proc raceProc1(future: Future[T]): Future[T] {.async.} =
|
||||
await raceProc0(future)
|
||||
proc raceProc2(future: Future[T]): Future[T] {.async.} =
|
||||
await raceProc1(future)
|
||||
|
||||
proc activation(udata: pointer) {.gcsafe.} =
|
||||
if itype == 0:
|
||||
when T is void:
|
||||
monitorFuture.complete()
|
||||
elif T is int:
|
||||
monitorFuture.complete(100)
|
||||
elif itype == 1:
|
||||
monitorFuture.fail(newException(ValueError, "test"))
|
||||
else:
|
||||
monitorFuture.cancelAndSchedule()
|
||||
|
||||
monitorFuture.cancelCallback = activation
|
||||
let
|
||||
testFut = raceProc2(monitorFuture)
|
||||
waitFut = withTimeout(testFut, 10.milliseconds)
|
||||
|
||||
when T is void:
|
||||
let waitRes =
|
||||
try:
|
||||
await waitFut
|
||||
except CancelledError:
|
||||
false
|
||||
except CatchableError:
|
||||
false
|
||||
if itype == 0:
|
||||
check waitRes == true
|
||||
elif itype == 1:
|
||||
check waitRes == true
|
||||
else:
|
||||
check waitRes == false
|
||||
elif T is int:
|
||||
let waitRes =
|
||||
try:
|
||||
await waitFut
|
||||
except CancelledError:
|
||||
false
|
||||
except CatchableError:
|
||||
false
|
||||
if itype == 0:
|
||||
check waitRes == true
|
||||
elif itype == 1:
|
||||
check waitRes == true
|
||||
else:
|
||||
check waitRes == false
|
||||
|
||||
await raceTest(void, 0)
|
||||
await raceTest(void, 1)
|
||||
await raceTest(void, 2)
|
||||
await raceTest(int, 0)
|
||||
await raceTest(int, 1)
|
||||
await raceTest(int, 2)
|
||||
|
|
Loading…
Reference in New Issue