From d4809d5a98fc6cda9ff4ed8a8cda7750ebc5b134 Mon Sep 17 00:00:00 2001 From: gmega Date: Wed, 22 Nov 2023 19:31:42 -0300 Subject: [PATCH] add harness and initial test --- chronos/asyncfutures2.nim | 19 +- chronos/asyncmacro2.nim | 9 - chronos/futures.nim | 15 +- chronos/profiler/events.nim | 68 +++++++ tests/config.nims | 2 +- tests/profiler/testasyncprofiler.nim | 109 +++++++++++ tests/profiler/utils.nim | 53 ++++++ tests/testutils.nim | 264 --------------------------- 8 files changed, 250 insertions(+), 289 deletions(-) create mode 100644 chronos/profiler/events.nim create mode 100644 tests/profiler/testasyncprofiler.nim create mode 100644 tests/profiler/utils.nim delete mode 100644 tests/testutils.nim diff --git a/chronos/asyncfutures2.nim b/chronos/asyncfutures2.nim index 9f6533f..f3deae8 100644 --- a/chronos/asyncfutures2.nim +++ b/chronos/asyncfutures2.nim @@ -133,8 +133,8 @@ proc finish(fut: FutureBase, state: FutureState) = # 2. `fut.state` is checked by `checkFinished()`. fut.internalState = state when chronosFuturesInstrumentation: - if not(isNil(futures.onFutureStop)): - futures.onFutureStop(fut) + if not isNil(onFutureEvent): + onFutureEvent(fut, state) when chronosStrictFutureAccess: doAssert fut.internalCancelcb == nil or state != FutureState.Cancelled fut.internalCancelcb = nil # release cancellation callback memory @@ -215,9 +215,6 @@ proc cancel(future: FutureBase, loc: ptr SrcLoc): bool = if future.finished(): return false - when chronosFuturesInstrumentation: - if not(isNil(onFutureStop)): onFutureStop(future) - if not(isNil(future.internalChild)): # If you hit this assertion, you should have used the `CancelledError` # mechanism and/or use a regular `addCallback` @@ -320,14 +317,22 @@ proc futureContinue*(fut: FutureBase) {.raises: [], gcsafe.} = template iterate = while true: when chronosFuturesInstrumentation: - if not(isNil(futures.onFutureRunning)): - futures.onFutureRunning(fut) + if not isNil(onFutureExecEvent): + onFutureExecEvent(fut, Running) + # Call closure to make progress on `fut` until it reaches `yield` (inside # `await` typically) or completes / fails / is cancelled next = fut.internalClosure(fut) + if fut.internalClosure.finished(): # Reached the end of the transformed proc break + # If we got thus far it means the future still has work to do, so we + # issue a pause. + when chronosFuturesInstrumentation: + if not isNil(onFutureExecEvent): + onFutureExecEvent(fut, Paused) + if next == nil: raiseAssert "Async procedure (" & ($fut.location[LocationKind.Create]) & ") yielded `nil`, are you await'ing a `nil` Future?" diff --git a/chronos/asyncmacro2.nim b/chronos/asyncmacro2.nim index 975d76a..64c6f07 100644 --- a/chronos/asyncmacro2.nim +++ b/chronos/asyncmacro2.nim @@ -298,10 +298,6 @@ template await*[T](f: Future[T]): untyped = when declared(chronosInternalRetFuture): chronosInternalRetFuture.internalChild = f - when chronosFuturesInstrumentation: - if not(isNil(futures.onFuturePause)): - futures.onFuturePause(chronosInternalRetFuture, chronosInternalRetFuture.internalChild) - # `futureContinue` calls the iterator generated by the `async` # transformation - `yield` gives control back to `futureContinue` which is # responsible for resuming execution once the yielded future is finished @@ -321,11 +317,6 @@ template await*[T](f: Future[T]): untyped = template awaitne*[T](f: Future[T]): Future[T] = when declared(chronosInternalRetFuture): chronosInternalRetFuture.internalChild = f - - when chronosFuturesInstrumentation: - if not(isNil(futures.onFuturePause)): - futures.onFuturePause(chronosInternalRetFuture, chronosInternalRetFuture.internalChild) - yield chronosInternalRetFuture.internalChild if chronosInternalRetFuture.internalMustCancel: raise newCancelledError() diff --git a/chronos/futures.nim b/chronos/futures.nim index e67433a..c93e64c 100644 --- a/chronos/futures.nim +++ b/chronos/futures.nim @@ -94,11 +94,11 @@ when chronosFutureTracking: var futureList* {.threadvar.}: FutureList when chronosFuturesInstrumentation: - var - onFutureCreate*: proc (fut: FutureBase) {.gcsafe, nimcall, raises: [].} - onFutureRunning*: proc (fut: FutureBase) {.gcsafe, nimcall, raises: [].} - onFuturePause*: proc (fut, child: FutureBase) {.gcsafe, nimcall, raises: [].} - onFutureStop*: proc (fut: FutureBase) {.gcsafe, nimcall, raises: [].} + type FutureExecutionState* {.pure.} = enum + Running, Paused + + var onFutureEvent* {.threadvar.}: proc (fut: FutureBase, state: FutureState): void {.nimcall, gcsafe, raises: [].} + var onFutureExecEvent* {.threadvar.}: proc(fut: FutureBase, state: FutureExecutionState): void {.nimcall, gcsafe, raises: [].} # Internal utilities - these are not part of the stable API proc internalInitFutureBase*( @@ -129,9 +129,8 @@ proc internalInitFutureBase*( futureList.count.inc() when chronosFuturesInstrumentation: - if not(isNil(onFutureCreate)): - onFutureCreate(fut) - + if not isNil(onFutureEvent): + onFutureEvent(fut, state) # Public API template init*[T](F: type Future[T], fromProc: static[string] = ""): Future[T] = diff --git a/chronos/profiler/events.nim b/chronos/profiler/events.nim new file mode 100644 index 0000000..a18805b --- /dev/null +++ b/chronos/profiler/events.nim @@ -0,0 +1,68 @@ +import ".."/futures +import ".."/srcloc + +type + ExtendedFutureState* {.pure.} = enum + Pending, + Running, + Paused, + Completed, + Cancelled, + Failed, + + Event* = object + futureId*: uint + location*: SrcLoc + newState*: ExtendedFutureState + + RunningFuture = object + event: Event + notNil: bool + +var running* {.threadvar.}: RunningFuture +var handleFutureEvent* {.threadvar.}: proc (event: Event) {.nimcall, gcsafe, raises: [].} + +proc dispatch(future: FutureBase, state: ExtendedFutureState) = + let event = Event( + futureId: future.id, + location: future.internalLocation[LocationKind.Create][], + newState: state + ) + + if state != ExtendedFutureState.Running: + handleFutureEvent(event) + return + + # If we have a running future, then it means this is a child. Emits synthetic + # pause event to keep things consistent with thread occupancy semantics. + if running.notNil: + handleFutureEvent(Event( + futureId: running.event.futureId, + location: running.event.location, + newState: Paused + )) + + running = RunningFuture(event: event, notNil: true) + + handleFutureEvent(event) + +onFutureEvent = proc (future: FutureBase, state: FutureState): void {.nimcall.} = + {.cast(gcsafe).}: + let extendedState = case state: + of FutureState.Pending: ExtendedFutureState.Pending + of FutureState.Completed: ExtendedFutureState.Completed + of FutureState.Cancelled: ExtendedFutureState.Cancelled + of FutureState.Failed: ExtendedFutureState.Failed + + dispatch(future, extendedState) + +onFutureExecEvent = proc (future: FutureBase, state: FutureExecutionState): void {.nimcall.} = + {.cast(gcsafe).}: + let extendedState = case state: + of FutureExecutionState.Running: ExtendedFutureState.Running + of FutureExecutionState.Paused: ExtendedFutureState.Paused + + dispatch(future, extendedState) + + + diff --git a/tests/config.nims b/tests/config.nims index 00648b8..9a3d66f 100644 --- a/tests/config.nims +++ b/tests/config.nims @@ -9,5 +9,5 @@ switch("threads", "on") switch("define", "nimRawSetjmp") ## REMOVE BEFORE MERGE! -# --d:chronosFuturesInstrumentation +--d:chronosFuturesInstrumentation # --d:chronosFutureTracking diff --git a/tests/profiler/testasyncprofiler.nim b/tests/profiler/testasyncprofiler.nim new file mode 100644 index 0000000..2d1893e --- /dev/null +++ b/tests/profiler/testasyncprofiler.nim @@ -0,0 +1,109 @@ +import std/os + +import unittest2 + +import ".."/".."/chronos +import ".."/".."/chronos/profiler/events + +import ./utils + +suite "profiler hooks test suite": + + setup: + installCallbacks() + + teardown: + clearRecording() + revertCallbacks() + + test "should emit correct events for a simple future": + + proc simple() {.async.} = + os.sleep(1) + + waitFor simple() + + check getRecording().forProcs("simple") == @[ + SimpleEvent(state: Pending, procedure: "simple"), + SimpleEvent(state: ExtendedFutureState.Running, procedure: "simple"), + SimpleEvent(state: Completed, procedure: "simple"), + ] + + # test "should emit correct events for a future with children": + # proc child1() {.async.} = + # os.sleep(1) + + # proc withChildren() {.async.} = + # await child1() + + # waitFor withChildren() + + # check getRecording().forProcs("withChildren", "child1") == @[ + # Event(kind: EventKind.Create, procedure: "withChildren"), + # Event(kind: EventKind.Run, procedure: "withChildren"), + # Event(kind: EventKind.Create, procedure: "child1"), + # Event(kind: EventKind.Pause, procedure: "withChildren"), + # Event(kind: EventKind.Run, procedure: "child1"), + # Event(kind: EventKind.Complete, procedure: "child1"), + # Event(kind: EventKind.Run, procedure: "withChildren"), + # Event(kind: EventKind.Complete, procedure: "withChildren"), + # ] + + # test "should emit correct events for a future with timers": + # proc withChildren() {.async.} = + # await sleepAsync(1.milliseconds) + + # waitFor withChildren() + + # check getRecording().forProcs( + # "withChildren", "chronos.sleepAsync(Duration)") == @[ + # Event(kind: EventKind.Create, procedure: "withChildren"), + # Event(kind: EventKind.Run, procedure: "withChildren"), + # Event(kind: EventKind.Pause, procedure: "withChildren"), + # Event(kind: EventKind.Create, procedure: "chronos.sleepAsync(Duration)"), + # # Timers don't "run" + # Event(kind: EventKind.Complete, procedure: "chronos.sleepAsync(Duration)"), + # Event(kind: EventKind.Run, procedure: "withChildren"), + # Event(kind: EventKind.Complete, procedure: "withChildren"), + # ] + + # test "should emit correct events when futures are canceled": + # proc withCancellation() {.async.} = + # let f = sleepyHead() + # f.cancel() + + # proc sleepyHead() {.async.} = + # await sleepAsync(10.minutes) + + # waitFor withCancellation() + + # check getRecording().forProcs("sleepyHead", "withCancellation") == @[ + # Event(kind: EventKind.Create, procedure: "withCancellation"), + # Event(kind: EventKind.Create, procedure: "sleepyHead"), + # Event(kind: EventKind.Run, procedure: "sleepyHead"), + # ] + +# type +# FakeFuture = object +# id: uint +# internalLocation*: array[LocationKind, ptr SrcLoc] + +# suite "asyncprofiler metrics": + +# test "should not keep metrics for a pending future in memory after it completes": + +# var fakeLoc = SrcLoc(procedure: "foo", file: "foo.nim", line: 1) +# let future = FakeFuture( +# id: 1, +# internalLocation: [ +# LocationKind.Create: addr fakeLoc, +# LocationKind.Finish: addr fakeLoc, +# ]) + +# var profiler = AsyncProfiler[FakeFuture]() + +# profiler.handleFutureCreate(future) +# profiler.handleFutureComplete(future) + +# check len(profiler.getPerFutureMetrics()) == 0 + diff --git a/tests/profiler/utils.nim b/tests/profiler/utils.nim new file mode 100644 index 0000000..b100113 --- /dev/null +++ b/tests/profiler/utils.nim @@ -0,0 +1,53 @@ +import std/sequtils +import std/sugar + +import ".."/".."/chronos +import ".."/".."/chronos/profiler/events + +type + SimpleEvent* = object + procedure*: string + state*: ExtendedFutureState + +# XXX this is sort of bad cause we get global state all over, but the fact we +# can't use closures on callbacks and that callbacks themselves are just +# global vars means we can't really do much better for now. + +var recording: seq[SimpleEvent] + +proc forProcs*(self: seq[SimpleEvent], procs: varargs[string]): seq[SimpleEvent] = + collect: + for e in self: + if e.procedure in procs: + e + +# FIXME bad, this needs to be refactored into a callback interface for the profiler. +var oldHandleFutureEvent: proc(event: Event) {.nimcall, gcsafe, raises: [].} = nil +var installed: bool = false + +proc recordEvent(event: Event) {.nimcall, gcsafe, raises: [].} = + {.cast(gcsafe).}: + recording.add( + SimpleEvent( + procedure: $(event.location.procedure), + state: event.newState + ) + ) + +proc getRecording*(): seq[SimpleEvent] = {.cast(gcsafe).}: recording + +proc clearRecording*(): void = recording = @[] + +proc installCallbacks*() = + assert not installed, "Callbacks already installed" + oldHandleFutureEvent = handleFutureEvent + handleFutureEvent = recordEvent + + installed = true + +proc revertCallbacks*() = + assert installed, "Callbacks already uninstalled" + + handleFutureEvent = oldHandleFutureEvent + installed = false + diff --git a/tests/testutils.nim b/tests/testutils.nim deleted file mode 100644 index c6045f2..0000000 --- a/tests/testutils.nim +++ /dev/null @@ -1,264 +0,0 @@ -# Chronos Test Suite -# (c) Copyright 2020-Present -# Status Research & Development GmbH -# -# Licensed under either of -# Apache License, version 2.0, (LICENSE-APACHEv2) -# MIT license (LICENSE-MIT) -import unittest2 -import ../chronos, ../chronos/config - -{.used.} - -when chronosFuturesInstrumentation: - import std/[tables, os, options, hashes] - import ../chronos/timer - -type - FutureMetric = object - ## Holds average timing information for a given closure - closureLoc*: ptr SrcLoc - created*: Moment - start*: Option[Moment] - duration*: Duration - blocks*: int - initDuration*: Duration - durationChildren*: Duration - - CallbackMetric = object - totalExecTime*: Duration - totalWallTime*: Duration - totalRunTime*: Duration - minSingleTime*: Duration - maxSingleTime*: Duration - count*: int64 - -var - futureDurations: Table[uint, FutureMetric] - callbackDurations: Table[ptr SrcLoc, CallbackMetric] - -suite "Asynchronous utilities test suite": - when chronosFutureTracking: - proc getCount(): uint = - # This procedure counts number of Future[T] in double-linked list via list - # iteration. - var res = 0'u - for item in pendingFutures(): - inc(res) - res - - test "Future clean and leaks test": - when chronosFutureTracking: - if pendingFuturesCount(WithoutCompleted) == 0'u: - if pendingFuturesCount(OnlyCompleted) > 0'u: - poll() - check pendingFuturesCount() == 0'u - else: - echo dumpPendingFutures() - check false - else: - skip() - - test "FutureList basics test": - when chronosFutureTracking: - var fut1 = newFuture[void]() - check: - getCount() == 1'u - pendingFuturesCount() == 1'u - var fut2 = newFuture[void]() - check: - getCount() == 2'u - pendingFuturesCount() == 2'u - var fut3 = newFuture[void]() - check: - getCount() == 3'u - pendingFuturesCount() == 3'u - fut1.complete() - poll() - check: - getCount() == 2'u - pendingFuturesCount() == 2'u - fut2.fail(newException(ValueError, "")) - poll() - check: - getCount() == 1'u - pendingFuturesCount() == 1'u - fut3.cancel() - poll() - check: - getCount() == 0'u - pendingFuturesCount() == 0'u - else: - skip() - - test "FutureList async procedure test": - when chronosFutureTracking: - proc simpleProc() {.async.} = - await sleepAsync(10.milliseconds) - - var fut = simpleProc() - check: - getCount() == 2'u - pendingFuturesCount() == 2'u - - waitFor fut - check: - getCount() == 1'u - pendingFuturesCount() == 1'u - - poll() - check: - getCount() == 0'u - pendingFuturesCount() == 0'u - else: - skip() - - test "check empty futures instrumentation runs": - - when chronosFuturesInstrumentation: - - proc simpleAsyncChild() {.async.} = - echo "child sleep..." - os.sleep(25) - - proc simpleAsync1() {.async.} = - for i in 0..1: - await sleepAsync(40.milliseconds) - await simpleAsyncChild() - echo "sleep..." - os.sleep(50) - - waitFor(simpleAsync1()) - check true - - - test "Example of using Future hooks to gather metrics": - - when chronosFuturesInstrumentation: - - proc setFutureCreate(fut: FutureBase) {.nimcall, gcsafe, raises: [].} = - ## used for setting the duration - {.cast(gcsafe).}: - let loc = fut.internalLocation[Create] - futureDurations[fut.id] = FutureMetric() - futureDurations.withValue(fut.id, metric): - metric.created = Moment.now() - echo loc, "; future create " - - proc setFutureStart(fut: FutureBase) {.nimcall, gcsafe, raises: [].} = - ## used for setting the duration - {.cast(gcsafe).}: - let loc = fut.internalLocation[Create] - assert futureDurations.hasKey(fut.id) - futureDurations.withValue(fut.id, metric): - let ts = Moment.now() - metric.start = some ts - metric.blocks.inc() - echo loc, "; future start: ", metric.initDuration - - proc setFuturePause(fut, child: FutureBase) {.nimcall, gcsafe, raises: [].} = - ## used for setting the duration - {.cast(gcsafe).}: - let loc = fut.internalLocation[Create] - let childLoc = if child.isNil: nil else: child.internalLocation[Create] - var durationChildren = ZeroDuration - var initDurationChildren = ZeroDuration - if childLoc != nil: - futureDurations.withValue(child.id, metric): - durationChildren = metric.duration - initDurationChildren = metric.initDuration - assert futureDurations.hasKey(fut.id) - futureDurations.withValue(fut.id, metric): - if metric.start.isSome: - let ts = Moment.now() - metric.duration += ts - metric.start.get() - metric.duration -= initDurationChildren - if metric.blocks == 1: - metric.initDuration = ts - metric.created # tricky, - # the first block of a child iterator also - # runs on the parents clock, so we track our first block - # time so any parents can get it - echo loc, "; child firstBlock time: ", initDurationChildren - - metric.durationChildren += durationChildren - metric.start = none Moment - echo loc, "; future pause ", if childLoc.isNil: "" else: " child: " & $childLoc - - proc setFutureDuration(fut: FutureBase) {.nimcall, gcsafe, raises: [].} = - ## used for setting the duration - {.cast(gcsafe).}: - let loc = fut.internalLocation[Create] - # assert "set duration: " & $loc - var fm: FutureMetric - # assert futureDurations.pop(fut.id, fm) - futureDurations.withValue(fut.id, metric): - fm = metric[] - - discard callbackDurations.hasKeyOrPut(loc, CallbackMetric(minSingleTime: InfiniteDuration)) - callbackDurations.withValue(loc, metric): - echo loc, " set duration: ", callbackDurations.hasKey(loc) - metric.totalExecTime += fm.duration - metric.totalWallTime += Moment.now() - fm.created - metric.totalRunTime += metric.totalExecTime + fm.durationChildren - echo loc, " child duration: ", fm.durationChildren - metric.count.inc - metric.minSingleTime = min(metric.minSingleTime, fm.duration) - metric.maxSingleTime = max(metric.maxSingleTime, fm.duration) - # handle overflow - if metric.count == metric.count.typeof.high: - metric.totalExecTime = ZeroDuration - metric.count = 0 - - onFutureCreate = - proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} = - f.setFutureCreate() - onFutureRunning = - proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} = - f.setFutureStart() - onFuturePause = - proc (f, child: FutureBase) {.nimcall, gcsafe, raises: [].} = - f.setFuturePause(child) - onFutureStop = - proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} = - f.setFuturePause(nil) - f.setFutureDuration() - - proc simpleAsyncChild() {.async.} = - echo "child sleep..." - os.sleep(25) - - proc simpleAsync1() {.async.} = - for i in 0..1: - await sleepAsync(40.milliseconds) - await simpleAsyncChild() - echo "sleep..." - os.sleep(50) - - waitFor(simpleAsync1()) - - let metrics = callbackDurations - echo "\n=== metrics ===" - echo "execTime:\ttime to execute non-async portions of async proc" - echo "runTime:\texecution time + execution time of children" - echo "wallTime:\twall time elapsed for future's lifetime" - for (k,v) in metrics.pairs(): - let count = v.count - if count > 0: - echo "" - echo "metric: ", $k - echo "count: ", count - echo "avg execTime:\t", v.totalExecTime div count, "\ttotal: ", v.totalExecTime - echo "avg wallTime:\t", v.totalWallTime div count, "\ttotal: ", v.totalWallTime - echo "avg runTime:\t", v.totalRunTime div count, "\ttotal: ", v.totalRunTime - if k.procedure == "simpleAsync1": - echo "v: ", v - check v.totalExecTime >= 100.milliseconds() - check v.totalExecTime <= 180.milliseconds() - - check v.totalRunTime >= 150.milliseconds() - check v.totalRunTime <= 240.milliseconds() - discard - echo "" - - else: - skip()