Avoid `ValueError` effect in varargs `race`/`one` (#520)

We can check at compile-time that at least one parameter is passed

* clean up closure environment explicitly in some callbacks to release
memory earlier
This commit is contained in:
Jacek Sieka 2024-03-06 06:42:22 +01:00 committed by GitHub
parent f6c7ecfa0a
commit 03d82475d9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 136 additions and 69 deletions

View File

@ -734,8 +734,8 @@ proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] {.
retFuture.fail(fut2.error) retFuture.fail(fut2.error)
else: else:
retFuture.complete() retFuture.complete()
fut1.callback = cb fut1.addCallback(cb)
fut2.callback = cb fut2.addCallback(cb)
proc cancellation(udata: pointer) = proc cancellation(udata: pointer) =
# On cancel we remove all our callbacks only. # On cancel we remove all our callbacks only.
@ -1086,12 +1086,14 @@ proc allFutures*(futs: varargs[FutureBase]): Future[void] {.
inc(finishedFutures) inc(finishedFutures)
if finishedFutures == totalFutures: if finishedFutures == totalFutures:
retFuture.complete() retFuture.complete()
reset(nfuts)
proc cancellation(udata: pointer) = proc cancellation(udata: pointer) =
# On cancel we remove all our callbacks only. # On cancel we remove all our callbacks only.
for i in 0..<len(nfuts): for i in 0..<len(nfuts):
if not(nfuts[i].finished()): if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb) nfuts[i].removeCallback(cb)
reset(nfuts)
for fut in nfuts: for fut in nfuts:
if not(fut.finished()): if not(fut.finished()):
@ -1148,13 +1150,14 @@ proc allFinished*[F: SomeFuture](futs: varargs[F]): Future[seq[F]] {.
if not(retFuture.finished()): if not(retFuture.finished()):
inc(finishedFutures) inc(finishedFutures)
if finishedFutures == totalFutures: if finishedFutures == totalFutures:
retFuture.complete(nfuts) retFuture.complete(move(nfuts))
proc cancellation(udata: pointer) = proc cancellation(udata: pointer) =
# On cancel we remove all our callbacks only. # On cancel we remove all our callbacks only.
for fut in nfuts.mitems(): for fut in nfuts.mitems():
if not(fut.finished()): if not(fut.finished()):
fut.removeCallback(cb) fut.removeCallback(cb)
reset(nfuts)
for fut in nfuts: for fut in nfuts:
if not(fut.finished()): if not(fut.finished()):
@ -1168,7 +1171,65 @@ proc allFinished*[F: SomeFuture](futs: varargs[F]): Future[seq[F]] {.
return retFuture return retFuture
proc one*[F: SomeFuture](futs: varargs[F]): Future[F] {. template oneImpl =
# If one of the Future[T] already finished we return it as result
for fut in futs:
if fut.finished():
retFuture.complete(fut)
return retFuture
# Because we can't capture varargs[T] in closures we need to create copy.
var nfuts =
when declared(fut0):
@[fut0] & @futs
else:
@futs
var cb: proc(udata: pointer) {.gcsafe, raises: [].}
cb = proc(udata: pointer) {.gcsafe, raises: [].} =
if not(retFuture.finished()):
var res: F
for i in 0..<len(nfuts):
if cast[pointer](nfuts[i]) != udata:
nfuts[i].removeCallback(cb)
else:
res = move(nfuts[i])
retFuture.complete(res)
reset(nfuts)
reset(cb)
proc cancellation(udata: pointer) =
# On cancel we remove all our callbacks only.
for i in 0..<len(nfuts):
if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb)
reset(nfuts)
reset(cb)
when declared(fut0):
fut0.addCallback(cb)
for fut in futs:
fut.addCallback(cb)
retFuture.cancelCallback = cancellation
return retFuture
proc one*[F: SomeFuture](fut0: F, futs: varargs[F]): Future[F] {.
async: (raw: true, raises: [CancelledError]).} =
## Returns a future which will complete and return completed Future[T] inside,
## when one of the futures in ``futs`` will be completed, failed or canceled.
##
## On success returned Future will hold finished Future[T].
##
## On cancel futures in ``futs`` WILL NOT BE cancelled.
let retFuture = newFuture[F]("chronos.one()")
if fut0.finished():
retFuture.complete(fut0)
return retFuture
oneImpl
proc one*[F: SomeFuture](futs: openArray[F]): Future[F] {.
async: (raw: true, raises: [ValueError, CancelledError]).} = async: (raw: true, raises: [ValueError, CancelledError]).} =
## Returns a future which will complete and return completed Future[T] inside, ## Returns a future which will complete and return completed Future[T] inside,
## when one of the futures in ``futs`` will be completed, failed or canceled. ## when one of the futures in ``futs`` will be completed, failed or canceled.
@ -1178,48 +1239,76 @@ proc one*[F: SomeFuture](futs: varargs[F]): Future[F] {.
## On success returned Future will hold finished Future[T]. ## On success returned Future will hold finished Future[T].
## ##
## On cancel futures in ``futs`` WILL NOT BE cancelled. ## On cancel futures in ``futs`` WILL NOT BE cancelled.
var retFuture = newFuture[F]("chronos.one()") let retFuture = newFuture[F]("chronos.one()")
if len(futs) == 0: if len(futs) == 0:
retFuture.fail(newException(ValueError, "Empty Future[T] list")) retFuture.fail(newException(ValueError, "Empty Future[T] list"))
return retFuture return retFuture
oneImpl
template raceImpl =
# If one of the Future[T] already finished we return it as result # If one of the Future[T] already finished we return it as result
for fut in futs: for fut in futs:
if fut.finished(): if fut.finished():
retFuture.complete(fut) retFuture.complete(fut)
return retFuture return retFuture
# Because we can't capture varargs[T] in closures we need to create copy. # Because we can't capture openArray/varargs in closures we need to create copy.
var nfuts = @futs var nfuts =
when declared(fut0):
@[fut0] & @futs
else:
@futs
var cb: proc(udata: pointer) {.gcsafe, raises: [].} var cb: proc(udata: pointer) {.gcsafe, raises: [].}
cb = proc(udata: pointer) {.gcsafe, raises: [].} = cb = proc(udata: pointer) {.gcsafe, raises: [].} =
if not(retFuture.finished()): if not(retFuture.finished()):
var res: F var res: FutureBase
var rfut = cast[FutureBase](udata)
for i in 0..<len(nfuts): for i in 0..<len(nfuts):
if cast[FutureBase](nfuts[i]) != rfut: if cast[pointer](nfuts[i]) != udata:
nfuts[i].removeCallback(cb) nfuts[i].removeCallback(cb)
else: else:
res = nfuts[i] res = move(nfuts[i])
retFuture.complete(res) retFuture.complete(res)
reset(nfuts)
reset(cb)
proc cancellation(udata: pointer) = proc cancellation(udata: pointer) =
# On cancel we remove all our callbacks only. # On cancel we remove all our callbacks only.
for i in 0..<len(nfuts): for i in 0..<len(nfuts):
if not(nfuts[i].finished()): if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb) nfuts[i].removeCallback(cb)
reset(nfuts)
reset(cb)
for fut in nfuts: when declared(fut0):
fut.addCallback(cb) fut0.addCallback(cb, cast[pointer](fut0))
for fut in futs:
fut.addCallback(cb, cast[pointer](fut))
retFuture.cancelCallback = cancellation retFuture.cancelCallback = cancellation
return retFuture return retFuture
proc race*(futs: varargs[FutureBase]): Future[FutureBase] {. proc race*(fut0: FutureBase, futs: varargs[FutureBase]): Future[FutureBase] {.
async: (raw: true, raises: [CancelledError]).} =
## Returns a future which will complete and return finished FutureBase,
## when one of the given futures will be completed, failed or canceled.
##
## On success returned Future will hold finished FutureBase.
##
## On cancel futures in ``futs`` WILL NOT BE cancelled.
let retFuture = newFuture[FutureBase]("chronos.race()")
if fut0.finished:
retFuture.complete(fut0)
return retFuture
raceImpl
proc race*(futs: openArray[FutureBase]): Future[FutureBase] {.
async: (raw: true, raises: [ValueError, CancelledError]).} = async: (raw: true, raises: [ValueError, CancelledError]).} =
## Returns a future which will complete and return completed FutureBase, ## Returns a future which will complete and return finished FutureBase,
## when one of the futures in ``futs`` will be completed, failed or canceled. ## when one of the futures in ``futs`` will be completed, failed or canceled.
## ##
## If the argument is empty, the returned future FAILS immediately. ## If the argument is empty, the returned future FAILS immediately.
@ -1233,59 +1322,18 @@ proc race*(futs: varargs[FutureBase]): Future[FutureBase] {.
retFuture.fail(newException(ValueError, "Empty Future[T] list")) retFuture.fail(newException(ValueError, "Empty Future[T] list"))
return retFuture return retFuture
# If one of the Future[T] already finished we return it as result raceImpl
for fut in futs:
if fut.finished():
retFuture.complete(fut)
return retFuture
# Because we can't capture varargs[T] in closures we need to create copy. proc race*(futs: openArray[SomeFuture]): Future[FutureBase] {.
var nfuts = @futs
var cb: proc(udata: pointer) {.gcsafe, raises: [].}
cb = proc(udata: pointer) {.gcsafe, raises: [].} =
if not(retFuture.finished()):
var res: FutureBase
var rfut = cast[FutureBase](udata)
for i in 0..<len(nfuts):
if nfuts[i] != rfut:
nfuts[i].removeCallback(cb)
else:
res = nfuts[i]
retFuture.complete(res)
proc cancellation(udata: pointer) =
# On cancel we remove all our callbacks only.
for i in 0..<len(nfuts):
if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb)
for fut in nfuts:
fut.addCallback(cb, cast[pointer](fut))
retFuture.cancelCallback = cancellation
return retFuture
proc race*[T](futs: varargs[Future[T]]): Future[FutureBase] {.
async: (raw: true, raises: [ValueError, CancelledError]).} = async: (raw: true, raises: [ValueError, CancelledError]).} =
## Returns a future which will complete only when all futures in ``futs`` ## Returns a future which will complete and return completed FutureBase,
## will be completed, failed or canceled. ## when one of the futures in ``futs`` will be completed, failed or canceled.
## ##
## If the argument is empty, the returned future COMPLETES immediately. ## If the argument is empty, the returned future FAILS immediately.
## ##
## On cancel all the awaited futures ``futs`` WILL NOT BE cancelled. ## On success returned Future will hold finished FutureBase.
# Because we can't capture varargs[T] in closures we need to create copy.
race(futs.mapIt(FutureBase(it)))
proc race*[T, E](futs: varargs[InternalRaisesFuture[T, E]]): Future[FutureBase] {.
async: (raw: true, raises: [ValueError, CancelledError]).} =
## Returns a future which will complete only when all futures in ``futs``
## will be completed, failed or canceled.
## ##
## If the argument is empty, the returned future COMPLETES immediately. ## On cancel futures in ``futs`` WILL NOT BE cancelled.
##
## On cancel all the awaited futures ``futs`` WILL NOT BE cancelled.
# Because we can't capture varargs[T] in closures we need to create copy. # Because we can't capture varargs[T] in closures we need to create copy.
race(futs.mapIt(FutureBase(it))) race(futs.mapIt(FutureBase(it)))

View File

@ -704,6 +704,14 @@ suite "Future[T] behavior test suite":
not(fut2.failed()) not(fut2.failed())
fut2.read() == f21 fut2.read() == f21
asyncTest "one() exception effect":
proc checkraises() {.async: (raises: [CancelledError]).} =
let f = Future[void].Raising([CancelledError]).init()
f.complete()
one(f).cancelSoon()
await checkraises()
asyncTest "or() test": asyncTest "or() test":
proc client1() {.async.} = proc client1() {.async.} =
await sleepAsync(200.milliseconds) await sleepAsync(200.milliseconds)
@ -1220,7 +1228,10 @@ suite "Future[T] behavior test suite":
test "location test": test "location test":
# WARNING: This test is very sensitive to line numbers and module name. # WARNING: This test is very sensitive to line numbers and module name.
template start(): int =
instantiationInfo().line
const first = start()
proc macroFuture() {.async.} = proc macroFuture() {.async.} =
let someVar {.used.} = 5 # LINE POSITION 1 let someVar {.used.} = 5 # LINE POSITION 1
let someOtherVar {.used.} = 4 let someOtherVar {.used.} = 4
@ -1258,12 +1269,12 @@ suite "Future[T] behavior test suite":
(loc.procedure == procedure) (loc.procedure == procedure)
check: check:
chk(loc10, "testfut.nim", 1225, "macroFuture") chk(loc10, "testfut.nim", first + 2, "macroFuture")
chk(loc11, "testfut.nim", 1228, "") chk(loc11, "testfut.nim", first + 5, "")
chk(loc20, "testfut.nim", 1237, "template") chk(loc20, "testfut.nim", first + 14, "template")
chk(loc21, "testfut.nim", 1240, "") chk(loc21, "testfut.nim", first + 17, "")
chk(loc30, "testfut.nim", 1234, "procedure") chk(loc30, "testfut.nim", first + 11, "procedure")
chk(loc31, "testfut.nim", 1241, "") chk(loc31, "testfut.nim", first + 18, "")
asyncTest "withTimeout(fut) should wait cancellation test": asyncTest "withTimeout(fut) should wait cancellation test":
proc futureNeverEnds(): Future[void] = proc futureNeverEnds(): Future[void] =
@ -1507,6 +1518,14 @@ suite "Future[T] behavior test suite":
f2.finished() f2.finished()
f3.finished() f3.finished()
asyncTest "race() exception effect":
proc checkraises() {.async: (raises: [CancelledError]).} =
let f = Future[void].Raising([CancelledError]).init()
f.complete()
race(f).cancelSoon()
await checkraises()
test "Unsigned integer overflow test": test "Unsigned integer overflow test":
check: check:
0xFFFF_FFFF_FFFF_FFFF'u64 + 1'u64 == 0'u64 0xFFFF_FFFF_FFFF_FFFF'u64 + 1'u64 == 0'u64