# # Chronos # # (c) Copyright 2015 Dominik Picheta # (c) Copyright 2018-2023 Status Research & Development GmbH # # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) import std/sequtils import stew/base10 import "."/srcloc export srcloc when chronosStackTrace: when defined(nimHasStacktracesModule): import system/stacktraces else: const reraisedFromBegin = -10 reraisedFromEnd = -100 type StackTrace = string const LocCreateIndex* = 0 LocFinishIndex* = 1 template LocCompleteIndex*: untyped {.deprecated: "LocFinishIndex".} = LocFinishIndex when chronosStrictException: when (NimMajor, NimMinor) < (1, 4): {.pragma: closureIter, raises: [Defect, CatchableError], gcsafe.} else: {.pragma: closureIter, raises: [CatchableError], gcsafe.} else: {.pragma: closureIter, raises: [Exception], gcsafe.} type FutureState* {.pure.} = enum Pending, Completed, Cancelled, Failed FutureBase* = ref object of RootObj ## Untyped future. location*: array[2, ptr SrcLoc] callbacks: seq[AsyncCallback] cancelcb*: CallbackFunc child*: FutureBase state*: FutureState error*: ref CatchableError ## Stored exception mustCancel*: bool closure*: iterator(f: FutureBase): FutureBase {.closureIter.} when chronosFutureId: id*: uint when chronosStackTrace: errorStackTrace*: StackTrace stackTrace: StackTrace ## For debugging purposes only. when chronosFutureTracking: next*: FutureBase prev*: FutureBase Future*[T] = ref object of FutureBase ## Typed future. when T isnot void: value*: T ## Stored value FutureStr*[T] = ref object of Future[T] ## Future to hold GC strings gcholder*: string FutureSeq*[A, B] = ref object of Future[A] ## Future to hold GC seqs gcholder*: seq[B] FutureDefect* = object of Defect cause*: FutureBase FutureError* = object of CatchableError CancelledError* = object of FutureError FutureList* = object head*: FutureBase tail*: FutureBase count*: uint # Backwards compatibility for old FutureState name template Finished* {.deprecated: "Use Completed instead".} = Completed template Finished*(T: type FutureState): FutureState {.deprecated: "Use FutureState.Completed instead".} = FutureState.Completed when chronosFutureId: var currentID* {.threadvar.}: uint else: template id*(f: FutureBase): uint = cast[uint](addr f[]) when chronosFutureTracking: var futureList* {.threadvar.}: FutureList template setupFutureBase(loc: ptr SrcLoc) = new(result) result.state = FutureState.Pending when chronosStackTrace: result.stackTrace = getStackTrace() when chronosFutureId: currentID.inc() result.id = currentID result.location[LocCreateIndex] = loc when chronosFutureTracking: result.next = nil result.prev = futureList.tail if not(isNil(futureList.tail)): futureList.tail.next = result futureList.tail = result if isNil(futureList.head): futureList.head = result futureList.count.inc() proc newFutureImpl[T](loc: ptr SrcLoc): Future[T] = setupFutureBase(loc) proc newFutureSeqImpl[A, B](loc: ptr SrcLoc): FutureSeq[A, B] = setupFutureBase(loc) proc newFutureStrImpl[T](loc: ptr SrcLoc): FutureStr[T] = setupFutureBase(loc) template newFuture*[T](fromProc: static[string] = ""): Future[T] = ## Creates a new future. ## ## Specifying ``fromProc``, which is a string specifying the name of the proc ## that this future belongs to, is a good habit as it helps with debugging. newFutureImpl[T](getSrcLocation(fromProc)) template newFutureSeq*[A, B](fromProc: static[string] = ""): FutureSeq[A, B] = ## Create a new future which can hold/preserve GC sequence until future will ## not be completed. ## ## Specifying ``fromProc``, which is a string specifying the name of the proc ## that this future belongs to, is a good habit as it helps with debugging. newFutureSeqImpl[A, B](getSrcLocation(fromProc)) template newFutureStr*[T](fromProc: static[string] = ""): FutureStr[T] = ## Create a new future which can hold/preserve GC string until future will ## not be completed. ## ## Specifying ``fromProc``, which is a string specifying the name of the proc ## that this future belongs to, is a good habit as it helps with debugging. newFutureStrImpl[T](getSrcLocation(fromProc)) proc finished*(future: FutureBase): bool {.inline.} = ## Determines whether ``future`` has finished, i.e. ``future`` state changed ## from state ``Pending`` to one of the states (``Finished``, ``Cancelled``, ## ``Failed``). (future.state != FutureState.Pending) proc cancelled*(future: FutureBase): bool {.inline.} = ## Determines whether ``future`` has cancelled. (future.state == FutureState.Cancelled) proc failed*(future: FutureBase): bool {.inline.} = ## Determines whether ``future`` finished with an error. (future.state == FutureState.Failed) proc completed*(future: FutureBase): bool {.inline.} = ## Determines whether ``future`` finished with a value. (future.state == FutureState.Completed) proc done*(future: FutureBase): bool {.deprecated: "Use `completed` instead".} = ## This is an alias for ``completed(future)`` procedure. completed(future) when chronosFutureTracking: proc futureDestructor(udata: pointer) = ## This procedure will be called when Future[T] got completed, cancelled or ## failed and all Future[T].callbacks are already scheduled and processed. let future = cast[FutureBase](udata) if future == futureList.tail: futureList.tail = future.prev if future == futureList.head: futureList.head = future.next if not(isNil(future.next)): future.next.prev = future.prev if not(isNil(future.prev)): future.prev.next = future.next futureList.count.dec() proc scheduleDestructor(future: FutureBase) {.inline.} = callSoon(futureDestructor, cast[pointer](future)) proc checkFinished(future: FutureBase, loc: ptr SrcLoc) = ## Checks whether `future` is finished. If it is then raises a ## ``FutureDefect``. if future.finished(): var msg = "" msg.add("An attempt was made to complete a Future more than once. ") msg.add("Details:") msg.add("\n Future ID: " & Base10.toString(future.id)) msg.add("\n Creation location:") msg.add("\n " & $future.location[LocCreateIndex]) msg.add("\n First completion location:") msg.add("\n " & $future.location[LocFinishIndex]) msg.add("\n Second completion location:") msg.add("\n " & $loc) when chronosStackTrace: msg.add("\n Stack trace to moment of creation:") msg.add("\n" & indent(future.stackTrace.strip(), 4)) msg.add("\n Stack trace to moment of secondary completion:") msg.add("\n" & indent(getStackTrace().strip(), 4)) msg.add("\n\n") var err = newException(FutureDefect, msg) err.cause = future raise err else: future.location[LocFinishIndex] = loc proc finish(fut: FutureBase, state: FutureState) = # We do not perform any checks here, because: # 1. `finish()` is a private procedure and `state` is under our control. # 2. `fut.state` is checked by `checkFinished()`. fut.state = state doAssert fut.cancelcb == nil or state != FutureState.Cancelled fut.cancelcb = nil # release cancellation callback memory for item in fut.callbacks.mitems(): if not(isNil(item.function)): callSoon(item) item = default(AsyncCallback) # release memory as early as possible fut.callbacks = default(seq[AsyncCallback]) # release seq as well when chronosFutureTracking: scheduleDestructor(fut) proc complete[T](future: Future[T], val: T, loc: ptr SrcLoc) = if not(future.cancelled()): checkFinished(future, loc) doAssert(isNil(future.error)) future.value = val future.finish(FutureState.Completed) template complete*[T](future: Future[T], val: T) = ## Completes ``future`` with value ``val``. complete(future, val, getSrcLocation()) proc complete(future: Future[void], loc: ptr SrcLoc) = if not(future.cancelled()): checkFinished(future, loc) doAssert(isNil(future.error)) future.finish(FutureState.Completed) template complete*(future: Future[void]) = ## Completes a void ``future``. complete(future, getSrcLocation()) proc fail(future: FutureBase, error: ref CatchableError, loc: ptr SrcLoc) = if not(future.cancelled()): checkFinished(future, loc) future.error = error when chronosStackTrace: future.errorStackTrace = if getStackTrace(error) == "": getStackTrace() else: getStackTrace(error) future.finish(FutureState.Failed) template fail*(future: FutureBase, error: ref CatchableError) = ## Completes ``future`` with ``error``. fail(future, error, getSrcLocation()) template newCancelledError(): ref CancelledError = (ref CancelledError)(msg: "Future operation cancelled!") proc cancelAndSchedule(future: FutureBase, loc: ptr SrcLoc) = if not(future.finished()): checkFinished(future, loc) future.error = newCancelledError() when chronosStackTrace: future.errorStackTrace = getStackTrace() future.finish(FutureState.Cancelled) template cancelAndSchedule*(future: FutureBase) = cancelAndSchedule(future, getSrcLocation()) proc cancel(future: FutureBase, loc: ptr SrcLoc): bool = ## Request that Future ``future`` cancel itself. ## ## This arranges for a `CancelledError` to be thrown into procedure which ## waits for ``future`` on the next cycle through the event loop. ## The procedure then has a chance to clean up or even deny the request ## using `try/except/finally`. ## ## This call do not guarantee that the ``future`` will be cancelled: the ## exception might be caught and acted upon, delaying cancellation of the ## ``future`` or preventing cancellation completely. The ``future`` may also ## return value or raise different exception. ## ## Immediately after this procedure is called, ``future.cancelled()`` will ## not return ``true`` (unless the Future was already cancelled). if future.finished(): return false if not(isNil(future.child)): # If you hit this assertion, you should have used the `CancelledError` # mechanism and/or use a regular `addCallback` doAssert future.cancelcb.isNil, "futures returned from `{.async.}` functions must not use `cancelCallback`" if cancel(future.child, getSrcLocation()): return true else: if not(isNil(future.cancelcb)): future.cancelcb(cast[pointer](future)) future.cancelcb = nil cancelAndSchedule(future, getSrcLocation()) future.mustCancel = true return true template cancel*(future: FutureBase) = ## Cancel ``future``. discard cancel(future, getSrcLocation()) proc clearCallbacks(future: FutureBase) = future.callbacks = default(seq[AsyncCallback]) proc addCallback*(future: FutureBase, cb: CallbackFunc, udata: pointer) = ## Adds the callbacks proc to be called when the future completes. ## ## If future has already completed then ``cb`` will be called immediately. doAssert(not isNil(cb)) if future.finished(): callSoon(cb, udata) else: future.callbacks.add AsyncCallback(function: cb, udata: udata) proc addCallback*(future: FutureBase, cb: CallbackFunc) = ## Adds the callbacks proc to be called when the future completes. ## ## If future has already completed then ``cb`` will be called immediately. future.addCallback(cb, cast[pointer](future)) proc removeCallback*(future: FutureBase, cb: CallbackFunc, udata: pointer) = ## Remove future from list of callbacks - this operation may be slow if there ## are many registered callbacks! doAssert(not isNil(cb)) # Make sure to release memory associated with callback, or reference chains # may be created! future.callbacks.keepItIf: it.function != cb or it.udata != udata proc removeCallback*(future: FutureBase, cb: CallbackFunc) = future.removeCallback(cb, cast[pointer](future)) proc `callback=`*(future: FutureBase, cb: CallbackFunc, udata: pointer) = ## Clears the list of callbacks and sets the callback proc to be called when ## the future completes. ## ## If future has already completed then ``cb`` will be called immediately. ## ## It's recommended to use ``addCallback`` or ``then`` instead. # ZAH: how about `setLen(1); callbacks[0] = cb` future.clearCallbacks future.addCallback(cb, udata) proc `callback=`*(future: FutureBase, cb: CallbackFunc) = ## Sets the callback proc to be called when the future completes. ## ## If future has already completed then ``cb`` will be called immediately. `callback=`(future, cb, cast[pointer](future)) proc `cancelCallback=`*(future: FutureBase, cb: CallbackFunc) = ## Sets the callback procedure to be called when the future is cancelled. ## ## This callback will be called immediately as ``future.cancel()`` invoked and ## must be set before future is finished. doAssert not future.finished(), "cancellation callback must be set before finishing the future" future.cancelcb = cb {.push stackTrace: off.} proc futureContinue*(fut: FutureBase) {.raises: [Defect], gcsafe.} proc internalContinue(fut: pointer) {.raises: [Defect], gcsafe.} = let asFut = cast[FutureBase](fut) GC_unref(asFut) futureContinue(asFut) proc futureContinue*(fut: FutureBase) {.raises: [Defect], gcsafe.} = # This function is responsible for calling the closure iterator generated by # the `{.async.}` transformation either until it has completed its iteration # or raised and error / been cancelled. # # Every call to an `{.async.}` proc is redirected to call this function # instead with its original body captured in `fut.closure`. var next: FutureBase try: while true: # Call closure to make progress on `fut` until it reaches `yield` (inside # `await` typically) or completes / fails / is cancelled next = fut.closure(fut) if fut.closure.finished(): # Reached the end of the transformed proc break if next == nil: raiseAssert "Async procedure (" & ($fut.location[LocCreateIndex]) & ") yielded `nil`, are you await'ing a `nil` Future?" if not next.finished(): # We cannot make progress on `fut` until `next` has finished - schedule # `fut` to continue running when that happens GC_ref(fut) next.addCallback(CallbackFunc(internalContinue), cast[pointer](fut)) # return here so that we don't remove the closure below return # Continue while the yielded future is already finished. except CancelledError: fut.cancelAndSchedule() except CatchableError as exc: fut.fail(exc) except Exception as exc: if exc of Defect: raise (ref Defect)(exc) fut.fail((ref ValueError)(msg: exc.msg, parent: exc)) finally: next = nil # GC hygiene # `futureContinue` will not be called any more for this future so we can # clean it up fut.closure = nil fut.child = nil {.pop.} when chronosStackTrace: import std/strutils template getFilenameProcname(entry: StackTraceEntry): (string, string) = when compiles(entry.filenameStr) and compiles(entry.procnameStr): # We can't rely on "entry.filename" and "entry.procname" still being valid # cstring pointers, because the "string.data" buffers they pointed to might # be already garbage collected (this entry being a non-shallow copy, # "entry.filename" no longer points to "entry.filenameStr.data", but to the # buffer of the original object). (entry.filenameStr, entry.procnameStr) else: ($entry.filename, $entry.procname) proc `$`(stackTraceEntries: seq[StackTraceEntry]): string = try: when defined(nimStackTraceOverride) and declared(addDebuggingInfo): let entries = addDebuggingInfo(stackTraceEntries) else: let entries = stackTraceEntries # Find longest filename & line number combo for alignment purposes. var longestLeft = 0 for entry in entries: let (filename, procname) = getFilenameProcname(entry) if procname == "": continue let leftLen = filename.len + len($entry.line) if leftLen > longestLeft: longestLeft = leftLen var indent = 2 # Format the entries. for entry in entries: let (filename, procname) = getFilenameProcname(entry) if procname == "": if entry.line == reraisedFromBegin: result.add(spaces(indent) & "#[\n") indent.inc(2) elif entry.line == reraisedFromEnd: indent.dec(2) result.add(spaces(indent) & "]#\n") continue let left = "$#($#)" % [filename, $entry.line] result.add((spaces(indent) & "$#$# $#\n") % [ left, spaces(longestLeft - left.len + 2), procname ]) except ValueError as exc: return exc.msg # Shouldn't actually happen since we set the formatting # string proc injectStacktrace(error: ref Exception) = const header = "\nAsync traceback:\n" var exceptionMsg = error.msg if header in exceptionMsg: # This is messy: extract the original exception message from the msg # containing the async traceback. let start = exceptionMsg.find(header) exceptionMsg = exceptionMsg[0..