add some hacks to allow enabling profiling on specific threads, and guarding against enabling it on multiple

This commit is contained in:
gmega 2023-11-09 17:07:11 -03:00
parent 23f63251f5
commit 256d0e7270
No known key found for this signature in database
GPG Key ID: FFD8DAF00660270F
4 changed files with 61 additions and 24 deletions

View File

@ -59,6 +59,7 @@ when isMainModule:
# or we get a circular import. # or we get a circular import.
when chronosFuturesInstrumentation: when chronosFuturesInstrumentation:
AsyncProfilerInfo.initDefault(k = config.profilerMaxMetrics) AsyncProfilerInfo.initDefault(k = config.profilerMaxMetrics)
enableChronosProfiling()
case config.cmd: case config.cmd:
of StartUpCommand.noCommand: of StartUpCommand.noCommand:

View File

@ -33,9 +33,12 @@ type
ChangeListener* = proc (): void {.raises: [].} ChangeListener* = proc (): void {.raises: [].}
var var
perFutureMetrics: Table[uint, FutureMetrics] perFutureMetrics {.threadvar.}: Table[uint, FutureMetrics]
futureSummaryMetrics: MetricsSummary futureSummaryMetrics {.threadvar.}: MetricsSummary
onChange: ChangeListener onChange {.threadvar.}: ChangeListener
# FIXME this is a HACK because it's currently not simple to disable
# the profiler on a thread-by-thread basis.
enabledForCurrentThread {.threadvar.}: bool
proc getFutureSummaryMetrics*(): MetricsSummary {.gcsafe.} = proc getFutureSummaryMetrics*(): MetricsSummary {.gcsafe.} =
## get a copy of the table of summary metrics for all futures. ## get a copy of the table of summary metrics for all futures.
@ -114,20 +117,29 @@ proc setFutureDuration(fut: FutureBase) {.raises: [].} =
futureSummaryMetrics.withValue(loc, metric): futureSummaryMetrics.withValue(loc, metric):
metric[].addRun(runMetrics) metric[].addRun(runMetrics)
proc enableChronosProfiling* (): void =
## This gates for which chronos event loops we want to collect metrics. This
## method should be called for each event loop, before chronos is started.
enabledForCurrentThread = true
onFutureCreate = onFutureCreate =
proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} = proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} =
if enabledForCurrentThread:
f.setFutureCreate() f.setFutureCreate()
onFutureRunning = onFutureRunning =
proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} = proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} =
if enabledForCurrentThread:
f.setFutureStart() f.setFutureStart()
onFuturePause = onFuturePause =
proc (f, child: FutureBase) {.nimcall, gcsafe, raises: [].} = proc (f, child: FutureBase) {.nimcall, gcsafe, raises: [].} =
if enabledForCurrentThread:
f.setFuturePause(child) f.setFuturePause(child)
onFutureStop = onFutureStop =
proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} = proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} =
if enabledForCurrentThread:
f.setFuturePause(nil) f.setFuturePause(nil)
f.setFutureDuration() f.setFutureDuration()

View File

