Fix bug cancellation handlers not called in wait() and withTimeout().

Fix double completion bug because of callback race.
Fix deprecation warnings.
Rename some internal procedures.
Bump version to 2.3.5.
This commit is contained in:
cheatfate 2020-01-27 20:28:44 +02:00 committed by zah
parent f02e748f18
commit 74700fdcab
6 changed files with 142 additions and 64 deletions

View File

@ -1,5 +1,5 @@
packageName = "chronos"
version = "2.3.4"
version = "2.3.5"
author = "Status Research & Development GmbH"
description = "Chronos"
license = "Apache License 2.0 or MIT"

View File

@ -482,13 +482,12 @@ proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] {.
fut1.callback = cb
fut2.callback = cb
proc cancel(udata: pointer) {.gcsafe.} =
proc cancellation(udata: pointer) {.gcsafe.} =
# On cancel we remove all our callbacks only.
if not(retFuture.finished()):
fut1.removeCallback(cb)
fut2.removeCallback(cb)
fut1.removeCallback(cb)
fut2.removeCallback(cb)
retFuture.cancelCallback = cancel
retFuture.cancelCallback = cancellation
return retFuture
proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] {.
@ -510,13 +509,12 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] {.
fut1.callback = cb
fut2.callback = cb
proc cancel(udata: pointer) {.gcsafe.} =
proc cancellation(udata: pointer) {.gcsafe.} =
# On cancel we remove all our callbacks only.
if not(retFuture.finished()):
fut1.removeCallback(cb)
fut2.removeCallback(cb)
fut1.removeCallback(cb)
fut2.removeCallback(cb)
retFuture.cancelCallback = cancel
retFuture.cancelCallback = cancellation
return retFuture
proc all*[T](futs: varargs[Future[T]]): auto {.
@ -688,17 +686,16 @@ proc allFutures*[T](futs: varargs[Future[T]]): Future[void] =
if completedFutures == totalFutures:
retFuture.complete()
proc cancel(udata: pointer) {.gcsafe.} =
proc cancellation(udata: pointer) {.gcsafe.} =
# On cancel we remove all our callbacks only.
if not(retFuture.finished()):
for i in 0..<len(nfuts):
if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb)
for i in 0..<len(nfuts):
if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb)
for fut in nfuts:
fut.addCallback(cb)
retFuture.cancelCallback = cancel
retFuture.cancelCallback = cancellation
if len(nfuts) == 0:
retFuture.complete()
@ -729,12 +726,11 @@ proc one*[T](futs: varargs[Future[T]]): Future[Future[T]] =
res = nfuts[i]
retFuture.complete(res)
proc cancel(udata: pointer) {.gcsafe.} =
proc cancellation(udata: pointer) {.gcsafe.} =
# On cancel we remove all our callbacks only.
if not(retFuture.finished()):
for i in 0..<len(nfuts):
if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb)
for i in 0..<len(nfuts):
if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb)
for fut in nfuts:
fut.addCallback(cb)
@ -742,5 +738,5 @@ proc one*[T](futs: varargs[Future[T]]): Future[Future[T]] =
if len(nfuts) == 0:
retFuture.fail(newException(ValueError, "Empty Future[T] list"))
retFuture.cancelCallback = cancel
retFuture.cancelCallback = cancellation
return retFuture

View File

