Future cleanup (#393)
* FutureState.Finished -> FutureState.Completed (to avoid name clash with `proc finished` which means not-pending) * deprecate `done` - to avoid additional confusion over completed vs finished * remove ad leftovers in stack trace formatting * avoid some generic bloat * avoid unnecessary allocations in `race`/`one`
This commit is contained in:
parent
2fa6df0880
commit
b65b85533a
|
@ -2,7 +2,7 @@
|
|||
# Chronos
|
||||
#
|
||||
# (c) Copyright 2015 Dominik Picheta
|
||||
# (c) Copyright 2018-2021 Status Research & Development GmbH
|
||||
# (c) Copyright 2018-2023 Status Research & Development GmbH
|
||||
#
|
||||
# Licensed under either of
|
||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||
|
@ -13,23 +13,26 @@ import stew/base10
|
|||
import "."/srcloc
|
||||
export srcloc
|
||||
|
||||
when defined(nimHasStacktracesModule):
|
||||
import system/stacktraces
|
||||
else:
|
||||
const
|
||||
reraisedFromBegin = -10
|
||||
reraisedFromEnd = -100
|
||||
when chronosStackTrace:
|
||||
when defined(nimHasStacktracesModule):
|
||||
import system/stacktraces
|
||||
else:
|
||||
const
|
||||
reraisedFromBegin = -10
|
||||
reraisedFromEnd = -100
|
||||
|
||||
type StackTrace = string
|
||||
|
||||
const
|
||||
LocCreateIndex* = 0
|
||||
LocCompleteIndex* = 1
|
||||
LocFinishIndex* = 1
|
||||
|
||||
when chronosStackTrace:
|
||||
type StackTrace = string
|
||||
template LocCompleteIndex*: untyped {.deprecated: "LocFinishIndex".} =
|
||||
LocFinishIndex
|
||||
|
||||
type
|
||||
FutureState* {.pure.} = enum
|
||||
Pending, Finished, Cancelled, Failed
|
||||
Pending, Completed, Cancelled, Failed
|
||||
|
||||
FutureBase* = ref object of RootObj ## Untyped future.
|
||||
location*: array[2, ptr SrcLoc]
|
||||
|
@ -39,7 +42,9 @@ type
|
|||
state*: FutureState
|
||||
error*: ref CatchableError ## Stored exception
|
||||
mustCancel*: bool
|
||||
id*: uint
|
||||
|
||||
when chronosFutureId:
|
||||
id*: uint
|
||||
|
||||
when chronosStackTrace:
|
||||
errorStackTrace*: StackTrace
|
||||
|
@ -55,10 +60,15 @@ type
|
|||
# Obviously, it will still be allocated on the heap when necessary.
|
||||
Future*[T] = ref object of FutureBase ## Typed future.
|
||||
when chronosStrictException:
|
||||
closure*: iterator(f: Future[T]): FutureBase {.raises: [Defect, CatchableError], gcsafe.}
|
||||
when (NimMajor, NimMinor) < (1, 4):
|
||||
closure*: iterator(f: Future[T]): FutureBase {.raises: [Defect, CatchableError], gcsafe.}
|
||||
else:
|
||||
closure*: iterator(f: Future[T]): FutureBase {.raises: [CatchableError], gcsafe.}
|
||||
else:
|
||||
closure*: iterator(f: Future[T]): FutureBase {.raises: [Defect, CatchableError, Exception], gcsafe.}
|
||||
value: T ## Stored value
|
||||
closure*: iterator(f: Future[T]): FutureBase {.raises: [Exception], gcsafe.}
|
||||
|
||||
when T isnot void:
|
||||
value*: T ## Stored value
|
||||
|
||||
FutureStr*[T] = ref object of Future[T]
|
||||
## Future to hold GC strings
|
||||
|
@ -80,6 +90,10 @@ type
|
|||
tail*: FutureBase
|
||||
count*: uint
|
||||
|
||||
# Backwards compatibility for old FutureState name
|
||||
template Finished* {.deprecated: "Use Completed instead".} = Completed
|
||||
template Finished*(T: type FutureState): FutureState {.deprecated: "Use FutureState.Completed instead".} = FutureState.Completed
|
||||
|
||||
when chronosFutureId:
|
||||
var currentID* {.threadvar.}: uint
|
||||
else:
|
||||
|
@ -88,7 +102,6 @@ else:
|
|||
|
||||
when chronosFutureTracking:
|
||||
var futureList* {.threadvar.}: FutureList
|
||||
futureList = FutureList()
|
||||
|
||||
template setupFutureBase(loc: ptr SrcLoc) =
|
||||
new(result)
|
||||
|
@ -143,30 +156,30 @@ template newFutureStr*[T](fromProc: static[string] = ""): FutureStr[T] =
|
|||
newFutureStrImpl[T](getSrcLocation(fromProc))
|
||||
|
||||
proc finished*(future: FutureBase): bool {.inline.} =
|
||||
## Determines whether ``future`` has completed, i.e. ``future`` state changed
|
||||
## Determines whether ``future`` has finished, i.e. ``future`` state changed
|
||||
## from state ``Pending`` to one of the states (``Finished``, ``Cancelled``,
|
||||
## ``Failed``).
|
||||
result = (future.state != FutureState.Pending)
|
||||
(future.state != FutureState.Pending)
|
||||
|
||||
proc cancelled*(future: FutureBase): bool {.inline.} =
|
||||
## Determines whether ``future`` has cancelled.
|
||||
(future.state == FutureState.Cancelled)
|
||||
|
||||
proc failed*(future: FutureBase): bool {.inline.} =
|
||||
## Determines whether ``future`` completed with an error.
|
||||
## Determines whether ``future`` finished with an error.
|
||||
(future.state == FutureState.Failed)
|
||||
|
||||
proc completed*(future: FutureBase): bool {.inline.} =
|
||||
## Determines whether ``future`` completed without an error.
|
||||
(future.state == FutureState.Finished)
|
||||
## Determines whether ``future`` finished with a value.
|
||||
(future.state == FutureState.Completed)
|
||||
|
||||
proc done*(future: FutureBase): bool {.inline.} =
|
||||
proc done*(future: FutureBase): bool {.deprecated: "Use `completed` instead".} =
|
||||
## This is an alias for ``completed(future)`` procedure.
|
||||
completed(future)
|
||||
|
||||
when chronosFutureTracking:
|
||||
proc futureDestructor(udata: pointer) =
|
||||
## This procedure will be called when Future[T] got finished, cancelled or
|
||||
## This procedure will be called when Future[T] got completed, cancelled or
|
||||
## failed and all Future[T].callbacks are already scheduled and processed.
|
||||
let future = cast[FutureBase](udata)
|
||||
if future == futureList.tail: futureList.tail = future.prev
|
||||
|
@ -189,7 +202,7 @@ proc checkFinished(future: FutureBase, loc: ptr SrcLoc) =
|
|||
msg.add("\n Creation location:")
|
||||
msg.add("\n " & $future.location[LocCreateIndex])
|
||||
msg.add("\n First completion location:")
|
||||
msg.add("\n " & $future.location[LocCompleteIndex])
|
||||
msg.add("\n " & $future.location[LocFinishIndex])
|
||||
msg.add("\n Second completion location:")
|
||||
msg.add("\n " & $loc)
|
||||
when chronosStackTrace:
|
||||
|
@ -202,7 +215,7 @@ proc checkFinished(future: FutureBase, loc: ptr SrcLoc) =
|
|||
err.cause = future
|
||||
raise err
|
||||
else:
|
||||
future.location[LocCompleteIndex] = loc
|
||||
future.location[LocFinishIndex] = loc
|
||||
|
||||
proc finish(fut: FutureBase, state: FutureState) =
|
||||
# We do not perform any checks here, because:
|
||||
|
@ -224,7 +237,7 @@ proc complete[T](future: Future[T], val: T, loc: ptr SrcLoc) =
|
|||
checkFinished(FutureBase(future), loc)
|
||||
doAssert(isNil(future.error))
|
||||
future.value = val
|
||||
future.finish(FutureState.Finished)
|
||||
future.finish(FutureState.Completed)
|
||||
|
||||
template complete*[T](future: Future[T], val: T) =
|
||||
## Completes ``future`` with value ``val``.
|
||||
|
@ -234,13 +247,13 @@ proc complete(future: Future[void], loc: ptr SrcLoc) =
|
|||
if not(future.cancelled()):
|
||||
checkFinished(FutureBase(future), loc)
|
||||
doAssert(isNil(future.error))
|
||||
future.finish(FutureState.Finished)
|
||||
future.finish(FutureState.Completed)
|
||||
|
||||
template complete*(future: Future[void]) =
|
||||
## Completes a void ``future``.
|
||||
complete(future, getSrcLocation())
|
||||
|
||||
proc fail[T](future: Future[T], error: ref CatchableError, loc: ptr SrcLoc) =
|
||||
proc fail(future: FutureBase, error: ref CatchableError, loc: ptr SrcLoc) =
|
||||
if not(future.cancelled()):
|
||||
checkFinished(FutureBase(future), loc)
|
||||
future.error = error
|
||||
|
@ -251,7 +264,7 @@ proc fail[T](future: Future[T], error: ref CatchableError, loc: ptr SrcLoc) =
|
|||
getStackTrace(error)
|
||||
future.finish(FutureState.Failed)
|
||||
|
||||
template fail*[T](future: Future[T], error: ref CatchableError) =
|
||||
template fail*(future: FutureBase, error: ref CatchableError) =
|
||||
## Completes ``future`` with ``error``.
|
||||
fail(future, error, getSrcLocation())
|
||||
|
||||
|
@ -266,7 +279,7 @@ proc cancelAndSchedule(future: FutureBase, loc: ptr SrcLoc) =
|
|||
future.errorStackTrace = getStackTrace()
|
||||
future.finish(FutureState.Cancelled)
|
||||
|
||||
template cancelAndSchedule*[T](future: Future[T]) =
|
||||
template cancelAndSchedule*(future: FutureBase) =
|
||||
cancelAndSchedule(FutureBase(future), getSrcLocation())
|
||||
|
||||
proc cancel(future: FutureBase, loc: ptr SrcLoc): bool =
|
||||
|
@ -303,14 +316,10 @@ template cancel*(future: FutureBase) =
|
|||
## Cancel ``future``.
|
||||
discard cancel(future, getSrcLocation())
|
||||
|
||||
template cancel*[T](future: Future[T]) =
|
||||
## Cancel ``future``.
|
||||
discard cancel(FutureBase(future), getSrcLocation())
|
||||
|
||||
proc clearCallbacks(future: FutureBase) =
|
||||
future.callbacks = default(seq[AsyncCallback])
|
||||
|
||||
proc addCallback*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) =
|
||||
proc addCallback*(future: FutureBase, cb: CallbackFunc, udata: pointer) =
|
||||
## Adds the callbacks proc to be called when the future completes.
|
||||
##
|
||||
## If future has already completed then ``cb`` will be called immediately.
|
||||
|
@ -321,14 +330,14 @@ proc addCallback*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) =
|
|||
let acb = AsyncCallback(function: cb, udata: udata)
|
||||
future.callbacks.add acb
|
||||
|
||||
proc addCallback*[T](future: Future[T], cb: CallbackFunc) =
|
||||
proc addCallback*(future: FutureBase, cb: CallbackFunc) =
|
||||
## Adds the callbacks proc to be called when the future completes.
|
||||
##
|
||||
## If future has already completed then ``cb`` will be called immediately.
|
||||
future.addCallback(cb, cast[pointer](future))
|
||||
|
||||
proc removeCallback*(future: FutureBase, cb: CallbackFunc,
|
||||
udata: pointer = nil) =
|
||||
udata: pointer) =
|
||||
## Remove future from list of callbacks - this operation may be slow if there
|
||||
## are many registered callbacks!
|
||||
doAssert(not isNil(cb))
|
||||
|
@ -337,10 +346,10 @@ proc removeCallback*(future: FutureBase, cb: CallbackFunc,
|
|||
future.callbacks.keepItIf:
|
||||
it.function != cb or it.udata != udata
|
||||
|
||||
proc removeCallback*[T](future: Future[T], cb: CallbackFunc) =
|
||||
proc removeCallback*(future: FutureBase, cb: CallbackFunc) =
|
||||
future.removeCallback(cb, cast[pointer](future))
|
||||
|
||||
proc `callback=`*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) =
|
||||
proc `callback=`*(future: FutureBase, cb: CallbackFunc, udata: pointer) =
|
||||
## Clears the list of callbacks and sets the callback proc to be called when
|
||||
## the future completes.
|
||||
##
|
||||
|
@ -351,13 +360,13 @@ proc `callback=`*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) =
|
|||
future.clearCallbacks
|
||||
future.addCallback(cb, udata)
|
||||
|
||||
proc `callback=`*[T](future: Future[T], cb: CallbackFunc) =
|
||||
proc `callback=`*(future: FutureBase, cb: CallbackFunc) =
|
||||
## Sets the callback proc to be called when the future completes.
|
||||
##
|
||||
## If future has already completed then ``cb`` will be called immediately.
|
||||
`callback=`(future, cb, cast[pointer](future))
|
||||
|
||||
proc `cancelCallback=`*[T](future: Future[T], cb: CallbackFunc) =
|
||||
proc `cancelCallback=`*(future: FutureBase, cb: CallbackFunc) =
|
||||
## Sets the callback procedure to be called when the future is cancelled.
|
||||
##
|
||||
## This callback will be called immediately as ``future.cancel()`` invoked.
|
||||
|
@ -403,84 +412,66 @@ proc internalContinue[T](fut: pointer) {.gcsafe, raises: [Defect].} =
|
|||
|
||||
{.pop.}
|
||||
|
||||
template getFilenameProcname(entry: StackTraceEntry): (string, string) =
|
||||
when compiles(entry.filenameStr) and compiles(entry.procnameStr):
|
||||
# We can't rely on "entry.filename" and "entry.procname" still being valid
|
||||
# cstring pointers, because the "string.data" buffers they pointed to might
|
||||
# be already garbage collected (this entry being a non-shallow copy,
|
||||
# "entry.filename" no longer points to "entry.filenameStr.data", but to the
|
||||
# buffer of the original object).
|
||||
(entry.filenameStr, entry.procnameStr)
|
||||
else:
|
||||
($entry.filename, $entry.procname)
|
||||
|
||||
proc getHint(entry: StackTraceEntry): string =
|
||||
## We try to provide some hints about stack trace entries that the user
|
||||
## may not be familiar with, in particular calls inside the stdlib.
|
||||
|
||||
let (filename, procname) = getFilenameProcname(entry)
|
||||
|
||||
if procname == "processPendingCallbacks":
|
||||
if cmpIgnoreStyle(filename, "asyncdispatch.nim") == 0:
|
||||
return "Executes pending callbacks"
|
||||
elif procname == "poll":
|
||||
if cmpIgnoreStyle(filename, "asyncdispatch.nim") == 0:
|
||||
return "Processes asynchronous completion events"
|
||||
|
||||
if procname == "internalContinue":
|
||||
if cmpIgnoreStyle(filename, "asyncfutures.nim") == 0:
|
||||
return "Resumes an async procedure"
|
||||
|
||||
proc `$`(stackTraceEntries: seq[StackTraceEntry]): string =
|
||||
try:
|
||||
when defined(nimStackTraceOverride) and declared(addDebuggingInfo):
|
||||
let entries = addDebuggingInfo(stackTraceEntries)
|
||||
else:
|
||||
let entries = stackTraceEntries
|
||||
|
||||
# Find longest filename & line number combo for alignment purposes.
|
||||
var longestLeft = 0
|
||||
for entry in entries:
|
||||
let (filename, procname) = getFilenameProcname(entry)
|
||||
|
||||
if procname == "": continue
|
||||
|
||||
let leftLen = filename.len + len($entry.line)
|
||||
if leftLen > longestLeft:
|
||||
longestLeft = leftLen
|
||||
|
||||
var indent = 2
|
||||
# Format the entries.
|
||||
for entry in entries:
|
||||
let (filename, procname) = getFilenameProcname(entry)
|
||||
|
||||
if procname == "":
|
||||
if entry.line == reraisedFromBegin:
|
||||
result.add(spaces(indent) & "#[\n")
|
||||
indent.inc(2)
|
||||
elif entry.line == reraisedFromEnd:
|
||||
indent.dec(2)
|
||||
result.add(spaces(indent) & "]#\n")
|
||||
continue
|
||||
|
||||
let left = "$#($#)" % [filename, $entry.line]
|
||||
result.add((spaces(indent) & "$#$# $#\n") % [
|
||||
left,
|
||||
spaces(longestLeft - left.len + 2),
|
||||
procname
|
||||
])
|
||||
let hint = getHint(entry)
|
||||
if hint.len > 0:
|
||||
result.add(spaces(indent+2) & "## " & hint & "\n")
|
||||
except ValueError as exc:
|
||||
return exc.msg # Shouldn't actually happen since we set the formatting
|
||||
# string
|
||||
|
||||
when chronosStackTrace:
|
||||
proc injectStacktrace(future: FutureBase) =
|
||||
import std/strutils
|
||||
|
||||
template getFilenameProcname(entry: StackTraceEntry): (string, string) =
|
||||
when compiles(entry.filenameStr) and compiles(entry.procnameStr):
|
||||
# We can't rely on "entry.filename" and "entry.procname" still being valid
|
||||
# cstring pointers, because the "string.data" buffers they pointed to might
|
||||
# be already garbage collected (this entry being a non-shallow copy,
|
||||
# "entry.filename" no longer points to "entry.filenameStr.data", but to the
|
||||
# buffer of the original object).
|
||||
(entry.filenameStr, entry.procnameStr)
|
||||
else:
|
||||
($entry.filename, $entry.procname)
|
||||
|
||||
proc `$`(stackTraceEntries: seq[StackTraceEntry]): string =
|
||||
try:
|
||||
when defined(nimStackTraceOverride) and declared(addDebuggingInfo):
|
||||
let entries = addDebuggingInfo(stackTraceEntries)
|
||||
else:
|
||||
let entries = stackTraceEntries
|
||||
|
||||
# Find longest filename & line number combo for alignment purposes.
|
||||
var longestLeft = 0
|
||||
for entry in entries:
|
||||
let (filename, procname) = getFilenameProcname(entry)
|
||||
|
||||
if procname == "": continue
|
||||
|
||||
let leftLen = filename.len + len($entry.line)
|
||||
if leftLen > longestLeft:
|
||||
longestLeft = leftLen
|
||||
|
||||
var indent = 2
|
||||
# Format the entries.
|
||||
for entry in entries:
|
||||
let (filename, procname) = getFilenameProcname(entry)
|
||||
|
||||
if procname == "":
|
||||
if entry.line == reraisedFromBegin:
|
||||
result.add(spaces(indent) & "#[\n")
|
||||
indent.inc(2)
|
||||
elif entry.line == reraisedFromEnd:
|
||||
indent.dec(2)
|
||||
result.add(spaces(indent) & "]#\n")
|
||||
continue
|
||||
|
||||
let left = "$#($#)" % [filename, $entry.line]
|
||||
result.add((spaces(indent) & "$#$# $#\n") % [
|
||||
left,
|
||||
spaces(longestLeft - left.len + 2),
|
||||
procname
|
||||
])
|
||||
except ValueError as exc:
|
||||
return exc.msg # Shouldn't actually happen since we set the formatting
|
||||
# string
|
||||
|
||||
proc injectStacktrace(error: ref Exception) =
|
||||
const header = "\nAsync traceback:\n"
|
||||
|
||||
var exceptionMsg = future.error.msg
|
||||
var exceptionMsg = error.msg
|
||||
if header in exceptionMsg:
|
||||
# This is messy: extract the original exception message from the msg
|
||||
# containing the async traceback.
|
||||
|
@ -489,7 +480,7 @@ when chronosStackTrace:
|
|||
|
||||
var newMsg = exceptionMsg & header
|
||||
|
||||
let entries = getStackTraceEntries(future.error)
|
||||
let entries = getStackTraceEntries(error)
|
||||
newMsg.add($entries)
|
||||
|
||||
newMsg.add("Exception message: " & exceptionMsg & "\n")
|
||||
|
@ -498,14 +489,14 @@ when chronosStackTrace:
|
|||
# newMsg.add("Exception type:")
|
||||
# for entry in getStackTraceEntries(future.error):
|
||||
# newMsg.add "\n" & $entry
|
||||
future.error.msg = newMsg
|
||||
error.msg = newMsg
|
||||
|
||||
proc internalCheckComplete*(fut: FutureBase) {.
|
||||
raises: [Defect, CatchableError].} =
|
||||
# For internal use only. Used in asyncmacro
|
||||
if not(isNil(fut.error)):
|
||||
when chronosStackTrace:
|
||||
injectStacktrace(fut)
|
||||
injectStacktrace(fut.error)
|
||||
raise fut.error
|
||||
|
||||
proc internalRead*[T](fut: Future[T]): T {.inline.} =
|
||||
|
@ -526,7 +517,7 @@ proc read*[T](future: Future[T] ): T {.
|
|||
# TODO: Make a custom exception type for this?
|
||||
raise newException(ValueError, "Future still in progress.")
|
||||
|
||||
proc readError*[T](future: Future[T]): ref CatchableError {.
|
||||
proc readError*(future: FutureBase): ref CatchableError {.
|
||||
raises: [Defect, ValueError].} =
|
||||
## Retrieves the exception stored in ``future``.
|
||||
##
|
||||
|
@ -610,7 +601,7 @@ proc asyncDiscard*[T](future: Future[T]) {.
|
|||
proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] {.
|
||||
deprecated: "Use allFutures[T](varargs[Future[T]])".} =
|
||||
## Returns a future which will complete once both ``fut1`` and ``fut2``
|
||||
## complete.
|
||||
## finish.
|
||||
##
|
||||
## If cancelled, ``fut1`` and ``fut2`` futures WILL NOT BE cancelled.
|
||||
var retFuture = newFuture[void]("chronos.`and`")
|
||||
|
@ -642,7 +633,7 @@ proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] {.
|
|||
|
||||
proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
|
||||
## Returns a future which will complete once either ``fut1`` or ``fut2``
|
||||
## complete.
|
||||
## finish.
|
||||
##
|
||||
## If ``fut1`` or ``fut2`` future is failed, the result future will also be
|
||||
## failed with an error stored in ``fut1`` or ``fut2`` respectively.
|
||||
|
@ -696,7 +687,7 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
|
|||
|
||||
proc all*[T](futs: varargs[Future[T]]): auto {.
|
||||
deprecated: "Use allFutures(varargs[Future[T]])".} =
|
||||
## Returns a future which will complete once all futures in ``futs`` complete.
|
||||
## Returns a future which will complete once all futures in ``futs`` finish.
|
||||
## If the argument is empty, the returned future completes immediately.
|
||||
##
|
||||
## If the awaited futures are not ``Future[void]``, the returned future
|
||||
|
@ -796,8 +787,8 @@ proc oneIndex*[T](futs: varargs[Future[T]]): Future[int] {.
|
|||
|
||||
proc oneValue*[T](futs: varargs[Future[T]]): Future[T] {.
|
||||
deprecated: "Use one[T](varargs[Future[T]])".} =
|
||||
## Returns a future which will complete once one of the futures in ``futs``
|
||||
## complete.
|
||||
## Returns a future which will finish once one of the futures in ``futs``
|
||||
## finish.
|
||||
##
|
||||
## If the argument is empty, returned future FAILS immediately.
|
||||
##
|
||||
|
@ -865,15 +856,15 @@ proc allFutures*(futs: varargs[FutureBase]): Future[void] =
|
|||
## On cancel all the awaited futures ``futs`` WILL NOT BE cancelled.
|
||||
var retFuture = newFuture[void]("chronos.allFutures()")
|
||||
let totalFutures = len(futs)
|
||||
var completedFutures = 0
|
||||
var finishedFutures = 0
|
||||
|
||||
# Because we can't capture varargs[T] in closures we need to create copy.
|
||||
var nfuts = @futs
|
||||
|
||||
proc cb(udata: pointer) =
|
||||
if not(retFuture.finished()):
|
||||
inc(completedFutures)
|
||||
if completedFutures == totalFutures:
|
||||
inc(finishedFutures)
|
||||
if finishedFutures == totalFutures:
|
||||
retFuture.complete()
|
||||
|
||||
proc cancellation(udata: pointer) =
|
||||
|
@ -886,10 +877,10 @@ proc allFutures*(futs: varargs[FutureBase]): Future[void] =
|
|||
if not(fut.finished()):
|
||||
fut.addCallback(cb)
|
||||
else:
|
||||
inc(completedFutures)
|
||||
inc(finishedFutures)
|
||||
|
||||
retFuture.cancelCallback = cancellation
|
||||
if len(nfuts) == 0 or len(nfuts) == completedFutures:
|
||||
if len(nfuts) == 0 or len(nfuts) == finishedFutures:
|
||||
retFuture.complete()
|
||||
|
||||
return retFuture
|
||||
|
@ -912,21 +903,21 @@ proc allFinished*[T](futs: varargs[Future[T]]): Future[seq[Future[T]]] =
|
|||
## will be completed, failed or canceled.
|
||||
##
|
||||
## Returned sequence will hold all the Future[T] objects passed to
|
||||
## ``allCompleted`` with the order preserved.
|
||||
## ``allFinished`` with the order preserved.
|
||||
##
|
||||
## If the argument is empty, the returned future COMPLETES immediately.
|
||||
##
|
||||
## On cancel all the awaited futures ``futs`` WILL NOT BE cancelled.
|
||||
var retFuture = newFuture[seq[Future[T]]]("chronos.allFinished()")
|
||||
let totalFutures = len(futs)
|
||||
var completedFutures = 0
|
||||
var finishedFutures = 0
|
||||
|
||||
var nfuts = @futs
|
||||
|
||||
proc cb(udata: pointer) =
|
||||
if not(retFuture.finished()):
|
||||
inc(completedFutures)
|
||||
if completedFutures == totalFutures:
|
||||
inc(finishedFutures)
|
||||
if finishedFutures == totalFutures:
|
||||
retFuture.complete(nfuts)
|
||||
|
||||
proc cancellation(udata: pointer) =
|
||||
|
@ -939,10 +930,10 @@ proc allFinished*[T](futs: varargs[Future[T]]): Future[seq[Future[T]]] =
|
|||
if not(fut.finished()):
|
||||
fut.addCallback(cb)
|
||||
else:
|
||||
inc(completedFutures)
|
||||
inc(finishedFutures)
|
||||
|
||||
retFuture.cancelCallback = cancellation
|
||||
if len(nfuts) == 0 or len(nfuts) == completedFutures:
|
||||
if len(nfuts) == 0 or len(nfuts) == finishedFutures:
|
||||
retFuture.complete(nfuts)
|
||||
|
||||
return retFuture
|
||||
|
@ -958,6 +949,16 @@ proc one*[T](futs: varargs[Future[T]]): Future[Future[T]] =
|
|||
## On cancel futures in ``futs`` WILL NOT BE cancelled.
|
||||
var retFuture = newFuture[Future[T]]("chronos.one()")
|
||||
|
||||
if len(futs) == 0:
|
||||
retFuture.fail(newException(ValueError, "Empty Future[T] list"))
|
||||
return retFuture
|
||||
|
||||
# If one of the Future[T] already finished we return it as result
|
||||
for fut in futs:
|
||||
if fut.finished():
|
||||
retFuture.complete(fut)
|
||||
return retFuture
|
||||
|
||||
# Because we can't capture varargs[T] in closures we need to create copy.
|
||||
var nfuts = @futs
|
||||
|
||||
|
@ -979,18 +980,9 @@ proc one*[T](futs: varargs[Future[T]]): Future[Future[T]] =
|
|||
if not(nfuts[i].finished()):
|
||||
nfuts[i].removeCallback(cb)
|
||||
|
||||
# If one of the Future[T] already finished we return it as result
|
||||
for fut in nfuts:
|
||||
if fut.finished():
|
||||
retFuture.complete(fut)
|
||||
return retFuture
|
||||
|
||||
for fut in nfuts:
|
||||
fut.addCallback(cb)
|
||||
|
||||
if len(nfuts) == 0:
|
||||
retFuture.fail(newException(ValueError, "Empty Future[T] list"))
|
||||
|
||||
retFuture.cancelCallback = cancellation
|
||||
return retFuture
|
||||
|
||||
|
@ -1003,7 +995,17 @@ proc race*(futs: varargs[FutureBase]): Future[FutureBase] =
|
|||
## On success returned Future will hold finished FutureBase.
|
||||
##
|
||||
## On cancel futures in ``futs`` WILL NOT BE cancelled.
|
||||
var retFuture = newFuture[FutureBase]("chronos.race()")
|
||||
let retFuture = newFuture[FutureBase]("chronos.race()")
|
||||
|
||||
if len(futs) == 0:
|
||||
retFuture.fail(newException(ValueError, "Empty Future[T] list"))
|
||||
return retFuture
|
||||
|
||||
# If one of the Future[T] already finished we return it as result
|
||||
for fut in futs:
|
||||
if fut.finished():
|
||||
retFuture.complete(fut)
|
||||
return retFuture
|
||||
|
||||
# Because we can't capture varargs[T] in closures we need to create copy.
|
||||
var nfuts = @futs
|
||||
|
@ -1026,17 +1028,9 @@ proc race*(futs: varargs[FutureBase]): Future[FutureBase] =
|
|||
if not(nfuts[i].finished()):
|
||||
nfuts[i].removeCallback(cb)
|
||||
|
||||
# If one of the Future[T] already finished we return it as result
|
||||
for fut in nfuts:
|
||||
if fut.finished():
|
||||
retFuture.complete(fut)
|
||||
return retFuture
|
||||
|
||||
for fut in nfuts:
|
||||
fut.addCallback(cb, cast[pointer](fut))
|
||||
|
||||
if len(nfuts) == 0:
|
||||
retFuture.fail(newException(ValueError, "Empty Future[T] list"))
|
||||
|
||||
retFuture.cancelCallback = cancellation
|
||||
|
||||
return retFuture
|
||||
|
|
|
@ -20,11 +20,11 @@ when chronosFutureTracking:
|
|||
|
||||
const
|
||||
AllFutureStates* = {FutureState.Pending, FutureState.Cancelled,
|
||||
FutureState.Finished, FutureState.Failed}
|
||||
WithoutFinished* = {FutureState.Pending, FutureState.Cancelled,
|
||||
FutureState.Completed, FutureState.Failed}
|
||||
WithoutCompleted* = {FutureState.Pending, FutureState.Cancelled,
|
||||
FutureState.Failed}
|
||||
OnlyPending* = {FutureState.Pending}
|
||||
OnlyFinished* = {FutureState.Finished}
|
||||
OnlyCompleted* = {FutureState.Completed}
|
||||
|
||||
proc dumpPendingFutures*(filter = AllFutureStates): string =
|
||||
## Dump all `pending` Future[T] objects.
|
||||
|
|
|
@ -1098,7 +1098,7 @@ suite "Future[T] behavior test suite":
|
|||
var fut = waitProc()
|
||||
await cancelAndWait(fut)
|
||||
check:
|
||||
fut.state == FutureState.Finished
|
||||
fut.state == FutureState.Completed
|
||||
neverFlag1 and neverFlag2 and neverFlag3 and waitProc1 and waitProc2
|
||||
|
||||
asyncTest "Cancellation withTimeout() test":
|
||||
|
@ -1129,7 +1129,7 @@ suite "Future[T] behavior test suite":
|
|||
var fut = withTimeoutProc()
|
||||
await cancelAndWait(fut)
|
||||
check:
|
||||
fut.state == FutureState.Finished
|
||||
fut.state == FutureState.Completed
|
||||
neverFlag1 and neverFlag2 and neverFlag3 and waitProc1 and waitProc2
|
||||
|
||||
asyncTest "Cancellation race test":
|
||||
|
@ -1462,8 +1462,8 @@ suite "Future[T] behavior test suite":
|
|||
var fut2 = race(f31, f21, f11)
|
||||
|
||||
check:
|
||||
fut1.done() and fut1.read() == FutureBase(f10)
|
||||
fut2.done() and fut2.read() == FutureBase(f21)
|
||||
fut1.completed() and fut1.read() == FutureBase(f10)
|
||||
fut2.completed() and fut2.read() == FutureBase(f21)
|
||||
|
||||
await allFutures(f20, f30, f11, f31)
|
||||
|
||||
|
|
|
@ -22,8 +22,8 @@ suite "Asynchronous utilities test suite":
|
|||
|
||||
test "Future clean and leaks test":
|
||||
when chronosFutureTracking:
|
||||
if pendingFuturesCount(WithoutFinished) == 0'u:
|
||||
if pendingFuturesCount(OnlyFinished) > 0'u:
|
||||
if pendingFuturesCount(WithoutCompleted) == 0'u:
|
||||
if pendingFuturesCount(OnlyCompleted) > 0'u:
|
||||
poll()
|
||||
check pendingFuturesCount() == 0'u
|
||||
else:
|
||||
|
|
Loading…
Reference in New Issue