@ -16,7 +16,6 @@ when defined(metrics):
init: bool init: bool
lastSample: Time lastSample: Time
collections*: uint collections*: uint
threadId: int
PerfSampler = proc (): MetricsSummary {.raises: [].} PerfSampler = proc (): MetricsSummary {.raises: [].}
@ -71,8 +70,9 @@ when defined(metrics):
"the largest chronos_single_exec_time_max of all procs", "the largest chronos_single_exec_time_max of all procs",
) )
# Keeps track of the thread initializing the module. This is the only thread
# that will be allowed to interact with the metrics collector.
let moduleInitThread = getThreadId() let moduleInitThread = getThreadId()
echo "MODULE INIT THREAD: ", getThreadId()
proc newCollector*( proc newCollector*(
AsyncProfilerInfo: typedesc, AsyncProfilerInfo: typedesc,
@ -87,7 +87,6 @@ when defined(metrics):
sampleInterval: sampleInterval, sampleInterval: sampleInterval,
init: true, init: true,
lastSample: low(Time), lastSample: low(Time),
threadId: getThreadId(),
) )
proc collectSlowestProcs( proc collectSlowestProcs(
@ -145,17 +144,16 @@ when defined(metrics):
chronos_largest_exec_time_max.set(largestMaxExecTime.nanoseconds) chronos_largest_exec_time_max.set(largestMaxExecTime.nanoseconds)
proc collect*(self: AsyncProfilerInfo, force: bool = false): void = proc collect*(self: AsyncProfilerInfo, force: bool = false): void =
if getThreadId() != self.threadId: # Calling this method from the wrong thread has happened a lot in the past,
raise (ref Defect)(msg: "AsyncProfilerInfo.collect() called from a different thread" & # so this makes sure we're not doing anything funny.
" than the one it was initialized with.") assert getThreadId() == moduleInitThread, "You cannot call collect() from" &
" a thread other than the one that initialized the metricscolletor module"
let now = self.clock() let now = self.clock()
if not force and (now - self.lastSample < self.sampleInterval): if not force and (now - self.lastSample < self.sampleInterval):
return return
self.collections += 1 self.collections += 1
var currentMetrics = self. var currentMetrics = self.
perfSampler(). perfSampler().
pairs. pairs.
@ -164,6 +162,13 @@ when defined(metrics):
proc (pair: (ptr SrcLoc, OverallMetrics)): ProfilerMetric = proc (pair: (ptr SrcLoc, OverallMetrics)): ProfilerMetric =
(pair[0][], pair[1]) (pair[0][], pair[1])
). ).
# We don't scoop metrics with 0 exec time as we have a limited number of
# prometheus slots, and those are less likely to be useful in debugging
# Chronos performance issues.
filter(
proc (pair: ProfilerMetric): bool =
pair[1].totalExecTime.nanoseconds > 0
).
sorted( sorted(
proc (a, b: ProfilerMetric): int = proc (a, b: ProfilerMetric): int =
cmp(a[1].totalExecTime, b[1].totalExecTime), cmp(a[1].totalExecTime, b[1].totalExecTime),
@ -193,12 +198,9 @@ when defined(metrics):
var asyncProfilerInfo* {.global.}: AsyncProfilerInfo var asyncProfilerInfo* {.global.}: AsyncProfilerInfo
proc initDefault*(AsyncProfilerInfo: typedesc, k: int) = proc initDefault*(AsyncProfilerInfo: typedesc, k: int) =
assert getThreadId() == moduleInitThread, "You cannot call " &
echo "INIT DEFAULT THREAD: ", getThreadId() "initDefault() from a thread other than the one that initialized the " &
"metricscolletor module."
if moduleInitThread != getThreadId():
raise (ref Defect)(msg: "AsyncProfilerInfo.initDefault() called from a different thread" &
" than the one it was initialized with.")
asyncProfilerInfo = AsyncProfilerInfo.newCollector( asyncProfilerInfo = AsyncProfilerInfo.newCollector(
perfSampler = getFutureSummaryMetrics, perfSampler = getFutureSummaryMetrics,

View File

@ -14,6 +14,7 @@ suite "asyncprofiler metrics collector":
SrcLoc(procedure: "start", file: "discovery.nim", line: 192), SrcLoc(procedure: "start", file: "discovery.nim", line: 192),
SrcLoc(procedure: "query", file: "manager.nim", line: 323), SrcLoc(procedure: "query", file: "manager.nim", line: 323),
SrcLoc(procedure: "update", file: "sqliteds.nim", line: 107), SrcLoc(procedure: "update", file: "sqliteds.nim", line: 107),
SrcLoc(procedure: "idle", file: "idle.nim", line: 100),
] ]
let sample = { let sample = {
@ -49,28 +50,40 @@ suite "asyncprofiler metrics collector":
maxSingleTime: timer.nanoseconds(41257), maxSingleTime: timer.nanoseconds(41257),
count: 3 count: 3
), ),
(addr locations[4]): OverallMetrics(
totalExecTime: timer.nanoseconds(0),
totalRunTime: timer.nanoseconds(156214),
totalWallTime: timer.nanoseconds(60813),
minSingleTime: timer.nanoseconds(0),
maxSingleTime: timer.nanoseconds(0),
count: 3
)
}.toTable }.toTable
var wallTime = getTime() var wallTime = getTime()
var collector: AsyncProfilerInfo var collector: AsyncProfilerInfo
setup: proc setupCollector(k: int = high(int)): void =
collector = AsyncProfilerInfo.newCollector( collector = AsyncProfilerInfo.newCollector(
perfSampler = proc (): MetricsSummary = sample, perfSampler = proc (): MetricsSummary = sample,
clock = proc (): Time = wallTime, clock = proc (): Time = wallTime,
sampleInterval = times.initDuration(minutes = 5), sampleInterval = times.initDuration(minutes = 5),
k = 3, k = k,
) )
collector.reset() collector.reset()
collector.collect() collector.collect()
test "should keep track of basic worst-case exec time stats": test "should keep track of basic worst-case exec time stats":
setupCollector(k = 3)
check chronos_largest_exec_time_total.value == 91660 check chronos_largest_exec_time_total.value == 91660
check chronos_largest_exec_time_max.value == 81660 check chronos_largest_exec_time_max.value == 81660
test "should create labeled series for the k slowest procs in terms of totalExecTime": test "should create labeled series for the k slowest procs in terms of totalExecTime":
setupCollector(k = 3)
check chronos_exec_time_total.value( check chronos_exec_time_total.value(
labelValues = @["start", "discovery.nim", "192"]) == 91660 labelValues = @["start", "discovery.nim", "192"]) == 91660
check chronos_exec_time_total.value( check chronos_exec_time_total.value(
@ -84,6 +97,8 @@ suite "asyncprofiler metrics collector":
labelValues = @["query", "manager.nim", "323"]) labelValues = @["query", "manager.nim", "323"])
test "should not collect metrics again unless enough time has elapsed from last collection": test "should not collect metrics again unless enough time has elapsed from last collection":
setupCollector()
check collector.collections == 1 check collector.collections == 1
collector.collect() collector.collect()
check collector.collections == 1 check collector.collections == 1
@ -92,3 +107,10 @@ suite "asyncprofiler metrics collector":
collector.collect() collector.collect()
check collector.collections == 2 check collector.collections == 2
test "should not collect metrics for futures with zero total exec time":
setupCollector()
expect system.KeyError:
discard chronos_exec_time_total.value(
labelValues = @["idle", "idle.nim", "100"])