update branch to nim-chroprof
This commit is contained in:
parent
c70f8a11eb
commit
8635f925fe
|
@ -218,3 +218,8 @@
|
|||
[submodule "vendor/nim-zippy"]
|
||||
path = vendor/nim-zippy
|
||||
url = https://github.com/status-im/nim-zippy.git
|
||||
[submodule "vendor/nim-chroprof"]
|
||||
path = vendor/nim-chroprof
|
||||
url = https://github.com/codex-storage/nim-chroprof.git
|
||||
ignore = untracked
|
||||
branch = master
|
||||
|
|
|
@ -23,7 +23,6 @@ import ./codex/codex
|
|||
import ./codex/logutils
|
||||
import ./codex/units
|
||||
import ./codex/utils/keyutils
|
||||
import ./codex/utils/asyncprofiler
|
||||
import ./codex/codextypes
|
||||
|
||||
export codex, conf, libp2p, chronos, logutils
|
||||
|
@ -56,12 +55,6 @@ when isMainModule:
|
|||
config.setupLogging()
|
||||
config.setupMetrics()
|
||||
|
||||
# TODO this should not be here, but currently can't have it in setupMetrics
|
||||
# or we get a circular import.
|
||||
when chronosProfiling:
|
||||
enableProfiling()
|
||||
AsyncProfilerInfo.initDefault(k = config.profilerMaxMetrics)
|
||||
|
||||
if config.nat == ValidIpAddress.init(IPv4_any()):
|
||||
error "`--nat` cannot be set to the any (`0.0.0.0`) address"
|
||||
quit QuitFailure
|
||||
|
|
|
@ -18,6 +18,8 @@ import std/typetraits
|
|||
import pkg/chronos
|
||||
import pkg/chronicles/helpers
|
||||
import pkg/chronicles/topics_registry
|
||||
import pkg/chroprof
|
||||
import pkg/chroprof/collector
|
||||
import pkg/confutils/defs
|
||||
import pkg/confutils/std/net
|
||||
import pkg/toml_serialization
|
||||
|
@ -671,6 +673,9 @@ proc setupLogging*(conf: CodexConf) =
|
|||
quit QuitFailure
|
||||
|
||||
proc setupMetrics*(config: CodexConf) =
|
||||
when chronosProfiling:
|
||||
enableProfiling()
|
||||
|
||||
if config.metricsEnabled:
|
||||
let metricsAddress = config.metricsAddress
|
||||
notice "Starting metrics HTTP server",
|
||||
|
@ -681,3 +686,6 @@ proc setupMetrics*(config: CodexConf) =
|
|||
raiseAssert exc.msg
|
||||
except Exception as exc:
|
||||
raiseAssert exc.msg # TODO fix metrics
|
||||
|
||||
when chronosProfiling:
|
||||
enableProfilerMetrics(k = config.profilerMaxMetrics)
|
||||
|
|
|
@ -17,6 +17,7 @@ import std/sequtils
|
|||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/chronos
|
||||
import pkg/chroprof
|
||||
import pkg/presto except toJson
|
||||
import pkg/metrics except toJson
|
||||
import pkg/stew/base10
|
||||
|
@ -35,7 +36,6 @@ import ../contracts
|
|||
import ../erasure/erasure
|
||||
import ../manifest
|
||||
import ../streams/asyncstreamwrapper
|
||||
import ../utils/asyncprofiler
|
||||
import ../stores
|
||||
import ../utils/options
|
||||
|
||||
|
@ -806,18 +806,6 @@ proc initDebugApi(node: CodexNodeRef, conf: CodexConf, router: var RestRouter) =
|
|||
trace "Excepting processing request", exc = exc.msg
|
||||
return RestApiResponse.error(Http500, headers = headers)
|
||||
|
||||
when chronosProfiling:
|
||||
router.api(
|
||||
MethodGet,
|
||||
"/api/codex/v1/debug/performance") do () -> RestApiResponse:
|
||||
# Returns profiling information, highest execTime first
|
||||
|
||||
without metrics =? sortBy(%(getMetrics().totals),
|
||||
"execTime").catch, error:
|
||||
return RestApiResponse.error(Http500, error.msg)
|
||||
|
||||
RestApiResponse.response($(metrics), contentType="application/json")
|
||||
|
||||
proc initRestApi*(
|
||||
node: CodexNodeRef,
|
||||
conf: CodexConf,
|
||||
|
|
|
@ -1,9 +0,0 @@
|
|||
import ../conf
|
||||
|
||||
when chronosProfiling:
|
||||
import chronos/profiler
|
||||
|
||||
import ./asyncprofiler/serialization
|
||||
import ./asyncprofiler/metricscollector
|
||||
|
||||
export profiler, serialization, metricscollector
|
|
@ -1,187 +0,0 @@
|
|||
import std/algorithm
|
||||
import std/enumerate
|
||||
import std/sequtils
|
||||
import std/tables
|
||||
import std/times
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/chronos/profiler
|
||||
import pkg/metrics
|
||||
|
||||
when defined(metrics):
|
||||
type
|
||||
AsyncProfilerInfo* = ref object of RootObj
|
||||
perfSampler: PerfSampler
|
||||
sampleInterval: times.Duration
|
||||
clock: Clock
|
||||
k: int
|
||||
init: bool
|
||||
lastSample: Time
|
||||
collections*: uint
|
||||
|
||||
PerfSampler = proc (): MetricsTotals {.raises: [].}
|
||||
|
||||
Clock = proc (): Time {.raises: [].}
|
||||
|
||||
ProfilerMetric = (SrcLoc, AggregateFutureMetrics)
|
||||
|
||||
const locationLabels = ["proc", "file", "line"]
|
||||
|
||||
# Per-proc Metrics
|
||||
declarePublicGauge(
|
||||
chronos_exec_time_total,
|
||||
"total time in which this proc actively occupied the event loop thread",
|
||||
labels = locationLabels,
|
||||
)
|
||||
|
||||
declarePublicGauge(
|
||||
chronos_exec_time_with_children_total,
|
||||
"chronos_exec_time_with_children_total of this proc plus of all its children (procs" &
|
||||
"that this proc called and awaited for)",
|
||||
labels = locationLabels,
|
||||
)
|
||||
|
||||
declarePublicGauge(
|
||||
chronos_wall_time_total,
|
||||
"the amount of time elapsed from when the async proc was started to when" &
|
||||
"it completed",
|
||||
labels = locationLabels,
|
||||
)
|
||||
|
||||
declarePublicGauge(
|
||||
chronos_call_count_total,
|
||||
"the total number of times this async proc was called and completed",
|
||||
labels = locationLabels,
|
||||
)
|
||||
|
||||
# Per-proc Statistics
|
||||
declarePublicGauge(
|
||||
chronos_single_exec_time_max,
|
||||
"the maximum execution time for a single call of this proc",
|
||||
labels = locationLabels,
|
||||
)
|
||||
|
||||
# Keeps track of the thread initializing the module. This is the only thread
|
||||
# that will be allowed to interact with the metrics collector.
|
||||
let moduleInitThread = getThreadId()
|
||||
|
||||
proc newCollector*(
|
||||
AsyncProfilerInfo: typedesc,
|
||||
perfSampler: PerfSampler,
|
||||
clock: Clock,
|
||||
sampleInterval: times.Duration,
|
||||
k: int = 10,
|
||||
): AsyncProfilerInfo = AsyncProfilerInfo(
|
||||
perfSampler: perfSampler,
|
||||
clock: clock,
|
||||
k: k,
|
||||
sampleInterval: sampleInterval,
|
||||
init: true,
|
||||
lastSample: low(Time),
|
||||
)
|
||||
|
||||
proc collectSlowestProcs(
|
||||
self: AsyncProfilerInfo,
|
||||
profilerMetrics: seq[ProfilerMetric],
|
||||
timestampMillis: int64,
|
||||
k: int,
|
||||
): void =
|
||||
|
||||
for (i, pair) in enumerate(profilerMetrics):
|
||||
if i == k:
|
||||
break
|
||||
|
||||
let (location, metrics) = pair
|
||||
|
||||
let locationLabels = @[
|
||||
$(location.procedure),
|
||||
$(location.file),
|
||||
$(location.line),
|
||||
]
|
||||
|
||||
chronos_exec_time_total.set(metrics.execTime.nanoseconds,
|
||||
labelValues = locationLabels)
|
||||
|
||||
chronos_exec_time_with_children_total.set(
|
||||
metrics.execTimeWithChildren.nanoseconds,
|
||||
labelValues = locationLabels
|
||||
)
|
||||
|
||||
chronos_wall_time_total.set(metrics.wallClockTime.nanoseconds,
|
||||
labelValues = locationLabels)
|
||||
|
||||
chronos_single_exec_time_max.set(metrics.execTimeMax.nanoseconds,
|
||||
labelValues = locationLabels)
|
||||
|
||||
chronos_call_count_total.set(metrics.callCount.int64,
|
||||
labelValues = locationLabels)
|
||||
|
||||
proc collect*(self: AsyncProfilerInfo, force: bool = false): void =
|
||||
# Calling this method from the wrong thread has happened a lot in the past,
|
||||
# so this makes sure we're not doing anything funny.
|
||||
assert getThreadId() == moduleInitThread, "You cannot call collect() from" &
|
||||
" a thread other than the one that initialized the metricscolletor module"
|
||||
|
||||
let now = self.clock()
|
||||
if not force and (now - self.lastSample < self.sampleInterval):
|
||||
return
|
||||
|
||||
self.collections += 1
|
||||
var currentMetrics = self.
|
||||
perfSampler().
|
||||
pairs.
|
||||
toSeq.
|
||||
# We don't scoop metrics with 0 exec time as we have a limited number of
|
||||
# prometheus slots, and those are less likely to be useful in debugging
|
||||
# Chronos performance issues.
|
||||
filter(
|
||||
proc (pair: ProfilerMetric): bool =
|
||||
pair[1].execTimeWithChildren.nanoseconds > 0
|
||||
).
|
||||
sorted(
|
||||
proc (a, b: ProfilerMetric): int =
|
||||
cmp(a[1].execTimeWithChildren, b[1].execTimeWithChildren),
|
||||
order = SortOrder.Descending
|
||||
)
|
||||
|
||||
self.collectSlowestProcs(currentMetrics, now.toMilliseconds(), self.k)
|
||||
|
||||
self.lastSample = now
|
||||
|
||||
proc resetMetric(gauge: Gauge): void =
|
||||
# We try to be as conservative as possible and not write directly to
|
||||
# internal state. We do need to read from it, though.
|
||||
for labelValues in gauge.metrics.keys:
|
||||
gauge.set(0.int64, labelValues = labelValues)
|
||||
|
||||
proc reset*(self: AsyncProfilerInfo): void =
|
||||
resetMetric(chronos_exec_time_total)
|
||||
resetMetric(chronos_exec_time_with_children_total)
|
||||
resetMetric(chronos_wall_time_total)
|
||||
resetMetric(chronos_call_count_total)
|
||||
resetMetric(chronos_single_exec_time_max)
|
||||
|
||||
var asyncProfilerInfo* {.global.}: AsyncProfilerInfo
|
||||
var wrappedEventHandler {.global.}: proc (e: Event) {.nimcall, gcsafe, raises: [].}
|
||||
|
||||
proc initDefault*(AsyncProfilerInfo: typedesc, k: int) =
|
||||
assert getThreadId() == moduleInitThread, "You cannot call " &
|
||||
"initDefault() from a thread other than the one that initialized the " &
|
||||
"metricscolletor module."
|
||||
|
||||
asyncProfilerInfo = AsyncProfilerInfo.newCollector(
|
||||
perfSampler = proc (): MetricsTotals = getMetrics().totals,
|
||||
k = k,
|
||||
# We want to collect metrics every 5 seconds.
|
||||
sampleInterval = initDuration(seconds = 5),
|
||||
clock = proc (): Time = getTime(),
|
||||
)
|
||||
|
||||
wrappedEventHandler = handleFutureEvent
|
||||
handleFutureEvent = proc (e: Event) {.nimcall, gcsafe.} =
|
||||
{.cast(gcsafe).}:
|
||||
wrappedEventHandler(e)
|
||||
|
||||
if e.newState == ExtendedFutureState.Completed:
|
||||
asyncProfilerInfo.collect()
|
||||
|
|
@ -1,28 +0,0 @@
|
|||
## Utilities for serializing profiler metrics.
|
||||
import std/algorithm
|
||||
import std/json
|
||||
|
||||
import chronos/profiler
|
||||
|
||||
proc `%`*(o: Duration): JsonNode =
|
||||
%(o.nanoseconds)
|
||||
|
||||
proc `%`*(o: cstring): JsonNode =
|
||||
%($(o))
|
||||
|
||||
proc toJson*(o: Table[SrcLoc, AggregateFutureMetrics]): JsonNode =
|
||||
var rows = newJArray()
|
||||
for (location, metric) in o.pairs:
|
||||
var row = %(metric)
|
||||
row["location"] = %(location)
|
||||
rows.add(row)
|
||||
|
||||
rows
|
||||
|
||||
proc `%`*(o: Table[SrcLoc, AggregateFutureMetrics]): JsonNode = o.toJson()
|
||||
|
||||
proc sortBy*(jArray: JsonNode, metric: string): JsonNode {.raises: [ref KeyError].} =
|
||||
%(jArray.getElems.sorted(
|
||||
proc (a, b: JsonNode): int {.raises: [ref KeyError].} =
|
||||
cmp(a[metric].getInt, b[metric].getInt),
|
||||
order = SortOrder.Descending))
|
|
@ -5,6 +5,5 @@ import ./utils/testasynciter
|
|||
import ./utils/testtimer
|
||||
import ./utils/testthen
|
||||
import ./utils/testtrackedfutures
|
||||
import ./utils/testasyncprofiler
|
||||
|
||||
{.warning[UnusedImport]: off.}
|
||||
|
|
|
@ -1,104 +0,0 @@
|
|||
import std/times
|
||||
import std/unittest
|
||||
|
||||
import pkg/chronos/profiler
|
||||
import pkg/metrics
|
||||
|
||||
import codex/utils/asyncprofiler/metricscollector
|
||||
|
||||
suite "asyncprofiler metrics collector":
|
||||
|
||||
var locations = @[
|
||||
SrcLoc(procedure: "start", file: "discovery.nim", line: 174),
|
||||
SrcLoc(procedure: "start", file: "discovery.nim", line: 192),
|
||||
SrcLoc(procedure: "query", file: "manager.nim", line: 323),
|
||||
SrcLoc(procedure: "update", file: "sqliteds.nim", line: 107),
|
||||
SrcLoc(procedure: "idle", file: "idle.nim", line: 100),
|
||||
]
|
||||
|
||||
let sample = {
|
||||
locations[0]: AggregateFutureMetrics(
|
||||
execTime: timer.nanoseconds(90062),
|
||||
execTimeMax: timer.nanoseconds(80062),
|
||||
childrenExecTime: timer.nanoseconds(52044),
|
||||
wallClockTime: timer.nanoseconds(174567),
|
||||
callCount: 1
|
||||
),
|
||||
locations[1]: AggregateFutureMetrics(
|
||||
execTime: timer.nanoseconds(91660),
|
||||
execTimeMax: timer.nanoseconds(81660),
|
||||
childrenExecTime: timer.nanoseconds(52495),
|
||||
wallClockTime: timer.nanoseconds(72941),
|
||||
callCount: 1
|
||||
),
|
||||
locations[2]: AggregateFutureMetrics(
|
||||
execTime: timer.nanoseconds(60529),
|
||||
execTimeMax: timer.nanoseconds(60529),
|
||||
childrenExecTime: timer.nanoseconds(9689),
|
||||
wallClockTime: timer.nanoseconds(60784),
|
||||
callCount: 1
|
||||
),
|
||||
locations[3]: AggregateFutureMetrics(
|
||||
execTime: timer.nanoseconds(60645),
|
||||
execTimeMax: timer.nanoseconds(41257),
|
||||
childrenExecTime: timer.nanoseconds(72934),
|
||||
wallClockTime: timer.nanoseconds(60813),
|
||||
callCount: 3
|
||||
),
|
||||
locations[4]: AggregateFutureMetrics(
|
||||
execTime: timer.nanoseconds(0),
|
||||
execTimeMax: timer.nanoseconds(0),
|
||||
childrenExecTime: timer.nanoseconds(0),
|
||||
wallClockTime: timer.nanoseconds(60813),
|
||||
callCount: 3
|
||||
)
|
||||
}.toTable
|
||||
|
||||
var wallTime = getTime()
|
||||
|
||||
var collector: AsyncProfilerInfo
|
||||
|
||||
proc setupCollector(k: int = high(int)): void =
|
||||
collector = AsyncProfilerInfo.newCollector(
|
||||
perfSampler = proc (): MetricsTotals = sample,
|
||||
clock = proc (): Time = wallTime,
|
||||
sampleInterval = times.initDuration(minutes = 5),
|
||||
k = k,
|
||||
)
|
||||
|
||||
collector.reset()
|
||||
collector.collect()
|
||||
|
||||
test "should create labeled series for the k slowest procs in terms of execTime":
|
||||
setupCollector(k = 3)
|
||||
|
||||
check chronos_exec_time_with_children_total.value(
|
||||
labelValues = @["start", "discovery.nim", "192"]) == 144155
|
||||
check chronos_exec_time_with_children_total.value(
|
||||
labelValues = @["start", "discovery.nim", "174"]) == 142106
|
||||
check chronos_exec_time_with_children_total.value(
|
||||
labelValues = @["update", "sqliteds.nim", "107"]) == 133579
|
||||
|
||||
# This is out of the top-k slowest, so should not have been recorded.
|
||||
expect system.KeyError:
|
||||
discard chronos_exec_time_with_children_total.value(
|
||||
labelValues = @["query", "manager.nim", "323"])
|
||||
|
||||
test "should not collect metrics again unless enough time has elapsed from last collection":
|
||||
setupCollector()
|
||||
|
||||
check collector.collections == 1
|
||||
collector.collect()
|
||||
check collector.collections == 1
|
||||
|
||||
wallTime += 6.minutes
|
||||
|
||||
collector.collect()
|
||||
check collector.collections == 2
|
||||
|
||||
test "should not collect metrics for futures with zero total exec time":
|
||||
setupCollector()
|
||||
|
||||
expect system.KeyError:
|
||||
discard chronos_exec_time_total.value(
|
||||
labelValues = @["idle", "idle.nim", "100"])
|
|
@ -1,92 +0,0 @@
|
|||
import std/sequtils
|
||||
import std/tables
|
||||
import std/unittest
|
||||
import std/json
|
||||
|
||||
import chronos/profiler
|
||||
|
||||
import codex/utils/asyncprofiler/serialization
|
||||
|
||||
suite "asyncprofiler metrics serializer":
|
||||
|
||||
let fooLoc = SrcLoc(
|
||||
procedure: "foo",
|
||||
file: "foo.nim",
|
||||
line: 1
|
||||
)
|
||||
|
||||
let fooMetric = AggregateFutureMetrics(
|
||||
execTime: 2.seconds,
|
||||
wallClockTime: 2.seconds,
|
||||
childrenExecTime: 10.seconds,
|
||||
execTimeMax: 1500.milliseconds,
|
||||
zombieEventCount: 0,
|
||||
stillbornCount: 0,
|
||||
callCount: 10
|
||||
)
|
||||
|
||||
test "should serialize AggregateFutureMetrics":
|
||||
check %fooMetric == %*{
|
||||
"execTime": 2000000000,
|
||||
"wallClockTime": 2000000000,
|
||||
"childrenExecTime": 10000000000,
|
||||
"execTimeMax": 1500000000,
|
||||
"zombieEventCount": 0,
|
||||
"stillbornCount": 0,
|
||||
"callCount": 10
|
||||
}
|
||||
|
||||
test "should serialize SrcLoc":
|
||||
check %fooLoc == %*{
|
||||
"procedure": "foo",
|
||||
"file": "foo.nim",
|
||||
"line": 1
|
||||
}
|
||||
|
||||
test "should serialize MetricsTotals":
|
||||
var summary: MetricsTotals = {
|
||||
fooLoc: fooMetric
|
||||
}.toTable
|
||||
|
||||
check %summary == %*[%*{
|
||||
"location": %*{
|
||||
"procedure": "foo",
|
||||
"file": "foo.nim",
|
||||
"line": 1,
|
||||
},
|
||||
"execTime": 2000000000,
|
||||
"wallClockTime": 2000000000,
|
||||
"childrenExecTime": 10000000000,
|
||||
"execTimeMax": 1500000000,
|
||||
"zombieEventCount": 0,
|
||||
"stillbornCount": 0,
|
||||
"callCount": 10
|
||||
}]
|
||||
|
||||
test "should sort MetricsSummary by the required key":
|
||||
let barLoc = SrcLoc(
|
||||
procedure: "bar",
|
||||
file: "bar.nim",
|
||||
line: 1
|
||||
)
|
||||
|
||||
var barMetrics = AggregateFutureMetrics(
|
||||
execTime: 3.seconds,
|
||||
wallClockTime: 1.seconds,
|
||||
execTimeMax: 1500.milliseconds,
|
||||
childrenExecTime: 1.seconds,
|
||||
zombieEventCount: 0,
|
||||
stillbornCount: 0,
|
||||
callCount: 5
|
||||
)
|
||||
|
||||
var summary: Table[SrcLoc, AggregateFutureMetrics] = {
|
||||
fooLoc: fooMetric,
|
||||
barLoc: barMetrics
|
||||
}.toTable
|
||||
|
||||
check (%summary).sortBy("execTime").getElems.map(
|
||||
proc (x: JsonNode): string = x["location"]["procedure"].getStr) == @["bar", "foo"]
|
||||
|
||||
check (%summary).sortBy("callCount").getElems.map(
|
||||
proc (x: JsonNode): string = x["location"]["procedure"].getStr) == @["foo", "bar"]
|
|
@ -1,4 +0,0 @@
|
|||
import ./asyncprofiler/testserialization
|
||||
import ./asyncprofiler/testmetricscollector
|
||||
|
||||
{.warning[UnusedImport]: off.}
|
|
@ -0,0 +1 @@
|
|||
Subproject commit 95da0353d276f688926926d796439b24874ccf44
|
|
@ -1 +1 @@
|
|||
Subproject commit 6142e433fc8ea9b73379770a788017ac528d46ff
|
||||
Subproject commit cacfdc12454a0804c65112b9f4f50d1375208dcd
|
Loading…
Reference in New Issue