@ -733,33 +733,31 @@ else:
proc initAPI() = discard
proc globalInit() = discard
proc setTimer(at: Moment, cb: CallbackFunc, udata: pointer = nil): TimerCallback =
proc setTimer*(at: Moment, cb: CallbackFunc,
udata: pointer = nil): TimerCallback =
## Arrange for the callback ``cb`` to be called at the given absolute
## timestamp ``at``. You can also pass ``udata`` to callback.
let loop = getGlobalDispatcher()
result = TimerCallback(finishAt: at,
function: AsyncCallback(function: cb, udata: udata))
function: AsyncCallback(function: cb, udata: udata))
loop.timers.push(result)
proc clearTimer(timer: TimerCallback) {.inline.} =
proc clearTimer*(timer: TimerCallback) {.inline.} =
timer.deleted = true
proc addTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil)
{.deprecated: "Use setTimer/clearTimer instead".} =
proc addTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) {.
inline, deprecated: "Use setTimer/clearTimer instead".} =
## Arrange for the callback ``cb`` to be called at the given absolute
## timestamp ``at``. You can also pass ``udata`` to callback.
let loop = getGlobalDispatcher()
var tcb = TimerCallback(finishAt: at,
function: AsyncCallback(function: cb, udata: udata))
loop.timers.push(tcb)
discard setTimer(at, cb, udata)
proc addTimer*(at: int64, cb: CallbackFunc, udata: pointer = nil) {.
inline, deprecated: "Use addTimer(Duration, cb, udata)".} =
addTimer(Moment.init(at, Millisecond), cb, udata)
discard setTimer(Moment.init(at, Millisecond), cb, udata)
proc addTimer*(at: uint64, cb: CallbackFunc, udata: pointer = nil) {.
inline, deprecated: "Use addTimer(Duration, cb, udata)".} =
addTimer(Moment.init(int64(at), Millisecond), cb, udata)
discard setTimer(Moment.init(int64(at), Millisecond), cb, udata)
proc removeTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) =
## Remove timer callback ``cb`` with absolute timestamp ``at`` from waiting
@ -793,14 +791,12 @@ proc sleepAsync*(duration: Duration): Future[void] =
var timer: TimerCallback
proc completion(data: pointer) {.gcsafe.} =
if not(retFuture.finished()):
retFuture.complete()
retFuture.complete()
proc cancel(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
clearTimer(timer)
proc cancellation(udata: pointer) {.gcsafe.} =
clearTimer(timer)
retFuture.cancelCallback = cancel
retFuture.cancelCallback = cancellation
timer = setTimer(moment, completion, cast[pointer](retFuture))
return retFuture
@ -821,7 +817,7 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] =
proc continuation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
if isNil(udata):
if not(fut.finished()):
# Timer exceeded first.
fut.removeCallback(continuation)
fut.cancel()
@ -832,13 +828,12 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] =
clearTimer(timer)
retFuture.complete(true)
proc cancel(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
if not isNil(timer):
clearTimer(timer)
if not(fut.finished()):
fut.removeCallback(continuation)
fut.cancel()
proc cancellation(udata: pointer) {.gcsafe.} =
if not isNil(timer):
clearTimer(timer)
if not(fut.finished()):
fut.removeCallback(continuation)
fut.cancel()
if fut.finished():
retFuture.complete(true)
@ -846,11 +841,11 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] =
if timeout.isZero():
retFuture.complete(false)
elif timeout.isInfinite():
retFuture.cancelCallback = cancel
retFuture.cancelCallback = cancellation
fut.addCallback(continuation)
else:
moment = Moment.fromNow(timeout)
retFuture.cancelCallback = cancel
retFuture.cancelCallback = cancellation
timer = setTimer(moment, continuation, nil)
fut.addCallback(continuation)
@ -875,7 +870,7 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] =
proc continuation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
if isNil(udata):
if not(fut.finished()):
# Timer exceeded first.
fut.removeCallback(continuation)
fut.cancel()
@ -893,13 +888,12 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] =
else:
retFuture.complete(fut.read())
proc cancel(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
if not isNil(timer):
clearTimer(timer)
if not(fut.finished()):
fut.removeCallback(continuation)
fut.cancel()
proc cancellation(udata: pointer) {.gcsafe.} =
if not isNil(timer):
clearTimer(timer)
if not(fut.finished()):
fut.removeCallback(continuation)
fut.cancel()
if fut.finished():
if fut.failed():
@ -913,11 +907,11 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] =
if timeout.isZero():
retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!"))
elif timeout.isInfinite():
retFuture.cancelCallback = cancel
retFuture.cancelCallback = cancellation
fut.addCallback(continuation)
else:
moment = Moment.fromNow(timeout)
retFuture.cancelCallback = cancel
retFuture.cancelCallback = cancellation
timer = setTimer(moment, continuation, nil)
fut.addCallback(continuation)

View File

