diff --git a/chronos.nimble b/chronos.nimble index a3947d3b..a4638314 100644 --- a/chronos.nimble +++ b/chronos.nimble @@ -1,5 +1,5 @@ packageName = "chronos" -version = "2.2.0" +version = "2.2.1" author = "Status Research & Development GmbH" description = "Chronos" license = "Apache License 2.0 or MIT" @@ -25,8 +25,8 @@ task test, "Run all tests": for cmd in @[ "nim c -r -d:useSysAssert -d:useGcAssert tests/" & tfile, "nim c -r tests/" & tfile, - "nim c -r --gc:markAndSweep tests/" & tfile, - "nim c -r -d:release tests/" & tfile, + #"nim c -r --gc:markAndSweep tests/" & tfile, + "nim c -r -d:release tests/" & tfile ]: echo "\n" & cmd exec cmd diff --git a/chronos/asyncfutures2.nim b/chronos/asyncfutures2.nim index 73e0bd13..48887fff 100644 --- a/chronos/asyncfutures2.nim +++ b/chronos/asyncfutures2.nim @@ -46,10 +46,9 @@ type FutureError* = object of Exception cause*: FutureBase -{.deprecated: [PFutureBase: FutureBase, PFuture: Future].} - when not defined(release): - var currentID = 0 + var currentID* {.threadvar.}: int + currentID = 0 # ZAH: This seems unnecessary. Isn't it easy to introduce a seperate # module for the dispatcher type, so it can be directly referenced here? @@ -145,7 +144,7 @@ proc complete*[T](future: Future[T], val: T) = ## Completes ``future`` with value ``val``. #doAssert(not future.finished, "Future already finished, cannot finish twice.") checkFinished(future) - doAssert(future.error == nil) + doAssert(isNil(future.error)) future.value = val future.finished = true future.callbacks.call() @@ -154,7 +153,7 @@ proc complete*(future: Future[void]) = ## Completes a void ``future``. #doAssert(not future.finished, "Future already finished, cannot finish twice.") checkFinished(future) - doAssert(future.error == nil) + doAssert(isNil(future.error)) future.finished = true future.callbacks.call() @@ -162,7 +161,7 @@ proc complete*[T](future: FutureVar[T]) = ## Completes a ``FutureVar``. template fut: untyped = Future[T](future) checkFinished(fut) - doAssert(fut.error == nil) + doAssert(isNil(fut.error)) fut.finished = true fut.callbacks.call() @@ -172,7 +171,7 @@ proc complete*[T](future: FutureVar[T], val: T) = ## Any previously stored value will be overwritten. template fut: untyped = Future[T](future) checkFinished(fut) - doAssert(fut.error.isNil()) + doAssert(isNil(fut.error)) fut.finished = true fut.value = val fut.callbacks.call() @@ -198,7 +197,7 @@ proc addCallback*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) = ## Adds the callbacks proc to be called when the future completes. ## ## If future has already completed then ``cb`` will be called immediately. - doAssert cb != nil + doAssert(not isNil(cb)) if future.finished: # ZAH: it seems that the Future needs to know its associated Dispatcher callSoon(cb, udata) @@ -214,7 +213,7 @@ proc addCallback*[T](future: Future[T], cb: CallbackFunc) = proc removeCallback*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) = - doAssert cb != nil + doAssert(not isNil(cb)) let acb = AsyncCallback(function: cb, udata: udata) future.callbacks.remove acb @@ -258,7 +257,7 @@ proc `$`*(entries: seq[StackTraceEntry]): string = # Find longest filename & line number combo for alignment purposes. var longestLeft = 0 for entry in entries: - if entry.procName.isNil: continue + if isNil(entry.procName): continue let left = $entry.filename & $entry.line if left.len > longestLeft: @@ -267,7 +266,7 @@ proc `$`*(entries: seq[StackTraceEntry]): string = var indent = 2 # Format the entries. for entry in entries: - if entry.procName.isNil: + if isNil(entry.procName): if entry.line == -10: result.add(spaces(indent) & "#[\n") indent.inc(2) @@ -320,7 +319,7 @@ proc read*[T](future: Future[T] | FutureVar[T]): T = let fut = Future[T](future) {.pop.} if fut.finished: - if fut.error != nil: + if not isNil(fut.error): injectStacktrace(fut) raise fut.error when T isnot void: @@ -334,7 +333,7 @@ proc readError*[T](future: Future[T]): ref Exception = ## ## An ``ValueError`` exception will be thrown if no exception exists ## in the specified Future. - if future.error != nil: return future.error + if not isNil(future.error): return future.error else: raise newException(ValueError, "No error in future.") @@ -356,20 +355,23 @@ proc finished*(future: FutureBase | FutureVar): bool = proc failed*(future: FutureBase): bool = ## Determines whether ``future`` completed with an error. - return future.error != nil + return (not isNil(future.error)) proc asyncCheck*[T](future: Future[T]) = ## Sets a callback on ``future`` which raises an exception if the future ## finished with an error. ## ## This should be used instead of ``discard`` to discard void futures. - doAssert(not future.isNil, "Future is nil") + doAssert(not isNil(future), "Future is nil") proc cb(data: pointer) = if future.failed: injectStacktrace(future) raise future.error future.callback = cb +proc asyncDiscard*[T](future: Future[T]) = discard + ## This is async workaround for discard ``Future[T]``. + # ZAH: The return type here could be a Future[(T, Y)] proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = ## Returns a future which will complete once both ``fut1`` and ``fut2`` @@ -413,64 +415,60 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = fut2.callback = cb return retFuture -# ZAH: The return type here could be a tuple -# This will enable waiting a heterogenous collection of futures. proc all*[T](futs: varargs[Future[T]]): auto = - ## Returns a future which will complete once - ## all futures in ``futs`` complete. + ## Returns a future which will complete once all futures in ``futs`` complete. ## If the argument is empty, the returned future completes immediately. ## ## If the awaited futures are not ``Future[void]``, the returned future ## will hold the values of all awaited futures in a sequence. ## - ## If the awaited futures *are* ``Future[void]``, - ## this proc returns ``Future[void]``. + ## If the awaited futures *are* ``Future[void]``, this proc returns + ## ``Future[void]``. + ## + ## Note, that if one of the futures in ``futs`` will fail, result of ``all()`` + ## will also be failed with error from failed future. + let totalFutures = len(futs) + var completedFutures = 0 + + # Because we can't capture varargs[T] in closures we need to create copy. + var nfuts = @futs when T is void: - var - retFuture = newFuture[void]("asyncdispatch.all") - completedFutures = 0 - - let totalFutures = len(futs) - - for fut in futs: + var retFuture = newFuture[void]("asyncdispatch.all(void)") + for fut in nfuts: fut.addCallback proc (data: pointer) = - var fut = cast[FutureBase](data) inc(completedFutures) if not retFuture.finished: - if fut.failed: - retFuture.fail(fut.error) - else: - if completedFutures == totalFutures: + if completedFutures == totalFutures: + for nfut in nfuts: + if nfut.failed: + retFuture.fail(nfut.error) + break + if not retFuture.failed: retFuture.complete() - if totalFutures == 0: + if len(nfuts) == 0: retFuture.complete() return retFuture - else: - var - retFuture = newFuture[seq[T]]("asyncdispatch.all") - retValues = newSeq[T](len(futs)) - completedFutures = 0 + var retFuture = newFuture[seq[T]]("asyncdispatch.all(T)") + var retValues = newSeq[T](totalFutures) + for fut in nfuts: + fut.addCallback proc (data: pointer) = + inc(completedFutures) + if not retFuture.finished: + if completedFutures == totalFutures: + for k, nfut in nfuts: + if nfut.failed: + retFuture.fail(nfut.error) + break + else: + retValues[k] = nfut.read() + if not retFuture.failed: + retFuture.complete(retValues) - for i, fut in futs: - proc setCallback(i: int) = - fut.addCallback proc (data: pointer) = - var fut = cast[Future[T]](data) - inc(completedFutures) - if not retFuture.finished: - if fut.failed: - retFuture.fail(fut.error) - else: - retValues[i] = fut.read() - if completedFutures == len(retValues): - retFuture.complete(retValues) - - setCallback(i) - - if retValues.len == 0: + if len(nfuts) == 0: retFuture.complete(retValues) return retFuture diff --git a/tests/testfut.nim b/tests/testfut.nim index 7497a961..bafdbfe2 100644 --- a/tests/testfut.nim +++ b/tests/testfut.nim @@ -134,6 +134,404 @@ proc test4(): string = proc test5(): int = result = waitFor(testFuture4()) +proc testAllVarargs(): int = + var completedFutures = 0 + + proc vlient1() {.async.} = + await sleepAsync(100) + inc(completedFutures) + + proc vlient2() {.async.} = + await sleepAsync(200) + inc(completedFutures) + + proc vlient3() {.async.} = + await sleepAsync(300) + inc(completedFutures) + + proc vlient4() {.async.} = + await sleepAsync(400) + inc(completedFutures) + + proc vlient5() {.async.} = + await sleepAsync(500) + inc(completedFutures) + + proc vlient1f() {.async.} = + await sleepAsync(100) + inc(completedFutures) + if true: + raise newException(ValueError, "") + + proc vlient2f() {.async.} = + await sleepAsync(100) + inc(completedFutures) + if true: + raise newException(ValueError, "") + + proc vlient3f() {.async.} = + await sleepAsync(100) + inc(completedFutures) + if true: + raise newException(ValueError, "") + + proc vlient4f() {.async.} = + await sleepAsync(100) + inc(completedFutures) + if true: + raise newException(ValueError, "") + + proc vlient5f() {.async.} = + await sleepAsync(100) + inc(completedFutures) + if true: + raise newException(ValueError, "") + + proc client1(): Future[int] {.async.} = + await sleepAsync(100) + inc(completedFutures) + result = 1 + + proc client2(): Future[int] {.async.} = + await sleepAsync(200) + inc(completedFutures) + result = 1 + + proc client3(): Future[int] {.async.} = + await sleepAsync(300) + inc(completedFutures) + result = 1 + + proc client4(): Future[int] {.async.} = + await sleepAsync(400) + inc(completedFutures) + result = 1 + + proc client5(): Future[int] {.async.} = + await sleepAsync(500) + inc(completedFutures) + result = 1 + + proc client1f(): Future[int] {.async.} = + await sleepAsync(100) + inc(completedFutures) + if true: + raise newException(ValueError, "") + + proc client2f(): Future[int] {.async.} = + await sleepAsync(200) + inc(completedFutures) + if true: + raise newException(ValueError, "") + + proc client3f(): Future[int] {.async.} = + await sleepAsync(300) + inc(completedFutures) + if true: + raise newException(ValueError, "") + + proc client4f(): Future[int] {.async.} = + await sleepAsync(400) + inc(completedFutures) + if true: + raise newException(ValueError, "") + + proc client5f(): Future[int] {.async.} = + await sleepAsync(500) + inc(completedFutures) + if true: + raise newException(ValueError, "") + + waitFor(all(vlient1(), vlient2(), vlient3(), vlient4(), vlient5())) + # 5 completed futures = 5 + result += completedFutures + completedFutures = 0 + try: + waitFor(all(vlient1(), vlient1f(), + vlient2(), vlient2f(), + vlient3(), vlient3f(), + vlient4(), vlient4f(), + vlient5(), vlient5f())) + result -= 10000 + except: + discard + # 10 completed futures = 10 + result += completedFutures + + completedFutures = 0 + var res = waitFor(all(client1(), client2(), client3(), client4(), client5())) + for item in res: + result += item + # 5 completed futures + 5 values = 10 + result += completedFutures + + completedFutures = 0 + try: + var res = waitFor(all(client1(), client1f(), + client2(), client2f(), + client3(), client3f(), + client4(), client4f(), + client5(), client5f())) + result -= 10000 + except: + discard + # 10 completed futures = 10 + result += completedFutures + +proc testAllSeq(): int = + var completedFutures = 0 + var vfutures = newSeq[Future[void]]() + var nfutures = newSeq[Future[int]]() + + proc vlient1() {.async.} = + await sleepAsync(100) + inc(completedFutures) + + proc vlient2() {.async.} = + await sleepAsync(200) + inc(completedFutures) + + proc vlient3() {.async.} = + await sleepAsync(300) + inc(completedFutures) + + proc vlient4() {.async.} = + await sleepAsync(400) + inc(completedFutures) + + proc vlient5() {.async.} = + await sleepAsync(500) + inc(completedFutures) + + proc vlient1f() {.async.} = + await sleepAsync(100) + inc(completedFutures) + if true: + raise newException(ValueError, "") + + proc vlient2f() {.async.} = + await sleepAsync(100) + inc(completedFutures) + if true: + raise newException(ValueError, "") + + proc vlient3f() {.async.} = + await sleepAsync(100) + inc(completedFutures) + if true: + raise newException(ValueError, "") + + proc vlient4f() {.async.} = + await sleepAsync(100) + inc(completedFutures) + if true: + raise newException(ValueError, "") + + proc vlient5f() {.async.} = + await sleepAsync(100) + inc(completedFutures) + if true: + raise newException(ValueError, "") + + proc client1(): Future[int] {.async.} = + await sleepAsync(100) + inc(completedFutures) + result = 1 + + proc client2(): Future[int] {.async.} = + await sleepAsync(200) + inc(completedFutures) + result = 1 + + proc client3(): Future[int] {.async.} = + await sleepAsync(300) + inc(completedFutures) + result = 1 + + proc client4(): Future[int] {.async.} = + await sleepAsync(400) + inc(completedFutures) + result = 1 + + proc client5(): Future[int] {.async.} = + await sleepAsync(500) + inc(completedFutures) + result = 1 + + proc client1f(): Future[int] {.async.} = + await sleepAsync(100) + inc(completedFutures) + if true: + raise newException(ValueError, "") + + proc client2f(): Future[int] {.async.} = + await sleepAsync(200) + inc(completedFutures) + if true: + raise newException(ValueError, "") + + proc client3f(): Future[int] {.async.} = + await sleepAsync(300) + inc(completedFutures) + if true: + raise newException(ValueError, "") + + proc client4f(): Future[int] {.async.} = + await sleepAsync(400) + inc(completedFutures) + if true: + raise newException(ValueError, "") + + proc client5f(): Future[int] {.async.} = + await sleepAsync(500) + inc(completedFutures) + if true: + raise newException(ValueError, "") + + vfutures.setLen(0) + for i in 0..<10: + vfutures.add(vlient1()) + vfutures.add(vlient2()) + vfutures.add(vlient3()) + vfutures.add(vlient4()) + vfutures.add(vlient5()) + + waitFor(all(vfutures)) + # 5 * 10 completed futures = 50 + result += completedFutures + + completedFutures = 0 + vfutures.setLen(0) + for i in 0..<10: + vfutures.add(vlient1()) + vfutures.add(vlient1f()) + vfutures.add(vlient2()) + vfutures.add(vlient2f()) + vfutures.add(vlient3()) + vfutures.add(vlient3f()) + vfutures.add(vlient4()) + vfutures.add(vlient4f()) + vfutures.add(vlient5()) + vfutures.add(vlient5f()) + + try: + waitFor(all(vfutures)) + result -= 10000 + except: + discard + # 10 * 10 completed futures = 100 + result += completedFutures + + completedFutures = 0 + nfutures.setLen(0) + for i in 0..<10: + nfutures.add(client1()) + nfutures.add(client2()) + nfutures.add(client3()) + nfutures.add(client4()) + nfutures.add(client5()) + + var res = waitFor(all(nfutures)) + for i in 0..= 100) and (res <= 1000) test $TimersCount & " timers with 1000ms timeout": var res = waitFor(test(1000)) - check (res >= 1000) and (res <= 2000) + check (res >= 1000) and (res <= 5000)