From 91d186b717859e55053b3b8482302cea8767be82 Mon Sep 17 00:00:00 2001 From: gmega Date: Thu, 2 Nov 2023 12:07:15 -0300 Subject: [PATCH] WiP --- .gitignore | 1 + .gitmodules | 2 +- codex/codex.nim | 4 + codex/conf.nim | 1 + codex/utils/asyncprofiler.nim | 168 +---------------- codex/utils/asyncprofiler/asyncprofiler.nim | 169 ++++++++++++++++++ codex/utils/asyncprofiler/utils.nim | 19 ++ tests/codex/utils/asyncprofiler/testutils.nim | 60 +++++++ 8 files changed, 260 insertions(+), 164 deletions(-) create mode 100644 codex/utils/asyncprofiler/asyncprofiler.nim create mode 100644 codex/utils/asyncprofiler/utils.nim create mode 100644 tests/codex/utils/asyncprofiler/testutils.nim diff --git a/.gitignore b/.gitignore index c85aa931..e780e705 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,7 @@ nimble.paths # vscode .vscode +.history # JetBrain's IDEs .idea diff --git a/.gitmodules b/.gitmodules index 1f6994f7..562d2bb4 100644 --- a/.gitmodules +++ b/.gitmodules @@ -77,7 +77,7 @@ path = vendor/nim-chronos url = https://github.com/codex-storage/nim-chronos.git ignore = untracked - branch = master + branch = future-instrumentation-codex [submodule "vendor/nim-faststreams"] path = vendor/nim-faststreams url = https://github.com/status-im/nim-faststreams.git diff --git a/codex/codex.nim b/codex/codex.nim index 4fc0f922..7ba4ebe2 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -265,6 +265,10 @@ proc new*( store = NetworkStore.new(engine, repoStore) erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider) codexNode = CodexNodeRef.new(switch, store, engine, erasure, discovery) + + trace "create everything but rest server" + + let restServer = RestServerRef.new( codexNode.initRestApi(config), initTAddress(config.apiBindAddress , config.apiPort), diff --git a/codex/conf.nim b/codex/conf.nim index 9dc3cb8d..2c0e75d5 100644 --- a/codex/conf.nim +++ b/codex/conf.nim @@ -46,6 +46,7 @@ const codex_enable_api_debug_peers* {.booldefine.} = false codex_enable_proof_failures* {.booldefine.} = false codex_enable_log_counter* {.booldefine.} = false + chronosFuturesInstrumentation* {.booldefine.} = false type StartUpCommand* {.pure.} = enum diff --git a/codex/utils/asyncprofiler.nim b/codex/utils/asyncprofiler.nim index 832392c6..94d14cc8 100644 --- a/codex/utils/asyncprofiler.nim +++ b/codex/utils/asyncprofiler.nim @@ -1,164 +1,6 @@ +import ../conf -import std/[tables, macros, options, hashes] -import pkg/chronos -import pkg/chronos/timer - - -type - FutureMetric = object - ## Holds average timing information for a given closure - closureLoc*: ptr SrcLoc - created*: Moment - start*: Option[Moment] - duration*: Duration - blocks*: int - initDuration*: Duration - durationChildren*: Duration - - CallbackMetric = object - totalExecTime*: Duration - totalRunTime*: Duration - totalWallTime*: Duration - minSingleTime*: Duration - maxSingleTime*: Duration - count*: int64 - -var - perFutureMetrics: Table[uint, FutureMetric] - futureSummaryMetrics: Table[ptr SrcLoc, CallbackMetric] - -proc getFutureSummaryMetrics*(): Table[ptr SrcLoc, CallbackMetric] {.gcsafe.} = - ## get a copy of the table of summary metrics for all futures - {.cast(gcsafe).}: - futureSummaryMetrics - -proc setFutureCreate(fut: FutureBase) {.raises: [].} = - ## used for setting the duration - {.cast(gcsafe).}: - let loc = fut.internalLocation[Create] - perFutureMetrics[fut.id] = FutureMetric() - perFutureMetrics.withValue(fut.id, metric): - metric.created = Moment.now() - # echo loc, "; future create " - -proc setFutureStart(fut: FutureBase) {.raises: [].} = - ## used for setting the duration - {.cast(gcsafe).}: - let loc = fut.internalLocation[Create] - assert perFutureMetrics.hasKey(fut.id) - perFutureMetrics.withValue(fut.id, metric): - let ts = Moment.now() - metric.start = some ts - metric.blocks.inc() - # echo loc, "; future start: ", metric.initDuration - -proc setFuturePause(fut, child: FutureBase) {.raises: [].} = - {.cast(gcsafe).}: - ## used for setting the duration - let loc = fut.internalLocation[Create] - let childLoc = if child.isNil: nil else: child.internalLocation[Create] - var durationChildren = ZeroDuration - var initDurationChildren = ZeroDuration - if childLoc != nil: - perFutureMetrics.withValue(child.id, metric): - durationChildren = metric.duration - initDurationChildren = metric.initDuration - assert perFutureMetrics.hasKey(fut.id) - perFutureMetrics.withValue(fut.id, metric): - if metric.start.isSome: - let ts = Moment.now() - metric.duration += ts - metric.start.get() - metric.duration -= initDurationChildren - if metric.blocks == 1: - metric.initDuration = ts - metric.created # tricky, - # the first block of a child iterator also - # runs on the parents clock, so we track our first block - # time so any parents can get it - # echo loc, "; child firstBlock time: ", initDurationChildren - - metric.durationChildren += durationChildren - metric.start = none Moment - # echo loc, "; future pause ", if childLoc.isNil: "" else: " child: " & $childLoc - -proc setFutureDuration(fut: FutureBase) {.raises: [].} = - {.cast(gcsafe).}: - ## used for setting the duration - let loc = fut.internalLocation[Create] - # assert "set duration: " & $loc - var fm: FutureMetric - # assert perFutureMetrics.pop(fut.id, fm) - perFutureMetrics.withValue(fut.id, metric): - fm = metric[] - - discard futureSummaryMetrics.hasKeyOrPut(loc, CallbackMetric(minSingleTime: InfiniteDuration)) - futureSummaryMetrics.withValue(loc, metric): - # echo loc, " set duration: ", futureSummaryMetrics.hasKey(loc) - metric.totalExecTime += fm.duration - metric.totalWallTime += Moment.now() - fm.created - metric.totalRunTime += metric.totalExecTime + fm.durationChildren - # echo loc, " child duration: ", fm.durationChildren - metric.count.inc - metric.minSingleTime = min(metric.minSingleTime, fm.duration) - metric.maxSingleTime = max(metric.maxSingleTime, fm.duration) - # handle overflow - if metric.count == metric.count.typeof.high: - metric.totalExecTime = ZeroDuration - metric.count = 0 - -onFutureCreate = - proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} = - f.setFutureCreate() -onFutureRunning = - proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} = - f.setFutureStart() -onFuturePause = - proc (f, child: FutureBase) {.nimcall, gcsafe, raises: [].} = - # echo "onFuturePause: ", f.pointer.repr, " ch: ", child.pointer.repr - f.setFuturePause(child) -onFutureStop = - proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} = - f.setFuturePause(nil) - f.setFutureDuration() - -when isMainModule: - import std/unittest - import std/os - - suite "async profiling": - test "basic profiling": - proc simpleAsyncChild() {.async.} = - echo "child sleep..." - os.sleep(25) - - proc simpleAsync1() {.async.} = - for i in 0..1: - await sleepAsync(40.milliseconds) - await simpleAsyncChild() - echo "sleep..." - os.sleep(50) - - waitFor(simpleAsync1()) - - let metrics = futureSummaryMetrics - echo "\n=== metrics ===" - echo "execTime:\ttime to execute non-async portions of async proc" - echo "runTime:\texecution time + execution time of children" - echo "wallTime:\twall time elapsed for future's lifetime" - for (k,v) in metrics.pairs(): - let count = v.count - if count > 0: - echo "" - echo "metric: ", $k - echo "count: ", count - echo "avg execTime:\t", v.totalExecTime div count, "\ttotal: ", v.totalExecTime - echo "avg wallTime:\t", v.totalWallTime div count, "\ttotal: ", v.totalWallTime - echo "avg runTime:\t", v.totalRunTime div count, "\ttotal: ", v.totalRunTime - if k.procedure == "simpleAsync1": - echo "v: ", v - check v.totalExecTime >= 100.milliseconds() - check v.totalExecTime <= 180.milliseconds() - - check v.totalRunTime >= 150.milliseconds() - check v.totalRunTime <= 240.milliseconds() - discard - echo "" +when chronosFuturesInstrumentation: + import ./asyncprofiler/asyncprofiler + import ./asyncprofiler/utils + export asyncprofiler, utils diff --git a/codex/utils/asyncprofiler/asyncprofiler.nim b/codex/utils/asyncprofiler/asyncprofiler.nim new file mode 100644 index 00000000..0b1478b6 --- /dev/null +++ b/codex/utils/asyncprofiler/asyncprofiler.nim @@ -0,0 +1,169 @@ + +import std/[tables, macros, options, hashes] +import pkg/chronos +import pkg/chronos/timer + +import ../json + +export tables, options, hashes, timer, chronos, SrcLoc + +type + FutureMetric* = object + ## Holds average timing information for a given closure + closureLoc*: ptr SrcLoc + created*: Moment + start*: Option[Moment] + duration*: Duration + blocks*: int + initDuration*: Duration + durationChildren*: Duration + + CallbackMetric* = object + totalExecTime* {.serialize.}: Duration + totalRunTime* {.serialize.}: Duration + totalWallTime* {.serialize.}: Duration + minSingleTime* {.serialize.}: Duration + maxSingleTime* {.serialize.}: Duration + count* {.serialize.}: int64 + + MetricsSummary* = Table[ptr SrcLoc, CallbackMetric] + +var + perFutureMetrics: Table[uint, FutureMetric] + futureSummaryMetrics: MetricsSummary + +proc getFutureSummaryMetrics*(): MetricsSummary {.gcsafe.} = + ## get a copy of the table of summary metrics for all futures + {.cast(gcsafe).}: + futureSummaryMetrics + +proc setFutureCreate(fut: FutureBase) {.raises: [].} = + ## used for setting the duration + {.cast(gcsafe).}: + let loc = fut.internalLocation[Create] + perFutureMetrics[fut.id] = FutureMetric() + perFutureMetrics.withValue(fut.id, metric): + metric.created = Moment.now() + # echo loc, "; future create " + +proc setFutureStart(fut: FutureBase) {.raises: [].} = + ## used for setting the duration + {.cast(gcsafe).}: + let loc = fut.internalLocation[Create] + assert perFutureMetrics.hasKey(fut.id) + perFutureMetrics.withValue(fut.id, metric): + let ts = Moment.now() + metric.start = some ts + metric.blocks.inc() + # echo loc, "; future start: ", metric.initDuration + +proc setFuturePause(fut, child: FutureBase) {.raises: [].} = + {.cast(gcsafe).}: + ## used for setting the duration + let loc = fut.internalLocation[Create] + let childLoc = if child.isNil: nil else: child.internalLocation[Create] + var durationChildren = ZeroDuration + var initDurationChildren = ZeroDuration + if childLoc != nil: + perFutureMetrics.withValue(child.id, metric): + durationChildren = metric.duration + initDurationChildren = metric.initDuration + assert perFutureMetrics.hasKey(fut.id) + perFutureMetrics.withValue(fut.id, metric): + if metric.start.isSome: + let ts = Moment.now() + metric.duration += ts - metric.start.get() + metric.duration -= initDurationChildren + if metric.blocks == 1: + metric.initDuration = ts - metric.created # tricky, + # the first block of a child iterator also + # runs on the parents clock, so we track our first block + # time so any parents can get it + # echo loc, "; child firstBlock time: ", initDurationChildren + + metric.durationChildren += durationChildren + metric.start = none Moment + # echo loc, "; future pause ", if childLoc.isNil: "" else: " child: " & $childLoc + +proc setFutureDuration(fut: FutureBase) {.raises: [].} = + {.cast(gcsafe).}: + ## used for setting the duration + let loc = fut.internalLocation[Create] + # assert "set duration: " & $loc + var fm: FutureMetric + # assert perFutureMetrics.pop(fut.id, fm) + perFutureMetrics.withValue(fut.id, metric): + fm = metric[] + + discard futureSummaryMetrics.hasKeyOrPut(loc, CallbackMetric(minSingleTime: InfiniteDuration)) + futureSummaryMetrics.withValue(loc, metric): + # echo loc, " set duration: ", futureSummaryMetrics.hasKey(loc) + metric.totalExecTime += fm.duration + metric.totalWallTime += Moment.now() - fm.created + metric.totalRunTime += metric.totalExecTime + fm.durationChildren + # echo loc, " child duration: ", fm.durationChildren + metric.count.inc + metric.minSingleTime = min(metric.minSingleTime, fm.duration) + metric.maxSingleTime = max(metric.maxSingleTime, fm.duration) + # handle overflow + if metric.count == metric.count.typeof.high: + metric.totalExecTime = ZeroDuration + metric.count = 0 + +onFutureCreate = + proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} = + f.setFutureCreate() +onFutureRunning = + proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} = + f.setFutureStart() +onFuturePause = + proc (f, child: FutureBase) {.nimcall, gcsafe, raises: [].} = + # echo "onFuturePause: ", f.pointer.repr, " ch: ", child.pointer.repr + f.setFuturePause(child) +onFutureStop = + proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} = + f.setFuturePause(nil) + f.setFutureDuration() + +when isMainModule: + import std/unittest + import std/os + + suite "async profiling": + test "basic profiling": + proc simpleAsyncChild() {.async.} = + echo "child sleep..." + os.sleep(25) + + proc simpleAsync1() {.async.} = + for i in 0..1: + await sleepAsync(40.milliseconds) + await simpleAsyncChild() + echo "sleep..." + os.sleep(50) + + waitFor(simpleAsync1()) + + let metrics = futureSummaryMetrics + echo "\n=== metrics ===" + echo "execTime:\ttime to execute non-async portions of async proc" + echo "runTime:\texecution time + execution time of children" + echo "wallTime:\twall time elapsed for future's lifetime" + for (k,v) in metrics.pairs(): + let count = v.count + if count > 0: + echo "" + echo "metric: ", $k + echo "count: ", count + echo "avg execTime:\t", v.totalExecTime div count, "\ttotal: ", v.totalExecTime + echo "avg wallTime:\t", v.totalWallTime div count, "\ttotal: ", v.totalWallTime + echo "avg runTime:\t", v.totalRunTime div count, "\ttotal: ", v.totalRunTime + if k.procedure == "simpleAsync1": + echo "v: ", v + check v.totalExecTime >= 100.milliseconds() + check v.totalExecTime <= 180.milliseconds() + + check v.totalRunTime >= 150.milliseconds() + check v.totalRunTime <= 240.milliseconds() + discard + echo "" diff --git a/codex/utils/asyncprofiler/utils.nim b/codex/utils/asyncprofiler/utils.nim new file mode 100644 index 00000000..4e787024 --- /dev/null +++ b/codex/utils/asyncprofiler/utils.nim @@ -0,0 +1,19 @@ +import asyncprofiler + +import ../json + + +proc `%`*(o: MetricsSummary): JsonNode = + var rows = newJArray() + for (location, metric) in o.pairs: + var row = %(metric) + row["location"] = %(location[]) + rows.add(row) + + rows + +proc `%`*(o: Duration): JsonNode = + %(o.nanoseconds) + +proc `%`*(o: cstring): JsonNode = + %($(o)) diff --git a/tests/codex/utils/asyncprofiler/testutils.nim b/tests/codex/utils/asyncprofiler/testutils.nim new file mode 100644 index 00000000..a2ee5c28 --- /dev/null +++ b/tests/codex/utils/asyncprofiler/testutils.nim @@ -0,0 +1,60 @@ +import std/tables +import std/unittest +import std/json + +import codex/utils/asyncprofiler + +import ../../helpers + +checksuite "asyncprofiler utils": + + var fooLoc = SrcLoc( + procedure: "foo", + file: "foo.nim", + line: 1 + ) + + let metric = CallbackMetric( + totalExecTime: 2.seconds, + totalRunTime: 2.seconds, + totalWallTime: 2.seconds, + minSingleTime: 100.nanoseconds, + maxSingleTime: 1500.milliseconds, + count: 10 + ) + + test "should serialize CallbackMetrics": + check %metric == %*{ + "totalExecTime": 2000000000, + "totalRunTime": 2000000000, + "totalWallTime": 2000000000, + "minSingleTime": 100, + "maxSingleTime":1500000000, + "count": 10 + } + + test "should serialize SrcLoc": + check %fooLoc == %*{ + "procedure": "foo", + "file": "foo.nim", + "line": 1 + } + + test "should serialize MetricsSummary": + var summary: MetricsSummary = { + (addr fooLoc): metric + }.toTable + + check %summary == [%*{ + "location": %*{ + "procedure": "foo", + "file": "foo.nim", + "line": 1, + }, + "totalExecTime": 2000000000, + "totalRunTime": 2000000000, + "totalWallTime": 2000000000, + "minSingleTime": 100, + "maxSingleTime":1500000000, + "count": 10 + }]