mirror of
https://github.com/logos-storage/nim-chroprof.git
synced 2026-01-02 13:33:06 +00:00
add metrics collector
This commit is contained in:
parent
895b89166c
commit
aa4801356a
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
nimble.develop
|
||||
nimble.paths
|
||||
@ -7,5 +7,6 @@ description = "A profiling tool for the Chronos networking framework"
|
||||
license = "MIT or Apache License 2.0"
|
||||
skipDirs = @["tests"]
|
||||
|
||||
requires "nim >= 1.6.16",
|
||||
"https://github.com/codex-storage/nim-chronos#feature/profiler-v4"
|
||||
requires "nim >= 1.6.16",
|
||||
"https://github.com/codex-storage/nim-chronos#feature/profiler-v4",
|
||||
"metrics >= 0.1.0"
|
||||
|
||||
@ -1,7 +1,10 @@
|
||||
import chronos/futures
|
||||
import ./[profiler, events]
|
||||
|
||||
type EventCallback = proc (e: Event) {.nimcall, gcsafe, raises: [].}
|
||||
export Event, ExtendedFutureState, ProfilerState, MetricsTotals,
|
||||
AggregateMetrics, FutureType, execTimeWithChildren
|
||||
|
||||
type EventCallback* = proc (e: Event) {.nimcall, gcsafe, raises: [].}
|
||||
|
||||
var profilerInstance {.threadvar.}: ProfilerState
|
||||
|
||||
@ -15,7 +18,8 @@ proc enableEventCallbacks*(callback: EventCallback): void =
|
||||
onAsyncFutureEvent = handleAsyncFutureEvent
|
||||
handleFutureEvent = callback
|
||||
|
||||
proc enableProfiling*() =
|
||||
proc enableProfiling*(clientCallback: EventCallback = nil) =
|
||||
## Enables profiling for the the event loop running in the current thread.
|
||||
handleFutureEvent = proc (e: Event) {.nimcall.} =
|
||||
profilerInstance.processEvent(e)
|
||||
if not isNil(clientCallback): clientCallback(e)
|
||||
|
||||
195
chroprof/collector.nim
Normal file
195
chroprof/collector.nim
Normal file
@ -0,0 +1,195 @@
|
||||
## Metrics collector which allows exporting Chronos profiling metrics to
|
||||
## Prometheus.
|
||||
|
||||
import std/algorithm
|
||||
import std/enumerate
|
||||
import std/sequtils
|
||||
import std/tables
|
||||
import std/times
|
||||
|
||||
import chronos/timer
|
||||
import metrics
|
||||
|
||||
import ./api
|
||||
|
||||
when defined(metrics):
|
||||
type
|
||||
ChronosProfilerInfo* = ref object of RootObj
|
||||
perfSampler: PerfSampler
|
||||
sampleInterval: times.Duration
|
||||
clock: Clock
|
||||
k: int
|
||||
init: bool
|
||||
lastSample: Time
|
||||
collections*: uint
|
||||
|
||||
PerfSampler = proc (): MetricsTotals {.raises: [].}
|
||||
|
||||
Clock = proc (): Time {.raises: [].}
|
||||
|
||||
FutureMetrics = (FutureType, AggregateMetrics)
|
||||
|
||||
const locationLabels = ["proc", "file", "line"]
|
||||
|
||||
declarePublicGauge(
|
||||
chronos_exec_time_total,
|
||||
"total time in which this proc actively occupied the event loop thread",
|
||||
labels = locationLabels,
|
||||
)
|
||||
|
||||
declarePublicGauge(
|
||||
chronos_exec_time_with_children_total,
|
||||
"chronos_exec_time_with_children_total of this proc plus of all" &
|
||||
"its children (procs that this proc called and awaited for)",
|
||||
labels = locationLabels,
|
||||
)
|
||||
|
||||
declarePublicGauge(
|
||||
chronos_wall_time_total,
|
||||
"the amount of time elapsed from when the async proc was started to when" &
|
||||
"it completed",
|
||||
labels = locationLabels,
|
||||
)
|
||||
|
||||
declarePublicGauge(
|
||||
chronos_call_count_total,
|
||||
"the total number of times this async proc was called and completed",
|
||||
labels = locationLabels,
|
||||
)
|
||||
|
||||
# Per-proc Statistics
|
||||
declarePublicGauge(
|
||||
chronos_single_exec_time_max,
|
||||
"the maximum execution time for a single call of this proc",
|
||||
labels = locationLabels,
|
||||
)
|
||||
|
||||
proc threadId(): int =
|
||||
when defined(getThreadId):
|
||||
getThreadId()
|
||||
else:
|
||||
0
|
||||
|
||||
# Keeps track of the thread initializing the module. This is the only thread
|
||||
# that will be allowed to interact with the metrics collector.
|
||||
let moduleInitThread = threadId()
|
||||
|
||||
proc newCollector*(
|
||||
ChronosProfilerInfo: typedesc,
|
||||
perfSampler: PerfSampler,
|
||||
clock: Clock,
|
||||
sampleInterval: times.Duration,
|
||||
k: int = 10,
|
||||
): ChronosProfilerInfo = ChronosProfilerInfo(
|
||||
perfSampler: perfSampler,
|
||||
clock: clock,
|
||||
k: k,
|
||||
sampleInterval: sampleInterval,
|
||||
init: true,
|
||||
lastSample: low(Time),
|
||||
)
|
||||
|
||||
proc collectSlowestProcs(
|
||||
self: ChronosProfilerInfo,
|
||||
profilerMetrics: seq[FutureMetrics],
|
||||
timestampMillis: int64,
|
||||
k: int,
|
||||
): void =
|
||||
|
||||
for (i, pair) in enumerate(profilerMetrics):
|
||||
if i == k:
|
||||
break
|
||||
|
||||
let (location, metrics) = pair
|
||||
|
||||
let locationLabels = @[
|
||||
$(location.procedure),
|
||||
$(location.file),
|
||||
$(location.line),
|
||||
]
|
||||
|
||||
chronos_exec_time_total.set(metrics.execTime.nanoseconds,
|
||||
labelValues = locationLabels)
|
||||
|
||||
chronos_exec_time_with_children_total.set(
|
||||
metrics.execTimeWithChildren.nanoseconds,
|
||||
labelValues = locationLabels
|
||||
)
|
||||
|
||||
chronos_wall_time_total.set(metrics.wallClockTime.nanoseconds,
|
||||
labelValues = locationLabels)
|
||||
|
||||
chronos_single_exec_time_max.set(metrics.execTimeMax.nanoseconds,
|
||||
labelValues = locationLabels)
|
||||
|
||||
chronos_call_count_total.set(metrics.callCount.int64,
|
||||
labelValues = locationLabels)
|
||||
|
||||
proc collect*(self: ChronosProfilerInfo, force: bool = false): void =
|
||||
# Calling this method from the wrong thread has happened a lot in the past,
|
||||
# so this makes sure we're not doing anything funny.
|
||||
assert threadId() == moduleInitThread, "You cannot call collect() from" &
|
||||
" a thread other than the one that initialized the metricscolletor module"
|
||||
|
||||
let now = self.clock()
|
||||
if not force and (now - self.lastSample < self.sampleInterval):
|
||||
return
|
||||
|
||||
self.collections += 1
|
||||
var currentMetrics = self.
|
||||
perfSampler().
|
||||
pairs.
|
||||
toSeq.
|
||||
# We don't scoop metrics with 0 exec time as we have a limited number of
|
||||
# prometheus slots, and those are less likely to be useful in debugging
|
||||
# Chronos performance issues.
|
||||
filter(
|
||||
proc (pair: FutureMetrics): bool =
|
||||
pair[1].execTimeWithChildren.nanoseconds > 0
|
||||
).
|
||||
sorted(
|
||||
proc (a, b: FutureMetrics): int =
|
||||
cmp(a[1].execTimeWithChildren, b[1].execTimeWithChildren),
|
||||
order = SortOrder.Descending
|
||||
)
|
||||
|
||||
self.collectSlowestProcs(currentMetrics, now.toMilliseconds(), self.k)
|
||||
|
||||
self.lastSample = now
|
||||
|
||||
proc resetMetric(gauge: Gauge): void =
|
||||
# We try to be as conservative as possible and not write directly to
|
||||
# internal state. We do need to read from it, though.
|
||||
for metricSeq in gauge.metrics:
|
||||
for metric in metricSeq:
|
||||
gauge.set(0.int64, labelValues = metric.labelValues)
|
||||
|
||||
proc reset*(self: ChronosProfilerInfo): void =
|
||||
resetMetric(chronos_exec_time_total)
|
||||
resetMetric(chronos_exec_time_with_children_total)
|
||||
resetMetric(chronos_wall_time_total)
|
||||
resetMetric(chronos_call_count_total)
|
||||
resetMetric(chronos_single_exec_time_max)
|
||||
|
||||
var asyncProfilerInfo* {.global.}: ChronosProfilerInfo
|
||||
|
||||
proc enableProfilerMetrics*(k: int) =
|
||||
assert threadId() == moduleInitThread,
|
||||
"You cannot call enableProfilerMetrics() from a thread other than" &
|
||||
" the one that initialized the metricscolletor module."
|
||||
|
||||
asyncProfilerInfo = ChronosProfilerInfo.newCollector(
|
||||
perfSampler = proc (): MetricsTotals = getMetrics(),
|
||||
k = k,
|
||||
# We want to collect metrics every 5 seconds.
|
||||
sampleInterval = initDuration(seconds = 5),
|
||||
clock = proc (): Time = getTime(),
|
||||
)
|
||||
|
||||
enableProfiling(
|
||||
proc (e: Event) {.nimcall, gcsafe.} =
|
||||
{.cast(gcsafe).}:
|
||||
if e.newState == ExtendedFutureState.Completed:
|
||||
asyncProfilerInfo.collect()
|
||||
)
|
||||
|
||||
@ -1 +1,4 @@
|
||||
--d:chronosProfiling
|
||||
# begin Nimble config (version 2)
|
||||
when withDir(thisDir(), system.fileExists("nimble.paths")):
|
||||
include "nimble.paths"
|
||||
# end Nimble config
|
||||
|
||||
3
tests/config.nims
Normal file
3
tests/config.nims
Normal file
@ -0,0 +1,3 @@
|
||||
switch("threads", "on")
|
||||
switch("define", "chronosProfiling")
|
||||
switch("define", "metrics")
|
||||
3
tests/testall.nim
Normal file
3
tests/testall.nim
Normal file
@ -0,0 +1,3 @@
|
||||
import ./[testevents, testmetricscollector, testprofiler]
|
||||
|
||||
{.warning[UnusedImport]: off.}
|
||||
@ -3,10 +3,10 @@ import std/os
|
||||
import chronos
|
||||
import unittest2
|
||||
|
||||
import pkg/chroprof/events
|
||||
import ../chroprof/events
|
||||
import ./utils
|
||||
|
||||
suite "Profiler hooks test suite":
|
||||
suite "event ordering expectations":
|
||||
|
||||
setup:
|
||||
installCallbacks()
|
||||
|
||||
105
tests/testmetricscollector.nim
Normal file
105
tests/testmetricscollector.nim
Normal file
@ -0,0 +1,105 @@
|
||||
import std/times
|
||||
import std/tables
|
||||
|
||||
import unittest2
|
||||
import metrics
|
||||
import chronos/[srcloc, timer]
|
||||
|
||||
import ../chroprof/[api, collector]
|
||||
|
||||
suite "metrics collector":
|
||||
|
||||
var locations = @[
|
||||
SrcLoc(procedure: "start", file: "discovery.nim", line: 174),
|
||||
SrcLoc(procedure: "start", file: "discovery.nim", line: 192),
|
||||
SrcLoc(procedure: "query", file: "manager.nim", line: 323),
|
||||
SrcLoc(procedure: "update", file: "sqliteds.nim", line: 107),
|
||||
SrcLoc(procedure: "idle", file: "idle.nim", line: 100),
|
||||
]
|
||||
|
||||
let sample = {
|
||||
locations[0]: AggregateMetrics(
|
||||
execTime: timer.nanoseconds(90062),
|
||||
execTimeMax: timer.nanoseconds(80062),
|
||||
childrenExecTime: timer.nanoseconds(52044),
|
||||
wallClockTime: timer.nanoseconds(174567),
|
||||
callCount: 1
|
||||
),
|
||||
locations[1]: AggregateMetrics(
|
||||
execTime: timer.nanoseconds(91660),
|
||||
execTimeMax: timer.nanoseconds(81660),
|
||||
childrenExecTime: timer.nanoseconds(52495),
|
||||
wallClockTime: timer.nanoseconds(72941),
|
||||
callCount: 1
|
||||
),
|
||||
locations[2]: AggregateMetrics(
|
||||
execTime: timer.nanoseconds(60529),
|
||||
execTimeMax: timer.nanoseconds(60529),
|
||||
childrenExecTime: timer.nanoseconds(9689),
|
||||
wallClockTime: timer.nanoseconds(60784),
|
||||
callCount: 1
|
||||
),
|
||||
locations[3]: AggregateMetrics(
|
||||
execTime: timer.nanoseconds(60645),
|
||||
execTimeMax: timer.nanoseconds(41257),
|
||||
childrenExecTime: timer.nanoseconds(72934),
|
||||
wallClockTime: timer.nanoseconds(60813),
|
||||
callCount: 3
|
||||
),
|
||||
locations[4]: AggregateMetrics(
|
||||
execTime: timer.nanoseconds(0),
|
||||
execTimeMax: timer.nanoseconds(0),
|
||||
childrenExecTime: timer.nanoseconds(0),
|
||||
wallClockTime: timer.nanoseconds(60813),
|
||||
callCount: 3
|
||||
)
|
||||
}.toTable
|
||||
|
||||
var wallTime = getTime()
|
||||
|
||||
var collector: ChronosProfilerInfo
|
||||
|
||||
proc setupCollector(k: int = high(int)): void =
|
||||
collector = ChronosProfilerInfo.newCollector(
|
||||
perfSampler = proc (): MetricsTotals = sample,
|
||||
clock = proc (): Time = wallTime,
|
||||
sampleInterval = times.initDuration(minutes = 5),
|
||||
k = k,
|
||||
)
|
||||
|
||||
collector.reset()
|
||||
collector.collect()
|
||||
|
||||
test "should create labeled series for the k slowest procs in terms of execTime":
|
||||
setupCollector(k = 3)
|
||||
|
||||
check chronos_exec_time_with_children_total.value(
|
||||
labelValuesParam = @["start", "discovery.nim", "192"]) == 144155
|
||||
check chronos_exec_time_with_children_total.value(
|
||||
labelValuesParam = @["start", "discovery.nim", "174"]) == 142106
|
||||
check chronos_exec_time_with_children_total.value(
|
||||
labelValuesParam = @["update", "sqliteds.nim", "107"]) == 133579
|
||||
|
||||
# This is out of the top-k slowest, so should not have been recorded.
|
||||
expect system.KeyError:
|
||||
discard chronos_exec_time_with_children_total.value(
|
||||
labelValuesParam = @["query", "manager.nim", "323"])
|
||||
|
||||
test "should not collect metrics again unless enough time has elapsed from last collection":
|
||||
setupCollector()
|
||||
|
||||
check collector.collections == 1
|
||||
collector.collect()
|
||||
check collector.collections == 1
|
||||
|
||||
wallTime += 6.minutes
|
||||
|
||||
collector.collect()
|
||||
check collector.collections == 2
|
||||
|
||||
test "should not collect metrics for futures with zero total exec time":
|
||||
setupCollector()
|
||||
|
||||
expect system.KeyError:
|
||||
discard chronos_exec_time_total.value(
|
||||
labelValuesParam = @["idle", "idle.nim", "100"])
|
||||
@ -1,13 +1,14 @@
|
||||
import math
|
||||
import sequtils
|
||||
import unittest2
|
||||
import std/math
|
||||
import std/sequtils
|
||||
|
||||
import unittest2
|
||||
import chronos
|
||||
import chroprof/profiler
|
||||
|
||||
import ../chroprof/profiler
|
||||
|
||||
import ./utils
|
||||
|
||||
suite "Profiler metrics test suite":
|
||||
suite "profiler metrics":
|
||||
|
||||
setup:
|
||||
installCallbacks()
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
import chronos
|
||||
import pkg/chroprof/[api, events, profiler]
|
||||
import ../chroprof/[api, events, profiler]
|
||||
|
||||
type
|
||||
SimpleEvent* = object
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user