Fixed all() implementation.

This commit is contained in:
cheatfate 2019-03-15 02:43:51 +02:00
parent 685665ad21
commit ea3fb9629a
No known key found for this signature in database
GPG Key ID: 46ADD633A7201F95
3 changed files with 57 additions and 59 deletions

View File

@ -1,5 +1,5 @@
packageName = "chronos" packageName = "chronos"
version = "2.2.0" version = "2.2.1"
author = "Status Research & Development GmbH" author = "Status Research & Development GmbH"
description = "Chronos" description = "Chronos"
license = "Apache License 2.0 or MIT" license = "Apache License 2.0 or MIT"
@ -25,8 +25,8 @@ task test, "Run all tests":
for cmd in @[ for cmd in @[
"nim c -r -d:useSysAssert -d:useGcAssert tests/" & tfile, "nim c -r -d:useSysAssert -d:useGcAssert tests/" & tfile,
"nim c -r tests/" & tfile, "nim c -r tests/" & tfile,
"nim c -r --gc:markAndSweep tests/" & tfile, #"nim c -r --gc:markAndSweep tests/" & tfile,
"nim c -r -d:release tests/" & tfile, "nim c -r -d:release tests/" & tfile
]: ]:
echo "\n" & cmd echo "\n" & cmd
exec cmd exec cmd

View File