@ -768,7 +768,8 @@ when defined(windows):
if pipeHandle == INVALID_HANDLE_VALUE:
let err = osLastError()
if int32(err) == ERROR_PIPE_BUSY:
addTimer(Moment.fromNow(50.milliseconds), pipeContinuation, nil)
discard setTimer(Moment.fromNow(50.milliseconds),
pipeContinuation, nil)
else:
retFuture.fail(getTransportOsError(err))
else:

View File

@ -14,6 +14,7 @@ suite "Asynchronous issues test suite":
const HELLO_PORT = 45679
const TEST_MSG = "testmsg"
const MSG_LEN = TEST_MSG.len()
const TestsCount = 500
type
CustomData = ref object
@ -42,6 +43,24 @@ suite "Asynchronous issues test suite":
if data.test == "OK":
result = true
proc testWait(): Future[bool] {.async.} =
for i in 0 ..< TestsCount:
try:
await wait(sleepAsync(4.milliseconds), 4.milliseconds)
except AsyncTimeoutError:
discard
result = true
proc testWithTimeout(): Future[bool] {.async.} =
for i in 0 ..< TestsCount:
discard await withTimeout(sleepAsync(4.milliseconds), 4.milliseconds)
result = true
test "Issue #6":
var res = waitFor(issue6())
check res == true
check waitFor(issue6()) == true
test "Callback-race double completion [wait()] test":
check waitFor(testWait()) == true
test "Callback-race double completion [withTimeout()] test":
check waitFor(testWithTimeout()) == true

View File

@ -757,7 +757,7 @@ suite "Future[T] behavior test suite":
removeTimer(moment, completion, cast[pointer](retFuture))
retFuture.cancelCallback = cancel
addTimer(moment, completion, cast[pointer](retFuture))
discard setTimer(moment, completion, cast[pointer](retFuture))
return retFuture
var fut = client1(100.milliseconds)
@ -770,6 +770,70 @@ suite "Future[T] behavior test suite":
return false
return true
proc testWaitAsync(): Future[bool] {.async.} =
var neverFlag1, neverFlag2, neverFlag3: bool
var waitProc1, waitProc2: bool
proc neverEndingProc(): Future[void] =
var res = newFuture[void]()
proc continuation(udata: pointer) {.gcsafe.} =
neverFlag2 = true
proc cancellation(udata: pointer) {.gcsafe.} =
neverFlag3 = true
res.addCallback(continuation)
res.cancelCallback = cancellation
result = res
neverFlag1 = true
proc waitProc() {.async.} =
try:
await wait(neverEndingProc(), 100.milliseconds)
except CancelledError:
waitProc1 = true
finally:
waitProc2 = true
var fut = waitProc()
await cancelAndWait(fut)
result = (fut.state == FutureState.Finished) and
neverFlag1 and neverFlag2 and neverFlag3 and
waitProc1 and waitProc2
proc testWithTimeoutAsync(): Future[bool] {.async.} =
var neverFlag1, neverFlag2, neverFlag3: bool
var waitProc1, waitProc2: bool
proc neverEndingProc(): Future[void] =
var res = newFuture[void]()
proc continuation(udata: pointer) {.gcsafe.} =
neverFlag2 = true
proc cancellation(udata: pointer) {.gcsafe.} =
neverFlag3 = true
res.addCallback(continuation)
res.cancelCallback = cancellation
result = res
neverFlag1 = true
proc withTimeoutProc() {.async.} =
try:
discard await withTimeout(neverEndingProc(), 100.milliseconds)
doAssert(false)
except CancelledError:
waitProc1 = true
finally:
waitProc2 = true
var fut = withTimeoutProc()
await cancelAndWait(fut)
result = (fut.state == FutureState.Finished) and
neverFlag1 and neverFlag2 and neverFlag3 and
waitProc1 and waitProc2
proc testWait(): bool =
result = waitFor(testWaitAsync())
proc testWithTimeout(): bool =
result = waitFor(testWithTimeoutAsync())
test "Async undefined behavior (#7758) test":
check test1() == true
test "Immediately completed asynchronous procedure test":
@ -806,3 +870,7 @@ suite "Future[T] behavior test suite":
check testBreakCancellation() == true
test "Cancellation callback test":
check testCancelCallback() == true
test "Cancellation wait() test":
check testWait() == true
test "Cancellation withTimeout() test":
check testWithTimeout() == true