nim-chroprof/chroprof/collector.nim
2024-03-01 13:44:06 -03:00

197 lines
5.6 KiB
Nim

## Metrics collector which allows exporting Chronos profiling metrics to
## Prometheus.
import std/algorithm
import std/enumerate
import std/sequtils
import std/tables
import std/times
import chronos/timer
import metrics
import ./api
when defined(metrics):
type
ChronosProfilerInfo* = ref object of RootObj
sampler: MetricsSampler
sampleInterval: times.Duration
clock: Clock
k: int
init: bool
lastSample: Time
collections*: uint
MetricsSampler = proc(): MetricsTotals {.raises: [].}
Clock = proc(): Time {.raises: [].}
FutureMetrics = (FutureType, AggregateMetrics)
const locationLabels = ["proc", "file", "line"]
declarePublicGauge(
chronos_exec_time_total,
"total time in which this proc actively occupied the event loop thread",
labels = locationLabels,
)
declarePublicGauge(
chronos_exec_time_with_children_total,
"chronos_exec_time_with_children_total of this proc plus of all" &
"its children (procs that this proc called and awaited for)",
labels = locationLabels,
)
declarePublicGauge(
chronos_wall_time_total,
"the amount of time elapsed from when the async proc was started to when" &
"it completed",
labels = locationLabels,
)
declarePublicGauge(
chronos_call_count_total,
"the total number of times this async proc was called and completed",
labels = locationLabels,
)
# Per-proc Statistics
declarePublicGauge(
chronos_single_exec_time_max,
"the maximum execution time for a single call of this proc",
labels = locationLabels,
)
proc threadId(): int =
when defined(getThreadId):
getThreadId()
else:
0
# Keeps track of the thread initializing the module. This is the only thread
# that will be allowed to interact with the metrics collector.
let moduleInitThread = threadId()
proc newCollector*(
ChronosProfilerInfo: typedesc,
sampler: MetricsSampler,
clock: Clock,
sampleInterval: times.Duration,
k: int = 10,
): ChronosProfilerInfo =
ChronosProfilerInfo(
sampler: sampler,
clock: clock,
k: k,
sampleInterval: sampleInterval,
init: true,
lastSample: low(Time),
)
proc collectSlowestProcs(
self: ChronosProfilerInfo,
profilerMetrics: seq[FutureMetrics],
timestampMillis: int64,
k: int,
): void =
for (i, pair) in enumerate(profilerMetrics):
if i == k:
break
let (location, metrics) = pair
let locationLabels = @[$(location.procedure), $(location.file), $(location.line)]
chronos_exec_time_total.set(
metrics.execTime.nanoseconds, labelValues = locationLabels
)
chronos_exec_time_with_children_total.set(
metrics.execTimeWithChildren.nanoseconds, labelValues = locationLabels
)
chronos_wall_time_total.set(
metrics.wallClockTime.nanoseconds, labelValues = locationLabels
)
chronos_single_exec_time_max.set(
metrics.execTimeMax.nanoseconds, labelValues = locationLabels
)
chronos_call_count_total.set(
metrics.callCount.int64, labelValues = locationLabels
)
proc collect*(self: ChronosProfilerInfo, force: bool = false): void =
# Calling this method from the wrong thread has happened a lot in the past,
# so this makes sure we're not doing anything funny.
assert threadId() == moduleInitThread,
"You cannot call collect() from" &
" a thread other than the one that initialized the metricscolletor module"
let now = self.clock()
if not force and (now - self.lastSample < self.sampleInterval):
return
self.collections += 1
var currentMetrics = self
.sampler().pairs.toSeq
.
# We don't scoop metrics with 0 exec time as we have a limited number of
# prometheus slots, and those are less likely to be useful in debugging
# Chronos performance issues.
filter(
proc(pair: FutureMetrics): bool =
pair[1].execTimeWithChildren.nanoseconds > 0
)
.sorted(
proc(a, b: FutureMetrics): int =
cmp(a[1].execTimeWithChildren, b[1].execTimeWithChildren)
,
order = SortOrder.Descending,
)
self.collectSlowestProcs(currentMetrics, now.toMilliseconds(), self.k)
self.lastSample = now
proc resetMetric(gauge: Gauge): void =
# We try to be as conservative as possible and not write directly to
# internal state. We do need to read from it, though.
for metricSeq in gauge.metrics:
for metric in metricSeq:
gauge.set(0.int64, labelValues = metric.labelValues)
proc reset*(self: ChronosProfilerInfo): void =
resetMetric(chronos_exec_time_total)
resetMetric(chronos_exec_time_with_children_total)
resetMetric(chronos_wall_time_total)
resetMetric(chronos_call_count_total)
resetMetric(chronos_single_exec_time_max)
var asyncProfilerInfo* {.global.}: ChronosProfilerInfo
proc enableProfilerMetrics*(k: int) =
assert threadId() == moduleInitThread,
"You cannot call enableProfilerMetrics() from a thread other than" &
" the one that initialized the metricscolletor module."
asyncProfilerInfo = ChronosProfilerInfo.newCollector(
sampler = getMetrics,
k = k,
# We want to collect metrics every 5 seconds.
sampleInterval = initDuration(seconds = 5),
clock = proc(): Time =
getTime()
,
)
enableProfiling(
proc(e: Event) {.nimcall, gcsafe, raises: [].} =
{.cast(gcsafe).}:
if e.newState == ExtendedFutureState.Completed:
asyncProfilerInfo.collect()
)