Memory cleanups (#395)
* Avoid `FutureContinue` bloat by moving type punning into iterator (which is typed anyway) * clear closure iterator from future when iteration is done also on cancellation / exception * remove some redundant local variables in `await` * document `futureContinue` flow
This commit is contained in:
parent
6523f741a6
commit
e436f20b33
|
@ -30,6 +30,14 @@ const
|
||||||
template LocCompleteIndex*: untyped {.deprecated: "LocFinishIndex".} =
|
template LocCompleteIndex*: untyped {.deprecated: "LocFinishIndex".} =
|
||||||
LocFinishIndex
|
LocFinishIndex
|
||||||
|
|
||||||
|
when chronosStrictException:
|
||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.pragma: closureIter, raises: [Defect, CatchableError], gcsafe.}
|
||||||
|
else:
|
||||||
|
{.pragma: closureIter, raises: [CatchableError], gcsafe.}
|
||||||
|
else:
|
||||||
|
{.pragma: closureIter, raises: [Exception], gcsafe.}
|
||||||
|
|
||||||
type
|
type
|
||||||
FutureState* {.pure.} = enum
|
FutureState* {.pure.} = enum
|
||||||
Pending, Completed, Cancelled, Failed
|
Pending, Completed, Cancelled, Failed
|
||||||
|
@ -42,6 +50,7 @@ type
|
||||||
state*: FutureState
|
state*: FutureState
|
||||||
error*: ref CatchableError ## Stored exception
|
error*: ref CatchableError ## Stored exception
|
||||||
mustCancel*: bool
|
mustCancel*: bool
|
||||||
|
closure*: iterator(f: FutureBase): FutureBase {.closureIter.}
|
||||||
|
|
||||||
when chronosFutureId:
|
when chronosFutureId:
|
||||||
id*: uint
|
id*: uint
|
||||||
|
@ -54,19 +63,7 @@ type
|
||||||
next*: FutureBase
|
next*: FutureBase
|
||||||
prev*: FutureBase
|
prev*: FutureBase
|
||||||
|
|
||||||
# ZAH: we have discussed some possible optimizations where
|
|
||||||
# the future can be stored within the caller's stack frame.
|
|
||||||
# How much refactoring is needed to make this a regular non-ref type?
|
|
||||||
# Obviously, it will still be allocated on the heap when necessary.
|
|
||||||
Future*[T] = ref object of FutureBase ## Typed future.
|
Future*[T] = ref object of FutureBase ## Typed future.
|
||||||
when chronosStrictException:
|
|
||||||
when (NimMajor, NimMinor) < (1, 4):
|
|
||||||
closure*: iterator(f: Future[T]): FutureBase {.raises: [Defect, CatchableError], gcsafe.}
|
|
||||||
else:
|
|
||||||
closure*: iterator(f: Future[T]): FutureBase {.raises: [CatchableError], gcsafe.}
|
|
||||||
else:
|
|
||||||
closure*: iterator(f: Future[T]): FutureBase {.raises: [Exception], gcsafe.}
|
|
||||||
|
|
||||||
when T isnot void:
|
when T isnot void:
|
||||||
value*: T ## Stored value
|
value*: T ## Stored value
|
||||||
|
|
||||||
|
@ -235,7 +232,7 @@ proc finish(fut: FutureBase, state: FutureState) =
|
||||||
|
|
||||||
proc complete[T](future: Future[T], val: T, loc: ptr SrcLoc) =
|
proc complete[T](future: Future[T], val: T, loc: ptr SrcLoc) =
|
||||||
if not(future.cancelled()):
|
if not(future.cancelled()):
|
||||||
checkFinished(FutureBase(future), loc)
|
checkFinished(future, loc)
|
||||||
doAssert(isNil(future.error))
|
doAssert(isNil(future.error))
|
||||||
future.value = val
|
future.value = val
|
||||||
future.finish(FutureState.Completed)
|
future.finish(FutureState.Completed)
|
||||||
|
@ -246,7 +243,7 @@ template complete*[T](future: Future[T], val: T) =
|
||||||
|
|
||||||
proc complete(future: Future[void], loc: ptr SrcLoc) =
|
proc complete(future: Future[void], loc: ptr SrcLoc) =
|
||||||
if not(future.cancelled()):
|
if not(future.cancelled()):
|
||||||
checkFinished(FutureBase(future), loc)
|
checkFinished(future, loc)
|
||||||
doAssert(isNil(future.error))
|
doAssert(isNil(future.error))
|
||||||
future.finish(FutureState.Completed)
|
future.finish(FutureState.Completed)
|
||||||
|
|
||||||
|
@ -256,7 +253,7 @@ template complete*(future: Future[void]) =
|
||||||
|
|
||||||
proc fail(future: FutureBase, error: ref CatchableError, loc: ptr SrcLoc) =
|
proc fail(future: FutureBase, error: ref CatchableError, loc: ptr SrcLoc) =
|
||||||
if not(future.cancelled()):
|
if not(future.cancelled()):
|
||||||
checkFinished(FutureBase(future), loc)
|
checkFinished(future, loc)
|
||||||
future.error = error
|
future.error = error
|
||||||
when chronosStackTrace:
|
when chronosStackTrace:
|
||||||
future.errorStackTrace = if getStackTrace(error) == "":
|
future.errorStackTrace = if getStackTrace(error) == "":
|
||||||
|
@ -281,7 +278,7 @@ proc cancelAndSchedule(future: FutureBase, loc: ptr SrcLoc) =
|
||||||
future.finish(FutureState.Cancelled)
|
future.finish(FutureState.Cancelled)
|
||||||
|
|
||||||
template cancelAndSchedule*(future: FutureBase) =
|
template cancelAndSchedule*(future: FutureBase) =
|
||||||
cancelAndSchedule(FutureBase(future), getSrcLocation())
|
cancelAndSchedule(future, getSrcLocation())
|
||||||
|
|
||||||
proc cancel(future: FutureBase, loc: ptr SrcLoc): bool =
|
proc cancel(future: FutureBase, loc: ptr SrcLoc): bool =
|
||||||
## Request that Future ``future`` cancel itself.
|
## Request that Future ``future`` cancel itself.
|
||||||
|
@ -302,8 +299,14 @@ proc cancel(future: FutureBase, loc: ptr SrcLoc): bool =
|
||||||
return false
|
return false
|
||||||
|
|
||||||
if not(isNil(future.child)):
|
if not(isNil(future.child)):
|
||||||
|
# If you hit this assertion, you should have used the `CancelledError`
|
||||||
|
# mechanism and/or use a regular `addCallback`
|
||||||
|
doAssert future.cancelcb.isNil,
|
||||||
|
"futures returned from `{.async.}` functions must not use `cancelCallback`"
|
||||||
|
|
||||||
if cancel(future.child, getSrcLocation()):
|
if cancel(future.child, getSrcLocation()):
|
||||||
return true
|
return true
|
||||||
|
|
||||||
else:
|
else:
|
||||||
if not(isNil(future.cancelcb)):
|
if not(isNil(future.cancelcb)):
|
||||||
future.cancelcb(cast[pointer](future))
|
future.cancelcb(cast[pointer](future))
|
||||||
|
@ -328,8 +331,7 @@ proc addCallback*(future: FutureBase, cb: CallbackFunc, udata: pointer) =
|
||||||
if future.finished():
|
if future.finished():
|
||||||
callSoon(cb, udata)
|
callSoon(cb, udata)
|
||||||
else:
|
else:
|
||||||
let acb = AsyncCallback(function: cb, udata: udata)
|
future.callbacks.add AsyncCallback(function: cb, udata: udata)
|
||||||
future.callbacks.add acb
|
|
||||||
|
|
||||||
proc addCallback*(future: FutureBase, cb: CallbackFunc) =
|
proc addCallback*(future: FutureBase, cb: CallbackFunc) =
|
||||||
## Adds the callbacks proc to be called when the future completes.
|
## Adds the callbacks proc to be called when the future completes.
|
||||||
|
@ -370,32 +372,51 @@ proc `callback=`*(future: FutureBase, cb: CallbackFunc) =
|
||||||
proc `cancelCallback=`*(future: FutureBase, cb: CallbackFunc) =
|
proc `cancelCallback=`*(future: FutureBase, cb: CallbackFunc) =
|
||||||
## Sets the callback procedure to be called when the future is cancelled.
|
## Sets the callback procedure to be called when the future is cancelled.
|
||||||
##
|
##
|
||||||
## This callback will be called immediately as ``future.cancel()`` invoked.
|
## This callback will be called immediately as ``future.cancel()`` invoked and
|
||||||
|
## must be set before future is finished.
|
||||||
|
|
||||||
|
doAssert not future.finished(),
|
||||||
|
"cancellation callback must be set before finishing the future"
|
||||||
future.cancelcb = cb
|
future.cancelcb = cb
|
||||||
|
|
||||||
{.push stackTrace: off.}
|
{.push stackTrace: off.}
|
||||||
proc internalContinue[T](fut: pointer) {.gcsafe, raises: [Defect].}
|
proc futureContinue*(fut: FutureBase) {.raises: [Defect], gcsafe.}
|
||||||
|
|
||||||
proc futureContinue*[T](fut: Future[T]) {.gcsafe, raises: [Defect].} =
|
proc internalContinue(fut: pointer) {.raises: [Defect], gcsafe.} =
|
||||||
# Used internally by async transformation
|
let asFut = cast[FutureBase](fut)
|
||||||
|
GC_unref(asFut)
|
||||||
|
futureContinue(asFut)
|
||||||
|
|
||||||
|
proc futureContinue*(fut: FutureBase) {.raises: [Defect], gcsafe.} =
|
||||||
|
# This function is responsible for calling the closure iterator generated by
|
||||||
|
# the `{.async.}` transformation either until it has completed its iteration
|
||||||
|
# or raised and error / been cancelled.
|
||||||
|
#
|
||||||
|
# Every call to an `{.async.}` proc is redirected to call this function
|
||||||
|
# instead with its original body captured in `fut.closure`.
|
||||||
|
var next: FutureBase
|
||||||
try:
|
try:
|
||||||
if not(fut.closure.finished()):
|
while true:
|
||||||
var next = fut.closure(fut)
|
# Call closure to make progress on `fut` until it reaches `yield` (inside
|
||||||
# Continue while the yielded future is already finished.
|
# `await` typically) or completes / fails / is cancelled
|
||||||
while (not next.isNil()) and next.finished():
|
next = fut.closure(fut)
|
||||||
next = fut.closure(fut)
|
if fut.closure.finished(): # Reached the end of the transformed proc
|
||||||
if fut.closure.finished():
|
break
|
||||||
break
|
|
||||||
|
|
||||||
if fut.closure.finished():
|
|
||||||
fut.closure = nil
|
|
||||||
if next == nil:
|
if next == nil:
|
||||||
if not(fut.finished()):
|
raiseAssert "Async procedure (" & ($fut.location[LocCreateIndex]) &
|
||||||
raiseAssert "Async procedure (" & ($fut.location[LocCreateIndex]) & ") yielded `nil`, " &
|
") yielded `nil`, are you await'ing a `nil` Future?"
|
||||||
"are you await'ing a `nil` Future?"
|
|
||||||
else:
|
if not next.finished():
|
||||||
|
# We cannot make progress on `fut` until `next` has finished - schedule
|
||||||
|
# `fut` to continue running when that happens
|
||||||
GC_ref(fut)
|
GC_ref(fut)
|
||||||
next.addCallback(internalContinue[T], cast[pointer](fut))
|
next.addCallback(CallbackFunc(internalContinue), cast[pointer](fut))
|
||||||
|
|
||||||
|
# return here so that we don't remove the closure below
|
||||||
|
return
|
||||||
|
|
||||||
|
# Continue while the yielded future is already finished.
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
fut.cancelAndSchedule()
|
fut.cancelAndSchedule()
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
|
@ -405,11 +426,13 @@ proc futureContinue*[T](fut: Future[T]) {.gcsafe, raises: [Defect].} =
|
||||||
raise (ref Defect)(exc)
|
raise (ref Defect)(exc)
|
||||||
|
|
||||||
fut.fail((ref ValueError)(msg: exc.msg, parent: exc))
|
fut.fail((ref ValueError)(msg: exc.msg, parent: exc))
|
||||||
|
finally:
|
||||||
|
next = nil # GC hygiene
|
||||||
|
|
||||||
proc internalContinue[T](fut: pointer) {.gcsafe, raises: [Defect].} =
|
# `futureContinue` will not be called any more for this future so we can
|
||||||
let asFut = cast[Future[T]](fut)
|
# clean it up
|
||||||
GC_unref(asFut)
|
fut.closure = nil
|
||||||
futureContinue(asFut)
|
fut.child = nil
|
||||||
|
|
||||||
{.pop.}
|
{.pop.}
|
||||||
|
|
||||||
|
@ -845,9 +868,6 @@ proc cancelAndWait*(fut: FutureBase): Future[void] =
|
||||||
fut.cancel()
|
fut.cancel()
|
||||||
return retFuture
|
return retFuture
|
||||||
|
|
||||||
proc cancelAndWait*[T](fut: Future[T]): Future[void] =
|
|
||||||
cancelAndWait(FutureBase(fut))
|
|
||||||
|
|
||||||
proc allFutures*(futs: varargs[FutureBase]): Future[void] =
|
proc allFutures*(futs: varargs[FutureBase]): Future[void] =
|
||||||
## Returns a future which will complete only when all futures in ``futs``
|
## Returns a future which will complete only when all futures in ``futs``
|
||||||
## will be completed, failed or canceled.
|
## will be completed, failed or canceled.
|
||||||
|
@ -896,7 +916,7 @@ proc allFutures*[T](futs: varargs[Future[T]]): Future[void] =
|
||||||
# Because we can't capture varargs[T] in closures we need to create copy.
|
# Because we can't capture varargs[T] in closures we need to create copy.
|
||||||
var nfuts: seq[FutureBase]
|
var nfuts: seq[FutureBase]
|
||||||
for future in futs:
|
for future in futs:
|
||||||
nfuts.add(FutureBase(future))
|
nfuts.add(future)
|
||||||
allFutures(nfuts)
|
allFutures(nfuts)
|
||||||
|
|
||||||
proc allFinished*[T](futs: varargs[Future[T]]): Future[seq[Future[T]]] =
|
proc allFinished*[T](futs: varargs[Future[T]]): Future[seq[Future[T]]] =
|
||||||
|
|
|
@ -123,7 +123,13 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} =
|
||||||
|
|
||||||
let
|
let
|
||||||
internalFutureSym = ident "chronosInternalRetFuture"
|
internalFutureSym = ident "chronosInternalRetFuture"
|
||||||
procBody = prc.body.processBody(internalFutureSym, baseTypeIsVoid)
|
internalFutureType =
|
||||||
|
if baseTypeIsVoid:
|
||||||
|
newNimNode(nnkBracketExpr, prc).add(newIdentNode("Future")).add(newIdentNode("void"))
|
||||||
|
else: returnType
|
||||||
|
castFutureSym = quote do:
|
||||||
|
cast[`internalFutureType`](`internalFutureSym`)
|
||||||
|
procBody = prc.body.processBody(castFutureSym, baseTypeIsVoid)
|
||||||
|
|
||||||
# don't do anything with forward bodies (empty)
|
# don't do anything with forward bodies (empty)
|
||||||
if procBody.kind != nnkEmpty:
|
if procBody.kind != nnkEmpty:
|
||||||
|
@ -139,7 +145,7 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} =
|
||||||
" a void async proc".}
|
" a void async proc".}
|
||||||
# -> complete(chronosInternalRetFuture)
|
# -> complete(chronosInternalRetFuture)
|
||||||
let complete =
|
let complete =
|
||||||
newCall(newIdentNode("complete"), internalFutureSym)
|
newCall(newIdentNode("complete"), castFutureSym)
|
||||||
|
|
||||||
newStmtList(resultTemplate, procBodyBlck, complete)
|
newStmtList(resultTemplate, procBodyBlck, complete)
|
||||||
else:
|
else:
|
||||||
|
@ -168,28 +174,20 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} =
|
||||||
|
|
||||||
# -> complete(chronosInternalRetFuture, result)
|
# -> complete(chronosInternalRetFuture, result)
|
||||||
newCall(newIdentNode("complete"),
|
newCall(newIdentNode("complete"),
|
||||||
internalFutureSym, newIdentNode("result")))
|
castFutureSym, newIdentNode("result")))
|
||||||
|
|
||||||
let
|
let
|
||||||
internalFutureType =
|
internalFutureParameter = nnkIdentDefs.newTree(internalFutureSym, newIdentNode("FutureBase"), newEmptyNode())
|
||||||
if baseTypeIsVoid:
|
|
||||||
newNimNode(nnkBracketExpr, prc).add(newIdentNode("Future")).add(newIdentNode("void"))
|
|
||||||
else: returnType
|
|
||||||
internalFutureParameter = nnkIdentDefs.newTree(internalFutureSym, internalFutureType, newEmptyNode())
|
|
||||||
iteratorNameSym = genSym(nskIterator, $prcName)
|
iteratorNameSym = genSym(nskIterator, $prcName)
|
||||||
closureIterator = newProc(iteratorNameSym, [newIdentNode("FutureBase"), internalFutureParameter],
|
closureIterator = newProc(iteratorNameSym, [newIdentNode("FutureBase"), internalFutureParameter],
|
||||||
closureBody, nnkIteratorDef)
|
closureBody, nnkIteratorDef)
|
||||||
|
|
||||||
|
iteratorNameSym.copyLineInfo(prc)
|
||||||
|
|
||||||
closureIterator.pragma = newNimNode(nnkPragma, lineInfoFrom=prc.body)
|
closureIterator.pragma = newNimNode(nnkPragma, lineInfoFrom=prc.body)
|
||||||
closureIterator.addPragma(newIdentNode("closure"))
|
closureIterator.addPragma(newIdentNode("closure"))
|
||||||
# **Remark 435**: We generate a proc with an inner iterator which call each other
|
|
||||||
# recursively. The current Nim compiler is not smart enough to infer
|
# `async` code must be gcsafe
|
||||||
# the `gcsafe`-ty aspect of this setup, so we always annotate it explicitly
|
|
||||||
# with `gcsafe`. This means that the client code is always enforced to be
|
|
||||||
# `gcsafe`. This is still **safe**, the compiler still checks for `gcsafe`-ty
|
|
||||||
# regardless, it is only helping the compiler's inference algorithm. See
|
|
||||||
# https://github.com/nim-lang/RFCs/issues/435
|
|
||||||
# for more details.
|
|
||||||
closureIterator.addPragma(newIdentNode("gcsafe"))
|
closureIterator.addPragma(newIdentNode("gcsafe"))
|
||||||
|
|
||||||
# TODO when push raises is active in a module, the iterator here inherits
|
# TODO when push raises is active in a module, the iterator here inherits
|
||||||
|
@ -211,9 +209,11 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} =
|
||||||
))
|
))
|
||||||
|
|
||||||
# If proc has an explicit gcsafe pragma, we add it to iterator as well.
|
# If proc has an explicit gcsafe pragma, we add it to iterator as well.
|
||||||
|
# TODO if these lines are not here, srcloc tests fail (!)
|
||||||
if prc.pragma.findChild(it.kind in {nnkSym, nnkIdent} and
|
if prc.pragma.findChild(it.kind in {nnkSym, nnkIdent} and
|
||||||
it.strVal == "gcsafe") != nil:
|
it.strVal == "gcsafe") != nil:
|
||||||
closureIterator.addPragma(newIdentNode("gcsafe"))
|
closureIterator.addPragma(newIdentNode("gcsafe"))
|
||||||
|
|
||||||
outerProcBody.add(closureIterator)
|
outerProcBody.add(closureIterator)
|
||||||
|
|
||||||
# -> let resultFuture = newFuture[T]()
|
# -> let resultFuture = newFuture[T]()
|
||||||
|
@ -264,7 +264,6 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} =
|
||||||
))
|
))
|
||||||
|
|
||||||
if baseTypeIsVoid:
|
if baseTypeIsVoid:
|
||||||
# Add discardable pragma.
|
|
||||||
if returnType.kind == nnkEmpty:
|
if returnType.kind == nnkEmpty:
|
||||||
# Add Future[void]
|
# Add Future[void]
|
||||||
prc.params2[0] =
|
prc.params2[0] =
|
||||||
|
@ -276,47 +275,30 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} =
|
||||||
|
|
||||||
template await*[T](f: Future[T]): untyped =
|
template await*[T](f: Future[T]): untyped =
|
||||||
when declared(chronosInternalRetFuture):
|
when declared(chronosInternalRetFuture):
|
||||||
#work around https://github.com/nim-lang/Nim/issues/19193
|
chronosInternalRetFuture.child = f
|
||||||
when not declaredInScope(chronosInternalTmpFuture):
|
# `futureContinue` calls the iterator generated by the `async`
|
||||||
var chronosInternalTmpFuture {.inject.}: FutureBase = f
|
# transformation - `yield` gives control back to `futureContinue` which is
|
||||||
else:
|
# responsible for resuming execution once the yielded future is finished
|
||||||
chronosInternalTmpFuture = f
|
yield chronosInternalRetFuture.child
|
||||||
chronosInternalRetFuture.child = chronosInternalTmpFuture
|
|
||||||
|
|
||||||
# This "yield" is meant for a closure iterator in the caller.
|
# `child` is guaranteed to have been `finished` after the yield
|
||||||
yield chronosInternalTmpFuture
|
|
||||||
|
|
||||||
# By the time we get control back here, we're guaranteed that the Future we
|
|
||||||
# just yielded has been completed (success, failure or cancellation),
|
|
||||||
# through a very complicated mechanism in which the caller proc (a regular
|
|
||||||
# closure) adds itself as a callback to chronosInternalTmpFuture.
|
|
||||||
#
|
|
||||||
# Callbacks are called only after completion and a copy of the closure
|
|
||||||
# iterator that calls this template is still in that callback's closure
|
|
||||||
# environment. That's where control actually gets back to us.
|
|
||||||
|
|
||||||
chronosInternalRetFuture.child = nil
|
|
||||||
if chronosInternalRetFuture.mustCancel:
|
if chronosInternalRetFuture.mustCancel:
|
||||||
raise newCancelledError()
|
raise newCancelledError()
|
||||||
chronosInternalTmpFuture.internalCheckComplete()
|
|
||||||
|
# `child` released by `futureContinue`
|
||||||
|
chronosInternalRetFuture.child.internalCheckComplete()
|
||||||
when T isnot void:
|
when T isnot void:
|
||||||
cast[type(f)](chronosInternalTmpFuture).internalRead()
|
cast[type(f)](chronosInternalRetFuture.child).internalRead()
|
||||||
else:
|
else:
|
||||||
unsupported "await is only available within {.async.}"
|
unsupported "await is only available within {.async.}"
|
||||||
|
|
||||||
template awaitne*[T](f: Future[T]): Future[T] =
|
template awaitne*[T](f: Future[T]): Future[T] =
|
||||||
when declared(chronosInternalRetFuture):
|
when declared(chronosInternalRetFuture):
|
||||||
#work around https://github.com/nim-lang/Nim/issues/19193
|
chronosInternalRetFuture.child = f
|
||||||
when not declaredInScope(chronosInternalTmpFuture):
|
yield chronosInternalRetFuture.child
|
||||||
var chronosInternalTmpFuture {.inject.}: FutureBase = f
|
|
||||||
else:
|
|
||||||
chronosInternalTmpFuture = f
|
|
||||||
chronosInternalRetFuture.child = chronosInternalTmpFuture
|
|
||||||
yield chronosInternalTmpFuture
|
|
||||||
chronosInternalRetFuture.child = nil
|
|
||||||
if chronosInternalRetFuture.mustCancel:
|
if chronosInternalRetFuture.mustCancel:
|
||||||
raise newCancelledError()
|
raise newCancelledError()
|
||||||
cast[type(f)](chronosInternalTmpFuture)
|
cast[type(f)](chronosInternalRetFuture.child)
|
||||||
else:
|
else:
|
||||||
unsupported "awaitne is only available within {.async.}"
|
unsupported "awaitne is only available within {.async.}"
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue