add callback to eliminate the need for an async timer in metric updates
This commit is contained in:
parent
29d36b51f2
commit
f048404cb7
|
@ -30,15 +30,23 @@ type
|
||||||
|
|
||||||
MetricsSummary* = Table[ptr SrcLoc, OverallMetrics]
|
MetricsSummary* = Table[ptr SrcLoc, OverallMetrics]
|
||||||
|
|
||||||
|
ChangeListener* = proc (): void {.raises: [].}
|
||||||
|
|
||||||
var
|
var
|
||||||
perFutureMetrics {.threadvar.}: Table[uint, FutureMetrics]
|
perFutureMetrics {.threadvar.}: Table[uint, FutureMetrics]
|
||||||
futureSummaryMetrics {.threadvar.}: MetricsSummary
|
futureSummaryMetrics {.threadvar.}: MetricsSummary
|
||||||
|
onChange {.threadvar.}: ChangeListener
|
||||||
|
|
||||||
proc getFutureSummaryMetrics*(): MetricsSummary {.gcsafe.} =
|
proc getFutureSummaryMetrics*(): MetricsSummary {.gcsafe.} =
|
||||||
## get a copy of the table of summary metrics for all futures.
|
## get a copy of the table of summary metrics for all futures.
|
||||||
{.cast(gcsafe).}:
|
{.cast(gcsafe).}:
|
||||||
futureSummaryMetrics
|
futureSummaryMetrics
|
||||||
|
|
||||||
|
proc setChangeCallback*(callback: ChangeListener): void =
|
||||||
|
## Allows registration of a single callback which gets called whenever
|
||||||
|
## a the summary metric table changes.
|
||||||
|
onChange = callback
|
||||||
|
|
||||||
proc addRun(self: var OverallMetrics, run: FutureMetrics) =
|
proc addRun(self: var OverallMetrics, run: FutureMetrics) =
|
||||||
## Adds metrics for a single run of a given async proc to its OverallMetrics.
|
## Adds metrics for a single run of a given async proc to its OverallMetrics.
|
||||||
self.totalExecTime += run.duration
|
self.totalExecTime += run.duration
|
||||||
|
@ -52,6 +60,9 @@ proc addRun(self: var OverallMetrics, run: FutureMetrics) =
|
||||||
self.totalExecTime = ZeroDuration
|
self.totalExecTime = ZeroDuration
|
||||||
self.count = 0
|
self.count = 0
|
||||||
|
|
||||||
|
if not isNil(onChange):
|
||||||
|
onChange()
|
||||||
|
|
||||||
proc setFutureCreate(fut: FutureBase) {.raises: [].} =
|
proc setFutureCreate(fut: FutureBase) {.raises: [].} =
|
||||||
## used for setting the duration
|
## used for setting the duration
|
||||||
{.cast(gcsafe).}:
|
{.cast(gcsafe).}:
|
||||||
|
|
|
@ -10,11 +10,17 @@ when defined(metrics):
|
||||||
type
|
type
|
||||||
AsyncProfilerInfo* = ref object of RootObj
|
AsyncProfilerInfo* = ref object of RootObj
|
||||||
perfSampler: PerfSampler
|
perfSampler: PerfSampler
|
||||||
sampleInterval: int
|
sampleInterval: times.Duration
|
||||||
|
clock: Clock
|
||||||
k: int
|
k: int
|
||||||
|
init: bool
|
||||||
|
lastSample: Time
|
||||||
|
collections*: uint
|
||||||
|
|
||||||
PerfSampler = proc (): MetricsSummary {.raises: [].}
|
PerfSampler = proc (): MetricsSummary {.raises: [].}
|
||||||
|
|
||||||
|
Clock = proc(): Time {.raises: [].}
|
||||||
|
|
||||||
ProfilerMetric = (SrcLoc, OverallMetrics)
|
ProfilerMetric = (SrcLoc, OverallMetrics)
|
||||||
|
|
||||||
const locationLabels = ["proc", "file", "line"]
|
const locationLabels = ["proc", "file", "line"]
|
||||||
|
@ -67,8 +73,17 @@ when defined(metrics):
|
||||||
proc newCollector*(
|
proc newCollector*(
|
||||||
AsyncProfilerInfo: typedesc,
|
AsyncProfilerInfo: typedesc,
|
||||||
perfSampler: PerfSampler,
|
perfSampler: PerfSampler,
|
||||||
|
clock: Clock,
|
||||||
|
sampleInterval: times.Duration,
|
||||||
k: int = 10,
|
k: int = 10,
|
||||||
): AsyncProfilerInfo = AsyncProfilerInfo(perfSampler: perfSampler, k: k)
|
): AsyncProfilerInfo = AsyncProfilerInfo(
|
||||||
|
perfSampler: perfSampler,
|
||||||
|
clock: clock,
|
||||||
|
k: k,
|
||||||
|
sampleInterval: sampleInterval,
|
||||||
|
init: true,
|
||||||
|
lastSample: low(Time),
|
||||||
|
)
|
||||||
|
|
||||||
proc collectSlowestProcs(
|
proc collectSlowestProcs(
|
||||||
self: AsyncProfilerInfo,
|
self: AsyncProfilerInfo,
|
||||||
|
@ -124,8 +139,13 @@ when defined(metrics):
|
||||||
chronos_largest_exec_time_total.set(largestExecTime.nanoseconds)
|
chronos_largest_exec_time_total.set(largestExecTime.nanoseconds)
|
||||||
chronos_largest_exec_time_max.set(largestMaxExecTime.nanoseconds)
|
chronos_largest_exec_time_max.set(largestMaxExecTime.nanoseconds)
|
||||||
|
|
||||||
proc collect*(self: AsyncProfilerInfo): void =
|
proc collect*(self: AsyncProfilerInfo, force: bool = false): void =
|
||||||
let now = times.getTime().toMilliseconds()
|
let now = self.clock()
|
||||||
|
|
||||||
|
if not force and (now - self.lastSample < self.sampleInterval):
|
||||||
|
return
|
||||||
|
|
||||||
|
self.collections += 1
|
||||||
|
|
||||||
var currentMetrics = self.
|
var currentMetrics = self.
|
||||||
perfSampler().
|
perfSampler().
|
||||||
|
@ -141,8 +161,10 @@ when defined(metrics):
|
||||||
order = SortOrder.Descending
|
order = SortOrder.Descending
|
||||||
)
|
)
|
||||||
|
|
||||||
self.collectOutlierMetrics(currentMetrics, now)
|
self.collectOutlierMetrics(currentMetrics, now.toMilliseconds())
|
||||||
self.collectSlowestProcs(currentMetrics, now, self.k)
|
self.collectSlowestProcs(currentMetrics, now.toMilliseconds(), self.k)
|
||||||
|
|
||||||
|
self.lastSample = now
|
||||||
|
|
||||||
proc resetMetric(gauge: Gauge): void =
|
proc resetMetric(gauge: Gauge): void =
|
||||||
# We try to be as conservative as possible and not write directly to
|
# We try to be as conservative as possible and not write directly to
|
||||||
|
@ -159,7 +181,15 @@ when defined(metrics):
|
||||||
resetMetric(chronos_largest_exec_time_total)
|
resetMetric(chronos_largest_exec_time_total)
|
||||||
resetMetric(chronos_largest_exec_time_max)
|
resetMetric(chronos_largest_exec_time_max)
|
||||||
|
|
||||||
proc run*(self: AsyncProfilerInfo) {.async.} =
|
var asyncProfilerInfo* {.threadvar.}: AsyncProfilerInfo
|
||||||
while true:
|
|
||||||
self.collect()
|
proc initDefault(AsyncProfilerInfo: typedesc): void =
|
||||||
await sleepAsync(timer.milliseconds(self.sampleInterval))
|
asyncProfilerInfo = AsyncProfilerInfo.newCollector(
|
||||||
|
perfSampler = asyncprofiler.getFutureSummaryMetrics,
|
||||||
|
k = 10,
|
||||||
|
# We want to collect metrics every 5 seconds.
|
||||||
|
sampleInterval = times.milliseconds(5 * 1000),
|
||||||
|
clock = proc (): Time = time.getTime(),
|
||||||
|
)
|
||||||
|
|
||||||
|
setChangeCallback(asyncProfilerInfo)
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import std/times
|
||||||
import std/unittest
|
import std/unittest
|
||||||
|
|
||||||
import pkg/metrics
|
import pkg/metrics
|
||||||
|
@ -17,45 +18,51 @@ suite "asyncprofiler metrics collector":
|
||||||
|
|
||||||
let sample = {
|
let sample = {
|
||||||
(addr locations[0]): OverallMetrics(
|
(addr locations[0]): OverallMetrics(
|
||||||
totalExecTime: 90062.nanoseconds,
|
totalExecTime: timer.nanoseconds(90062),
|
||||||
totalRunTime: 113553.nanoseconds,
|
totalRunTime: timer.nanoseconds(113553),
|
||||||
totalWallTime: 174567.nanoseconds,
|
totalWallTime: timer.nanoseconds(174567),
|
||||||
minSingleTime: 80062.nanoseconds,
|
minSingleTime: timer.nanoseconds(80062),
|
||||||
maxSingleTime: 80062.nanoseconds,
|
maxSingleTime: timer.nanoseconds(80062),
|
||||||
count: 1
|
count: 1
|
||||||
),
|
),
|
||||||
(addr locations[1]): OverallMetrics(
|
(addr locations[1]): OverallMetrics(
|
||||||
totalExecTime: 91660.nanoseconds,
|
totalExecTime: timer.nanoseconds(91660),
|
||||||
totalRunTime: 71660.nanoseconds,
|
totalRunTime: timer.nanoseconds(71660),
|
||||||
totalWallTime: 72941.nanoseconds,
|
totalWallTime: timer.nanoseconds(72941),
|
||||||
minSingleTime: 71660.nanoseconds,
|
minSingleTime: timer.nanoseconds(71660),
|
||||||
maxSingleTime: 81660.nanoseconds,
|
maxSingleTime: timer.nanoseconds(81660),
|
||||||
count: 1
|
count: 1
|
||||||
),
|
),
|
||||||
(addr locations[2]): OverallMetrics(
|
(addr locations[2]): OverallMetrics(
|
||||||
totalExecTime: 60529.nanoseconds,
|
totalExecTime: timer.nanoseconds(60529),
|
||||||
totalRunTime: 60529.nanoseconds,
|
totalRunTime: timer.nanoseconds(60529),
|
||||||
totalWallTime: 60784.nanoseconds,
|
totalWallTime: timer.nanoseconds(60784),
|
||||||
minSingleTime: 60529.nanoseconds,
|
minSingleTime: timer.nanoseconds(60529),
|
||||||
maxSingleTime: 60529.nanoseconds,
|
maxSingleTime: timer.nanoseconds(60529),
|
||||||
count: 1
|
count: 1
|
||||||
),
|
),
|
||||||
(addr locations[3]): OverallMetrics(
|
(addr locations[3]): OverallMetrics(
|
||||||
totalExecTime: 60645.nanoseconds,
|
totalExecTime: timer.nanoseconds(60645),
|
||||||
totalRunTime: 156214.nanoseconds,
|
totalRunTime: timer.nanoseconds(156214),
|
||||||
totalWallTime: 60813.nanoseconds,
|
totalWallTime: timer.nanoseconds(60813),
|
||||||
minSingleTime: 5333.nanoseconds,
|
minSingleTime: timer.nanoseconds(5333),
|
||||||
maxSingleTime: 41257.nanoseconds,
|
maxSingleTime: timer.nanoseconds(41257),
|
||||||
count: 3
|
count: 3
|
||||||
),
|
),
|
||||||
}.toTable
|
}.toTable
|
||||||
|
|
||||||
var collector = AsyncProfilerInfo.newCollector(
|
var wallTime = getTime()
|
||||||
perfSampler = proc (): MetricsSummary = sample,
|
|
||||||
k = 3,
|
var collector: AsyncProfilerInfo
|
||||||
)
|
|
||||||
|
|
||||||
setup:
|
setup:
|
||||||
|
collector = AsyncProfilerInfo.newCollector(
|
||||||
|
perfSampler = proc (): MetricsSummary = sample,
|
||||||
|
clock = proc (): Time = wallTime,
|
||||||
|
sampleInterval = times.initDuration(minutes = 5),
|
||||||
|
k = 3,
|
||||||
|
)
|
||||||
|
|
||||||
collector.reset()
|
collector.reset()
|
||||||
collector.collect()
|
collector.collect()
|
||||||
|
|
||||||
|
@ -75,3 +82,13 @@ suite "asyncprofiler metrics collector":
|
||||||
expect system.KeyError:
|
expect system.KeyError:
|
||||||
discard chronos_exec_time_total.value(
|
discard chronos_exec_time_total.value(
|
||||||
labelValues = @["query", "manager.nim", "323"])
|
labelValues = @["query", "manager.nim", "323"])
|
||||||
|
|
||||||
|
test "should not collect metrics again unless enough time has elapsed from last collection":
|
||||||
|
check collector.collections == 1
|
||||||
|
collector.collect()
|
||||||
|
check collector.collections == 1
|
||||||
|
|
||||||
|
wallTime += 6.minutes
|
||||||
|
|
||||||
|
collector.collect()
|
||||||
|
check collector.collections == 2
|
||||||
|
|
Loading…
Reference in New Issue