mirror of
https://github.com/logos-storage/nim-chronos.git
synced 2026-01-03 22:13:06 +00:00
add harness for simulating profiled time and basic profiler metric test
This commit is contained in:
parent
d4809d5a98
commit
8b5c6a4674
@ -326,9 +326,8 @@ proc futureContinue*(fut: FutureBase) {.raises: [], gcsafe.} =
|
||||
|
||||
if fut.internalClosure.finished(): # Reached the end of the transformed proc
|
||||
break
|
||||
|
||||
# If we got thus far it means the future still has work to do, so we
|
||||
# issue a pause.
|
||||
|
||||
# If we got thus far, means the future is paused.
|
||||
when chronosFuturesInstrumentation:
|
||||
if not isNil(onFutureExecEvent):
|
||||
onFutureExecEvent(fut, Paused)
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
import ".."/timer
|
||||
import ".."/futures
|
||||
import ".."/srcloc
|
||||
|
||||
@ -14,38 +15,18 @@ type
|
||||
futureId*: uint
|
||||
location*: SrcLoc
|
||||
newState*: ExtendedFutureState
|
||||
timestamp*: Moment
|
||||
|
||||
RunningFuture = object
|
||||
event: Event
|
||||
notNil: bool
|
||||
|
||||
var running* {.threadvar.}: RunningFuture
|
||||
var handleFutureEvent* {.threadvar.}: proc (event: Event) {.nimcall, gcsafe, raises: [].}
|
||||
|
||||
proc dispatch(future: FutureBase, state: ExtendedFutureState) =
|
||||
let event = Event(
|
||||
proc mkEvent(future: FutureBase, state: ExtendedFutureState): Event =
|
||||
Event(
|
||||
futureId: future.id,
|
||||
location: future.internalLocation[LocationKind.Create][],
|
||||
newState: state
|
||||
location: future.internalLocation[Create][],
|
||||
newState: state,
|
||||
timestamp: Moment.now(),
|
||||
)
|
||||
|
||||
if state != ExtendedFutureState.Running:
|
||||
handleFutureEvent(event)
|
||||
return
|
||||
|
||||
# If we have a running future, then it means this is a child. Emits synthetic
|
||||
# pause event to keep things consistent with thread occupancy semantics.
|
||||
if running.notNil:
|
||||
handleFutureEvent(Event(
|
||||
futureId: running.event.futureId,
|
||||
location: running.event.location,
|
||||
newState: Paused
|
||||
))
|
||||
|
||||
running = RunningFuture(event: event, notNil: true)
|
||||
|
||||
handleFutureEvent(event)
|
||||
|
||||
onFutureEvent = proc (future: FutureBase, state: FutureState): void {.nimcall.} =
|
||||
{.cast(gcsafe).}:
|
||||
let extendedState = case state:
|
||||
@ -54,7 +35,8 @@ onFutureEvent = proc (future: FutureBase, state: FutureState): void {.nimcall.}
|
||||
of FutureState.Cancelled: ExtendedFutureState.Cancelled
|
||||
of FutureState.Failed: ExtendedFutureState.Failed
|
||||
|
||||
dispatch(future, extendedState)
|
||||
if not isNil(handleFutureEvent):
|
||||
handleFutureEvent(mkEvent(future, extendedState))
|
||||
|
||||
onFutureExecEvent = proc (future: FutureBase, state: FutureExecutionState): void {.nimcall.} =
|
||||
{.cast(gcsafe).}:
|
||||
@ -62,7 +44,8 @@ onFutureExecEvent = proc (future: FutureBase, state: FutureExecutionState): void
|
||||
of FutureExecutionState.Running: ExtendedFutureState.Running
|
||||
of FutureExecutionState.Paused: ExtendedFutureState.Paused
|
||||
|
||||
dispatch(future, extendedState)
|
||||
if not isNil(handleFutureEvent):
|
||||
handleFutureEvent(mkEvent(future, extendedState))
|
||||
|
||||
|
||||
|
||||
|
||||
89
chronos/profiler/metrics.nim
Normal file
89
chronos/profiler/metrics.nim
Normal file
@ -0,0 +1,89 @@
|
||||
import std/tables
|
||||
|
||||
import ./events
|
||||
import ../[timer, srcloc]
|
||||
|
||||
export timer, tables, srcloc
|
||||
|
||||
type
|
||||
Clock* = proc (): Moment
|
||||
|
||||
AggregateFutureMetrics* = object
|
||||
execTime*: Duration
|
||||
childrenExecTime*: Duration
|
||||
wallClockTime*: Duration
|
||||
|
||||
RunningFuture* = object
|
||||
state*: ExtendedFutureState
|
||||
created*: Moment
|
||||
lastStarted*: Moment
|
||||
timeToFirstPause*: Duration
|
||||
partialExecTime*: Duration
|
||||
pauses*: uint
|
||||
|
||||
ProfilerMetrics* = object
|
||||
partials: Table[uint, RunningFuture]
|
||||
totals*: Table[SrcLoc, AggregateFutureMetrics]
|
||||
|
||||
proc init*(T: typedesc[ProfilerMetrics]): ProfilerMetrics =
|
||||
result.clock = timer.now
|
||||
result.partials = initTable[uint, RunningFuture]()
|
||||
result.totals = initTable[SrcLoc, AggregateFutureMetrics]()
|
||||
|
||||
proc futureCreated(self: var ProfilerMetrics, event: Event): void =
|
||||
assert not self.partials.hasKey(event.futureId)
|
||||
|
||||
self.partials[event.futureId] = RunningFuture(
|
||||
created: event.timestamp,
|
||||
state: Pending,
|
||||
)
|
||||
|
||||
proc futureRunning(self: var ProfilerMetrics, event: Event): void =
|
||||
assert self.partials.hasKey(event.futureId)
|
||||
|
||||
self.partials.withValue(event.futureId, metrics):
|
||||
assert metrics.state == Pending or metrics.state == Paused
|
||||
|
||||
metrics.lastStarted = event.timestamp
|
||||
metrics.state = Running
|
||||
|
||||
proc futurePaused(self: var ProfilerMetrics, event: Event): void =
|
||||
assert self.partials.hasKey(event.futureId)
|
||||
|
||||
self.partials.withValue(event.futureId, metrics):
|
||||
assert metrics.state == Running
|
||||
|
||||
let segmentExecTime = event.timestamp - metrics.lastStarted
|
||||
if metrics.pauses == 0:
|
||||
metrics.timeToFirstPause = segmentExecTime
|
||||
metrics.partialExecTime += segmentExecTime
|
||||
metrics.pauses += 1
|
||||
metrics.state = Paused
|
||||
|
||||
proc futureCompleted(self: var ProfilerMetrics, event: Event): void =
|
||||
self.partials.withValue(event.futureId, metrics):
|
||||
if metrics.state == Running:
|
||||
self.futurePaused(event)
|
||||
|
||||
let location = event.location
|
||||
if not self.totals.hasKey(location):
|
||||
self.totals[location] = AggregateFutureMetrics()
|
||||
|
||||
self.totals.withValue(location, aggMetrics):
|
||||
aggMetrics.execTime += metrics.partialExecTime
|
||||
aggMetrics.wallClockTime += event.timestamp - metrics.created
|
||||
|
||||
self.partials.del(event.futureId)
|
||||
|
||||
proc processEvent*(self: var ProfilerMetrics, event: Event): void =
|
||||
case event.newState:
|
||||
of Pending: self.futureCreated(event)
|
||||
of Running: self.futureRunning(event)
|
||||
of Paused: self.futurePaused(event)
|
||||
of Completed: self.futureCompleted(event)
|
||||
else:
|
||||
assert false, "Unimplemented"
|
||||
|
||||
proc processAllEvents*(self: var ProfilerMetrics, events: seq[Event]): void =
|
||||
for event in events:
|
||||
self.processEvent(event)
|
||||
18
tests/profiler/example.nim
Normal file
18
tests/profiler/example.nim
Normal file
@ -0,0 +1,18 @@
|
||||
import os
|
||||
import ../../chronos
|
||||
|
||||
proc child11() {.async.} =
|
||||
echo "I ran"
|
||||
await sleepAsync(10.milliseconds)
|
||||
|
||||
proc child2() {.async.} =
|
||||
os.sleep(10)
|
||||
|
||||
proc child1() {.async.} =
|
||||
await child2()
|
||||
await child11()
|
||||
|
||||
proc p() {.async.} =
|
||||
echo "r1"
|
||||
await child1()
|
||||
echo "r2"
|
||||
@ -3,7 +3,7 @@ import std/os
|
||||
import unittest2
|
||||
|
||||
import ".."/".."/chronos
|
||||
import ".."/".."/chronos/profiler/events
|
||||
import ".."/".."/chronos/profiler/[events, metrics]
|
||||
|
||||
import ./utils
|
||||
|
||||
@ -23,87 +23,141 @@ suite "profiler hooks test suite":
|
||||
|
||||
waitFor simple()
|
||||
|
||||
check getRecording().forProcs("simple") == @[
|
||||
check recording == @[
|
||||
SimpleEvent(state: Pending, procedure: "simple"),
|
||||
SimpleEvent(state: ExtendedFutureState.Running, procedure: "simple"),
|
||||
SimpleEvent(state: Completed, procedure: "simple"),
|
||||
]
|
||||
|
||||
# test "should emit correct events for a future with children":
|
||||
# proc child1() {.async.} =
|
||||
# os.sleep(1)
|
||||
test "should emit correct events when a single child runs as part of the parent":
|
||||
|
||||
# proc withChildren() {.async.} =
|
||||
# await child1()
|
||||
proc withChildren() {.async.} =
|
||||
recordSegment("segment 1")
|
||||
await sleepAsync(10.milliseconds)
|
||||
recordSegment("segment 2")
|
||||
|
||||
waitFor withChildren()
|
||||
|
||||
# waitFor withChildren()
|
||||
|
||||
# check getRecording().forProcs("withChildren", "child1") == @[
|
||||
# Event(kind: EventKind.Create, procedure: "withChildren"),
|
||||
# Event(kind: EventKind.Run, procedure: "withChildren"),
|
||||
# Event(kind: EventKind.Create, procedure: "child1"),
|
||||
# Event(kind: EventKind.Pause, procedure: "withChildren"),
|
||||
# Event(kind: EventKind.Run, procedure: "child1"),
|
||||
# Event(kind: EventKind.Complete, procedure: "child1"),
|
||||
# Event(kind: EventKind.Run, procedure: "withChildren"),
|
||||
# Event(kind: EventKind.Complete, procedure: "withChildren"),
|
||||
# ]
|
||||
check recording == @[
|
||||
SimpleEvent(state: Pending, procedure: "withChildren"),
|
||||
SimpleEvent(state: ExtendedFutureState.Running, procedure: "withChildren"),
|
||||
SimpleEvent(state: ExtendedFutureState.Running, procedure: "segment 1"),
|
||||
SimpleEvent(state: ExtendedFutureState.Pending, procedure: "chronos.sleepAsync(Duration)"),
|
||||
SimpleEvent(state: Paused, procedure: "withChildren"),
|
||||
SimpleEvent(state: Completed, procedure: "chronos.sleepAsync(Duration)"),
|
||||
SimpleEvent(state: ExtendedFutureState.Running, procedure: "withChildren"),
|
||||
SimpleEvent(state: ExtendedFutureState.Running, procedure: "segment 2"),
|
||||
SimpleEvent(state: Completed, procedure: "withChildren"),
|
||||
]
|
||||
|
||||
# test "should emit correct events for a future with timers":
|
||||
# proc withChildren() {.async.} =
|
||||
# await sleepAsync(1.milliseconds)
|
||||
test "should emit correct events when a nested child pauses execution":
|
||||
proc child2() {.async.} =
|
||||
await sleepAsync(10.milliseconds)
|
||||
await sleepAsync(10.milliseconds)
|
||||
|
||||
# waitFor withChildren()
|
||||
proc child1() {.async.} =
|
||||
await child2()
|
||||
|
||||
# check getRecording().forProcs(
|
||||
# "withChildren", "chronos.sleepAsync(Duration)") == @[
|
||||
# Event(kind: EventKind.Create, procedure: "withChildren"),
|
||||
# Event(kind: EventKind.Run, procedure: "withChildren"),
|
||||
# Event(kind: EventKind.Pause, procedure: "withChildren"),
|
||||
# Event(kind: EventKind.Create, procedure: "chronos.sleepAsync(Duration)"),
|
||||
# # Timers don't "run"
|
||||
# Event(kind: EventKind.Complete, procedure: "chronos.sleepAsync(Duration)"),
|
||||
# Event(kind: EventKind.Run, procedure: "withChildren"),
|
||||
# Event(kind: EventKind.Complete, procedure: "withChildren"),
|
||||
# ]
|
||||
proc withChildren() {.async.} =
|
||||
recordSegment("segment 1")
|
||||
await child1()
|
||||
recordSegment("segment 2")
|
||||
|
||||
waitFor withChildren()
|
||||
|
||||
# test "should emit correct events when futures are canceled":
|
||||
# proc withCancellation() {.async.} =
|
||||
# let f = sleepyHead()
|
||||
# f.cancel()
|
||||
check recording == @[
|
||||
# First iteration of parent and each child
|
||||
SimpleEvent(state: Pending, procedure: "withChildren"),
|
||||
SimpleEvent(state: ExtendedFutureState.Running, procedure: "withChildren"),
|
||||
SimpleEvent(state: ExtendedFutureState.Running, procedure: "segment 1"),
|
||||
SimpleEvent(state: ExtendedFutureState.Pending, procedure: "child1"),
|
||||
SimpleEvent(state: ExtendedFutureState.Running, procedure: "child1"),
|
||||
SimpleEvent(state: ExtendedFutureState.Pending, procedure: "child2"),
|
||||
SimpleEvent(state: ExtendedFutureState.Running, procedure: "child2"),
|
||||
SimpleEvent(state: ExtendedFutureState.Pending, procedure: "chronos.sleepAsync(Duration)"),
|
||||
SimpleEvent(state: ExtendedFutureState.Paused, procedure: "child2"),
|
||||
SimpleEvent(state: ExtendedFutureState.Paused, procedure: "child1"),
|
||||
SimpleEvent(state: ExtendedFutureState.Paused, procedure: "withChildren"),
|
||||
|
||||
# proc sleepyHead() {.async.} =
|
||||
# await sleepAsync(10.minutes)
|
||||
# Second iteration of child2
|
||||
SimpleEvent(state: ExtendedFutureState.Completed, procedure: "chronos.sleepAsync(Duration)"),
|
||||
SimpleEvent(state: ExtendedFutureState.Running, procedure: "child2"),
|
||||
SimpleEvent(state: ExtendedFutureState.Pending, procedure: "chronos.sleepAsync(Duration)"),
|
||||
SimpleEvent(state: ExtendedFutureState.Paused, procedure: "child2"),
|
||||
SimpleEvent(state: ExtendedFutureState.Completed, procedure: "chronos.sleepAsync(Duration)"),
|
||||
SimpleEvent(state: ExtendedFutureState.Running, procedure: "child2"),
|
||||
SimpleEvent(state: ExtendedFutureState.Completed, procedure: "child2"),
|
||||
|
||||
# waitFor withCancellation()
|
||||
# Second iteration child1
|
||||
SimpleEvent(state: ExtendedFutureState.Running, procedure: "child1"),
|
||||
SimpleEvent(state: ExtendedFutureState.Completed, procedure: "child1"),
|
||||
|
||||
# check getRecording().forProcs("sleepyHead", "withCancellation") == @[
|
||||
# Event(kind: EventKind.Create, procedure: "withCancellation"),
|
||||
# Event(kind: EventKind.Create, procedure: "sleepyHead"),
|
||||
# Event(kind: EventKind.Run, procedure: "sleepyHead"),
|
||||
# ]
|
||||
# Second iteration of parent
|
||||
SimpleEvent(state: ExtendedFutureState.Running, procedure: "withChildren"),
|
||||
SimpleEvent(state: ExtendedFutureState.Running, procedure: "segment 2"),
|
||||
SimpleEvent(state: ExtendedFutureState.Completed, procedure: "withChildren"),
|
||||
]
|
||||
|
||||
# type
|
||||
# FakeFuture = object
|
||||
# id: uint
|
||||
# internalLocation*: array[LocationKind, ptr SrcLoc]
|
||||
|
||||
# suite "asyncprofiler metrics":
|
||||
suite "profiler metrics test suite":
|
||||
|
||||
setup:
|
||||
installCallbacks()
|
||||
|
||||
# test "should not keep metrics for a pending future in memory after it completes":
|
||||
teardown:
|
||||
clearRecording()
|
||||
revertCallbacks()
|
||||
resetTime()
|
||||
|
||||
test "should compute correct times for a simple future":
|
||||
|
||||
# var fakeLoc = SrcLoc(procedure: "foo", file: "foo.nim", line: 1)
|
||||
# let future = FakeFuture(
|
||||
# id: 1,
|
||||
# internalLocation: [
|
||||
# LocationKind.Create: addr fakeLoc,
|
||||
# LocationKind.Finish: addr fakeLoc,
|
||||
# ])
|
||||
var metrics = ProfilerMetrics()
|
||||
|
||||
# var profiler = AsyncProfiler[FakeFuture]()
|
||||
proc simple() {.async.} =
|
||||
advanceTime(50.milliseconds)
|
||||
|
||||
waitFor simple()
|
||||
|
||||
# profiler.handleFutureCreate(future)
|
||||
# profiler.handleFutureComplete(future)
|
||||
metrics.processAllEvents(rawRecording)
|
||||
|
||||
# check len(profiler.getPerFutureMetrics()) == 0
|
||||
let simpleMetrics = metrics.forProc("simple")
|
||||
|
||||
check simpleMetrics.execTime == 50.milliseconds
|
||||
check simpleMetrics.wallClockTime == 50.milliseconds
|
||||
|
||||
|
||||
# test "should compute correct times when a single child runs as part of the parent":
|
||||
|
||||
# var metrics = ProfilerMetrics()
|
||||
|
||||
# proc child1() {.async.} =
|
||||
# advanceTime(10.milliseconds)
|
||||
|
||||
# proc withChildren() {.async.} =
|
||||
# advanceTime(10.milliseconds)
|
||||
# await child1()
|
||||
# advanceTime(10.milliseconds)
|
||||
|
||||
# waitFor withChildren()
|
||||
|
||||
# metrics.processAllEvents(rawRecording)
|
||||
|
||||
# let withChildrenMetrics = metrics.forProc("withChildren")
|
||||
# let child1Metrics = metrics.forProc("child1")
|
||||
|
||||
# check withChildrenMetrics.execTime == 20.milliseconds
|
||||
# check withChildrenMetrics.childrenExecTime == 10.milliseconds
|
||||
# check withChildrenMetrics.wallClockTime == 30.milliseconds
|
||||
|
||||
# check child1Metrics.execTime == 10.milliseconds
|
||||
# check child1Metrics.wallClockTime == 10.milliseconds
|
||||
|
||||
# # test "should emit correct metrics when a single child runs as part of the parent":
|
||||
|
||||
# # proc withChildren() {.async.} =
|
||||
# # recordSegment("segment 1")
|
||||
# # await sleepAsync(10.milliseconds)
|
||||
# # recordSegment("segment 2")
|
||||
|
||||
# # waitFor withChildren()
|
||||
@ -1,8 +1,8 @@
|
||||
import std/sequtils
|
||||
import std/sugar
|
||||
import std/with
|
||||
|
||||
import ".."/".."/chronos
|
||||
import ".."/".."/chronos/profiler/events
|
||||
import ".."/".."/chronos/profiler/metrics
|
||||
|
||||
type
|
||||
SimpleEvent* = object
|
||||
@ -12,14 +12,9 @@ type
|
||||
# XXX this is sort of bad cause we get global state all over, but the fact we
|
||||
# can't use closures on callbacks and that callbacks themselves are just
|
||||
# global vars means we can't really do much better for now.
|
||||
|
||||
var recording: seq[SimpleEvent]
|
||||
|
||||
proc forProcs*(self: seq[SimpleEvent], procs: varargs[string]): seq[SimpleEvent] =
|
||||
collect:
|
||||
for e in self:
|
||||
if e.procedure in procs:
|
||||
e
|
||||
var recording*: seq[SimpleEvent]
|
||||
var rawRecording*: seq[Event]
|
||||
var fakeTime*: Moment = Moment.now()
|
||||
|
||||
# FIXME bad, this needs to be refactored into a callback interface for the profiler.
|
||||
var oldHandleFutureEvent: proc(event: Event) {.nimcall, gcsafe, raises: [].} = nil
|
||||
@ -28,15 +23,22 @@ var installed: bool = false
|
||||
proc recordEvent(event: Event) {.nimcall, gcsafe, raises: [].} =
|
||||
{.cast(gcsafe).}:
|
||||
recording.add(
|
||||
SimpleEvent(
|
||||
procedure: $(event.location.procedure),
|
||||
state: event.newState
|
||||
)
|
||||
)
|
||||
SimpleEvent(procedure: $event.location.procedure, state: event.newState))
|
||||
|
||||
proc getRecording*(): seq[SimpleEvent] = {.cast(gcsafe).}: recording
|
||||
var timeShifted = event
|
||||
timeshifted.timestamp = fakeTime
|
||||
|
||||
proc clearRecording*(): void = recording = @[]
|
||||
rawRecording.add(timeShifted)
|
||||
|
||||
proc recordSegment*(segment: string) =
|
||||
{.cast(gcsafe).}:
|
||||
recording.add(SimpleEvent(
|
||||
procedure: segment,
|
||||
state: ExtendedFutureState.Running
|
||||
))
|
||||
|
||||
proc clearRecording*(): void =
|
||||
recording = @[]
|
||||
|
||||
proc installCallbacks*() =
|
||||
assert not installed, "Callbacks already installed"
|
||||
@ -51,3 +53,13 @@ proc revertCallbacks*() =
|
||||
handleFutureEvent = oldHandleFutureEvent
|
||||
installed = false
|
||||
|
||||
proc forProc*(self: var ProfilerMetrics, procedure: string): AggregateFutureMetrics =
|
||||
for (key, aggMetrics) in self.totals.mpairs:
|
||||
if key.procedure == procedure:
|
||||
return aggMetrics
|
||||
|
||||
proc resetTime*() =
|
||||
fakeTime = Moment.now()
|
||||
|
||||
proc advanceTime*(duration: Duration) =
|
||||
fakeTime += duration
|
||||
Loading…
x
Reference in New Issue
Block a user