From 5d0c2018824f67cd0523981d224c945f8410dea0 Mon Sep 17 00:00:00 2001 From: gmega Date: Thu, 16 Nov 2023 12:30:46 -0300 Subject: [PATCH] refactor asyncprofiler to make it more testable --- codex/rest/api.nim | 30 ++--- codex/utils/asyncprofiler/asyncprofiler.nim | 114 ++++++++++-------- .../utils/asyncprofiler/metricscollector.nim | 4 +- 3 files changed, 77 insertions(+), 71 deletions(-) diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 84e5261b..918d62e6 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -31,16 +31,11 @@ import pkg/codexdht/discv5/spr as spr import ../node import ../blocktype import ../conf -<<<<<<< HEAD import ../contracts import ../manifest import ../streams/asyncstreamwrapper import ../stores/blockstore -======= -import ../contracts except `%*`, `%` # imported from contracts/marketplace (exporting ethers) -import ../streams import ../utils/asyncprofiler ->>>>>>> f7c385f (add simple profiling API) import ./coders import ./json @@ -191,18 +186,6 @@ proc initDataApi(node: CodexNodeRef, router: var RestRouter) = return RestApiResponse.response($json, contentType="application/json") - when chronosFuturesInstrumentation: - router.api( - MethodGet, - "/api/codex/v1/debug/performance") do () -> RestApiResponse: - # Returns profiling information, highest totalExecTime first - - without metrics =? sortBy(%(getFutureSummaryMetrics()), - "totalExecTime").catch, error: - return RestApiResponse.error(Http500, error.msg) - - RestApiResponse.response($(metrics), contentType="application/json") - proc initSalesApi(node: CodexNodeRef, router: var RestRouter) = router.api( MethodGet, @@ -472,10 +455,23 @@ proc initDebugApi(node: CodexNodeRef, conf: CodexConf, router: var RestRouter) = let json = %RestPeerRecord.init(peerRecord) trace "debug/peer returning peer record" return RestApiResponse.response($json) + except CatchableError as exc: trace "Excepting processing request", exc = exc.msg return RestApiResponse.error(Http500) + when chronosFuturesInstrumentation: + router.api( + MethodGet, + "/api/codex/v1/debug/performance") do () -> RestApiResponse: + # Returns profiling information, highest totalExecTime first + + without metrics =? sortBy(%(profiler.getFutureSummaryMetrics()), + "totalExecTime").catch, error: + return RestApiResponse.error(Http500, error.msg) + + RestApiResponse.response($(metrics), contentType="application/json") + proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter = var router = RestRouter.init(validate) diff --git a/codex/utils/asyncprofiler/asyncprofiler.nim b/codex/utils/asyncprofiler/asyncprofiler.nim index 7fe380f6..2f4b05ff 100644 --- a/codex/utils/asyncprofiler/asyncprofiler.nim +++ b/codex/utils/asyncprofiler/asyncprofiler.nim @@ -8,6 +8,11 @@ import ../json export tables, options, hashes, timer, chronos, SrcLoc type + AsyncProfiler*[TFutureBase] = object + perFutureMetrics: PartialMetrics + futureSummaryMetrics: MetricsSummary + onChange: ChangeListener + FutureMetrics* = object ## Tracks timing information for a single future (typically an async ## proc). Created when a future starts, and discarded when a future ends. @@ -28,27 +33,29 @@ type maxSingleTime* {.serialize.}: Duration count* {.serialize.}: int64 + PartialMetrics* = Table[uint, FutureMetrics] + MetricsSummary* = Table[ptr SrcLoc, OverallMetrics] ChangeListener* = proc (): void {.raises: [].} var - perFutureMetrics {.threadvar.}: Table[uint, FutureMetrics] - futureSummaryMetrics {.threadvar.}: MetricsSummary - onChange {.threadvar.}: ChangeListener - # FIXME this is a HACK because it's currently not simple to disable - # the profiler on a thread-by-thread basis. - enabledForCurrentThread {.threadvar.}: bool + profiler* {.threadvar.}: AsyncProfiler[FutureBase] + enabled {.threadvar.}: bool -proc getFutureSummaryMetrics*(): MetricsSummary {.gcsafe.} = +proc getPerFutureMetrics*(self: AsyncProfiler): PartialMetrics {.gcsafe.} = + {.cast(gcsafe).}: + self.perFutureMetrics + +proc getFutureSummaryMetrics*(self: AsyncProfiler): MetricsSummary {.gcsafe.} = ## get a copy of the table of summary metrics for all futures. {.cast(gcsafe).}: - futureSummaryMetrics + self.futureSummaryMetrics -proc setChangeCallback*(callback: ChangeListener): void = +proc setChangeCallback*(self: var AsyncProfiler, callback: ChangeListener): void = ## Allows registration of a single callback which gets called whenever ## a the summary metric table changes. - onChange = callback + self.onChange = callback proc addRun(self: var OverallMetrics, run: FutureMetrics) = ## Adds metrics for a single run of a given async proc to its OverallMetrics. @@ -59,37 +66,35 @@ proc addRun(self: var OverallMetrics, run: FutureMetrics) = self.minSingleTime = min(self.minSingleTime, run.duration) self.maxSingleTime = max(self.maxSingleTime, run.duration) - if not isNil(onChange): - onChange() - -proc setFutureCreate(fut: FutureBase) {.raises: [].} = - ## used for setting the duration +proc handleFutureCreate*[TFutureBase](self: var AsyncProfiler[TFutureBase], + fut: TFutureBase) {.raises: [].} = {.cast(gcsafe).}: - perFutureMetrics[fut.id] = FutureMetrics() - perFutureMetrics.withValue(fut.id, metric): + self.perFutureMetrics[fut.id] = FutureMetrics() + self.perFutureMetrics.withValue(fut.id, metric): metric.created = Moment.now() -proc setFutureStart(fut: FutureBase) {.raises: [].} = - ## used for setting the duration +proc handleFutureStart*[TFutureBase](self: var AsyncProfiler[TFutureBase], + fut: TFutureBase) {.raises: [].} = {.cast(gcsafe).}: - assert perFutureMetrics.hasKey(fut.id) - perFutureMetrics.withValue(fut.id, metric): + assert self.perFutureMetrics.hasKey(fut.id) + self.perFutureMetrics.withValue(fut.id, metric): let ts = Moment.now() metric.start = some ts metric.blocks.inc() -proc setFuturePause(fut, child: FutureBase) {.raises: [].} = +proc handleFuturePause*[TFutureBase](self: var AsyncProfiler[TFutureBase], + fut, child: TFutureBase) {.raises: [].} = {.cast(gcsafe).}: - ## used for setting the duration let childLoc = if child.isNil: nil else: child.internalLocation[Create] var durationChildren = ZeroDuration var initDurationChildren = ZeroDuration if childLoc != nil: - perFutureMetrics.withValue(child.id, metric): + self.perFutureMetrics.withValue(child.id, metric): durationChildren = metric.duration initDurationChildren = metric.initDuration - assert perFutureMetrics.hasKey(fut.id) - perFutureMetrics.withValue(fut.id, metric): + + assert self.perFutureMetrics.hasKey(fut.id) + self.perFutureMetrics.withValue(fut.id, metric): if metric.start.isSome: let ts = Moment.now() metric.duration += ts - metric.start.get() @@ -103,45 +108,50 @@ proc setFuturePause(fut, child: FutureBase) {.raises: [].} = metric.durationChildren += durationChildren metric.start = none Moment -proc setFutureDuration(fut: FutureBase) {.raises: [].} = +proc handleFutureComplete*[TFutureBase](self: var AsyncProfiler[TFutureBase], + fut: TFutureBase) {.raises: [].} = {.cast(gcsafe).}: ## used for setting the duration let loc = fut.internalLocation[Create] # assert "set duration: " & $loc var runMetrics: FutureMetrics - perFutureMetrics.withValue(fut.id, metric): + self.perFutureMetrics.withValue(fut.id, metric): runMetrics = metric[] - discard futureSummaryMetrics.hasKeyOrPut(loc, + discard self.futureSummaryMetrics.hasKeyOrPut(loc, OverallMetrics(minSingleTime: InfiniteDuration)) - futureSummaryMetrics.withValue(loc, metric): + + self.futureSummaryMetrics.withValue(loc, metric): metric[].addRun(runMetrics) + if not isNil(self.onChange): + self.onChange() + +onFutureCreate = + proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} = + if enabled: + profiler.handleFutureCreate(f) + +onFutureRunning = + proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} = + if enabled: + profiler.handleFutureStart(f) + +onFuturePause = + proc (f, child: FutureBase) {.nimcall, gcsafe, raises: [].} = + if enabled: + profiler.handleFuturePause(f, child) + +onFutureStop = + proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} = + if enabled: + profiler.handleFuturePause(f, nil) + profiler.handleFutureComplete(f) + proc enableChronosProfiling* (): void = ## This gates for which chronos event loops we want to collect metrics. This ## method should be called for each event loop, before chronos is started. - enabledForCurrentThread = true - -onFutureCreate = - proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} = - if enabledForCurrentThread: - f.setFutureCreate() - -onFutureRunning = - proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} = - if enabledForCurrentThread: - f.setFutureStart() - -onFuturePause = - proc (f, child: FutureBase) {.nimcall, gcsafe, raises: [].} = - if enabledForCurrentThread: - f.setFuturePause(child) - -onFutureStop = - proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} = - if enabledForCurrentThread: - f.setFuturePause(nil) - f.setFutureDuration() + enabled = true when isMainModule: import std/unittest diff --git a/codex/utils/asyncprofiler/metricscollector.nim b/codex/utils/asyncprofiler/metricscollector.nim index 113f5acf..8daafb6b 100644 --- a/codex/utils/asyncprofiler/metricscollector.nim +++ b/codex/utils/asyncprofiler/metricscollector.nim @@ -203,11 +203,11 @@ when defined(metrics): "metricscolletor module." asyncProfilerInfo = AsyncProfilerInfo.newCollector( - perfSampler = getFutureSummaryMetrics, + perfSampler = proc (): MetricsSummary = profiler.getFutureSummaryMetrics(), k = k, # We want to collect metrics every 5 seconds. sampleInterval = initDuration(seconds = 5), clock = proc (): Time = getTime(), ) - setChangeCallback(proc (): void = asyncProfilerInfo.collect()) + profiler.setChangeCallback(proc (): void = asyncProfilerInfo.collect())