@ -46,10 +46,9 @@ type
FutureError* = object of Exception FutureError* = object of Exception
cause*: FutureBase cause*: FutureBase
{.deprecated: [PFutureBase: FutureBase, PFuture: Future].}
when not defined(release): when not defined(release):
var currentID = 0 var currentID* {.threadvar.}: int
currentID = 0
# ZAH: This seems unnecessary. Isn't it easy to introduce a seperate # ZAH: This seems unnecessary. Isn't it easy to introduce a seperate
# module for the dispatcher type, so it can be directly referenced here? # module for the dispatcher type, so it can be directly referenced here?
@ -145,7 +144,7 @@ proc complete*[T](future: Future[T], val: T) =
## Completes ``future`` with value ``val``. ## Completes ``future`` with value ``val``.
#doAssert(not future.finished, "Future already finished, cannot finish twice.") #doAssert(not future.finished, "Future already finished, cannot finish twice.")
checkFinished(future) checkFinished(future)
doAssert(future.error == nil) doAssert(isNil(future.error))
future.value = val future.value = val
future.finished = true future.finished = true
future.callbacks.call() future.callbacks.call()
@ -154,7 +153,7 @@ proc complete*(future: Future[void]) =
## Completes a void ``future``. ## Completes a void ``future``.
#doAssert(not future.finished, "Future already finished, cannot finish twice.") #doAssert(not future.finished, "Future already finished, cannot finish twice.")
checkFinished(future) checkFinished(future)
doAssert(future.error == nil) doAssert(isNil(future.error))
future.finished = true future.finished = true
future.callbacks.call() future.callbacks.call()
@ -162,7 +161,7 @@ proc complete*[T](future: FutureVar[T]) =
## Completes a ``FutureVar``. ## Completes a ``FutureVar``.
template fut: untyped = Future[T](future) template fut: untyped = Future[T](future)
checkFinished(fut) checkFinished(fut)
doAssert(fut.error == nil) doAssert(isNil(fut.error))
fut.finished = true fut.finished = true
fut.callbacks.call() fut.callbacks.call()
@ -172,7 +171,7 @@ proc complete*[T](future: FutureVar[T], val: T) =
## Any previously stored value will be overwritten. ## Any previously stored value will be overwritten.
template fut: untyped = Future[T](future) template fut: untyped = Future[T](future)
checkFinished(fut) checkFinished(fut)
doAssert(fut.error.isNil()) doAssert(isNil(fut.error))
fut.finished = true fut.finished = true
fut.value = val fut.value = val
fut.callbacks.call() fut.callbacks.call()
@ -198,7 +197,7 @@ proc addCallback*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) =
## Adds the callbacks proc to be called when the future completes. ## Adds the callbacks proc to be called when the future completes.
## ##
## If future has already completed then ``cb`` will be called immediately. ## If future has already completed then ``cb`` will be called immediately.
doAssert cb != nil doAssert(not isNil(cb))
if future.finished: if future.finished:
# ZAH: it seems that the Future needs to know its associated Dispatcher # ZAH: it seems that the Future needs to know its associated Dispatcher
callSoon(cb, udata) callSoon(cb, udata)
@ -214,7 +213,7 @@ proc addCallback*[T](future: Future[T], cb: CallbackFunc) =
proc removeCallback*(future: FutureBase, cb: CallbackFunc, proc removeCallback*(future: FutureBase, cb: CallbackFunc,
udata: pointer = nil) = udata: pointer = nil) =
doAssert cb != nil doAssert(not isNil(cb))
let acb = AsyncCallback(function: cb, udata: udata) let acb = AsyncCallback(function: cb, udata: udata)
future.callbacks.remove acb future.callbacks.remove acb
@ -258,7 +257,7 @@ proc `$`*(entries: seq[StackTraceEntry]): string =
# Find longest filename & line number combo for alignment purposes. # Find longest filename & line number combo for alignment purposes.
var longestLeft = 0 var longestLeft = 0
for entry in entries: for entry in entries:
if entry.procName.isNil: continue if isNil(entry.procName): continue
let left = $entry.filename & $entry.line let left = $entry.filename & $entry.line
if left.len > longestLeft: if left.len > longestLeft:
@ -267,7 +266,7 @@ proc `$`*(entries: seq[StackTraceEntry]): string =
var indent = 2 var indent = 2
# Format the entries. # Format the entries.
for entry in entries: for entry in entries:
if entry.procName.isNil: if isNil(entry.procName):
if entry.line == -10: if entry.line == -10:
result.add(spaces(indent) & "#[\n") result.add(spaces(indent) & "#[\n")
indent.inc(2) indent.inc(2)
@ -320,7 +319,7 @@ proc read*[T](future: Future[T] | FutureVar[T]): T =
let fut = Future[T](future) let fut = Future[T](future)
{.pop.} {.pop.}
if fut.finished: if fut.finished:
if fut.error != nil: if not isNil(fut.error):
injectStacktrace(fut) injectStacktrace(fut)
raise fut.error raise fut.error
when T isnot void: when T isnot void:
@ -334,7 +333,7 @@ proc readError*[T](future: Future[T]): ref Exception =
## ##
## An ``ValueError`` exception will be thrown if no exception exists ## An ``ValueError`` exception will be thrown if no exception exists
## in the specified Future. ## in the specified Future.
if future.error != nil: return future.error if not isNil(future.error): return future.error
else: else:
raise newException(ValueError, "No error in future.") raise newException(ValueError, "No error in future.")
@ -356,20 +355,23 @@ proc finished*(future: FutureBase | FutureVar): bool =
proc failed*(future: FutureBase): bool = proc failed*(future: FutureBase): bool =
## Determines whether ``future`` completed with an error. ## Determines whether ``future`` completed with an error.
return future.error != nil return (not isNil(future.error))
proc asyncCheck*[T](future: Future[T]) = proc asyncCheck*[T](future: Future[T]) =
## Sets a callback on ``future`` which raises an exception if the future ## Sets a callback on ``future`` which raises an exception if the future
## finished with an error. ## finished with an error.
## ##
## This should be used instead of ``discard`` to discard void futures. ## This should be used instead of ``discard`` to discard void futures.
doAssert(not future.isNil, "Future is nil") doAssert(not isNil(future), "Future is nil")
proc cb(data: pointer) = proc cb(data: pointer) =
if future.failed: if future.failed:
injectStacktrace(future) injectStacktrace(future)
raise future.error raise future.error
future.callback = cb future.callback = cb
proc asyncDiscard*[T](future: Future[T]) = discard
## This is async workaround for discard ``Future[T]``.
# ZAH: The return type here could be a Future[(T, Y)] # ZAH: The return type here could be a Future[(T, Y)]
proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
## Returns a future which will complete once both ``fut1`` and ``fut2`` ## Returns a future which will complete once both ``fut1`` and ``fut2``
@ -413,64 +415,60 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
fut2.callback = cb fut2.callback = cb
return retFuture return retFuture
# ZAH: The return type here could be a tuple
# This will enable waiting a heterogenous collection of futures.
proc all*[T](futs: varargs[Future[T]]): auto = proc all*[T](futs: varargs[Future[T]]): auto =
## Returns a future which will complete once ## Returns a future which will complete once all futures in ``futs`` complete.
## all futures in ``futs`` complete.
## If the argument is empty, the returned future completes immediately. ## If the argument is empty, the returned future completes immediately.
## ##
## If the awaited futures are not ``Future[void]``, the returned future ## If the awaited futures are not ``Future[void]``, the returned future
## will hold the values of all awaited futures in a sequence. ## will hold the values of all awaited futures in a sequence.
## ##
## If the awaited futures *are* ``Future[void]``, ## If the awaited futures *are* ``Future[void]``, this proc returns
## this proc returns ``Future[void]``. ## ``Future[void]``.
##
## Note, that if one of the futures in ``futs`` will fail, result of ``all()``
## will also be failed with error from failed future.
let totalFutures = len(futs)
var completedFutures = 0
# Because we can't capture varargs[T] in closures we need to create copy.
var nfuts = @futs
when T is void: when T is void:
var var retFuture = newFuture[void]("asyncdispatch.all(void)")
retFuture = newFuture[void]("asyncdispatch.all") for fut in nfuts:
completedFutures = 0
let totalFutures = len(futs)
for fut in futs:
fut.addCallback proc (data: pointer) = fut.addCallback proc (data: pointer) =
var fut = cast[FutureBase](data)
inc(completedFutures) inc(completedFutures)
if not retFuture.finished: if not retFuture.finished:
if fut.failed: if completedFutures == totalFutures:
retFuture.fail(fut.error) for nfut in nfuts:
else: if nfut.failed:
if completedFutures == totalFutures: retFuture.fail(nfut.error)
break
if not retFuture.failed:
retFuture.complete() retFuture.complete()
if totalFutures == 0: if len(nfuts) == 0:
retFuture.complete() retFuture.complete()
return retFuture return retFuture
else: else:
var var retFuture = newFuture[seq[T]]("asyncdispatch.all(T)")
retFuture = newFuture[seq[T]]("asyncdispatch.all") var retValues = newSeq[T](totalFutures)
retValues = newSeq[T](len(futs)) for fut in nfuts:
completedFutures = 0 fut.addCallback proc (data: pointer) =
inc(completedFutures)
if not retFuture.finished:
if completedFutures == totalFutures:
for k, nfut in nfuts:
if nfut.failed:
retFuture.fail(nfut.error)
break
else:
retValues[k] = nfut.read()
if not retFuture.failed:
retFuture.complete(retValues)
for i, fut in futs: if len(nfuts) == 0:
proc setCallback(i: int) =
fut.addCallback proc (data: pointer) =
var fut = cast[Future[T]](data)
inc(completedFutures)
if not retFuture.finished:
if fut.failed:
retFuture.fail(fut.error)
else:
retValues[i] = fut.read()
if completedFutures == len(retValues):
retFuture.complete(retValues)
setCallback(i)
if retValues.len == 0:
retFuture.complete(retValues) retFuture.complete(retValues)
return retFuture return retFuture

View File

@ -49,4 +49,4 @@ when isMainModule:
check (res >= 100) and (res <= 1000) check (res >= 100) and (res <= 1000)
test $TimersCount & " timers with 1000ms timeout": test $TimersCount & " timers with 1000ms timeout":
var res = waitFor(test(1000)) var res = waitFor(test(1000))
check (res >= 1000) and (res <= 2000) check (res >= 1000) and (res <= 5000)