refactor asyncprofiler to make it more testable

This commit is contained in:
gmega 2023-11-16 12:30:46 -03:00
parent 1821a47261
commit 5d0c201882
No known key found for this signature in database
GPG Key ID: FFD8DAF00660270F
3 changed files with 77 additions and 71 deletions

View File

@ -31,16 +31,11 @@ import pkg/codexdht/discv5/spr as spr
import ../node
import ../blocktype
import ../conf
<<<<<<< HEAD
import ../contracts
import ../manifest
import ../streams/asyncstreamwrapper
import ../stores/blockstore
=======
import ../contracts except `%*`, `%` # imported from contracts/marketplace (exporting ethers)
import ../streams
import ../utils/asyncprofiler
>>>>>>> f7c385f (add simple profiling API)
import ./coders
import ./json
@ -191,18 +186,6 @@ proc initDataApi(node: CodexNodeRef, router: var RestRouter) =
return RestApiResponse.response($json, contentType="application/json")
when chronosFuturesInstrumentation:
router.api(
MethodGet,
"/api/codex/v1/debug/performance") do () -> RestApiResponse:
# Returns profiling information, highest totalExecTime first
without metrics =? sortBy(%(getFutureSummaryMetrics()),
"totalExecTime").catch, error:
return RestApiResponse.error(Http500, error.msg)
RestApiResponse.response($(metrics), contentType="application/json")
proc initSalesApi(node: CodexNodeRef, router: var RestRouter) =
router.api(
MethodGet,
@ -472,10 +455,23 @@ proc initDebugApi(node: CodexNodeRef, conf: CodexConf, router: var RestRouter) =
let json = %RestPeerRecord.init(peerRecord)
trace "debug/peer returning peer record"
return RestApiResponse.response($json)
except CatchableError as exc:
trace "Excepting processing request", exc = exc.msg
return RestApiResponse.error(Http500)
when chronosFuturesInstrumentation:
router.api(
MethodGet,
"/api/codex/v1/debug/performance") do () -> RestApiResponse:
# Returns profiling information, highest totalExecTime first
without metrics =? sortBy(%(profiler.getFutureSummaryMetrics()),
"totalExecTime").catch, error:
return RestApiResponse.error(Http500, error.msg)
RestApiResponse.response($(metrics), contentType="application/json")
proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
var router = RestRouter.init(validate)

View File

@ -8,6 +8,11 @@ import ../json
export tables, options, hashes, timer, chronos, SrcLoc
type
AsyncProfiler*[TFutureBase] = object
perFutureMetrics: PartialMetrics
futureSummaryMetrics: MetricsSummary
onChange: ChangeListener
FutureMetrics* = object
## Tracks timing information for a single future (typically an async
## proc). Created when a future starts, and discarded when a future ends.
@ -28,27 +33,29 @@ type
maxSingleTime* {.serialize.}: Duration
count* {.serialize.}: int64
PartialMetrics* = Table[uint, FutureMetrics]
MetricsSummary* = Table[ptr SrcLoc, OverallMetrics]
ChangeListener* = proc (): void {.raises: [].}
var
perFutureMetrics {.threadvar.}: Table[uint, FutureMetrics]
futureSummaryMetrics {.threadvar.}: MetricsSummary
onChange {.threadvar.}: ChangeListener
# FIXME this is a HACK because it's currently not simple to disable
# the profiler on a thread-by-thread basis.
enabledForCurrentThread {.threadvar.}: bool
profiler* {.threadvar.}: AsyncProfiler[FutureBase]
enabled {.threadvar.}: bool
proc getFutureSummaryMetrics*(): MetricsSummary {.gcsafe.} =
proc getPerFutureMetrics*(self: AsyncProfiler): PartialMetrics {.gcsafe.} =
{.cast(gcsafe).}:
self.perFutureMetrics
proc getFutureSummaryMetrics*(self: AsyncProfiler): MetricsSummary {.gcsafe.} =
## get a copy of the table of summary metrics for all futures.
{.cast(gcsafe).}:
futureSummaryMetrics
self.futureSummaryMetrics
proc setChangeCallback*(callback: ChangeListener): void =
proc setChangeCallback*(self: var AsyncProfiler, callback: ChangeListener): void =
## Allows registration of a single callback which gets called whenever
## a the summary metric table changes.
onChange = callback
self.onChange = callback
proc addRun(self: var OverallMetrics, run: FutureMetrics) =
## Adds metrics for a single run of a given async proc to its OverallMetrics.
@ -59,37 +66,35 @@ proc addRun(self: var OverallMetrics, run: FutureMetrics) =
self.minSingleTime = min(self.minSingleTime, run.duration)
self.maxSingleTime = max(self.maxSingleTime, run.duration)
if not isNil(onChange):
onChange()
proc setFutureCreate(fut: FutureBase) {.raises: [].} =
## used for setting the duration
proc handleFutureCreate*[TFutureBase](self: var AsyncProfiler[TFutureBase],
fut: TFutureBase) {.raises: [].} =
{.cast(gcsafe).}:
perFutureMetrics[fut.id] = FutureMetrics()
perFutureMetrics.withValue(fut.id, metric):
self.perFutureMetrics[fut.id] = FutureMetrics()
self.perFutureMetrics.withValue(fut.id, metric):
metric.created = Moment.now()
proc setFutureStart(fut: FutureBase) {.raises: [].} =
## used for setting the duration
proc handleFutureStart*[TFutureBase](self: var AsyncProfiler[TFutureBase],
fut: TFutureBase) {.raises: [].} =
{.cast(gcsafe).}:
assert perFutureMetrics.hasKey(fut.id)
perFutureMetrics.withValue(fut.id, metric):
assert self.perFutureMetrics.hasKey(fut.id)
self.perFutureMetrics.withValue(fut.id, metric):
let ts = Moment.now()
metric.start = some ts
metric.blocks.inc()
proc setFuturePause(fut, child: FutureBase) {.raises: [].} =
proc handleFuturePause*[TFutureBase](self: var AsyncProfiler[TFutureBase],
fut, child: TFutureBase) {.raises: [].} =
{.cast(gcsafe).}:
## used for setting the duration
let childLoc = if child.isNil: nil else: child.internalLocation[Create]
var durationChildren = ZeroDuration
var initDurationChildren = ZeroDuration
if childLoc != nil:
perFutureMetrics.withValue(child.id, metric):
self.perFutureMetrics.withValue(child.id, metric):
durationChildren = metric.duration
initDurationChildren = metric.initDuration
assert perFutureMetrics.hasKey(fut.id)
perFutureMetrics.withValue(fut.id, metric):
assert self.perFutureMetrics.hasKey(fut.id)
self.perFutureMetrics.withValue(fut.id, metric):
if metric.start.isSome:
let ts = Moment.now()
metric.duration += ts - metric.start.get()
@ -103,45 +108,50 @@ proc setFuturePause(fut, child: FutureBase) {.raises: [].} =
metric.durationChildren += durationChildren
metric.start = none Moment
proc setFutureDuration(fut: FutureBase) {.raises: [].} =
proc handleFutureComplete*[TFutureBase](self: var AsyncProfiler[TFutureBase],
fut: TFutureBase) {.raises: [].} =
{.cast(gcsafe).}:
## used for setting the duration
let loc = fut.internalLocation[Create]
# assert "set duration: " & $loc
var runMetrics: FutureMetrics
perFutureMetrics.withValue(fut.id, metric):
self.perFutureMetrics.withValue(fut.id, metric):
runMetrics = metric[]
discard futureSummaryMetrics.hasKeyOrPut(loc,
discard self.futureSummaryMetrics.hasKeyOrPut(loc,
OverallMetrics(minSingleTime: InfiniteDuration))
futureSummaryMetrics.withValue(loc, metric):
self.futureSummaryMetrics.withValue(loc, metric):
metric[].addRun(runMetrics)
if not isNil(self.onChange):
self.onChange()
onFutureCreate =
proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} =
if enabled:
profiler.handleFutureCreate(f)
onFutureRunning =
proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} =
if enabled:
profiler.handleFutureStart(f)
onFuturePause =
proc (f, child: FutureBase) {.nimcall, gcsafe, raises: [].} =
if enabled:
profiler.handleFuturePause(f, child)
onFutureStop =
proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} =
if enabled:
profiler.handleFuturePause(f, nil)
profiler.handleFutureComplete(f)
proc enableChronosProfiling* (): void =
## This gates for which chronos event loops we want to collect metrics. This
## method should be called for each event loop, before chronos is started.
enabledForCurrentThread = true
onFutureCreate =
proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} =
if enabledForCurrentThread:
f.setFutureCreate()
onFutureRunning =
proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} =
if enabledForCurrentThread:
f.setFutureStart()
onFuturePause =
proc (f, child: FutureBase) {.nimcall, gcsafe, raises: [].} =
if enabledForCurrentThread:
f.setFuturePause(child)
onFutureStop =
proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} =
if enabledForCurrentThread:
f.setFuturePause(nil)
f.setFutureDuration()
enabled = true
when isMainModule:
import std/unittest

View File

@ -203,11 +203,11 @@ when defined(metrics):
"metricscolletor module."
asyncProfilerInfo = AsyncProfilerInfo.newCollector(
perfSampler = getFutureSummaryMetrics,
perfSampler = proc (): MetricsSummary = profiler.getFutureSummaryMetrics(),
k = k,
# We want to collect metrics every 5 seconds.
sampleInterval = initDuration(seconds = 5),
clock = proc (): Time = getTime(),
)
setChangeCallback(proc (): void = asyncProfilerInfo.collect())
profiler.setChangeCallback(proc (): void = asyncProfilerInfo.collect())