Add oneIndex(), oneValue() which are varargs `or` operation.
Fix some deprecation warnings for Nim devel. Add tests for oneIndex(), oneValue().
This commit is contained in:
parent
38dd4cb6e7
commit
c27c564d47
|
@ -437,7 +437,7 @@ proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
|
||||||
#
|
#
|
||||||
# We should investigate this further, before settling on the final design.
|
# We should investigate this further, before settling on the final design.
|
||||||
# The same reasoning applies to `or` and `all`.
|
# The same reasoning applies to `or` and `all`.
|
||||||
var retFuture = newFuture[void]("asyncdispatch.`and`")
|
var retFuture = newFuture[void]("chronos.`and`")
|
||||||
proc cb(data: pointer) =
|
proc cb(data: pointer) =
|
||||||
if not retFuture.finished:
|
if not retFuture.finished:
|
||||||
if (fut1.failed or fut1.finished) and (fut2.failed or fut2.finished):
|
if (fut1.failed or fut1.finished) and (fut2.failed or fut2.finished):
|
||||||
|
@ -454,7 +454,7 @@ proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
|
||||||
proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
|
proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
|
||||||
## Returns a future which will complete once either ``fut1`` or ``fut2``
|
## Returns a future which will complete once either ``fut1`` or ``fut2``
|
||||||
## complete.
|
## complete.
|
||||||
var retFuture = newFuture[void]("asyncdispatch.`or`")
|
var retFuture = newFuture[void]("chronos.`or`")
|
||||||
proc cb(data: pointer) {.gcsafe.} =
|
proc cb(data: pointer) {.gcsafe.} =
|
||||||
if not retFuture.finished:
|
if not retFuture.finished:
|
||||||
var fut = cast[FutureBase](data)
|
var fut = cast[FutureBase](data)
|
||||||
|
@ -487,7 +487,7 @@ proc all*[T](futs: varargs[Future[T]]): auto =
|
||||||
var nfuts = @futs
|
var nfuts = @futs
|
||||||
|
|
||||||
when T is void:
|
when T is void:
|
||||||
var retFuture = newFuture[void]("asyncdispatch.all(void)")
|
var retFuture = newFuture[void]("chronos.all(void)")
|
||||||
for fut in nfuts:
|
for fut in nfuts:
|
||||||
fut.addCallback proc (data: pointer) =
|
fut.addCallback proc (data: pointer) =
|
||||||
inc(completedFutures)
|
inc(completedFutures)
|
||||||
|
@ -505,7 +505,7 @@ proc all*[T](futs: varargs[Future[T]]): auto =
|
||||||
|
|
||||||
return retFuture
|
return retFuture
|
||||||
else:
|
else:
|
||||||
var retFuture = newFuture[seq[T]]("asyncdispatch.all(T)")
|
var retFuture = newFuture[seq[T]]("chronos.all(T)")
|
||||||
var retValues = newSeq[T](totalFutures)
|
var retValues = newSeq[T](totalFutures)
|
||||||
for fut in nfuts:
|
for fut in nfuts:
|
||||||
fut.addCallback proc (data: pointer) =
|
fut.addCallback proc (data: pointer) =
|
||||||
|
@ -525,3 +525,69 @@ proc all*[T](futs: varargs[Future[T]]): auto =
|
||||||
retFuture.complete(retValues)
|
retFuture.complete(retValues)
|
||||||
|
|
||||||
return retFuture
|
return retFuture
|
||||||
|
|
||||||
|
proc oneIndex*[T](futs: varargs[Future[T]]): Future[int] =
|
||||||
|
## Returns a future which will complete once one of the futures in ``futs``
|
||||||
|
## complete.
|
||||||
|
##
|
||||||
|
## If the argument is empty, the returned future FAILS immediately.
|
||||||
|
##
|
||||||
|
## Returned future will hold index of completed/failed future in ``futs``
|
||||||
|
## argument.
|
||||||
|
var nfuts = @futs
|
||||||
|
var retFuture = newFuture[int]("chronos.oneIndex(T)")
|
||||||
|
|
||||||
|
proc cb(data: pointer) {.gcsafe.} =
|
||||||
|
var res = -1
|
||||||
|
if not retFuture.finished:
|
||||||
|
var rfut = cast[FutureBase](data)
|
||||||
|
for i in 0..<len(nfuts):
|
||||||
|
if cast[FutureBase](nfuts[i]) != rfut:
|
||||||
|
nfuts[i].removeCallback(cb)
|
||||||
|
else:
|
||||||
|
res = i
|
||||||
|
retFuture.complete(res)
|
||||||
|
|
||||||
|
for fut in nfuts:
|
||||||
|
fut.addCallback(cb)
|
||||||
|
|
||||||
|
if len(nfuts) == 0:
|
||||||
|
retFuture.fail(newException(ValueError, "Empty Future[T] list"))
|
||||||
|
|
||||||
|
return retFuture
|
||||||
|
|
||||||
|
proc oneValue*[T](futs: varargs[Future[T]]): Future[T] =
|
||||||
|
## Returns a future which will complete once one of the futures in ``futs``
|
||||||
|
## complete.
|
||||||
|
##
|
||||||
|
## If the argument is empty, returned future FAILS immediately.
|
||||||
|
##
|
||||||
|
## Returned future will hold value of completed ``futs`` future, or error
|
||||||
|
## if future was failed.
|
||||||
|
var nfuts = @futs
|
||||||
|
var retFuture = newFuture[T]("chronos.oneValue(T)")
|
||||||
|
|
||||||
|
proc cb(data: pointer) {.gcsafe.} =
|
||||||
|
var resFut: Future[T]
|
||||||
|
if not retFuture.finished:
|
||||||
|
var rfut = cast[FutureBase](data)
|
||||||
|
for i in 0..<len(nfuts):
|
||||||
|
if cast[FutureBase](nfuts[i]) != rfut:
|
||||||
|
nfuts[i].removeCallback(cb)
|
||||||
|
else:
|
||||||
|
resFut = nfuts[i]
|
||||||
|
if resFut.failed:
|
||||||
|
retFuture.fail(resFut.error)
|
||||||
|
else:
|
||||||
|
when T is void:
|
||||||
|
retFuture.complete()
|
||||||
|
else:
|
||||||
|
retFuture.complete(resFut.read())
|
||||||
|
|
||||||
|
for fut in nfuts:
|
||||||
|
fut.addCallback(cb)
|
||||||
|
|
||||||
|
if len(nfuts) == 0:
|
||||||
|
retFuture.fail(newException(ValueError, "Empty Future[T] list"))
|
||||||
|
|
||||||
|
return retFuture
|
||||||
|
|
|
@ -308,8 +308,18 @@ when defined(windows) or defined(nimdoc):
|
||||||
## Creates a new Dispatcher instance.
|
## Creates a new Dispatcher instance.
|
||||||
new result
|
new result
|
||||||
result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
|
result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
|
||||||
|
when compiles(initHashSet):
|
||||||
|
# After 0.20.0 Nim's stdlib version
|
||||||
|
result.handles = initHashSet[AsyncFD]()
|
||||||
|
else:
|
||||||
|
# Pre 0.20.0 Nim's stdlib version
|
||||||
result.handles = initSet[AsyncFD]()
|
result.handles = initSet[AsyncFD]()
|
||||||
result.timers.newHeapQueue()
|
when compiles(initHeapQueue):
|
||||||
|
# After 0.20.0 Nim's stdlib version
|
||||||
|
result.timers = initHeapQueue[TimerCallback]()
|
||||||
|
else:
|
||||||
|
# Pre 0.20.0 Nim's stdlib version
|
||||||
|
result.timers = newHeapQueue[TimerCallback]()
|
||||||
result.callbacks = initDeque[AsyncCallback](64)
|
result.callbacks = initDeque[AsyncCallback](64)
|
||||||
result.trackers = initTable[string, TrackerBase]()
|
result.trackers = initTable[string, TrackerBase]()
|
||||||
|
|
||||||
|
|
|
@ -52,11 +52,11 @@ elif defined(freebsd) or defined(openbsd) or defined(netbsd) or
|
||||||
defined(dragonflybsd):
|
defined(dragonflybsd):
|
||||||
import posix, os
|
import posix, os
|
||||||
type
|
type
|
||||||
SendfileHeader* = object {.importc: "sf_hdtr",
|
SendfileHeader* {.importc: "sf_hdtr",
|
||||||
header: """#include <sys/types.h>
|
header: """#include <sys/types.h>
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#include <sys/uio.h>""",
|
#include <sys/uio.h>""",
|
||||||
pure, final.}
|
pure, final.} = object
|
||||||
|
|
||||||
proc osSendFile*(outfd, infd: cint, offset: uint, size: uint,
|
proc osSendFile*(outfd, infd: cint, offset: uint, size: uint,
|
||||||
hdtr: ptr SendfileHeader, sbytes: ptr uint,
|
hdtr: ptr SendfileHeader, sbytes: ptr uint,
|
||||||
|
@ -83,11 +83,11 @@ elif defined(freebsd) or defined(openbsd) or defined(netbsd) or
|
||||||
elif defined(macosx):
|
elif defined(macosx):
|
||||||
import posix, os
|
import posix, os
|
||||||
type
|
type
|
||||||
SendfileHeader* = object {.importc: "sf_hdtr",
|
SendfileHeader* {.importc: "sf_hdtr",
|
||||||
header: """#include <sys/types.h>
|
header: """#include <sys/types.h>
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#include <sys/uio.h>""",
|
#include <sys/uio.h>""",
|
||||||
pure, final.}
|
pure, final.} = object
|
||||||
|
|
||||||
proc osSendFile*(fd, s: cint, offset: int, size: ptr int,
|
proc osSendFile*(fd, s: cint, offset: int, size: ptr int,
|
||||||
hdtr: ptr SendfileHeader,
|
hdtr: ptr SendfileHeader,
|
||||||
|
|
|
@ -537,6 +537,182 @@ suite "Future[T] behavior test suite":
|
||||||
var fut = all(tseq)
|
var fut = all(tseq)
|
||||||
result = fut.finished
|
result = fut.finished
|
||||||
|
|
||||||
|
proc testOneIndexZero(): bool =
|
||||||
|
var tseq = newSeq[Future[int]]()
|
||||||
|
var fut = oneIndex(tseq)
|
||||||
|
result = fut.finished and fut.failed
|
||||||
|
|
||||||
|
proc testOneValueZero(): bool =
|
||||||
|
var tseq = newSeq[Future[int]]()
|
||||||
|
var fut = oneValue(tseq)
|
||||||
|
result = fut.finished and fut.failed
|
||||||
|
|
||||||
|
proc testOneIndexVarargs(): bool =
|
||||||
|
proc vlient1() {.async.} =
|
||||||
|
await sleepAsync(100.milliseconds)
|
||||||
|
|
||||||
|
proc vlient2() {.async.} =
|
||||||
|
await sleepAsync(200.milliseconds)
|
||||||
|
|
||||||
|
proc vlient3() {.async.} =
|
||||||
|
await sleepAsync(300.milliseconds)
|
||||||
|
|
||||||
|
proc client1(): Future[int] {.async.} =
|
||||||
|
await sleepAsync(100.milliseconds)
|
||||||
|
result = 10
|
||||||
|
|
||||||
|
proc client2(): Future[int] {.async.} =
|
||||||
|
await sleepAsync(200.milliseconds)
|
||||||
|
result = 20
|
||||||
|
|
||||||
|
proc client3(): Future[int] {.async.} =
|
||||||
|
await sleepAsync(300.milliseconds)
|
||||||
|
result = 30
|
||||||
|
|
||||||
|
var res10 = waitFor(oneIndex(vlient1(), vlient2(), vlient3()))
|
||||||
|
var res11 = waitFor(oneIndex(vlient2(), vlient1(), vlient3()))
|
||||||
|
var res12 = waitFor(oneIndex(vlient3(), vlient2(), vlient1()))
|
||||||
|
if res10 != 0 or res11 != 1 or res12 != 2:
|
||||||
|
return
|
||||||
|
|
||||||
|
var res20 = waitFor(oneIndex(client1(), client2(), client3()))
|
||||||
|
var res21 = waitFor(oneIndex(client2(), client1(), client3()))
|
||||||
|
var res22 = waitFor(oneIndex(client3(), client2(), client1()))
|
||||||
|
if res20 != 0 or res21 != 1 or res22 != 2:
|
||||||
|
return
|
||||||
|
|
||||||
|
result = true
|
||||||
|
|
||||||
|
proc testOneValueVarargs(): bool =
|
||||||
|
proc vlient1() {.async.} =
|
||||||
|
await sleepAsync(100.milliseconds)
|
||||||
|
|
||||||
|
proc vlient2() {.async.} =
|
||||||
|
await sleepAsync(200.milliseconds)
|
||||||
|
|
||||||
|
proc vlient3() {.async.} =
|
||||||
|
await sleepAsync(300.milliseconds)
|
||||||
|
|
||||||
|
proc client1(): Future[int] {.async.} =
|
||||||
|
await sleepAsync(100.milliseconds)
|
||||||
|
result = 10
|
||||||
|
|
||||||
|
proc client2(): Future[int] {.async.} =
|
||||||
|
await sleepAsync(200.milliseconds)
|
||||||
|
result = 20
|
||||||
|
|
||||||
|
proc client3(): Future[int] {.async.} =
|
||||||
|
await sleepAsync(300.milliseconds)
|
||||||
|
result = 30
|
||||||
|
|
||||||
|
var v10 = vlient1()
|
||||||
|
var v11 = vlient2()
|
||||||
|
var v12 = vlient3()
|
||||||
|
var v20 = vlient2()
|
||||||
|
var v21 = vlient1()
|
||||||
|
var v22 = vlient3()
|
||||||
|
var v30 = vlient3()
|
||||||
|
var v31 = vlient2()
|
||||||
|
var v32 = vlient1()
|
||||||
|
|
||||||
|
waitFor(oneValue(v10, v11, v12))
|
||||||
|
waitFor(oneValue(v20, v21, v22))
|
||||||
|
waitFor(oneValue(v30, v31, v32))
|
||||||
|
|
||||||
|
if (not v10.finished) or (not v21.finished) or (not v32.finished):
|
||||||
|
return
|
||||||
|
|
||||||
|
var res30 = waitFor(oneValue(client1(), client2(), client3()))
|
||||||
|
var res31 = waitFor(oneValue(client2(), client1(), client3()))
|
||||||
|
var res32 = waitFor(oneValue(client3(), client2(), client1()))
|
||||||
|
if res30 != 10 or res31 != 10 or res32 != 10:
|
||||||
|
return
|
||||||
|
|
||||||
|
result = true
|
||||||
|
|
||||||
|
proc testOneIndexSeq(): bool =
|
||||||
|
proc vlient1() {.async.} =
|
||||||
|
await sleepAsync(100.milliseconds)
|
||||||
|
|
||||||
|
proc vlient2() {.async.} =
|
||||||
|
await sleepAsync(200.milliseconds)
|
||||||
|
|
||||||
|
proc vlient3() {.async.} =
|
||||||
|
await sleepAsync(300.milliseconds)
|
||||||
|
|
||||||
|
proc client1(): Future[int] {.async.} =
|
||||||
|
await sleepAsync(100.milliseconds)
|
||||||
|
result = 10
|
||||||
|
|
||||||
|
proc client2(): Future[int] {.async.} =
|
||||||
|
await sleepAsync(200.milliseconds)
|
||||||
|
result = 20
|
||||||
|
|
||||||
|
proc client3(): Future[int] {.async.} =
|
||||||
|
await sleepAsync(300.milliseconds)
|
||||||
|
result = 30
|
||||||
|
|
||||||
|
var res10 = waitFor(oneIndex(@[vlient1(), vlient2(), vlient3()]))
|
||||||
|
var res11 = waitFor(oneIndex(@[vlient2(), vlient1(), vlient3()]))
|
||||||
|
var res12 = waitFor(oneIndex(@[vlient3(), vlient2(), vlient1()]))
|
||||||
|
if res10 != 0 or res11 != 1 or res12 != 2:
|
||||||
|
return
|
||||||
|
|
||||||
|
var res20 = waitFor(oneIndex(@[client1(), client2(), client3()]))
|
||||||
|
var res21 = waitFor(oneIndex(@[client2(), client1(), client3()]))
|
||||||
|
var res22 = waitFor(oneIndex(@[client3(), client2(), client1()]))
|
||||||
|
if res20 != 0 or res21 != 1 or res22 != 2:
|
||||||
|
return
|
||||||
|
|
||||||
|
result = true
|
||||||
|
|
||||||
|
proc testOneValueSeq(): bool =
|
||||||
|
proc vlient1() {.async.} =
|
||||||
|
await sleepAsync(100.milliseconds)
|
||||||
|
|
||||||
|
proc vlient2() {.async.} =
|
||||||
|
await sleepAsync(200.milliseconds)
|
||||||
|
|
||||||
|
proc vlient3() {.async.} =
|
||||||
|
await sleepAsync(300.milliseconds)
|
||||||
|
|
||||||
|
proc client1(): Future[int] {.async.} =
|
||||||
|
await sleepAsync(100.milliseconds)
|
||||||
|
result = 10
|
||||||
|
|
||||||
|
proc client2(): Future[int] {.async.} =
|
||||||
|
await sleepAsync(200.milliseconds)
|
||||||
|
result = 20
|
||||||
|
|
||||||
|
proc client3(): Future[int] {.async.} =
|
||||||
|
await sleepAsync(300.milliseconds)
|
||||||
|
result = 30
|
||||||
|
|
||||||
|
var v10 = vlient1()
|
||||||
|
var v11 = vlient2()
|
||||||
|
var v12 = vlient3()
|
||||||
|
var v20 = vlient2()
|
||||||
|
var v21 = vlient1()
|
||||||
|
var v22 = vlient3()
|
||||||
|
var v30 = vlient3()
|
||||||
|
var v31 = vlient2()
|
||||||
|
var v32 = vlient1()
|
||||||
|
|
||||||
|
waitFor(oneValue(@[v10, v11, v12]))
|
||||||
|
waitFor(oneValue(@[v20, v21, v22]))
|
||||||
|
waitFor(oneValue(@[v30, v31, v32]))
|
||||||
|
|
||||||
|
if (not v10.finished) or (not v21.finished) or (not v32.finished):
|
||||||
|
return
|
||||||
|
|
||||||
|
var res30 = waitFor(oneValue(@[client1(), client2(), client3()]))
|
||||||
|
var res31 = waitFor(oneValue(@[client2(), client1(), client3()]))
|
||||||
|
var res32 = waitFor(oneValue(@[client3(), client2(), client1()]))
|
||||||
|
if res30 != 10 or res31 != 10 or res32 != 10:
|
||||||
|
return
|
||||||
|
|
||||||
|
result = true
|
||||||
|
|
||||||
test "Async undefined behavior (#7758) test":
|
test "Async undefined behavior (#7758) test":
|
||||||
check test1() == true
|
check test1() == true
|
||||||
test "Immediately completed asynchronous procedure test":
|
test "Immediately completed asynchronous procedure test":
|
||||||
|
@ -555,3 +731,15 @@ suite "Future[T] behavior test suite":
|
||||||
check testAllZero() == true
|
check testAllZero() == true
|
||||||
test "asyncDiscard() test":
|
test "asyncDiscard() test":
|
||||||
check testAsyncDiscard() == 10
|
check testAsyncDiscard() == 10
|
||||||
|
test "oneIndex[T](zero) test":
|
||||||
|
check testOneIndexZero() == true
|
||||||
|
test "oneValue[T](zero) test":
|
||||||
|
check testOneValueZero() == true
|
||||||
|
test "oneIndex[T](varargs) test":
|
||||||
|
check testOneIndexVarargs() == true
|
||||||
|
test "oneValue[T](varargs) test":
|
||||||
|
check testOneValueVarargs() == true
|
||||||
|
test "oneIndex[T](seq) test":
|
||||||
|
check testOneIndexSeq() == true
|
||||||
|
test "oneValue[T](seq) test":
|
||||||
|
check testOneValueSeq() == true
|
||||||
|
|
Loading…
Reference in New Issue