From e436f20b33d2dfd8e341b316f30bd8d11360f8f7 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Wed, 31 May 2023 07:24:25 +0200 Subject: [PATCH] Memory cleanups (#395) * Avoid `FutureContinue` bloat by moving type punning into iterator (which is typed anyway) * clear closure iterator from future when iteration is done also on cancellation / exception * remove some redundant local variables in `await` * document `futureContinue` flow --- chronos/asyncfutures2.nim | 108 ++++++++++++++++++++++---------------- chronos/asyncmacro2.nim | 76 ++++++++++----------------- 2 files changed, 93 insertions(+), 91 deletions(-) diff --git a/chronos/asyncfutures2.nim b/chronos/asyncfutures2.nim index 9a9879c..37d2051 100644 --- a/chronos/asyncfutures2.nim +++ b/chronos/asyncfutures2.nim @@ -30,6 +30,14 @@ const template LocCompleteIndex*: untyped {.deprecated: "LocFinishIndex".} = LocFinishIndex +when chronosStrictException: + when (NimMajor, NimMinor) < (1, 4): + {.pragma: closureIter, raises: [Defect, CatchableError], gcsafe.} + else: + {.pragma: closureIter, raises: [CatchableError], gcsafe.} +else: + {.pragma: closureIter, raises: [Exception], gcsafe.} + type FutureState* {.pure.} = enum Pending, Completed, Cancelled, Failed @@ -42,6 +50,7 @@ type state*: FutureState error*: ref CatchableError ## Stored exception mustCancel*: bool + closure*: iterator(f: FutureBase): FutureBase {.closureIter.} when chronosFutureId: id*: uint @@ -54,19 +63,7 @@ type next*: FutureBase prev*: FutureBase - # ZAH: we have discussed some possible optimizations where - # the future can be stored within the caller's stack frame. - # How much refactoring is needed to make this a regular non-ref type? - # Obviously, it will still be allocated on the heap when necessary. Future*[T] = ref object of FutureBase ## Typed future. - when chronosStrictException: - when (NimMajor, NimMinor) < (1, 4): - closure*: iterator(f: Future[T]): FutureBase {.raises: [Defect, CatchableError], gcsafe.} - else: - closure*: iterator(f: Future[T]): FutureBase {.raises: [CatchableError], gcsafe.} - else: - closure*: iterator(f: Future[T]): FutureBase {.raises: [Exception], gcsafe.} - when T isnot void: value*: T ## Stored value @@ -235,7 +232,7 @@ proc finish(fut: FutureBase, state: FutureState) = proc complete[T](future: Future[T], val: T, loc: ptr SrcLoc) = if not(future.cancelled()): - checkFinished(FutureBase(future), loc) + checkFinished(future, loc) doAssert(isNil(future.error)) future.value = val future.finish(FutureState.Completed) @@ -246,7 +243,7 @@ template complete*[T](future: Future[T], val: T) = proc complete(future: Future[void], loc: ptr SrcLoc) = if not(future.cancelled()): - checkFinished(FutureBase(future), loc) + checkFinished(future, loc) doAssert(isNil(future.error)) future.finish(FutureState.Completed) @@ -256,7 +253,7 @@ template complete*(future: Future[void]) = proc fail(future: FutureBase, error: ref CatchableError, loc: ptr SrcLoc) = if not(future.cancelled()): - checkFinished(FutureBase(future), loc) + checkFinished(future, loc) future.error = error when chronosStackTrace: future.errorStackTrace = if getStackTrace(error) == "": @@ -281,7 +278,7 @@ proc cancelAndSchedule(future: FutureBase, loc: ptr SrcLoc) = future.finish(FutureState.Cancelled) template cancelAndSchedule*(future: FutureBase) = - cancelAndSchedule(FutureBase(future), getSrcLocation()) + cancelAndSchedule(future, getSrcLocation()) proc cancel(future: FutureBase, loc: ptr SrcLoc): bool = ## Request that Future ``future`` cancel itself. @@ -302,8 +299,14 @@ proc cancel(future: FutureBase, loc: ptr SrcLoc): bool = return false if not(isNil(future.child)): + # If you hit this assertion, you should have used the `CancelledError` + # mechanism and/or use a regular `addCallback` + doAssert future.cancelcb.isNil, + "futures returned from `{.async.}` functions must not use `cancelCallback`" + if cancel(future.child, getSrcLocation()): return true + else: if not(isNil(future.cancelcb)): future.cancelcb(cast[pointer](future)) @@ -328,8 +331,7 @@ proc addCallback*(future: FutureBase, cb: CallbackFunc, udata: pointer) = if future.finished(): callSoon(cb, udata) else: - let acb = AsyncCallback(function: cb, udata: udata) - future.callbacks.add acb + future.callbacks.add AsyncCallback(function: cb, udata: udata) proc addCallback*(future: FutureBase, cb: CallbackFunc) = ## Adds the callbacks proc to be called when the future completes. @@ -370,32 +372,51 @@ proc `callback=`*(future: FutureBase, cb: CallbackFunc) = proc `cancelCallback=`*(future: FutureBase, cb: CallbackFunc) = ## Sets the callback procedure to be called when the future is cancelled. ## - ## This callback will be called immediately as ``future.cancel()`` invoked. + ## This callback will be called immediately as ``future.cancel()`` invoked and + ## must be set before future is finished. + + doAssert not future.finished(), + "cancellation callback must be set before finishing the future" future.cancelcb = cb {.push stackTrace: off.} -proc internalContinue[T](fut: pointer) {.gcsafe, raises: [Defect].} +proc futureContinue*(fut: FutureBase) {.raises: [Defect], gcsafe.} -proc futureContinue*[T](fut: Future[T]) {.gcsafe, raises: [Defect].} = - # Used internally by async transformation +proc internalContinue(fut: pointer) {.raises: [Defect], gcsafe.} = + let asFut = cast[FutureBase](fut) + GC_unref(asFut) + futureContinue(asFut) + +proc futureContinue*(fut: FutureBase) {.raises: [Defect], gcsafe.} = + # This function is responsible for calling the closure iterator generated by + # the `{.async.}` transformation either until it has completed its iteration + # or raised and error / been cancelled. + # + # Every call to an `{.async.}` proc is redirected to call this function + # instead with its original body captured in `fut.closure`. + var next: FutureBase try: - if not(fut.closure.finished()): - var next = fut.closure(fut) - # Continue while the yielded future is already finished. - while (not next.isNil()) and next.finished(): - next = fut.closure(fut) - if fut.closure.finished(): - break + while true: + # Call closure to make progress on `fut` until it reaches `yield` (inside + # `await` typically) or completes / fails / is cancelled + next = fut.closure(fut) + if fut.closure.finished(): # Reached the end of the transformed proc + break - if fut.closure.finished(): - fut.closure = nil if next == nil: - if not(fut.finished()): - raiseAssert "Async procedure (" & ($fut.location[LocCreateIndex]) & ") yielded `nil`, " & - "are you await'ing a `nil` Future?" - else: + raiseAssert "Async procedure (" & ($fut.location[LocCreateIndex]) & + ") yielded `nil`, are you await'ing a `nil` Future?" + + if not next.finished(): + # We cannot make progress on `fut` until `next` has finished - schedule + # `fut` to continue running when that happens GC_ref(fut) - next.addCallback(internalContinue[T], cast[pointer](fut)) + next.addCallback(CallbackFunc(internalContinue), cast[pointer](fut)) + + # return here so that we don't remove the closure below + return + + # Continue while the yielded future is already finished. except CancelledError: fut.cancelAndSchedule() except CatchableError as exc: @@ -405,11 +426,13 @@ proc futureContinue*[T](fut: Future[T]) {.gcsafe, raises: [Defect].} = raise (ref Defect)(exc) fut.fail((ref ValueError)(msg: exc.msg, parent: exc)) + finally: + next = nil # GC hygiene -proc internalContinue[T](fut: pointer) {.gcsafe, raises: [Defect].} = - let asFut = cast[Future[T]](fut) - GC_unref(asFut) - futureContinue(asFut) + # `futureContinue` will not be called any more for this future so we can + # clean it up + fut.closure = nil + fut.child = nil {.pop.} @@ -845,9 +868,6 @@ proc cancelAndWait*(fut: FutureBase): Future[void] = fut.cancel() return retFuture -proc cancelAndWait*[T](fut: Future[T]): Future[void] = - cancelAndWait(FutureBase(fut)) - proc allFutures*(futs: varargs[FutureBase]): Future[void] = ## Returns a future which will complete only when all futures in ``futs`` ## will be completed, failed or canceled. @@ -896,7 +916,7 @@ proc allFutures*[T](futs: varargs[Future[T]]): Future[void] = # Because we can't capture varargs[T] in closures we need to create copy. var nfuts: seq[FutureBase] for future in futs: - nfuts.add(FutureBase(future)) + nfuts.add(future) allFutures(nfuts) proc allFinished*[T](futs: varargs[Future[T]]): Future[seq[Future[T]]] = diff --git a/chronos/asyncmacro2.nim b/chronos/asyncmacro2.nim index 9da84b3..f5b3570 100644 --- a/chronos/asyncmacro2.nim +++ b/chronos/asyncmacro2.nim @@ -123,7 +123,13 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} = let internalFutureSym = ident "chronosInternalRetFuture" - procBody = prc.body.processBody(internalFutureSym, baseTypeIsVoid) + internalFutureType = + if baseTypeIsVoid: + newNimNode(nnkBracketExpr, prc).add(newIdentNode("Future")).add(newIdentNode("void")) + else: returnType + castFutureSym = quote do: + cast[`internalFutureType`](`internalFutureSym`) + procBody = prc.body.processBody(castFutureSym, baseTypeIsVoid) # don't do anything with forward bodies (empty) if procBody.kind != nnkEmpty: @@ -139,7 +145,7 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} = " a void async proc".} # -> complete(chronosInternalRetFuture) let complete = - newCall(newIdentNode("complete"), internalFutureSym) + newCall(newIdentNode("complete"), castFutureSym) newStmtList(resultTemplate, procBodyBlck, complete) else: @@ -168,28 +174,20 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} = # -> complete(chronosInternalRetFuture, result) newCall(newIdentNode("complete"), - internalFutureSym, newIdentNode("result"))) + castFutureSym, newIdentNode("result"))) let - internalFutureType = - if baseTypeIsVoid: - newNimNode(nnkBracketExpr, prc).add(newIdentNode("Future")).add(newIdentNode("void")) - else: returnType - internalFutureParameter = nnkIdentDefs.newTree(internalFutureSym, internalFutureType, newEmptyNode()) + internalFutureParameter = nnkIdentDefs.newTree(internalFutureSym, newIdentNode("FutureBase"), newEmptyNode()) iteratorNameSym = genSym(nskIterator, $prcName) closureIterator = newProc(iteratorNameSym, [newIdentNode("FutureBase"), internalFutureParameter], closureBody, nnkIteratorDef) + iteratorNameSym.copyLineInfo(prc) + closureIterator.pragma = newNimNode(nnkPragma, lineInfoFrom=prc.body) closureIterator.addPragma(newIdentNode("closure")) - # **Remark 435**: We generate a proc with an inner iterator which call each other - # recursively. The current Nim compiler is not smart enough to infer - # the `gcsafe`-ty aspect of this setup, so we always annotate it explicitly - # with `gcsafe`. This means that the client code is always enforced to be - # `gcsafe`. This is still **safe**, the compiler still checks for `gcsafe`-ty - # regardless, it is only helping the compiler's inference algorithm. See - # https://github.com/nim-lang/RFCs/issues/435 - # for more details. + + # `async` code must be gcsafe closureIterator.addPragma(newIdentNode("gcsafe")) # TODO when push raises is active in a module, the iterator here inherits @@ -211,9 +209,11 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} = )) # If proc has an explicit gcsafe pragma, we add it to iterator as well. + # TODO if these lines are not here, srcloc tests fail (!) if prc.pragma.findChild(it.kind in {nnkSym, nnkIdent} and it.strVal == "gcsafe") != nil: closureIterator.addPragma(newIdentNode("gcsafe")) + outerProcBody.add(closureIterator) # -> let resultFuture = newFuture[T]() @@ -264,7 +264,6 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} = )) if baseTypeIsVoid: - # Add discardable pragma. if returnType.kind == nnkEmpty: # Add Future[void] prc.params2[0] = @@ -276,47 +275,30 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} = template await*[T](f: Future[T]): untyped = when declared(chronosInternalRetFuture): - #work around https://github.com/nim-lang/Nim/issues/19193 - when not declaredInScope(chronosInternalTmpFuture): - var chronosInternalTmpFuture {.inject.}: FutureBase = f - else: - chronosInternalTmpFuture = f - chronosInternalRetFuture.child = chronosInternalTmpFuture + chronosInternalRetFuture.child = f + # `futureContinue` calls the iterator generated by the `async` + # transformation - `yield` gives control back to `futureContinue` which is + # responsible for resuming execution once the yielded future is finished + yield chronosInternalRetFuture.child - # This "yield" is meant for a closure iterator in the caller. - yield chronosInternalTmpFuture - - # By the time we get control back here, we're guaranteed that the Future we - # just yielded has been completed (success, failure or cancellation), - # through a very complicated mechanism in which the caller proc (a regular - # closure) adds itself as a callback to chronosInternalTmpFuture. - # - # Callbacks are called only after completion and a copy of the closure - # iterator that calls this template is still in that callback's closure - # environment. That's where control actually gets back to us. - - chronosInternalRetFuture.child = nil + # `child` is guaranteed to have been `finished` after the yield if chronosInternalRetFuture.mustCancel: raise newCancelledError() - chronosInternalTmpFuture.internalCheckComplete() + + # `child` released by `futureContinue` + chronosInternalRetFuture.child.internalCheckComplete() when T isnot void: - cast[type(f)](chronosInternalTmpFuture).internalRead() + cast[type(f)](chronosInternalRetFuture.child).internalRead() else: unsupported "await is only available within {.async.}" template awaitne*[T](f: Future[T]): Future[T] = when declared(chronosInternalRetFuture): - #work around https://github.com/nim-lang/Nim/issues/19193 - when not declaredInScope(chronosInternalTmpFuture): - var chronosInternalTmpFuture {.inject.}: FutureBase = f - else: - chronosInternalTmpFuture = f - chronosInternalRetFuture.child = chronosInternalTmpFuture - yield chronosInternalTmpFuture - chronosInternalRetFuture.child = nil + chronosInternalRetFuture.child = f + yield chronosInternalRetFuture.child if chronosInternalRetFuture.mustCancel: raise newCancelledError() - cast[type(f)](chronosInternalTmpFuture) + cast[type(f)](chronosInternalRetFuture.child) else: unsupported "awaitne is only available within {.async.}"