# # Chronos # # (c) Copyright 2015 Dominik Picheta # (c) Copyright 2018-Present Status Research & Development GmbH # # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) import os, tables, strutils, heapqueue, options, deques, cstrutils, sets, hashes when defined(metrics): import metrics, locks import ./srcloc export srcloc const LocCreateIndex* = 0 LocCompleteIndex* = 1 when defined(chronosStackTrace): type StackTrace = string type FutureState* {.pure.} = enum Pending, Finished, Cancelled, Failed FutureBase* = ref object of RootObj ## Untyped future. location*: array[2, ptr SrcLoc] callbacks: Deque[AsyncCallback] cancelcb*: CallbackFunc child*: FutureBase state*: FutureState error*: ref Exception ## Stored exception mustCancel*: bool id*: int when defined(chronosStackTrace): errorStackTrace*: StackTrace stackTrace: StackTrace ## For debugging purposes only. when defined(chronosFutureTracking): next*: FutureBase prev*: FutureBase # ZAH: we have discussed some possible optimizations where # the future can be stored within the caller's stack frame. # How much refactoring is needed to make this a regular non-ref type? # Obviously, it will still be allocated on the heap when necessary. Future*[T] = ref object of FutureBase ## Typed future. value: T ## Stored value FutureStr*[T] = ref object of Future[T] ## Future to hold GC strings gcholder*: string FutureSeq*[A, B] = ref object of Future[A] ## Future to hold GC seqs gcholder*: seq[B] FutureVar*[T] = distinct Future[T] FutureDefect* = object of Defect cause*: FutureBase FutureError* = object of CatchableError FutureList* = object head*: FutureBase tail*: FutureBase count*: int {.push warning[InheritFromException]: off.} # used internally; should not be caught by the API user type CancelledError* = object of Exception {.pop.} var currentID* {.threadvar.}: int currentID = 0 when defined(metrics): declareCounter chronos_new_future, "new Future being created" when defined(chronosFutureTracking): var futureList* {.threadvar.}: FutureList futureList = FutureList() when defined(chronosFutureTracking): proc registerPendingFuture(future: var FutureBase) = future.next = nil future.prev = futureList.tail if not(isNil(futureList.tail)): futureList.tail.next = future futureList.tail = future if isNil(futureList.head): futureList.head = future futureList.count.inc() when defined(metrics): chronos_new_future.inc() {.gcsafe.}: withLock(pendingFuturesTableLock): pendingFuturesTable[$future.location[LocCreateIndex]] = pendingFuturesTable.getOrDefault($future.location[LocCreateIndex]) + 1 proc unregisterPendingFuture(future: var FutureBase) = if future == futureList.tail: futureList.tail = future.prev if future == futureList.head: futureList.head = future.next if not(isNil(future.next)): future.next.prev = future.prev if not(isNil(future.prev)): future.prev.next = future.next futureList.count.dec() when defined(metrics): {.gcsafe.}: withLock(pendingFuturesTableLock): pendingFuturesTable[$future.location[LocCreateIndex]] = pendingFuturesTable.getOrDefault($future.location[LocCreateIndex]) - 1 template setupFutureBase(loc: ptr SrcLoc) = new(result) result.state = FutureState.Pending when defined(chronosStackTrace): result.stackTrace = getStackTrace() result.id = currentID result.location[LocCreateIndex] = loc currentID.inc() when defined(chronosFutureTracking): registerPendingFuture(result.FutureBase) proc newFuture[T](loc: ptr SrcLoc): Future[T] = setupFutureBase(loc) proc newFutureSeq[A, B](loc: ptr SrcLoc): FutureSeq[A, B] = setupFutureBase(loc) proc newFutureStr[T](loc: ptr SrcLoc): FutureStr[T] = setupFutureBase(loc) proc newFutureVar[T](loc: ptr SrcLoc): FutureVar[T] = FutureVar[T](newFuture[T](loc)) template newFuture*[T](fromProc: static[string] = ""): auto = ## Creates a new future. ## ## Specifying ``fromProc``, which is a string specifying the name of the proc ## that this future belongs to, is a good habit as it helps with debugging. newFuture[T](getSrcLocation(fromProc)) template newFutureSeq*[A, B](fromProc: static[string] = ""): auto = ## Create a new future which can hold/preserve GC sequence until future will ## not be completed. ## ## Specifying ``fromProc``, which is a string specifying the name of the proc ## that this future belongs to, is a good habit as it helps with debugging. newFutureSeq[A, B](getSrcLocation(fromProc)) template newFutureStr*[T](fromProc: static[string] = ""): auto = ## Create a new future which can hold/preserve GC string until future will ## not be completed. ## ## Specifying ``fromProc``, which is a string specifying the name of the proc ## that this future belongs to, is a good habit as it helps with debugging. newFutureStr[T](getSrcLocation(fromProc)) template newFutureVar*[T](fromProc: static[string] = ""): auto = ## Create a new ``FutureVar``. This Future type is ideally suited for ## situations where you want to avoid unnecessary allocations of Futures. ## ## Specifying ``fromProc``, which is a string specifying the name of the proc ## that this future belongs to, is a good habit as it helps with debugging. newFutureVar[T](getSrcLocation(fromProc)) proc clean*[T](future: FutureVar[T]) = ## Resets the ``finished`` status of ``future``. Future[T](future).state = FutureState.Pending Future[T](future).error = nil proc finished*(future: FutureBase | FutureVar): bool {.inline.} = ## Determines whether ``future`` has completed. ## ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish. when future is FutureVar: result = (FutureBase(future).state != FutureState.Pending) else: result = (future.state != FutureState.Pending) proc cancelled*(future: FutureBase): bool {.inline.} = ## Determines whether ``future`` has cancelled. result = (future.state == FutureState.Cancelled) proc failed*(future: FutureBase): bool {.inline.} = ## Determines whether ``future`` completed with an error. result = (future.state == FutureState.Failed) when defined(chronosFutureTracking): proc futureDestructor(udata: pointer) {.gcsafe.} = ## This procedure will be called when Future[T] got finished, cancelled or ## failed and all Future[T].callbacks are already scheduled and processed. var future = cast[FutureBase](udata) unregisterPendingFuture(future) proc scheduleDestructor(future: FutureBase) {.inline.} = callSoon(futureDestructor, cast[pointer](future)) proc checkFinished(future: FutureBase, loc: ptr SrcLoc) = ## Checks whether `future` is finished. If it is then raises a ## ``FutureDefect``. if future.finished(): var msg = "" msg.add("An attempt was made to complete a Future more than once. ") msg.add("Details:") msg.add("\n Future ID: " & $future.id) msg.add("\n Creation location:") msg.add("\n " & $future.location[LocCreateIndex]) msg.add("\n First completion location:") msg.add("\n " & $future.location[LocCompleteIndex]) msg.add("\n Second completion location:") msg.add("\n " & $loc) when defined(chronosStackTrace): msg.add("\n Stack trace to moment of creation:") msg.add("\n" & indent(future.stackTrace.strip(), 4)) msg.add("\n Stack trace to moment of secondary completion:") msg.add("\n" & indent(getStackTrace().strip(), 4)) msg.add("\n\n") var err = newException(FutureDefect, msg) err.cause = future raise err else: future.location[LocCompleteIndex] = loc proc call(callbacks: var Deque[AsyncCallback]) = var count = len(callbacks) while count > 0: var item = callbacks.popFirst() if not(item.deleted): callSoon(item.function, item.udata) dec(count) proc add(callbacks: var Deque[AsyncCallback], item: AsyncCallback) = if len(callbacks) == 0: callbacks = initDeque[AsyncCallback]() callbacks.addLast(item) proc remove(callbacks: var Deque[AsyncCallback], item: AsyncCallback) = for p in callbacks.mitems(): if p.function == item.function and p.udata == item.udata: p.deleted = true proc complete[T](future: Future[T], val: T, loc: ptr SrcLoc) = if not(future.cancelled()): checkFinished(FutureBase(future), loc) doAssert(isNil(future.error)) future.value = val future.state = FutureState.Finished future.callbacks.call() when defined(chronosFutureTracking): scheduleDestructor(FutureBase(future)) template complete*[T](future: Future[T], val: T) = ## Completes ``future`` with value ``val``. complete(future, val, getSrcLocation()) proc complete(future: Future[void], loc: ptr SrcLoc) = if not(future.cancelled()): checkFinished(FutureBase(future), loc) doAssert(isNil(future.error)) future.state = FutureState.Finished future.callbacks.call() when defined(chronosFutureTracking): scheduleDestructor(FutureBase(future)) template complete*(future: Future[void]) = ## Completes a void ``future``. complete(future, getSrcLocation()) proc complete[T](future: FutureVar[T], loc: ptr SrcLoc) = if not(future.cancelled()): template fut: untyped = Future[T](future) checkFinished(FutureBase(fut), loc) doAssert(isNil(fut.error)) fut.state = FutureState.Finished fut.callbacks.call() when defined(chronosFutureTracking): scheduleDestructor(FutureBase(future)) template complete*[T](futvar: FutureVar[T]) = ## Completes a ``FutureVar``. complete(futvar, getSrcLocation()) proc complete[T](futvar: FutureVar[T], val: T, loc: ptr SrcLoc) = if not(futvar.cancelled()): template fut: untyped = Future[T](futvar) checkFinished(FutureBase(fut), loc) doAssert(isNil(fut.error)) fut.state = FutureState.Finished fut.value = val fut.callbacks.call() when defined(chronosFutureTracking): scheduleDestructor(FutureBase(fut)) template complete*[T](futvar: FutureVar[T], val: T) = ## Completes a ``FutureVar`` with value ``val``. ## ## Any previously stored value will be overwritten. complete(futvar, val, getSrcLocation()) proc fail[T](future: Future[T], error: ref Exception, loc: ptr SrcLoc) = if not(future.cancelled()): checkFinished(FutureBase(future), loc) future.state = FutureState.Failed future.error = error when defined(chronosStackTrace): future.errorStackTrace = if getStackTrace(error) == "": getStackTrace() else: getStackTrace(error) future.callbacks.call() when defined(chronosFutureTracking): scheduleDestructor(FutureBase(future)) template fail*[T](future: Future[T], error: ref Exception) = ## Completes ``future`` with ``error``. fail(future, error, getSrcLocation()) template newCancelledError(): ref CancelledError = (ref CancelledError)(msg: "Future operation cancelled!") proc cancelAndSchedule(future: FutureBase, loc: ptr SrcLoc) = if not(future.finished()): checkFinished(future, loc) future.state = FutureState.Cancelled future.error = newCancelledError() when defined(chronosStackTrace): future.errorStackTrace = getStackTrace() future.callbacks.call() when defined(chronosFutureTracking): scheduleDestructor(future) template cancelAndSchedule*[T](future: Future[T]) = cancelAndSchedule(FutureBase(future), getSrcLocation()) proc cancel(future: FutureBase, loc: ptr SrcLoc) = if not(future.finished()): # Cancel the bottom-most child. When that happens, its parent's `await` call # will raise CancelledError. Some macro will catch that and call # `cancelAndSchedule()` on that parent, thus propagating the cancellation # up the chain. if not(isNil(future.child)): cancel(future.child, loc) future.mustCancel = true else: if not(isNil(future.cancelcb)): future.cancelcb(cast[pointer](future)) cancelAndSchedule(future, loc) template cancel*[T](future: Future[T]) = ## Cancel ``future``. cancel(FutureBase(future), getSrcLocation()) proc clearCallbacks(future: FutureBase) = future.callbacks.clear() proc addCallback*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) = ## Adds the callbacks proc to be called when the future completes. ## ## If future has already completed then ``cb`` will be called immediately. when defined(metrics): {.gcsafe.}: if future.location[0] != nil: withLock(callbacksByFutureLock): callbacksByFuture.inc($future.location[LocCreateIndex]) doAssert(not isNil(cb)) if future.finished(): callSoon(cb, udata) else: let acb = AsyncCallback(function: cb, udata: udata) future.callbacks.add acb proc addCallback*[T](future: Future[T], cb: CallbackFunc) = ## Adds the callbacks proc to be called when the future completes. ## ## If future has already completed then ``cb`` will be called immediately. future.addCallback(cb, cast[pointer](future)) proc removeCallback*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) = doAssert(not isNil(cb)) let acb = AsyncCallback(function: cb, udata: udata) future.callbacks.remove acb proc removeCallback*[T](future: Future[T], cb: CallbackFunc) = future.removeCallback(cb, cast[pointer](future)) proc `callback=`*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) = ## Clears the list of callbacks and sets the callback proc to be called when ## the future completes. ## ## If future has already completed then ``cb`` will be called immediately. ## ## It's recommended to use ``addCallback`` or ``then`` instead. # ZAH: how about `setLen(1); callbacks[0] = cb` future.clearCallbacks future.addCallback(cb, udata) proc `callback=`*[T](future: Future[T], cb: CallbackFunc) = ## Sets the callback proc to be called when the future completes. ## ## If future has already completed then ``cb`` will be called immediately. `callback=`(future, cb, cast[pointer](future)) proc `cancelCallback=`*[T](future: Future[T], cb: CallbackFunc) = ## Sets the callback procedure to be called when the future is cancelled. ## ## This callback will be called immediately as ``future.cancel()`` invoked. future.cancelcb = cb proc getHint(entry: StackTraceEntry): string = ## We try to provide some hints about stack trace entries that the user ## may not be familiar with, in particular calls inside the stdlib. result = "" if entry.procname == "processPendingCallbacks": if cmpIgnoreStyle(entry.filename, "asyncdispatch.nim") == 0: return "Executes pending callbacks" elif entry.procname == "poll": if cmpIgnoreStyle(entry.filename, "asyncdispatch.nim") == 0: return "Processes asynchronous completion events" if entry.procname.endsWith("_continue"): if cmpIgnoreStyle(entry.filename, "asyncmacro.nim") == 0: return "Resumes an async procedure" proc `$`*(entries: seq[StackTraceEntry]): string = result = "" # Find longest filename & line number combo for alignment purposes. var longestLeft = 0 for entry in entries: if isNil(entry.procName): continue let left = $entry.filename & $entry.line if left.len > longestLeft: longestLeft = left.len var indent = 2 # Format the entries. for entry in entries: if isNil(entry.procName): if entry.line == -10: result.add(spaces(indent) & "#[\n") indent.inc(2) else: indent.dec(2) result.add(spaces(indent) & "]#\n") continue let left = "$#($#)" % [$entry.filename, $entry.line] result.add((spaces(indent) & "$#$# $#\n") % [ left, spaces(longestLeft - left.len + 2), $entry.procName ]) let hint = getHint(entry) if hint.len > 0: result.add(spaces(indent+2) & "## " & hint & "\n") when defined(chronosStackTrace): proc injectStacktrace(future: FutureBase) = const header = "\nAsync traceback:\n" var exceptionMsg = future.error.msg if header in exceptionMsg: # This is messy: extract the original exception message from the msg # containing the async traceback. let start = exceptionMsg.find(header) exceptionMsg = exceptionMsg[0..