From c27c564d471010f8240257189e0c1bcad44e39cd Mon Sep 17 00:00:00 2001 From: cheatfate Date: Tue, 4 Jun 2019 19:51:35 +0300 Subject: [PATCH] Add oneIndex(), oneValue() which are varargs `or` operation. Fix some deprecation warnings for Nim devel. Add tests for oneIndex(), oneValue(). --- chronos/asyncfutures2.nim | 74 ++++++++++++++- chronos/asyncloop.nim | 14 ++- chronos/sendfile.nim | 20 ++-- tests/testfut.nim | 188 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 280 insertions(+), 16 deletions(-) diff --git a/chronos/asyncfutures2.nim b/chronos/asyncfutures2.nim index 78dc538..d37c64a 100644 --- a/chronos/asyncfutures2.nim +++ b/chronos/asyncfutures2.nim @@ -437,7 +437,7 @@ proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = # # We should investigate this further, before settling on the final design. # The same reasoning applies to `or` and `all`. - var retFuture = newFuture[void]("asyncdispatch.`and`") + var retFuture = newFuture[void]("chronos.`and`") proc cb(data: pointer) = if not retFuture.finished: if (fut1.failed or fut1.finished) and (fut2.failed or fut2.finished): @@ -454,7 +454,7 @@ proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = ## Returns a future which will complete once either ``fut1`` or ``fut2`` ## complete. - var retFuture = newFuture[void]("asyncdispatch.`or`") + var retFuture = newFuture[void]("chronos.`or`") proc cb(data: pointer) {.gcsafe.} = if not retFuture.finished: var fut = cast[FutureBase](data) @@ -487,7 +487,7 @@ proc all*[T](futs: varargs[Future[T]]): auto = var nfuts = @futs when T is void: - var retFuture = newFuture[void]("asyncdispatch.all(void)") + var retFuture = newFuture[void]("chronos.all(void)") for fut in nfuts: fut.addCallback proc (data: pointer) = inc(completedFutures) @@ -505,7 +505,7 @@ proc all*[T](futs: varargs[Future[T]]): auto = return retFuture else: - var retFuture = newFuture[seq[T]]("asyncdispatch.all(T)") + var retFuture = newFuture[seq[T]]("chronos.all(T)") var retValues = newSeq[T](totalFutures) for fut in nfuts: fut.addCallback proc (data: pointer) = @@ -525,3 +525,69 @@ proc all*[T](futs: varargs[Future[T]]): auto = retFuture.complete(retValues) return retFuture + +proc oneIndex*[T](futs: varargs[Future[T]]): Future[int] = + ## Returns a future which will complete once one of the futures in ``futs`` + ## complete. + ## + ## If the argument is empty, the returned future FAILS immediately. + ## + ## Returned future will hold index of completed/failed future in ``futs`` + ## argument. + var nfuts = @futs + var retFuture = newFuture[int]("chronos.oneIndex(T)") + + proc cb(data: pointer) {.gcsafe.} = + var res = -1 + if not retFuture.finished: + var rfut = cast[FutureBase](data) + for i in 0.. - #include - #include """, - pure, final.} + SendfileHeader* {.importc: "sf_hdtr", + header: """#include + #include + #include """, + pure, final.} = object proc osSendFile*(outfd, infd: cint, offset: uint, size: uint, hdtr: ptr SendfileHeader, sbytes: ptr uint, @@ -83,11 +83,11 @@ elif defined(freebsd) or defined(openbsd) or defined(netbsd) or elif defined(macosx): import posix, os type - SendfileHeader* = object {.importc: "sf_hdtr", - header: """#include - #include - #include """, - pure, final.} + SendfileHeader* {.importc: "sf_hdtr", + header: """#include + #include + #include """, + pure, final.} = object proc osSendFile*(fd, s: cint, offset: int, size: ptr int, hdtr: ptr SendfileHeader, diff --git a/tests/testfut.nim b/tests/testfut.nim index 1eb0b4c..50ebc65 100644 --- a/tests/testfut.nim +++ b/tests/testfut.nim @@ -537,6 +537,182 @@ suite "Future[T] behavior test suite": var fut = all(tseq) result = fut.finished + proc testOneIndexZero(): bool = + var tseq = newSeq[Future[int]]() + var fut = oneIndex(tseq) + result = fut.finished and fut.failed + + proc testOneValueZero(): bool = + var tseq = newSeq[Future[int]]() + var fut = oneValue(tseq) + result = fut.finished and fut.failed + + proc testOneIndexVarargs(): bool = + proc vlient1() {.async.} = + await sleepAsync(100.milliseconds) + + proc vlient2() {.async.} = + await sleepAsync(200.milliseconds) + + proc vlient3() {.async.} = + await sleepAsync(300.milliseconds) + + proc client1(): Future[int] {.async.} = + await sleepAsync(100.milliseconds) + result = 10 + + proc client2(): Future[int] {.async.} = + await sleepAsync(200.milliseconds) + result = 20 + + proc client3(): Future[int] {.async.} = + await sleepAsync(300.milliseconds) + result = 30 + + var res10 = waitFor(oneIndex(vlient1(), vlient2(), vlient3())) + var res11 = waitFor(oneIndex(vlient2(), vlient1(), vlient3())) + var res12 = waitFor(oneIndex(vlient3(), vlient2(), vlient1())) + if res10 != 0 or res11 != 1 or res12 != 2: + return + + var res20 = waitFor(oneIndex(client1(), client2(), client3())) + var res21 = waitFor(oneIndex(client2(), client1(), client3())) + var res22 = waitFor(oneIndex(client3(), client2(), client1())) + if res20 != 0 or res21 != 1 or res22 != 2: + return + + result = true + + proc testOneValueVarargs(): bool = + proc vlient1() {.async.} = + await sleepAsync(100.milliseconds) + + proc vlient2() {.async.} = + await sleepAsync(200.milliseconds) + + proc vlient3() {.async.} = + await sleepAsync(300.milliseconds) + + proc client1(): Future[int] {.async.} = + await sleepAsync(100.milliseconds) + result = 10 + + proc client2(): Future[int] {.async.} = + await sleepAsync(200.milliseconds) + result = 20 + + proc client3(): Future[int] {.async.} = + await sleepAsync(300.milliseconds) + result = 30 + + var v10 = vlient1() + var v11 = vlient2() + var v12 = vlient3() + var v20 = vlient2() + var v21 = vlient1() + var v22 = vlient3() + var v30 = vlient3() + var v31 = vlient2() + var v32 = vlient1() + + waitFor(oneValue(v10, v11, v12)) + waitFor(oneValue(v20, v21, v22)) + waitFor(oneValue(v30, v31, v32)) + + if (not v10.finished) or (not v21.finished) or (not v32.finished): + return + + var res30 = waitFor(oneValue(client1(), client2(), client3())) + var res31 = waitFor(oneValue(client2(), client1(), client3())) + var res32 = waitFor(oneValue(client3(), client2(), client1())) + if res30 != 10 or res31 != 10 or res32 != 10: + return + + result = true + + proc testOneIndexSeq(): bool = + proc vlient1() {.async.} = + await sleepAsync(100.milliseconds) + + proc vlient2() {.async.} = + await sleepAsync(200.milliseconds) + + proc vlient3() {.async.} = + await sleepAsync(300.milliseconds) + + proc client1(): Future[int] {.async.} = + await sleepAsync(100.milliseconds) + result = 10 + + proc client2(): Future[int] {.async.} = + await sleepAsync(200.milliseconds) + result = 20 + + proc client3(): Future[int] {.async.} = + await sleepAsync(300.milliseconds) + result = 30 + + var res10 = waitFor(oneIndex(@[vlient1(), vlient2(), vlient3()])) + var res11 = waitFor(oneIndex(@[vlient2(), vlient1(), vlient3()])) + var res12 = waitFor(oneIndex(@[vlient3(), vlient2(), vlient1()])) + if res10 != 0 or res11 != 1 or res12 != 2: + return + + var res20 = waitFor(oneIndex(@[client1(), client2(), client3()])) + var res21 = waitFor(oneIndex(@[client2(), client1(), client3()])) + var res22 = waitFor(oneIndex(@[client3(), client2(), client1()])) + if res20 != 0 or res21 != 1 or res22 != 2: + return + + result = true + + proc testOneValueSeq(): bool = + proc vlient1() {.async.} = + await sleepAsync(100.milliseconds) + + proc vlient2() {.async.} = + await sleepAsync(200.milliseconds) + + proc vlient3() {.async.} = + await sleepAsync(300.milliseconds) + + proc client1(): Future[int] {.async.} = + await sleepAsync(100.milliseconds) + result = 10 + + proc client2(): Future[int] {.async.} = + await sleepAsync(200.milliseconds) + result = 20 + + proc client3(): Future[int] {.async.} = + await sleepAsync(300.milliseconds) + result = 30 + + var v10 = vlient1() + var v11 = vlient2() + var v12 = vlient3() + var v20 = vlient2() + var v21 = vlient1() + var v22 = vlient3() + var v30 = vlient3() + var v31 = vlient2() + var v32 = vlient1() + + waitFor(oneValue(@[v10, v11, v12])) + waitFor(oneValue(@[v20, v21, v22])) + waitFor(oneValue(@[v30, v31, v32])) + + if (not v10.finished) or (not v21.finished) or (not v32.finished): + return + + var res30 = waitFor(oneValue(@[client1(), client2(), client3()])) + var res31 = waitFor(oneValue(@[client2(), client1(), client3()])) + var res32 = waitFor(oneValue(@[client3(), client2(), client1()])) + if res30 != 10 or res31 != 10 or res32 != 10: + return + + result = true + test "Async undefined behavior (#7758) test": check test1() == true test "Immediately completed asynchronous procedure test": @@ -555,3 +731,15 @@ suite "Future[T] behavior test suite": check testAllZero() == true test "asyncDiscard() test": check testAsyncDiscard() == 10 + test "oneIndex[T](zero) test": + check testOneIndexZero() == true + test "oneValue[T](zero) test": + check testOneValueZero() == true + test "oneIndex[T](varargs) test": + check testOneIndexVarargs() == true + test "oneValue[T](varargs) test": + check testOneValueVarargs() == true + test "oneIndex[T](seq) test": + check testOneIndexSeq() == true + test "oneValue[T](seq) test": + check testOneValueSeq() == true