diff --git a/codex.nim b/codex.nim index efaf9289..924b4ffc 100644 --- a/codex.nim +++ b/codex.nim @@ -59,6 +59,7 @@ when isMainModule: # or we get a circular import. when chronosFuturesInstrumentation: AsyncProfilerInfo.initDefault(k = config.profilerMaxMetrics) + enableChronosProfiling() case config.cmd: of StartUpCommand.noCommand: diff --git a/codex/utils/asyncprofiler/asyncprofiler.nim b/codex/utils/asyncprofiler/asyncprofiler.nim index e9f56a9a..7fe380f6 100644 --- a/codex/utils/asyncprofiler/asyncprofiler.nim +++ b/codex/utils/asyncprofiler/asyncprofiler.nim @@ -33,9 +33,12 @@ type ChangeListener* = proc (): void {.raises: [].} var - perFutureMetrics: Table[uint, FutureMetrics] - futureSummaryMetrics: MetricsSummary - onChange: ChangeListener + perFutureMetrics {.threadvar.}: Table[uint, FutureMetrics] + futureSummaryMetrics {.threadvar.}: MetricsSummary + onChange {.threadvar.}: ChangeListener + # FIXME this is a HACK because it's currently not simple to disable + # the profiler on a thread-by-thread basis. + enabledForCurrentThread {.threadvar.}: bool proc getFutureSummaryMetrics*(): MetricsSummary {.gcsafe.} = ## get a copy of the table of summary metrics for all futures. @@ -114,22 +117,31 @@ proc setFutureDuration(fut: FutureBase) {.raises: [].} = futureSummaryMetrics.withValue(loc, metric): metric[].addRun(runMetrics) +proc enableChronosProfiling* (): void = + ## This gates for which chronos event loops we want to collect metrics. This + ## method should be called for each event loop, before chronos is started. + enabledForCurrentThread = true + onFutureCreate = proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} = - f.setFutureCreate() + if enabledForCurrentThread: + f.setFutureCreate() onFutureRunning = proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} = - f.setFutureStart() + if enabledForCurrentThread: + f.setFutureStart() onFuturePause = proc (f, child: FutureBase) {.nimcall, gcsafe, raises: [].} = - f.setFuturePause(child) + if enabledForCurrentThread: + f.setFuturePause(child) onFutureStop = proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} = - f.setFuturePause(nil) - f.setFutureDuration() + if enabledForCurrentThread: + f.setFuturePause(nil) + f.setFutureDuration() when isMainModule: import std/unittest diff --git a/codex/utils/asyncprofiler/metricscollector.nim b/codex/utils/asyncprofiler/metricscollector.nim index d8862c87..113f5acf 100644 --- a/codex/utils/asyncprofiler/metricscollector.nim +++ b/codex/utils/asyncprofiler/metricscollector.nim @@ -16,7 +16,6 @@ when defined(metrics): init: bool lastSample: Time collections*: uint - threadId: int PerfSampler = proc (): MetricsSummary {.raises: [].} @@ -71,8 +70,9 @@ when defined(metrics): "the largest chronos_single_exec_time_max of all procs", ) + # Keeps track of the thread initializing the module. This is the only thread + # that will be allowed to interact with the metrics collector. let moduleInitThread = getThreadId() - echo "MODULE INIT THREAD: ", getThreadId() proc newCollector*( AsyncProfilerInfo: typedesc, @@ -87,7 +87,6 @@ when defined(metrics): sampleInterval: sampleInterval, init: true, lastSample: low(Time), - threadId: getThreadId(), ) proc collectSlowestProcs( @@ -145,17 +144,16 @@ when defined(metrics): chronos_largest_exec_time_max.set(largestMaxExecTime.nanoseconds) proc collect*(self: AsyncProfilerInfo, force: bool = false): void = - if getThreadId() != self.threadId: - raise (ref Defect)(msg: "AsyncProfilerInfo.collect() called from a different thread" & - " than the one it was initialized with.") + # Calling this method from the wrong thread has happened a lot in the past, + # so this makes sure we're not doing anything funny. + assert getThreadId() == moduleInitThread, "You cannot call collect() from" & + " a thread other than the one that initialized the metricscolletor module" let now = self.clock() - if not force and (now - self.lastSample < self.sampleInterval): return self.collections += 1 - var currentMetrics = self. perfSampler(). pairs. @@ -164,6 +162,13 @@ when defined(metrics): proc (pair: (ptr SrcLoc, OverallMetrics)): ProfilerMetric = (pair[0][], pair[1]) ). + # We don't scoop metrics with 0 exec time as we have a limited number of + # prometheus slots, and those are less likely to be useful in debugging + # Chronos performance issues. + filter( + proc (pair: ProfilerMetric): bool = + pair[1].totalExecTime.nanoseconds > 0 + ). sorted( proc (a, b: ProfilerMetric): int = cmp(a[1].totalExecTime, b[1].totalExecTime), @@ -193,12 +198,9 @@ when defined(metrics): var asyncProfilerInfo* {.global.}: AsyncProfilerInfo proc initDefault*(AsyncProfilerInfo: typedesc, k: int) = - - echo "INIT DEFAULT THREAD: ", getThreadId() - - if moduleInitThread != getThreadId(): - raise (ref Defect)(msg: "AsyncProfilerInfo.initDefault() called from a different thread" & - " than the one it was initialized with.") + assert getThreadId() == moduleInitThread, "You cannot call " & + "initDefault() from a thread other than the one that initialized the " & + "metricscolletor module." asyncProfilerInfo = AsyncProfilerInfo.newCollector( perfSampler = getFutureSummaryMetrics, diff --git a/tests/codex/utils/asyncprofiler/testmetricscollector.nim b/tests/codex/utils/asyncprofiler/testmetricscollector.nim index 1908241c..e8303e24 100644 --- a/tests/codex/utils/asyncprofiler/testmetricscollector.nim +++ b/tests/codex/utils/asyncprofiler/testmetricscollector.nim @@ -14,6 +14,7 @@ suite "asyncprofiler metrics collector": SrcLoc(procedure: "start", file: "discovery.nim", line: 192), SrcLoc(procedure: "query", file: "manager.nim", line: 323), SrcLoc(procedure: "update", file: "sqliteds.nim", line: 107), + SrcLoc(procedure: "idle", file: "idle.nim", line: 100), ] let sample = { @@ -49,28 +50,40 @@ suite "asyncprofiler metrics collector": maxSingleTime: timer.nanoseconds(41257), count: 3 ), + (addr locations[4]): OverallMetrics( + totalExecTime: timer.nanoseconds(0), + totalRunTime: timer.nanoseconds(156214), + totalWallTime: timer.nanoseconds(60813), + minSingleTime: timer.nanoseconds(0), + maxSingleTime: timer.nanoseconds(0), + count: 3 + ) }.toTable var wallTime = getTime() var collector: AsyncProfilerInfo - setup: + proc setupCollector(k: int = high(int)): void = collector = AsyncProfilerInfo.newCollector( perfSampler = proc (): MetricsSummary = sample, clock = proc (): Time = wallTime, sampleInterval = times.initDuration(minutes = 5), - k = 3, + k = k, ) collector.reset() collector.collect() test "should keep track of basic worst-case exec time stats": + setupCollector(k = 3) + check chronos_largest_exec_time_total.value == 91660 check chronos_largest_exec_time_max.value == 81660 test "should create labeled series for the k slowest procs in terms of totalExecTime": + setupCollector(k = 3) + check chronos_exec_time_total.value( labelValues = @["start", "discovery.nim", "192"]) == 91660 check chronos_exec_time_total.value( @@ -84,6 +97,8 @@ suite "asyncprofiler metrics collector": labelValues = @["query", "manager.nim", "323"]) test "should not collect metrics again unless enough time has elapsed from last collection": + setupCollector() + check collector.collections == 1 collector.collect() check collector.collections == 1 @@ -92,3 +107,10 @@ suite "asyncprofiler metrics collector": collector.collect() check collector.collections == 2 + + test "should not collect metrics for futures with zero total exec time": + setupCollector() + + expect system.KeyError: + discard chronos_exec_time_total.value( + labelValues = @["idle", "idle.nim", "100"])