Race() call (#142)

* Add `race` procedure call which extends `one` with FutureBase.

* Fix race() and add test procedures.
This commit is contained in:
Eugene Kabanov 2020-11-27 00:50:55 +02:00 committed by GitHub
parent ac9b3e304f
commit bca5559c6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 243 additions and 0 deletions

View File

@ -932,3 +932,49 @@ proc one*[T](futs: varargs[Future[T]]): Future[Future[T]] =
retFuture.cancelCallback = cancellation retFuture.cancelCallback = cancellation
return retFuture return retFuture
proc race*(futs: varargs[FutureBase]): Future[FutureBase] =
## Returns a future which will complete and return completed FutureBase,
## when one of the futures in ``futs`` will be completed, failed or canceled.
##
## If the argument is empty, the returned future FAILS immediately.
##
## On success returned Future will hold finished FutureBase.
##
## On cancel futures in ``futs`` WILL NOT BE cancelled.
var retFuture = newFuture[FutureBase]("chronos.race()")
# Because we can't capture varargs[T] in closures we need to create copy.
var nfuts = @futs
proc cb(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
var res: FutureBase
var rfut = cast[FutureBase](udata)
for i in 0..<len(nfuts):
if nfuts[i] != rfut:
nfuts[i].removeCallback(cb)
else:
res = nfuts[i]
retFuture.complete(res)
proc cancellation(udata: pointer) {.gcsafe.} =
# On cancel we remove all our callbacks only.
for i in 0..<len(nfuts):
if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb)
# If one of the Future[T] already finished we return it as result
for fut in nfuts:
if fut.finished():
retFuture.complete(fut)
return retFuture
for fut in nfuts:
fut.addCallback(cb, cast[pointer](fut))
if len(nfuts) == 0:
retFuture.fail(newException(ValueError, "Empty Future[T] list"))
retFuture.cancelCallback = cancellation
return retFuture

View File

@ -1113,6 +1113,192 @@ suite "Future[T] behavior test suite":
waitFor testWait() waitFor testWait()
proc testRaceZero(): bool =
var tseq = newSeq[FutureBase]()
var fut1 = race(tseq)
var fut2 = race()
var fut3 = race([])
fut1.failed() and fut2.failed() and fut3.failed()
proc testRaceVarargs(): bool =
proc vlient1() {.async.} =
await sleepAsync(100.milliseconds)
proc vlient2() {.async.} =
await sleepAsync(200.milliseconds)
proc vlient3() {.async.} =
await sleepAsync(300.milliseconds)
proc ilient1(): Future[int] {.async.} =
await sleepAsync(100.milliseconds)
result = 10
proc ilient2(): Future[int] {.async.} =
await sleepAsync(200.milliseconds)
result = 20
proc ilient3(): Future[int] {.async.} =
await sleepAsync(300.milliseconds)
result = 30
proc slient1(): Future[string] {.async.} =
await sleepAsync(100.milliseconds)
result = "sclient1"
proc slient2(): Future[string] {.async.} =
await sleepAsync(200.milliseconds)
result = "sclient2"
proc slient3(): Future[string] {.async.} =
await sleepAsync(300.milliseconds)
result = "sclient3"
var fut11 = vlient1()
var fut12 = ilient2()
var fut13 = slient3()
var res1 = waitFor(race(fut11, fut12, fut13))
var fut21 = vlient2()
var fut22 = ilient1()
var fut23 = slient3()
var res2 = waitFor(race(fut21, fut22, fut23))
var fut31 = vlient3()
var fut32 = ilient2()
var fut33 = slient1()
var res3 = waitFor(race(fut31, fut32, fut33))
var fut41 = vlient1()
var fut42 = slient2()
var fut43 = ilient3()
var res4 = waitFor(race(fut41, fut42, fut43))
if (FutureBase(fut11) != res1) or
(FutureBase(fut22) != res2) or
(FutureBase(fut33) != res3) or
(FutureBase(fut41) != res4):
return false
result = true
proc testRaceSeq(): bool =
proc vlient1() {.async.} =
await sleepAsync(100.milliseconds)
proc vlient2() {.async.} =
await sleepAsync(200.milliseconds)
proc vlient3() {.async.} =
await sleepAsync(300.milliseconds)
proc ilient1(): Future[int] {.async.} =
await sleepAsync(100.milliseconds)
result = 10
proc ilient2(): Future[int] {.async.} =
await sleepAsync(200.milliseconds)
result = 20
proc ilient3(): Future[int] {.async.} =
await sleepAsync(300.milliseconds)
result = 30
proc slient1(): Future[string] {.async.} =
await sleepAsync(100.milliseconds)
result = "slient1"
proc slient2(): Future[string] {.async.} =
await sleepAsync(200.milliseconds)
result = "slient2"
proc slient3(): Future[string] {.async.} =
await sleepAsync(300.milliseconds)
result = "slient3"
var v10 = vlient1()
var v11 = ilient2()
var v12 = slient3()
var res1 = waitFor(race(@[FutureBase(v10), FutureBase(v11),
FutureBase(v12)]))
var v20 = vlient2()
var v21 = ilient1()
var v22 = slient3()
var res2 = waitFor(race(@[FutureBase(v20), FutureBase(v21),
FutureBase(v22)]))
var v30 = vlient3()
var v31 = ilient2()
var v32 = slient1()
var res3 = waitFor(race(@[FutureBase(v30), FutureBase(v31),
FutureBase(v32)]))
var v40 = vlient1()
var v41 = slient2()
var v42 = ilient3()
var res4 = waitFor(race(@[FutureBase(v40), FutureBase(v41),
FutureBase(v42)]))
if res1 != FutureBase(v10) or
res2 != FutureBase(v21) or
res3 != FutureBase(v32) or
res4 != FutureBase(v40):
return false
result = true
proc testRaceCompleted(): bool =
proc client1(): Future[int] {.async.} =
result = 1
proc client2() {.async.} =
if true:
raise newException(ValueError, "")
proc client3(): Future[string] {.async.} =
await sleepAsync(100.milliseconds)
result = "client3"
var f10 = client1()
var f20 = client2()
var f30 = client3()
var fut1 = race(f30, f10, f20)
var f11 = client1()
var f21 = client2()
var f31 = client3()
var fut2 = race(f31, f21, f11)
result = (fut1.done() and fut1.read() == FutureBase(f10)) and
(fut2.done() and fut2.read() == FutureBase(f21))
proc testRaceCancelled(): bool =
proc client1() {.async.} =
await sleepAsync(100.milliseconds)
proc client2(): Future[int] {.async.} =
await sleepAsync(200.milliseconds)
return 10
proc client3(): Future[string] {.async.} =
await sleepAsync(300.milliseconds)
return "client3"
var f1 = client1()
var f2 = client2()
var f3 = client3()
var fut = race(f1, f2, f3)
waitFor(cancelAndWait(fut))
if f1.finished() or f2.finished() or f3.finished():
return false
waitFor(sleepAsync(400.milliseconds))
if not(f1.finished()) or not(f2.finished()) or not(f3.finished()):
return false
return true
test "Async undefined behavior (#7758) test": test "Async undefined behavior (#7758) test":
check test1() == true check test1() == true
test "Immediately completed asynchronous procedure test": test "Immediately completed asynchronous procedure test":
@ -1152,6 +1338,17 @@ suite "Future[T] behavior test suite":
test "or() already completed test": test "or() already completed test":
check testOrCompleted() == true check testOrCompleted() == true
test "race(zero) test":
check testRaceZero() == true
test "race(varargs) test":
check testRaceVarargs() == true
test "race(seq) test":
check testRaceSeq() == true
test "race() already completed test":
check testRaceCompleted() == true
test "race() cancellation test":
check testRaceCancelled() == true
test "cancel() async procedure test": test "cancel() async procedure test":
check testCancelIter() == true check testCancelIter() == true
test "cancelAndWait() test": test "cancelAndWait() test":