diff --git a/chronos/asyncfutures2.nim b/chronos/asyncfutures2.nim index d170f08..34920c0 100644 --- a/chronos/asyncfutures2.nim +++ b/chronos/asyncfutures2.nim @@ -132,6 +132,9 @@ proc finish(fut: FutureBase, state: FutureState) = # 1. `finish()` is a private procedure and `state` is under our control. # 2. `fut.state` is checked by `checkFinished()`. fut.internalState = state + when chronosFuturesInstrumentation: + if not(isNil(onFutureStop)): + onFutureStop(fut) when chronosStrictFutureAccess: doAssert fut.internalCancelcb == nil or state != FutureState.Cancelled fut.internalCancelcb = nil # release cancellation callback memory @@ -212,6 +215,9 @@ proc cancel(future: FutureBase, loc: ptr SrcLoc): bool = if future.finished(): return false + when chronosFuturesInstrumentation: + if not(isNil(onFutureStop)): onFutureStop(future) + if not(isNil(future.internalChild)): # If you hit this assertion, you should have used the `CancelledError` # mechanism and/or use a regular `addCallback` diff --git a/chronos/asyncmacro2.nim b/chronos/asyncmacro2.nim index 45146a3..2e0b56d 100644 --- a/chronos/asyncmacro2.nim +++ b/chronos/asyncmacro2.nim @@ -297,6 +297,11 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} = template await*[T](f: Future[T]): untyped = when declared(chronosInternalRetFuture): chronosInternalRetFuture.internalChild = f + + when chronosFuturesInstrumentation: + if not(isNil(onFuturePause)): + onFuturePause(chronosInternalRetFuture, f) + # `futureContinue` calls the iterator generated by the `async` # transformation - `yield` gives control back to `futureContinue` which is # responsible for resuming execution once the yielded future is finished @@ -316,6 +321,11 @@ template await*[T](f: Future[T]): untyped = template awaitne*[T](f: Future[T]): Future[T] = when declared(chronosInternalRetFuture): chronosInternalRetFuture.internalChild = f + + when chronosFuturesInstrumentation: + if not(isNil(onFuturePause)): + onFuturePause(chronosInternalRetFuture, f) + yield chronosInternalRetFuture.internalChild if chronosInternalRetFuture.internalMustCancel: raise newCancelledError() diff --git a/chronos/config.nim b/chronos/config.nim index 0a439a1..3d3dd4d 100644 --- a/chronos/config.nim +++ b/chronos/config.nim @@ -49,6 +49,11 @@ when (NimMajor, NimMinor) >= (1, 4): ## using `AsyncProcessOption.EvalCommand` and API calls such as ## ``execCommand(command)`` and ``execCommandEx(command)``. + chronosFuturesInstrumentation* {.booldefine.} = defined(chronosFuturesInstrumentation) + ## Enable instrumentation callbacks which are called at + ## the start, pause, or end of a Future's lifetime. + ## Useful for implementing metrics or other instrumentation. + else: # 1.2 doesn't support `booldefine` in `when` properly const @@ -61,6 +66,7 @@ else: chronosFutureTracking*: bool = defined(chronosDebug) or defined(chronosFutureTracking) chronosDumpAsync*: bool = defined(nimDumpAsync) + chronosFuturesInstrumentation*: bool = defined(chronosFuturesInstrumentation) chronosProcShell* {.strdefine.}: string = when defined(windows): "cmd.exe" diff --git a/chronos/futures.nim b/chronos/futures.nim index edfae32..92a4929 100644 --- a/chronos/futures.nim +++ b/chronos/futures.nim @@ -93,6 +93,13 @@ when chronosFutureTracking: var futureList* {.threadvar.}: FutureList +when chronosFuturesInstrumentation: + var + onFutureCreate* {.threadvar.}: proc (fut: FutureBase) {.gcsafe, raises: [].} + onFutureRunning* {.threadvar.}: proc (fut: FutureBase) {.gcsafe, raises: [].} + onFuturePause* {.threadvar.}: proc (fut, child: FutureBase) {.gcsafe, raises: [].} + onFutureStop* {.threadvar.}: proc (fut: FutureBase) {.gcsafe, raises: [].} + # Internal utilities - these are not part of the stable API proc internalInitFutureBase*( fut: FutureBase, @@ -121,6 +128,11 @@ proc internalInitFutureBase*( futureList.head = fut futureList.count.inc() + when chronosFuturesInstrumentation: + if not(isNil(onFutureCreate)): + onFutureCreate(fut) + + # Public API template init*[T](F: type Future[T], fromProc: static[string] = ""): Future[T] = ## Creates a new pending future. diff --git a/tests/config.nims b/tests/config.nims index e1d0189..9a3d66f 100644 --- a/tests/config.nims +++ b/tests/config.nims @@ -7,3 +7,7 @@ switch("threads", "on") # Should be removed when https://github.com/status-im/nim-chronos/issues/284 # will be implemented. switch("define", "nimRawSetjmp") + +## REMOVE BEFORE MERGE! +--d:chronosFuturesInstrumentation +# --d:chronosFutureTracking diff --git a/tests/testutils.nim b/tests/testutils.nim index e589037..e3ee1b8 100644 --- a/tests/testutils.nim +++ b/tests/testutils.nim @@ -10,6 +10,10 @@ import ../chronos, ../chronos/config {.used.} +when chronosFuturesInstrumentation: + import std/[tables, os, options, hashes] + import ../chronos/timer + suite "Asynchronous utilities test suite": when chronosFutureTracking: proc getCount(): uint = @@ -85,3 +89,171 @@ suite "Asynchronous utilities test suite": pendingFuturesCount() == 0'u else: skip() + + test "check empty futures instrumentation runs": + + when chronosFuturesInstrumentation: + + proc simpleAsyncChild() {.async.} = + echo "child sleep..." + os.sleep(25) + + proc simpleAsync1() {.async.} = + for i in 0..1: + await sleepAsync(40.milliseconds) + await simpleAsyncChild() + echo "sleep..." + os.sleep(50) + + waitFor(simpleAsync1()) + check true + + + test "Example of using Future hooks to gather metrics": + + when chronosFuturesInstrumentation: + + type + FutureMetric = object + ## Holds average timing information for a given closure + closureLoc*: ptr SrcLoc + created*: Moment + start*: Option[Moment] + duration*: Duration + blocks*: int + initDuration*: Duration + durationChildren*: Duration + + CallbackMetric = object + totalExecTime*: Duration + totalWallTime*: Duration + totalRunTime*: Duration + minSingleTime*: Duration + maxSingleTime*: Duration + count*: int64 + + var + futureDurations: Table[uint, FutureMetric] + callbackDurations: Table[ptr SrcLoc, CallbackMetric] + + proc setFutureCreate(fut: FutureBase) {.raises: [].} = + ## used for setting the duration + let loc = fut.internalLocation[Create] + futureDurations[fut.id] = FutureMetric() + futureDurations.withValue(fut.id, metric): + metric.created = Moment.now() + echo loc, "; future create " + + proc setFutureStart(fut: FutureBase) {.raises: [].} = + ## used for setting the duration + let loc = fut.internalLocation[Create] + assert futureDurations.hasKey(fut.id) + futureDurations.withValue(fut.id, metric): + let ts = Moment.now() + metric.start = some ts + metric.blocks.inc() + echo loc, "; future start: ", metric.initDuration + + proc setFuturePause(fut, child: FutureBase) {.raises: [].} = + ## used for setting the duration + let loc = fut.internalLocation[Create] + let childLoc = if child.isNil: nil else: child.internalLocation[Create] + var durationChildren = ZeroDuration + var initDurationChildren = ZeroDuration + if childLoc != nil: + futureDurations.withValue(child.id, metric): + durationChildren = metric.duration + initDurationChildren = metric.initDuration + assert futureDurations.hasKey(fut.id) + futureDurations.withValue(fut.id, metric): + if metric.start.isSome: + let ts = Moment.now() + metric.duration += ts - metric.start.get() + metric.duration -= initDurationChildren + if metric.blocks == 1: + metric.initDuration = ts - metric.created # tricky, + # the first block of a child iterator also + # runs on the parents clock, so we track our first block + # time so any parents can get it + echo loc, "; child firstBlock time: ", initDurationChildren + + metric.durationChildren += durationChildren + metric.start = none Moment + echo loc, "; future pause ", if childLoc.isNil: "" else: " child: " & $childLoc + + proc setFutureDuration(fut: FutureBase) {.raises: [].} = + ## used for setting the duration + let loc = fut.internalLocation[Create] + # assert "set duration: " & $loc + var fm: FutureMetric + # assert futureDurations.pop(fut.id, fm) + futureDurations.withValue(fut.id, metric): + fm = metric[] + + discard callbackDurations.hasKeyOrPut(loc, CallbackMetric(minSingleTime: InfiniteDuration)) + callbackDurations.withValue(loc, metric): + echo loc, " set duration: ", callbackDurations.hasKey(loc) + metric.totalExecTime += fm.duration + metric.totalWallTime += Moment.now() - fm.created + metric.totalRunTime += metric.totalExecTime + fm.durationChildren + echo loc, " child duration: ", fm.durationChildren + metric.count.inc + metric.minSingleTime = min(metric.minSingleTime, fm.duration) + metric.maxSingleTime = max(metric.maxSingleTime, fm.duration) + # handle overflow + if metric.count == metric.count.typeof.high: + metric.totalExecTime = ZeroDuration + metric.count = 0 + + onFutureCreate = + proc (f: FutureBase) = + f.setFutureCreate() + onFutureRunning = + proc (f: FutureBase) = + f.setFutureStart() + onFuturePause = + proc (f, child: FutureBase) = + f.setFuturePause(child) + onFutureStop = + proc (f: FutureBase) = + f.setFuturePause(nil) + f.setFutureDuration() + + proc simpleAsyncChild() {.async.} = + echo "child sleep..." + os.sleep(25) + + proc simpleAsync1() {.async.} = + for i in 0..1: + await sleepAsync(40.milliseconds) + await simpleAsyncChild() + echo "sleep..." + os.sleep(50) + + waitFor(simpleAsync1()) + + let metrics = callbackDurations + echo "\n=== metrics ===" + echo "execTime:\ttime to execute non-async portions of async proc" + echo "runTime:\texecution time + execution time of children" + echo "wallTime:\twall time elapsed for future's lifetime" + for (k,v) in metrics.pairs(): + let count = v.count + if count > 0: + echo "" + echo "metric: ", $k + echo "count: ", count + echo "avg execTime:\t", v.totalExecTime div count, "\ttotal: ", v.totalExecTime + echo "avg wallTime:\t", v.totalWallTime div count, "\ttotal: ", v.totalWallTime + echo "avg runTime:\t", v.totalRunTime div count, "\ttotal: ", v.totalRunTime + if k.procedure == "simpleAsync1": + check v.totalExecTime >= 150.milliseconds() + check v.totalExecTime <= 180.milliseconds() + + check v.totalRunTime >= 200.milliseconds() + check v.totalRunTime <= 230.milliseconds() + discard + echo "" + + else: + skip()