Deprecate oneIndex(), oneValue(), all(), or(), and().

Add allFutures() and one().
Fix deprecation in chunkedstream.
Add tests.
This commit is contained in:
cheatfate 2019-07-04 15:04:59 +03:00
parent 992cc57377
commit fcfb87d2a0
No known key found for this signature in database
GPG Key ID: 46ADD633A7201F95
3 changed files with 266 additions and 295 deletions

View File

@ -469,12 +469,12 @@ proc asyncCheck*[T](future: Future[T]) =
proc asyncDiscard*[T](future: Future[T]) = discard
## This is async workaround for discard ``Future[T]``.
proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] {.
deprecated: "Use allFutures[T](varargs[Future[T]])".} =
## Returns a future which will complete once both ``fut1`` and ``fut2``
## complete.
##
## TODO: In case when `fut1` or `fut2` got cancelled, what result Future[void]
## should return?
## If cancelled, ``fut1`` and ``fut2`` futures WILL NOT BE cancelled.
var retFuture = newFuture[void]("chronos.`and`")
proc cb(data: pointer) =
if not(retFuture.finished()):
@ -493,20 +493,20 @@ proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
fut2.callback = cb
proc cancel(udata: pointer) {.gcsafe.} =
# On cancel we remove all our callbacks only.
if not(retFuture.finished()):
fut1.removeCallback(cb)
fut2.removeCallback(cb)
if not(fut1.finished()):
fut1.cancel()
if not(fut2.finished()):
fut2.cancel()
retFuture.cancelCallback = cancel
return retFuture
proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] {.
deprecated: "Use one[T](varargs[Future[T]])".} =
## Returns a future which will complete once either ``fut1`` or ``fut2``
## complete.
##
## If cancelled, ``fut1`` and ``fut2`` futures WILL NOT BE cancelled.
var retFuture = newFuture[void]("chronos.`or`")
proc cb(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
@ -521,18 +521,16 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
fut2.callback = cb
proc cancel(udata: pointer) {.gcsafe.} =
# On cancel we remove all our callbacks only.
if not(retFuture.finished()):
fut1.removeCallback(cb)
fut2.removeCallback(cb)
if not(fut1.finished()):
fut1.cancel()
if not(fut2.finished()):
fut2.cancel()
retFuture.cancelCallback = cancel
return retFuture
proc all*[T](futs: varargs[Future[T]]): auto =
proc all*[T](futs: varargs[Future[T]]): auto {.
deprecated: "Use allFutures(varargs[Future[T]])".} =
## Returns a future which will complete once all futures in ``futs`` complete.
## If the argument is empty, the returned future completes immediately.
##
@ -568,20 +566,12 @@ proc all*[T](futs: varargs[Future[T]]): auto =
if not(retFuture.failed()):
retFuture.complete()
proc cancel(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
for i in 0..<len(nfuts):
if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb)
nfuts[i].cancel()
for fut in nfuts:
fut.addCallback(cb)
if len(nfuts) == 0:
retFuture.complete()
retFuture.cancelCallback = cancel
return retFuture
else:
var retFuture = newFuture[seq[T]]("chronos.all(T)")
@ -600,23 +590,16 @@ proc all*[T](futs: varargs[Future[T]]): auto =
if not(retFuture.failed()):
retFuture.complete(retValues)
proc cancel(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
for i in 0..<len(nfuts):
if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb)
nfuts[i].cancel()
for fut in nfuts:
fut.addCallback(cb)
if len(nfuts) == 0:
retFuture.complete(retValues)
retFuture.cancelCallback = cancel
return retFuture
proc oneIndex*[T](futs: varargs[Future[T]]): Future[int] =
proc oneIndex*[T](futs: varargs[Future[T]]): Future[int] {.
deprecated: "Use one[T](varargs[Future[T]])".} =
## Returns a future which will complete once one of the futures in ``futs``
## complete.
##
@ -638,23 +621,16 @@ proc oneIndex*[T](futs: varargs[Future[T]]): Future[int] =
res = i
retFuture.complete(res)
proc cancel(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
for i in 0..<len(nfuts):
if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb)
nfuts[i].cancel()
for fut in nfuts:
fut.addCallback(cb)
if len(nfuts) == 0:
retFuture.fail(newException(ValueError, "Empty Future[T] list"))
retFuture.cancelCallback = cancel
return retFuture
proc oneValue*[T](futs: varargs[Future[T]]): Future[T] =
proc oneValue*[T](futs: varargs[Future[T]]): Future[T] {.
deprecated: "Use one[T](varargs[Future[T]])".} =
## Returns a future which will complete once one of the futures in ``futs``
## complete.
##
@ -662,11 +638,6 @@ proc oneValue*[T](futs: varargs[Future[T]]): Future[T] =
##
## Returned future will hold value of completed ``futs`` future, or error
## if future was failed.
##
## TODO: This procedure has bug on handling cancelled futures from ``futs``.
## So if future from ``futs`` list become cancelled, what must be returned?
## You can't cancel result ``retFuture`` because in such way infinite
## recursion will happen.
var nfuts = @futs
var retFuture = newFuture[T]("chronos.oneValue(T)")
@ -687,20 +658,12 @@ proc oneValue*[T](futs: varargs[Future[T]]): Future[T] =
else:
retFuture.complete(resFut.read())
proc cancel(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
for i in 0..<len(nfuts):
if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb)
nfuts[i].cancel()
for fut in nfuts:
fut.addCallback(cb)
if len(nfuts) == 0:
retFuture.fail(newException(ValueError, "Empty Future[T] list"))
retFuture.cancelCallback = cancel
return retFuture
proc cancelAndWait*[T](future: Future[T]): Future[void] =
@ -714,3 +677,80 @@ proc cancelAndWait*[T](future: Future[T]): Future[void] =
future.addCallback(continuation)
future.cancel()
return retFuture
proc allFutures*[T](futs: varargs[Future[T]]): Future[void] =
## Returns a future which will complete only when all futures in ``futs``
## will be completed, failed or canceled.
##
## If the argument is empty, the returned future COMPLETES immediately.
##
## On cancel all the awaited futures ``futs`` WILL NOT BE cancelled.
var retFuture = newFuture[void]("chronos.allFutures()")
let totalFutures = len(futs)
var completedFutures = 0
# Because we can't capture varargs[T] in closures we need to create copy.
var nfuts = @futs
proc cb(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
inc(completedFutures)
if completedFutures == totalFutures:
retFuture.complete()
proc cancel(udata: pointer) {.gcsafe.} =
# On cancel we remove all our callbacks only.
if not(retFuture.finished()):
for i in 0..<len(nfuts):
if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb)
for fut in nfuts:
fut.addCallback(cb)
retFuture.cancelCallback = cancel
if len(nfuts) == 0:
retFuture.complete()
return retFuture
proc one*[T](futs: varargs[Future[T]]): Future[Future[T]] =
## Returns a future which will complete and return completed Future[T] inside,
## when one of the futures in ``futs`` will be completed, failed or canceled.
##
## If the argument is empty, the returned future FAILS immediately.
##
## On success returned Future will hold index in ``futs`` array.
##
## On cancel futures in ``futs`` WILL NOT BE cancelled.
var retFuture = newFuture[Future[T]]("chronos.one()")
# Because we can't capture varargs[T] in closures we need to create copy.
var nfuts = @futs
proc cb(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
var res: Future[T]
var rfut = cast[FutureBase](udata)
for i in 0..<len(nfuts):
if cast[FutureBase](nfuts[i]) != rfut:
nfuts[i].removeCallback(cb)
else:
res = nfuts[i]
retFuture.complete(res)
proc cancel(udata: pointer) {.gcsafe.} =
# On cancel we remove all our callbacks only.
if not(retFuture.finished()):
for i in 0..<len(nfuts):
if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb)
for fut in nfuts:
fut.addCallback(cb)
if len(nfuts) == 0:
retFuture.fail(newException(ValueError, "Empty Future[T] list"))
retFuture.cancelCallback = cancel
return retFuture

View File

@ -128,7 +128,9 @@ proc chunkedReadLoop(stream: AsyncStreamReader) {.async.} =
break
rstream.buffer.update(toRead)
await rstream.buffer.transfer() or exitFut
await oneOf(rstream.buffer.transfer(), exitFut)
if exitFut.finished():
rstream.state = AsyncStreamState.Stopped
break
@ -169,7 +171,9 @@ proc chunkedReadLoop(stream: AsyncStreamReader) {.async.} =
break
rstream.state = AsyncStreamState.Finished
await rstream.buffer.transfer() or exitFut
await oneOf(rstream.buffer.transfer(), exitFut)
if exitFut.finished():
rstream.state = AsyncStreamState.Stopped
break

View File

@ -139,7 +139,79 @@ suite "Future[T] behavior test suite":
proc test5(): int =
result = waitFor(testFuture4())
proc testAllVarargs(): int =
proc testAsyncDiscard(): int =
var completedFutures = 0
proc client1() {.async.} =
await sleepAsync(100.milliseconds)
inc(completedFutures)
proc client2() {.async.} =
await sleepAsync(200.milliseconds)
inc(completedFutures)
proc client3() {.async.} =
await sleepAsync(300.milliseconds)
inc(completedFutures)
proc client4() {.async.} =
await sleepAsync(400.milliseconds)
inc(completedFutures)
proc client5() {.async.} =
await sleepAsync(500.milliseconds)
inc(completedFutures)
proc client1f() {.async.} =
await sleepAsync(100.milliseconds)
inc(completedFutures)
if true:
raise newException(ValueError, "")
proc client2f() {.async.} =
await sleepAsync(200.milliseconds)
inc(completedFutures)
if true:
raise newException(ValueError, "")
proc client3f() {.async.} =
await sleepAsync(300.milliseconds)
inc(completedFutures)
if true:
raise newException(ValueError, "")
proc client4f() {.async.} =
await sleepAsync(400.milliseconds)
inc(completedFutures)
if true:
raise newException(ValueError, "")
proc client5f() {.async.} =
await sleepAsync(500.milliseconds)
inc(completedFutures)
if true:
raise newException(ValueError, "")
asyncDiscard client1()
asyncDiscard client1f()
asyncDiscard client2()
asyncDiscard client2f()
asyncDiscard client3()
asyncDiscard client3f()
asyncDiscard client4()
asyncDiscard client4f()
asyncDiscard client5()
asyncDiscard client5f()
waitFor(sleepAsync(2000.milliseconds))
result = completedFutures
proc testAllFuturesZero(): bool =
var tseq = newSeq[Future[int]]()
var fut = allFutures(tseq)
result = fut.finished
proc testAllFuturesVarargs(): int =
var completedFutures = 0
proc vlient1() {.async.} =
@ -247,43 +319,34 @@ suite "Future[T] behavior test suite":
if true:
raise newException(ValueError, "")
waitFor(all(vlient1(), vlient2(), vlient3(), vlient4(), vlient5()))
waitFor(allFutures(vlient1(), vlient2(), vlient3(), vlient4(), vlient5()))
# 5 completed futures = 5
result += completedFutures
completedFutures = 0
try:
waitFor(all(vlient1(), vlient1f(),
vlient2(), vlient2f(),
vlient3(), vlient3f(),
vlient4(), vlient4f(),
vlient5(), vlient5f()))
result -= 10000
except:
discard
waitFor(allFutures(vlient1(), vlient1f(),
vlient2(), vlient2f(),
vlient3(), vlient3f(),
vlient4(), vlient4f(),
vlient5(), vlient5f()))
# 10 completed futures = 10
result += completedFutures
completedFutures = 0
var res = waitFor(all(client1(), client2(), client3(), client4(), client5()))
for item in res:
result += item
# 5 completed futures + 5 values = 10
waitFor(allFutures(client1(), client2(), client3(), client4(), client5()))
# 5 completed futures
result += completedFutures
completedFutures = 0
try:
var res = waitFor(all(client1(), client1f(),
client2(), client2f(),
client3(), client3f(),
client4(), client4f(),
client5(), client5f()))
result -= 10000
except:
discard
waitFor(allFutures(client1(), client1f(),
client2(), client2f(),
client3(), client3f(),
client4(), client4f(),
client5(), client5f()))
# 10 completed futures = 10
result += completedFutures
proc testAllSeq(): int =
proc testAllFuturesSeq(): int =
var completedFutures = 0
var vfutures = newSeq[Future[void]]()
var nfutures = newSeq[Future[int]]()
@ -401,7 +464,7 @@ suite "Future[T] behavior test suite":
vfutures.add(vlient4())
vfutures.add(vlient5())
waitFor(all(vfutures))
waitFor(allFutures(vfutures))
# 5 * 10 completed futures = 50
result += completedFutures
@ -419,11 +482,7 @@ suite "Future[T] behavior test suite":
vfutures.add(vlient5())
vfutures.add(vlient5f())
try:
waitFor(all(vfutures))
result -= 10000
except:
discard
waitFor(allFutures(vfutures))
# 10 * 10 completed futures = 100
result += completedFutures
@ -436,10 +495,8 @@ suite "Future[T] behavior test suite":
nfutures.add(client4())
nfutures.add(client5())
var res = waitFor(all(nfutures))
for i in 0..<len(nfutures):
result += res[i]
# 5 * 10 completed futures + 5 * 10 results = 100
waitFor(allFutures(nfutures))
# 5 * 10 completed futures = 50
result += completedFutures
completedFutures = 0
@ -456,98 +513,17 @@ suite "Future[T] behavior test suite":
nfutures.add(client5())
nfutures.add(client5f())
try:
var results = waitFor(all(nfutures))
result -= 10000
except:
discard
waitFor(allFutures(nfutures))
# 10 * 10 completed futures + 0 * 10 results = 100
# 10 * 10 completed futures = 100
result += completedFutures
proc testAsyncDiscard(): int =
var completedFutures = 0
proc client1() {.async.} =
await sleepAsync(100.milliseconds)
inc(completedFutures)
proc client2() {.async.} =
await sleepAsync(200.milliseconds)
inc(completedFutures)
proc client3() {.async.} =
await sleepAsync(300.milliseconds)
inc(completedFutures)
proc client4() {.async.} =
await sleepAsync(400.milliseconds)
inc(completedFutures)
proc client5() {.async.} =
await sleepAsync(500.milliseconds)
inc(completedFutures)
proc client1f() {.async.} =
await sleepAsync(100.milliseconds)
inc(completedFutures)
if true:
raise newException(ValueError, "")
proc client2f() {.async.} =
await sleepAsync(200.milliseconds)
inc(completedFutures)
if true:
raise newException(ValueError, "")
proc client3f() {.async.} =
await sleepAsync(300.milliseconds)
inc(completedFutures)
if true:
raise newException(ValueError, "")
proc client4f() {.async.} =
await sleepAsync(400.milliseconds)
inc(completedFutures)
if true:
raise newException(ValueError, "")
proc client5f() {.async.} =
await sleepAsync(500.milliseconds)
inc(completedFutures)
if true:
raise newException(ValueError, "")
asyncDiscard client1()
asyncDiscard client1f()
asyncDiscard client2()
asyncDiscard client2f()
asyncDiscard client3()
asyncDiscard client3f()
asyncDiscard client4()
asyncDiscard client4f()
asyncDiscard client5()
asyncDiscard client5f()
waitFor(sleepAsync(2000.milliseconds))
result = completedFutures
proc testAllZero(): bool =
proc testOneZero(): bool =
var tseq = newSeq[Future[int]]()
var fut = all(tseq)
result = fut.finished
proc testOneIndexZero(): bool =
var tseq = newSeq[Future[int]]()
var fut = oneIndex(tseq)
var fut = one(tseq)
result = fut.finished and fut.failed
proc testOneValueZero(): bool =
var tseq = newSeq[Future[int]]()
var fut = oneValue(tseq)
result = fut.finished and fut.failed
proc testOneIndexVarargs(): bool =
proc testOneVarargs(): bool =
proc vlient1() {.async.} =
await sleepAsync(100.milliseconds)
@ -569,21 +545,45 @@ suite "Future[T] behavior test suite":
await sleepAsync(300.milliseconds)
result = 30
var res10 = waitFor(oneIndex(vlient1(), vlient2(), vlient3()))
var res11 = waitFor(oneIndex(vlient2(), vlient1(), vlient3()))
var res12 = waitFor(oneIndex(vlient3(), vlient2(), vlient1()))
if res10 != 0 or res11 != 1 or res12 != 2:
return
var fut11 = vlient1()
var fut12 = vlient2()
var fut13 = vlient3()
var res1 = waitFor(one(fut11, fut12, fut13))
var res20 = waitFor(oneIndex(client1(), client2(), client3()))
var res21 = waitFor(oneIndex(client2(), client1(), client3()))
var res22 = waitFor(oneIndex(client3(), client2(), client1()))
if res20 != 0 or res21 != 1 or res22 != 2:
return
var fut21 = vlient2()
var fut22 = vlient1()
var fut23 = vlient3()
var res2 = waitFor(one(fut21, fut22, fut23))
var fut31 = vlient3()
var fut32 = vlient2()
var fut33 = vlient1()
var res3 = waitFor(one(fut31, fut32, fut33))
if fut11 != res1 or fut22 != res2 or fut33 != res3:
return false
var cut11 = client1()
var cut12 = client2()
var cut13 = client3()
var res4 = waitFor(one(cut11, cut12, cut13))
var cut21 = client2()
var cut22 = client1()
var cut23 = client3()
var res5 = waitFor(one(cut21, cut22, cut23))
var cut31 = client3()
var cut32 = client2()
var cut33 = client1()
var res6 = waitFor(one(cut31, cut32, cut33))
if cut11 != res4 or cut22 != res5 or cut33 != res6:
return false
result = true
proc testOneValueVarargs(): bool =
proc testOneSeq(): bool =
proc vlient1() {.async.} =
await sleepAsync(100.milliseconds)
@ -608,108 +608,38 @@ suite "Future[T] behavior test suite":
var v10 = vlient1()
var v11 = vlient2()
var v12 = vlient3()
var res1 = waitFor(one(@[v10, v11, v12]))
var v20 = vlient2()
var v21 = vlient1()
var v22 = vlient3()
var res2 = waitFor(one(@[v20, v21, v22]))
var v30 = vlient3()
var v31 = vlient2()
var v32 = vlient1()
var res3 = waitFor(one(@[v30, v31, v32]))
waitFor(oneValue(v10, v11, v12))
waitFor(oneValue(v20, v21, v22))
waitFor(oneValue(v30, v31, v32))
if res1 != v10 or res2 != v21 or res3 != v32:
return false
if (not v10.finished) or (not v21.finished) or (not v32.finished):
return
var c10 = client1()
var c11 = client2()
var c12 = client3()
var res4 = waitFor(one(@[c10, c11, c12]))
var res30 = waitFor(oneValue(client1(), client2(), client3()))
var res31 = waitFor(oneValue(client2(), client1(), client3()))
var res32 = waitFor(oneValue(client3(), client2(), client1()))
if res30 != 10 or res31 != 10 or res32 != 10:
return
var c20 = client2()
var c21 = client1()
var c22 = client3()
var res5 = waitFor(one(@[c20, c21, c22]))
result = true
var c30 = client3()
var c31 = client2()
var c32 = client1()
var res6 = waitFor(one(@[c30, c31, c32]))
proc testOneIndexSeq(): bool =
proc vlient1() {.async.} =
await sleepAsync(100.milliseconds)
proc vlient2() {.async.} =
await sleepAsync(200.milliseconds)
proc vlient3() {.async.} =
await sleepAsync(300.milliseconds)
proc client1(): Future[int] {.async.} =
await sleepAsync(100.milliseconds)
result = 10
proc client2(): Future[int] {.async.} =
await sleepAsync(200.milliseconds)
result = 20
proc client3(): Future[int] {.async.} =
await sleepAsync(300.milliseconds)
result = 30
var res10 = waitFor(oneIndex(@[vlient1(), vlient2(), vlient3()]))
var res11 = waitFor(oneIndex(@[vlient2(), vlient1(), vlient3()]))
var res12 = waitFor(oneIndex(@[vlient3(), vlient2(), vlient1()]))
if res10 != 0 or res11 != 1 or res12 != 2:
return
var res20 = waitFor(oneIndex(@[client1(), client2(), client3()]))
var res21 = waitFor(oneIndex(@[client2(), client1(), client3()]))
var res22 = waitFor(oneIndex(@[client3(), client2(), client1()]))
if res20 != 0 or res21 != 1 or res22 != 2:
return
result = true
proc testOneValueSeq(): bool =
proc vlient1() {.async.} =
await sleepAsync(100.milliseconds)
proc vlient2() {.async.} =
await sleepAsync(200.milliseconds)
proc vlient3() {.async.} =
await sleepAsync(300.milliseconds)
proc client1(): Future[int] {.async.} =
await sleepAsync(100.milliseconds)
result = 10
proc client2(): Future[int] {.async.} =
await sleepAsync(200.milliseconds)
result = 20
proc client3(): Future[int] {.async.} =
await sleepAsync(300.milliseconds)
result = 30
var v10 = vlient1()
var v11 = vlient2()
var v12 = vlient3()
var v20 = vlient2()
var v21 = vlient1()
var v22 = vlient3()
var v30 = vlient3()
var v31 = vlient2()
var v32 = vlient1()
waitFor(oneValue(@[v10, v11, v12]))
waitFor(oneValue(@[v20, v21, v22]))
waitFor(oneValue(@[v30, v31, v32]))
if (not v10.finished) or (not v21.finished) or (not v32.finished):
return
var res30 = waitFor(oneValue(@[client1(), client2(), client3()]))
var res31 = waitFor(oneValue(@[client2(), client1(), client3()]))
var res32 = waitFor(oneValue(@[client3(), client2(), client1()]))
if res30 != 10 or res31 != 10 or res32 != 10:
return
if res4 != c10 or res5 != c21 or res6 != c32:
return false
result = true
@ -848,26 +778,23 @@ suite "Future[T] behavior test suite":
check test4() == "1245"
test "wait[T]() test":
check test5() == 6
test "all[T](varargs) test":
check testAllVarargs() == 35
test "all[T](seq) test":
check testAllSeq() == 350
test "all[T](zero) test":
check testAllZero() == true
test "asyncDiscard() test":
check testAsyncDiscard() == 10
test "oneIndex[T](zero) test":
check testOneIndexZero() == true
test "oneValue[T](zero) test":
check testOneValueZero() == true
test "oneIndex[T](varargs) test":
check testOneIndexVarargs() == true
test "oneValue[T](varargs) test":
check testOneValueVarargs() == true
test "oneIndex[T](seq) test":
check testOneIndexSeq() == true
test "oneValue[T](seq) test":
check testOneValueSeq() == true
test "allFutures(zero) test":
check testAllFuturesZero() == true
test "allFutures(varargs) test":
check testAllFuturesVarargs() == 30
test "allFutures(varargs) test":
check testAllFuturesSeq() == 300
test "one(zero) test":
check testOneZero() == true
test "one(varargs) test":
check testOneVarargs() == true
test "one(seq) test":
check testOneSeq() == true
test "cancel() async procedure test":
check testCancelIter() == true