Make Future tracking and stack traces optional (#108)

* Make Future tracking optional via -d:chronosDutureTracking compilation flag.
* Stack traces is now optional, use -d:chronosStackTraces.
* Fix mistypes and add test for chronosStackTrace option.
This commit is contained in:
Eugene Kabanov 2020-07-08 19:48:01 +03:00 committed by GitHub
parent 5629b3c41f
commit ce6e7d17b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 195 additions and 155 deletions

View File

@ -13,8 +13,9 @@ requires "nim > 0.19.4",
task test, "Run all tests": task test, "Run all tests":
var commands = [ var commands = [
"nim c -r -d:useSysAssert -d:useGcAssert tests/", "nim c -r -d:useSysAssert -d:useGcAssert tests/",
"nim c -r tests/", "nim c -r -d:chronosStackTrace tests/",
"nim c -r -d:release tests/" "nim c -r -d:release tests/",
"nim c -r -d:release -d:chronosFutureTracking tests/"
] ]
for testname in ["testall"]: for testname in ["testall"]:
for cmd in commands: for cmd in commands:

View File

@ -16,12 +16,10 @@ const
LocCreateIndex* = 0 LocCreateIndex* = 0
LocCompleteIndex* = 1 LocCompleteIndex* = 1
type when defined(chronosStackTrace):
# ZAH: This can probably be stored with a cheaper representation type StackTrace = string
# until the moment it needs to be printed to the screen
# (e.g. seq[StackTraceEntry])
StackTrace = string
type
FutureState* {.pure.} = enum FutureState* {.pure.} = enum
Pending, Finished, Cancelled, Failed Pending, Finished, Cancelled, Failed
@ -32,12 +30,16 @@ type
child*: FutureBase child*: FutureBase
state*: FutureState state*: FutureState
error*: ref Exception ## Stored exception error*: ref Exception ## Stored exception
errorStackTrace*: StackTrace
stackTrace: StackTrace ## For debugging purposes only.
mustCancel*: bool mustCancel*: bool
id*: int id*: int
next*: FutureBase
prev*: FutureBase when defined(chronosStackTrace):
errorStackTrace*: StackTrace
stackTrace: StackTrace ## For debugging purposes only.
when defined(chronosFutureTracking):
next*: FutureBase
prev*: FutureBase
# ZAH: we have discussed some possible optimizations where # ZAH: we have discussed some possible optimizations where
# the future can be stored within the caller's stack frame. # the future can be stored within the caller's stack frame.
@ -69,26 +71,30 @@ type
count*: int count*: int
var currentID* {.threadvar.}: int var currentID* {.threadvar.}: int
var futureList* {.threadvar.}: FutureList
currentID = 0 currentID = 0
futureList = FutureList()
when defined(chronosFutureTracking):
var futureList* {.threadvar.}: FutureList
futureList = FutureList()
template setupFutureBase(loc: ptr SrcLoc) = template setupFutureBase(loc: ptr SrcLoc) =
new(result) new(result)
result.state = FutureState.Pending result.state = FutureState.Pending
result.stackTrace = getStackTrace() when defined(chronosStackTrace):
result.stackTrace = getStackTrace()
result.id = currentID result.id = currentID
result.location[LocCreateIndex] = loc result.location[LocCreateIndex] = loc
currentID.inc() currentID.inc()
result.next = nil when defined(chronosFutureTracking):
result.prev = futureList.tail result.next = nil
if not(isNil(futureList.tail)): result.prev = futureList.tail
futureList.tail.next = result if not(isNil(futureList.tail)):
futureList.tail = result futureList.tail.next = result
if isNil(futureList.head): futureList.tail = result
futureList.head = result if isNil(futureList.head):
futureList.count.inc() futureList.head = result
futureList.count.inc()
proc newFuture[T](loc: ptr SrcLoc): Future[T] = proc newFuture[T](loc: ptr SrcLoc): Future[T] =
setupFutureBase(loc) setupFutureBase(loc)
@ -155,15 +161,19 @@ proc failed*(future: FutureBase): bool {.inline.} =
## Determines whether ``future`` completed with an error. ## Determines whether ``future`` completed with an error.
result = (future.state == FutureState.Failed) result = (future.state == FutureState.Failed)
proc futureDestructor(udata: pointer) {.gcsafe.} = when defined(chronosFutureTracking):
## This procedure will be called when Future[T] got finished, cancelled or proc futureDestructor(udata: pointer) {.gcsafe.} =
## failed and all Future[T].callbacks are already scheduled and processed. ## This procedure will be called when Future[T] got finished, cancelled or
let future = cast[FutureBase](udata) ## failed and all Future[T].callbacks are already scheduled and processed.
if future == futureList.tail: futureList.tail = future.prev let future = cast[FutureBase](udata)
if future == futureList.head: futureList.head = future.next if future == futureList.tail: futureList.tail = future.prev
if not(isNil(future.next)): future.next.prev = future.prev if future == futureList.head: futureList.head = future.next
if not(isNil(future.prev)): future.prev.next = future.next if not(isNil(future.next)): future.next.prev = future.prev
futureList.count.dec() if not(isNil(future.prev)): future.prev.next = future.next
futureList.count.dec()
proc scheduleDestructor(future: FutureBase) {.inline.} =
callSoon(futureDestructor, cast[pointer](future))
proc checkFinished(future: FutureBase, loc: ptr SrcLoc) = proc checkFinished(future: FutureBase, loc: ptr SrcLoc) =
## Checks whether `future` is finished. If it is then raises a ## Checks whether `future` is finished. If it is then raises a
@ -179,10 +189,11 @@ proc checkFinished(future: FutureBase, loc: ptr SrcLoc) =
msg.add("\n " & $future.location[LocCompleteIndex]) msg.add("\n " & $future.location[LocCompleteIndex])
msg.add("\n Second completion location:") msg.add("\n Second completion location:")
msg.add("\n " & $loc) msg.add("\n " & $loc)
msg.add("\n Stack trace to moment of creation:") when defined(chronosStackTrace):
msg.add("\n" & indent(future.stackTrace.strip(), 4)) msg.add("\n Stack trace to moment of creation:")
msg.add("\n Stack trace to moment of secondary completion:") msg.add("\n" & indent(future.stackTrace.strip(), 4))
msg.add("\n" & indent(getStackTrace().strip(), 4)) msg.add("\n Stack trace to moment of secondary completion:")
msg.add("\n" & indent(getStackTrace().strip(), 4))
msg.add("\n\n") msg.add("\n\n")
var err = newException(FutureDefect, msg) var err = newException(FutureDefect, msg)
err.cause = future err.cause = future
@ -198,9 +209,6 @@ proc call(callbacks: var Deque[AsyncCallback]) =
callSoon(item.function, item.udata) callSoon(item.function, item.udata)
dec(count) dec(count)
proc scheduleDestructor(future: FutureBase) {.inline.} =
callSoon(futureDestructor, cast[pointer](future))
proc add(callbacks: var Deque[AsyncCallback], item: AsyncCallback) = proc add(callbacks: var Deque[AsyncCallback], item: AsyncCallback) =
if len(callbacks) == 0: if len(callbacks) == 0:
callbacks = initDeque[AsyncCallback]() callbacks = initDeque[AsyncCallback]()
@ -218,7 +226,8 @@ proc complete[T](future: Future[T], val: T, loc: ptr SrcLoc) =
future.value = val future.value = val
future.state = FutureState.Finished future.state = FutureState.Finished
future.callbacks.call() future.callbacks.call()
scheduleDestructor(FutureBase(future)) when defined(chronosFutureTracking):
scheduleDestructor(FutureBase(future))
template complete*[T](future: Future[T], val: T) = template complete*[T](future: Future[T], val: T) =
## Completes ``future`` with value ``val``. ## Completes ``future`` with value ``val``.
@ -230,7 +239,8 @@ proc complete(future: Future[void], loc: ptr SrcLoc) =
doAssert(isNil(future.error)) doAssert(isNil(future.error))
future.state = FutureState.Finished future.state = FutureState.Finished
future.callbacks.call() future.callbacks.call()
scheduleDestructor(FutureBase(future)) when defined(chronosFutureTracking):
scheduleDestructor(FutureBase(future))
template complete*(future: Future[void]) = template complete*(future: Future[void]) =
## Completes a void ``future``. ## Completes a void ``future``.
@ -243,7 +253,8 @@ proc complete[T](future: FutureVar[T], loc: ptr SrcLoc) =
doAssert(isNil(fut.error)) doAssert(isNil(fut.error))
fut.state = FutureState.Finished fut.state = FutureState.Finished
fut.callbacks.call() fut.callbacks.call()
scheduleDestructor(FutureBase(future)) when defined(chronosFutureTracking):
scheduleDestructor(FutureBase(future))
template complete*[T](futvar: FutureVar[T]) = template complete*[T](futvar: FutureVar[T]) =
## Completes a ``FutureVar``. ## Completes a ``FutureVar``.
@ -257,7 +268,8 @@ proc complete[T](futvar: FutureVar[T], val: T, loc: ptr SrcLoc) =
fut.state = FutureState.Finished fut.state = FutureState.Finished
fut.value = val fut.value = val
fut.callbacks.call() fut.callbacks.call()
scheduleDestructor(FutureBase(fut)) when defined(chronosFutureTracking):
scheduleDestructor(FutureBase(fut))
template complete*[T](futvar: FutureVar[T], val: T) = template complete*[T](futvar: FutureVar[T], val: T) =
## Completes a ``FutureVar`` with value ``val``. ## Completes a ``FutureVar`` with value ``val``.
@ -270,23 +282,32 @@ proc fail[T](future: Future[T], error: ref Exception, loc: ptr SrcLoc) =
checkFinished(FutureBase(future), loc) checkFinished(FutureBase(future), loc)
future.state = FutureState.Failed future.state = FutureState.Failed
future.error = error future.error = error
future.errorStackTrace = when defined(chronosStackTrace):
if getStackTrace(error) == "": getStackTrace() else: getStackTrace(error) future.errorStackTrace = if getStackTrace(error) == "":
getStackTrace()
else:
getStackTrace(error)
future.callbacks.call() future.callbacks.call()
scheduleDestructor(FutureBase(future)) when defined(chronosFutureTracking):
scheduleDestructor(FutureBase(future))
template fail*[T](future: Future[T], error: ref Exception) = template fail*[T](future: Future[T], error: ref Exception) =
## Completes ``future`` with ``error``. ## Completes ``future`` with ``error``.
fail(future, error, getSrcLocation()) fail(future, error, getSrcLocation())
template newCancelledError(): ref CancelledError =
(ref CancelledError)(msg: "Future operation cancelled!")
proc cancelAndSchedule(future: FutureBase, loc: ptr SrcLoc) = proc cancelAndSchedule(future: FutureBase, loc: ptr SrcLoc) =
if not(future.finished()): if not(future.finished()):
checkFinished(future, loc) checkFinished(future, loc)
future.state = FutureState.Cancelled future.state = FutureState.Cancelled
future.error = newException(CancelledError, "") future.error = newCancelledError()
future.errorStackTrace = getStackTrace() when defined(chronosStackTrace):
future.errorStackTrace = getStackTrace()
future.callbacks.call() future.callbacks.call()
scheduleDestructor(future) when defined(chronosFutureTracking):
scheduleDestructor(future)
template cancelAndSchedule*[T](future: Future[T]) = template cancelAndSchedule*[T](future: Future[T]) =
cancelAndSchedule(FutureBase(future), getSrcLocation()) cancelAndSchedule(FutureBase(future), getSrcLocation())
@ -405,33 +426,35 @@ proc `$`*(entries: seq[StackTraceEntry]): string =
if hint.len > 0: if hint.len > 0:
result.add(spaces(indent+2) & "## " & hint & "\n") result.add(spaces(indent+2) & "## " & hint & "\n")
proc injectStacktrace(future: FutureBase) = when defined(chronosStackTrace):
const header = "\nAsync traceback:\n" proc injectStacktrace(future: FutureBase) =
const header = "\nAsync traceback:\n"
var exceptionMsg = future.error.msg var exceptionMsg = future.error.msg
if header in exceptionMsg: if header in exceptionMsg:
# This is messy: extract the original exception message from the msg # This is messy: extract the original exception message from the msg
# containing the async traceback. # containing the async traceback.
let start = exceptionMsg.find(header) let start = exceptionMsg.find(header)
exceptionMsg = exceptionMsg[0..<start] exceptionMsg = exceptionMsg[0..<start]
var newMsg = exceptionMsg & header var newMsg = exceptionMsg & header
let entries = getStackTraceEntries(future.error) let entries = getStackTraceEntries(future.error)
newMsg.add($entries) newMsg.add($entries)
newMsg.add("Exception message: " & exceptionMsg & "\n") newMsg.add("Exception message: " & exceptionMsg & "\n")
newMsg.add("Exception type:") newMsg.add("Exception type:")
# # For debugging purposes # # For debugging purposes
# for entry in getStackTraceEntries(future.error): # for entry in getStackTraceEntries(future.error):
# newMsg.add "\n" & $entry # newMsg.add "\n" & $entry
future.error.msg = newMsg future.error.msg = newMsg
proc internalCheckComplete*(fut: FutureBase) = proc internalCheckComplete*(fut: FutureBase) =
# For internal use only. Used in asyncmacro # For internal use only. Used in asyncmacro
if not(isNil(fut.error)): if not(isNil(fut.error)):
injectStacktrace(fut) when defined(chronosStackTrace):
injectStacktrace(fut)
raise fut.error raise fut.error
proc internalRead*[T](fut: Future[T] | FutureVar[T]): T {.inline.} = proc internalRead*[T](fut: Future[T] | FutureVar[T]): T {.inline.} =
@ -480,7 +503,8 @@ proc asyncCheck*[T](future: Future[T]) =
doAssert(not isNil(future), "Future is nil") doAssert(not isNil(future), "Future is nil")
proc cb(data: pointer) = proc cb(data: pointer) =
if future.failed() or future.cancelled(): if future.failed() or future.cancelled():
injectStacktrace(future) when defined(chronosStackTrace):
injectStacktrace(future)
raise future.error raise future.error
future.callback = cb future.callback = cb

View File

@ -959,18 +959,19 @@ proc getTracker*(id: string): TrackerBase =
let loop = getGlobalDispatcher() let loop = getGlobalDispatcher()
result = loop.trackers.getOrDefault(id, nil) result = loop.trackers.getOrDefault(id, nil)
iterator pendingFutures*(): FutureBase = when defined(chronosFutureTracking):
## Iterates over the list of pending Futures (Future[T] objects which not yet iterator pendingFutures*(): FutureBase =
## completed, cancelled or failed). ## Iterates over the list of pending Futures (Future[T] objects which not
var slider = futureList.head ## yet completed, cancelled or failed).
while not(isNil(slider)): var slider = futureList.head
yield slider while not(isNil(slider)):
slider = slider.next yield slider
slider = slider.next
proc pendingFuturesCount*(): int = proc pendingFuturesCount*(): int =
## Returns number of pending Futures (Future[T] objects which not yet ## Returns number of pending Futures (Future[T] objects which not yet
## completed, cancelled or failed). ## completed, cancelled or failed).
futureList.count futureList.count
# Perform global per-module initialization. # Perform global per-module initialization.
globalInit() globalInit()

View File

@ -275,7 +275,7 @@ template await*[T](f: Future[T]): auto =
yield chronosInternalTmpFuture yield chronosInternalTmpFuture
chronosInternalRetFuture.child = nil chronosInternalRetFuture.child = nil
if chronosInternalRetFuture.mustCancel: if chronosInternalRetFuture.mustCancel:
raise newException(CancelledError, "") raise newCancelledError()
chronosInternalTmpFuture.internalCheckComplete() chronosInternalTmpFuture.internalCheckComplete()
cast[type(f)](chronosInternalTmpFuture).internalRead() cast[type(f)](chronosInternalTmpFuture).internalRead()
else: else:
@ -290,7 +290,7 @@ template awaitne*[T](f: Future[T]): Future[T] =
yield chronosInternalTmpFuture yield chronosInternalTmpFuture
chronosInternalRetFuture.child = nil chronosInternalRetFuture.child = nil
if chronosInternalRetFuture.mustCancel: if chronosInternalRetFuture.mustCancel:
raise newException(CancelledError, "") raise newCancelledError()
cast[type(f)](chronosInternalTmpFuture) cast[type(f)](chronosInternalTmpFuture)
else: else:
unsupported "awaitne is only available within {.async.}" unsupported "awaitne is only available within {.async.}"

View File

@ -26,20 +26,21 @@ proc dumpPendingFutures*(filter = AllFutureStates): string =
## which callbacks are scheduled, but not yet fully processed. ## which callbacks are scheduled, but not yet fully processed.
var count = 0 var count = 0
var res = "" var res = ""
for item in pendingFutures(): when defined(chronosFutureTracking):
if item.state in filter: for item in pendingFutures():
inc(count) if item.state in filter:
let loc = item.location[LocCreateIndex][] inc(count)
let procedure = $loc.procedure let loc = item.location[LocCreateIndex][]
let filename = $loc.file let procedure = $loc.procedure
let procname = if len(procedure) == 0: let filename = $loc.file
"\"unspecified\"" let procname = if len(procedure) == 0:
else: "\"unspecified\""
"\"" & procedure & "\"" else:
let item = "Future[" & $item.id & "] with name " & $procname & "\"" & procedure & "\""
" created at " & "<" & filename & ":" & $loc.line & ">" & let item = "Future[" & $item.id & "] with name " & $procname &
" and state = " & $item.state & "\n" " created at " & "<" & filename & ":" & $loc.line & ">" &
res.add(item) " and state = " & $item.state & "\n"
res.add(item)
result = $count & " pending Future[T] objects found:\n" & $res result = $count & " pending Future[T] objects found:\n" & $res
proc pendingFuturesCount*(filter: set[FutureState]): int = proc pendingFuturesCount*(filter: set[FutureState]): int =
@ -48,11 +49,14 @@ proc pendingFuturesCount*(filter: set[FutureState]): int =
## ##
## If ``filter`` is equal to ``AllFutureStates`` Operation's complexity is ## If ``filter`` is equal to ``AllFutureStates`` Operation's complexity is
## O(1), otherwise operation's complexity is O(n). ## O(1), otherwise operation's complexity is O(n).
if filter == AllFutureStates: when defined(chronosFutureTracking):
pendingFuturesCount() if filter == AllFutureStates:
pendingFuturesCount()
else:
var res = 0
for item in pendingFutures():
if item.state in filter:
inc(res)
res
else: else:
var res = 0 0
for item in pendingFutures():
if item.state in filter:
inc(res)
res

View File

@ -5,72 +5,82 @@
# Licensed under either of # Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2) # Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT) # MIT license (LICENSE-MIT)
import unittest, strutils import unittest
import ../chronos import ../chronos
when defined(nimHasUsed): {.used.} when defined(nimHasUsed): {.used.}
suite "Asynchronous utilities test suite": suite "Asynchronous utilities test suite":
proc getCount(): int = when defined(chronosFutureTracking):
# This procedure counts number of Future[T] in double-linked list via list proc getCount(): int =
# iteration. # This procedure counts number of Future[T] in double-linked list via list
result = 0 # iteration.
for item in pendingFutures(): result = 0
inc(result) for item in pendingFutures():
inc(result)
test "Future clean and leaks test": test "Future clean and leaks test":
if pendingFuturesCount(WithoutFinished) == 0: when defined(chronosFutureTracking):
if pendingFuturesCount(OnlyFinished) > 0: if pendingFuturesCount(WithoutFinished) == 0:
poll() if pendingFuturesCount(OnlyFinished) > 0:
check pendingFuturesCount() == 0 poll()
check pendingFuturesCount() == 0
else:
echo dumpPendingFutures()
check false
else: else:
echo dumpPendingFutures() skip()
check false
test "FutureList basics test": test "FutureList basics test":
var fut1 = newFuture[void]() when defined(chronosFutureTracking):
check: var fut1 = newFuture[void]()
getCount() == 1 check:
pendingFuturesCount() == 1 getCount() == 1
var fut2 = newFuture[void]() pendingFuturesCount() == 1
check: var fut2 = newFuture[void]()
getCount() == 2 check:
pendingFuturesCount() == 2 getCount() == 2
var fut3 = newFuture[void]() pendingFuturesCount() == 2
check: var fut3 = newFuture[void]()
getCount() == 3 check:
pendingFuturesCount() == 3 getCount() == 3
fut1.complete() pendingFuturesCount() == 3
poll() fut1.complete()
check: poll()
getCount() == 2 check:
pendingFuturesCount() == 2 getCount() == 2
fut2.fail(newException(ValueError, "")) pendingFuturesCount() == 2
poll() fut2.fail(newException(ValueError, ""))
check: poll()
getCount() == 1 check:
pendingFuturesCount() == 1 getCount() == 1
fut3.cancel() pendingFuturesCount() == 1
poll() fut3.cancel()
check: poll()
getCount() == 0 check:
pendingFuturesCount() == 0 getCount() == 0
pendingFuturesCount() == 0
else:
skip()
test "FutureList async procedure test": test "FutureList async procedure test":
proc simpleProc() {.async.} = when defined(chronosFutureTracking):
await sleepAsync(10.milliseconds) proc simpleProc() {.async.} =
await sleepAsync(10.milliseconds)
var fut = simpleProc() var fut = simpleProc()
check: check:
getCount() == 2 getCount() == 2
pendingFuturesCount() == 2 pendingFuturesCount() == 2
waitFor fut waitFor fut
check: check:
getCount() == 1 getCount() == 1
pendingFuturesCount() == 1 pendingFuturesCount() == 1
poll() poll()
check: check:
getCount() == 0 getCount() == 0
pendingFuturesCount() == 0 pendingFuturesCount() == 0
else:
skip()