mirror of
https://github.com/logos-storage/nim-chronos.git
synced 2026-01-06 07:23:08 +00:00
futures profiling instrumentation
This commit is contained in:
parent
0277b65be2
commit
f9909c4053
@ -132,6 +132,9 @@ proc finish(fut: FutureBase, state: FutureState) =
|
||||
# 1. `finish()` is a private procedure and `state` is under our control.
|
||||
# 2. `fut.state` is checked by `checkFinished()`.
|
||||
fut.internalState = state
|
||||
when chronosFuturesInstrumentation:
|
||||
if not(isNil(onFutureStop)):
|
||||
onFutureStop(fut)
|
||||
when chronosStrictFutureAccess:
|
||||
doAssert fut.internalCancelcb == nil or state != FutureState.Cancelled
|
||||
fut.internalCancelcb = nil # release cancellation callback memory
|
||||
@ -212,6 +215,9 @@ proc cancel(future: FutureBase, loc: ptr SrcLoc): bool =
|
||||
if future.finished():
|
||||
return false
|
||||
|
||||
when chronosFuturesInstrumentation:
|
||||
if not(isNil(onFutureStop)): onFutureStop(future)
|
||||
|
||||
if not(isNil(future.internalChild)):
|
||||
# If you hit this assertion, you should have used the `CancelledError`
|
||||
# mechanism and/or use a regular `addCallback`
|
||||
|
||||
@ -297,6 +297,11 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} =
|
||||
template await*[T](f: Future[T]): untyped =
|
||||
when declared(chronosInternalRetFuture):
|
||||
chronosInternalRetFuture.internalChild = f
|
||||
|
||||
when chronosFuturesInstrumentation:
|
||||
if not(isNil(onFuturePause)):
|
||||
onFuturePause(chronosInternalRetFuture, f)
|
||||
|
||||
# `futureContinue` calls the iterator generated by the `async`
|
||||
# transformation - `yield` gives control back to `futureContinue` which is
|
||||
# responsible for resuming execution once the yielded future is finished
|
||||
@ -316,6 +321,11 @@ template await*[T](f: Future[T]): untyped =
|
||||
template awaitne*[T](f: Future[T]): Future[T] =
|
||||
when declared(chronosInternalRetFuture):
|
||||
chronosInternalRetFuture.internalChild = f
|
||||
|
||||
when chronosFuturesInstrumentation:
|
||||
if not(isNil(onFuturePause)):
|
||||
onFuturePause(chronosInternalRetFuture, f)
|
||||
|
||||
yield chronosInternalRetFuture.internalChild
|
||||
if chronosInternalRetFuture.internalMustCancel:
|
||||
raise newCancelledError()
|
||||
|
||||
@ -49,6 +49,11 @@ when (NimMajor, NimMinor) >= (1, 4):
|
||||
## using `AsyncProcessOption.EvalCommand` and API calls such as
|
||||
## ``execCommand(command)`` and ``execCommandEx(command)``.
|
||||
|
||||
chronosFuturesInstrumentation* {.booldefine.} = defined(chronosFuturesInstrumentation)
|
||||
## Enable instrumentation callbacks which are called at
|
||||
## the start, pause, or end of a Future's lifetime.
|
||||
## Useful for implementing metrics or other instrumentation.
|
||||
|
||||
else:
|
||||
# 1.2 doesn't support `booldefine` in `when` properly
|
||||
const
|
||||
@ -61,6 +66,7 @@ else:
|
||||
chronosFutureTracking*: bool =
|
||||
defined(chronosDebug) or defined(chronosFutureTracking)
|
||||
chronosDumpAsync*: bool = defined(nimDumpAsync)
|
||||
chronosFuturesInstrumentation*: bool = defined(chronosFuturesInstrumentation)
|
||||
chronosProcShell* {.strdefine.}: string =
|
||||
when defined(windows):
|
||||
"cmd.exe"
|
||||
|
||||
@ -93,6 +93,13 @@ when chronosFutureTracking:
|
||||
|
||||
var futureList* {.threadvar.}: FutureList
|
||||
|
||||
when chronosFuturesInstrumentation:
|
||||
var
|
||||
onFutureCreate* {.threadvar.}: proc (fut: FutureBase) {.gcsafe, raises: [].}
|
||||
onFutureRunning* {.threadvar.}: proc (fut: FutureBase) {.gcsafe, raises: [].}
|
||||
onFuturePause* {.threadvar.}: proc (fut, child: FutureBase) {.gcsafe, raises: [].}
|
||||
onFutureStop* {.threadvar.}: proc (fut: FutureBase) {.gcsafe, raises: [].}
|
||||
|
||||
# Internal utilities - these are not part of the stable API
|
||||
proc internalInitFutureBase*(
|
||||
fut: FutureBase,
|
||||
@ -121,6 +128,11 @@ proc internalInitFutureBase*(
|
||||
futureList.head = fut
|
||||
futureList.count.inc()
|
||||
|
||||
when chronosFuturesInstrumentation:
|
||||
if not(isNil(onFutureCreate)):
|
||||
onFutureCreate(fut)
|
||||
|
||||
|
||||
# Public API
|
||||
template init*[T](F: type Future[T], fromProc: static[string] = ""): Future[T] =
|
||||
## Creates a new pending future.
|
||||
|
||||
@ -7,3 +7,7 @@ switch("threads", "on")
|
||||
# Should be removed when https://github.com/status-im/nim-chronos/issues/284
|
||||
# will be implemented.
|
||||
switch("define", "nimRawSetjmp")
|
||||
|
||||
## REMOVE BEFORE MERGE!
|
||||
--d:chronosFuturesInstrumentation
|
||||
# --d:chronosFutureTracking
|
||||
|
||||
@ -10,6 +10,10 @@ import ../chronos, ../chronos/config
|
||||
|
||||
{.used.}
|
||||
|
||||
when chronosFuturesInstrumentation:
|
||||
import std/[tables, os, options, hashes]
|
||||
import ../chronos/timer
|
||||
|
||||
suite "Asynchronous utilities test suite":
|
||||
when chronosFutureTracking:
|
||||
proc getCount(): uint =
|
||||
@ -85,3 +89,171 @@ suite "Asynchronous utilities test suite":
|
||||
pendingFuturesCount() == 0'u
|
||||
else:
|
||||
skip()
|
||||
|
||||
test "check empty futures instrumentation runs":
|
||||
|
||||
when chronosFuturesInstrumentation:
|
||||
|
||||
proc simpleAsyncChild() {.async.} =
|
||||
echo "child sleep..."
|
||||
os.sleep(25)
|
||||
|
||||
proc simpleAsync1() {.async.} =
|
||||
for i in 0..1:
|
||||
await sleepAsync(40.milliseconds)
|
||||
await simpleAsyncChild()
|
||||
echo "sleep..."
|
||||
os.sleep(50)
|
||||
|
||||
waitFor(simpleAsync1())
|
||||
check true
|
||||
|
||||
|
||||
test "Example of using Future hooks to gather metrics":
|
||||
|
||||
when chronosFuturesInstrumentation:
|
||||
|
||||
type
|
||||
FutureMetric = object
|
||||
## Holds average timing information for a given closure
|
||||
closureLoc*: ptr SrcLoc
|
||||
created*: Moment
|
||||
start*: Option[Moment]
|
||||
duration*: Duration
|
||||
blocks*: int
|
||||
initDuration*: Duration
|
||||
durationChildren*: Duration
|
||||
|
||||
CallbackMetric = object
|
||||
totalExecTime*: Duration
|
||||
totalWallTime*: Duration
|
||||
totalRunTime*: Duration
|
||||
minSingleTime*: Duration
|
||||
maxSingleTime*: Duration
|
||||
count*: int64
|
||||
|
||||
var
|
||||
futureDurations: Table[uint, FutureMetric]
|
||||
callbackDurations: Table[ptr SrcLoc, CallbackMetric]
|
||||
|
||||
proc setFutureCreate(fut: FutureBase) {.raises: [].} =
|
||||
## used for setting the duration
|
||||
let loc = fut.internalLocation[Create]
|
||||
futureDurations[fut.id] = FutureMetric()
|
||||
futureDurations.withValue(fut.id, metric):
|
||||
metric.created = Moment.now()
|
||||
echo loc, "; future create "
|
||||
|
||||
proc setFutureStart(fut: FutureBase) {.raises: [].} =
|
||||
## used for setting the duration
|
||||
let loc = fut.internalLocation[Create]
|
||||
assert futureDurations.hasKey(fut.id)
|
||||
futureDurations.withValue(fut.id, metric):
|
||||
let ts = Moment.now()
|
||||
metric.start = some ts
|
||||
metric.blocks.inc()
|
||||
echo loc, "; future start: ", metric.initDuration
|
||||
|
||||
proc setFuturePause(fut, child: FutureBase) {.raises: [].} =
|
||||
## used for setting the duration
|
||||
let loc = fut.internalLocation[Create]
|
||||
let childLoc = if child.isNil: nil else: child.internalLocation[Create]
|
||||
var durationChildren = ZeroDuration
|
||||
var initDurationChildren = ZeroDuration
|
||||
if childLoc != nil:
|
||||
futureDurations.withValue(child.id, metric):
|
||||
durationChildren = metric.duration
|
||||
initDurationChildren = metric.initDuration
|
||||
assert futureDurations.hasKey(fut.id)
|
||||
futureDurations.withValue(fut.id, metric):
|
||||
if metric.start.isSome:
|
||||
let ts = Moment.now()
|
||||
metric.duration += ts - metric.start.get()
|
||||
metric.duration -= initDurationChildren
|
||||
if metric.blocks == 1:
|
||||
metric.initDuration = ts - metric.created # tricky,
|
||||
# the first block of a child iterator also
|
||||
# runs on the parents clock, so we track our first block
|
||||
# time so any parents can get it
|
||||
echo loc, "; child firstBlock time: ", initDurationChildren
|
||||
|
||||
metric.durationChildren += durationChildren
|
||||
metric.start = none Moment
|
||||
echo loc, "; future pause ", if childLoc.isNil: "" else: " child: " & $childLoc
|
||||
|
||||
proc setFutureDuration(fut: FutureBase) {.raises: [].} =
|
||||
## used for setting the duration
|
||||
let loc = fut.internalLocation[Create]
|
||||
# assert "set duration: " & $loc
|
||||
var fm: FutureMetric
|
||||
# assert futureDurations.pop(fut.id, fm)
|
||||
futureDurations.withValue(fut.id, metric):
|
||||
fm = metric[]
|
||||
|
||||
discard callbackDurations.hasKeyOrPut(loc, CallbackMetric(minSingleTime: InfiniteDuration))
|
||||
callbackDurations.withValue(loc, metric):
|
||||
echo loc, " set duration: ", callbackDurations.hasKey(loc)
|
||||
metric.totalExecTime += fm.duration
|
||||
metric.totalWallTime += Moment.now() - fm.created
|
||||
metric.totalRunTime += metric.totalExecTime + fm.durationChildren
|
||||
echo loc, " child duration: ", fm.durationChildren
|
||||
metric.count.inc
|
||||
metric.minSingleTime = min(metric.minSingleTime, fm.duration)
|
||||
metric.maxSingleTime = max(metric.maxSingleTime, fm.duration)
|
||||
# handle overflow
|
||||
if metric.count == metric.count.typeof.high:
|
||||
metric.totalExecTime = ZeroDuration
|
||||
metric.count = 0
|
||||
|
||||
onFutureCreate =
|
||||
proc (f: FutureBase) =
|
||||
f.setFutureCreate()
|
||||
onFutureRunning =
|
||||
proc (f: FutureBase) =
|
||||
f.setFutureStart()
|
||||
onFuturePause =
|
||||
proc (f, child: FutureBase) =
|
||||
f.setFuturePause(child)
|
||||
onFutureStop =
|
||||
proc (f: FutureBase) =
|
||||
f.setFuturePause(nil)
|
||||
f.setFutureDuration()
|
||||
|
||||
proc simpleAsyncChild() {.async.} =
|
||||
echo "child sleep..."
|
||||
os.sleep(25)
|
||||
|
||||
proc simpleAsync1() {.async.} =
|
||||
for i in 0..1:
|
||||
await sleepAsync(40.milliseconds)
|
||||
await simpleAsyncChild()
|
||||
echo "sleep..."
|
||||
os.sleep(50)
|
||||
|
||||
waitFor(simpleAsync1())
|
||||
|
||||
let metrics = callbackDurations
|
||||
echo "\n=== metrics ==="
|
||||
echo "execTime:\ttime to execute non-async portions of async proc"
|
||||
echo "runTime:\texecution time + execution time of children"
|
||||
echo "wallTime:\twall time elapsed for future's lifetime"
|
||||
for (k,v) in metrics.pairs():
|
||||
let count = v.count
|
||||
if count > 0:
|
||||
echo ""
|
||||
echo "metric: ", $k
|
||||
echo "count: ", count
|
||||
echo "avg execTime:\t", v.totalExecTime div count, "\ttotal: ", v.totalExecTime
|
||||
echo "avg wallTime:\t", v.totalWallTime div count, "\ttotal: ", v.totalWallTime
|
||||
echo "avg runTime:\t", v.totalRunTime div count, "\ttotal: ", v.totalRunTime
|
||||
if k.procedure == "simpleAsync1":
|
||||
check v.totalExecTime >= 150.milliseconds()
|
||||
check v.totalExecTime <= 180.milliseconds()
|
||||
|
||||
check v.totalRunTime >= 200.milliseconds()
|
||||
check v.totalRunTime <= 230.milliseconds()
|
||||
discard
|
||||
echo ""
|
||||
|
||||
else:
|
||||
skip()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user