add harness and initial test

This commit is contained in:
gmega 2023-11-22 19:31:42 -03:00
parent d0ed08c4a3
commit d4809d5a98
No known key found for this signature in database
GPG Key ID: FFD8DAF00660270F
8 changed files with 250 additions and 289 deletions

View File

@ -133,8 +133,8 @@ proc finish(fut: FutureBase, state: FutureState) =
# 2. `fut.state` is checked by `checkFinished()`.
fut.internalState = state
when chronosFuturesInstrumentation:
if not(isNil(futures.onFutureStop)):
futures.onFutureStop(fut)
if not isNil(onFutureEvent):
onFutureEvent(fut, state)
when chronosStrictFutureAccess:
doAssert fut.internalCancelcb == nil or state != FutureState.Cancelled
fut.internalCancelcb = nil # release cancellation callback memory
@ -215,9 +215,6 @@ proc cancel(future: FutureBase, loc: ptr SrcLoc): bool =
if future.finished():
return false
when chronosFuturesInstrumentation:
if not(isNil(onFutureStop)): onFutureStop(future)
if not(isNil(future.internalChild)):
# If you hit this assertion, you should have used the `CancelledError`
# mechanism and/or use a regular `addCallback`
@ -320,14 +317,22 @@ proc futureContinue*(fut: FutureBase) {.raises: [], gcsafe.} =
template iterate =
while true:
when chronosFuturesInstrumentation:
if not(isNil(futures.onFutureRunning)):
futures.onFutureRunning(fut)
if not isNil(onFutureExecEvent):
onFutureExecEvent(fut, Running)
# Call closure to make progress on `fut` until it reaches `yield` (inside
# `await` typically) or completes / fails / is cancelled
next = fut.internalClosure(fut)
if fut.internalClosure.finished(): # Reached the end of the transformed proc
break
# If we got thus far it means the future still has work to do, so we
# issue a pause.
when chronosFuturesInstrumentation:
if not isNil(onFutureExecEvent):
onFutureExecEvent(fut, Paused)
if next == nil:
raiseAssert "Async procedure (" & ($fut.location[LocationKind.Create]) &
") yielded `nil`, are you await'ing a `nil` Future?"

View File

@ -298,10 +298,6 @@ template await*[T](f: Future[T]): untyped =
when declared(chronosInternalRetFuture):
chronosInternalRetFuture.internalChild = f
when chronosFuturesInstrumentation:
if not(isNil(futures.onFuturePause)):
futures.onFuturePause(chronosInternalRetFuture, chronosInternalRetFuture.internalChild)
# `futureContinue` calls the iterator generated by the `async`
# transformation - `yield` gives control back to `futureContinue` which is
# responsible for resuming execution once the yielded future is finished
@ -321,11 +317,6 @@ template await*[T](f: Future[T]): untyped =
template awaitne*[T](f: Future[T]): Future[T] =
when declared(chronosInternalRetFuture):
chronosInternalRetFuture.internalChild = f
when chronosFuturesInstrumentation:
if not(isNil(futures.onFuturePause)):
futures.onFuturePause(chronosInternalRetFuture, chronosInternalRetFuture.internalChild)
yield chronosInternalRetFuture.internalChild
if chronosInternalRetFuture.internalMustCancel:
raise newCancelledError()

View File

@ -94,11 +94,11 @@ when chronosFutureTracking:
var futureList* {.threadvar.}: FutureList
when chronosFuturesInstrumentation:
var
onFutureCreate*: proc (fut: FutureBase) {.gcsafe, nimcall, raises: [].}
onFutureRunning*: proc (fut: FutureBase) {.gcsafe, nimcall, raises: [].}
onFuturePause*: proc (fut, child: FutureBase) {.gcsafe, nimcall, raises: [].}
onFutureStop*: proc (fut: FutureBase) {.gcsafe, nimcall, raises: [].}
type FutureExecutionState* {.pure.} = enum
Running, Paused
var onFutureEvent* {.threadvar.}: proc (fut: FutureBase, state: FutureState): void {.nimcall, gcsafe, raises: [].}
var onFutureExecEvent* {.threadvar.}: proc(fut: FutureBase, state: FutureExecutionState): void {.nimcall, gcsafe, raises: [].}
# Internal utilities - these are not part of the stable API
proc internalInitFutureBase*(
@ -129,9 +129,8 @@ proc internalInitFutureBase*(
futureList.count.inc()
when chronosFuturesInstrumentation:
if not(isNil(onFutureCreate)):
onFutureCreate(fut)
if not isNil(onFutureEvent):
onFutureEvent(fut, state)
# Public API
template init*[T](F: type Future[T], fromProc: static[string] = ""): Future[T] =

View File

@ -0,0 +1,68 @@
import ".."/futures
import ".."/srcloc
type
ExtendedFutureState* {.pure.} = enum
Pending,
Running,
Paused,
Completed,
Cancelled,
Failed,
Event* = object
futureId*: uint
location*: SrcLoc
newState*: ExtendedFutureState
RunningFuture = object
event: Event
notNil: bool
var running* {.threadvar.}: RunningFuture
var handleFutureEvent* {.threadvar.}: proc (event: Event) {.nimcall, gcsafe, raises: [].}
proc dispatch(future: FutureBase, state: ExtendedFutureState) =
let event = Event(
futureId: future.id,
location: future.internalLocation[LocationKind.Create][],
newState: state
)
if state != ExtendedFutureState.Running:
handleFutureEvent(event)
return
# If we have a running future, then it means this is a child. Emits synthetic
# pause event to keep things consistent with thread occupancy semantics.
if running.notNil:
handleFutureEvent(Event(
futureId: running.event.futureId,
location: running.event.location,
newState: Paused
))
running = RunningFuture(event: event, notNil: true)
handleFutureEvent(event)
onFutureEvent = proc (future: FutureBase, state: FutureState): void {.nimcall.} =
{.cast(gcsafe).}:
let extendedState = case state:
of FutureState.Pending: ExtendedFutureState.Pending
of FutureState.Completed: ExtendedFutureState.Completed
of FutureState.Cancelled: ExtendedFutureState.Cancelled
of FutureState.Failed: ExtendedFutureState.Failed
dispatch(future, extendedState)
onFutureExecEvent = proc (future: FutureBase, state: FutureExecutionState): void {.nimcall.} =
{.cast(gcsafe).}:
let extendedState = case state:
of FutureExecutionState.Running: ExtendedFutureState.Running
of FutureExecutionState.Paused: ExtendedFutureState.Paused
dispatch(future, extendedState)

View File

@ -9,5 +9,5 @@ switch("threads", "on")
switch("define", "nimRawSetjmp")
## REMOVE BEFORE MERGE!
# --d:chronosFuturesInstrumentation
--d:chronosFuturesInstrumentation
# --d:chronosFutureTracking

View File

@ -0,0 +1,109 @@
import std/os
import unittest2
import ".."/".."/chronos
import ".."/".."/chronos/profiler/events
import ./utils
suite "profiler hooks test suite":
setup:
installCallbacks()
teardown:
clearRecording()
revertCallbacks()
test "should emit correct events for a simple future":
proc simple() {.async.} =
os.sleep(1)
waitFor simple()
check getRecording().forProcs("simple") == @[
SimpleEvent(state: Pending, procedure: "simple"),
SimpleEvent(state: ExtendedFutureState.Running, procedure: "simple"),
SimpleEvent(state: Completed, procedure: "simple"),
]
# test "should emit correct events for a future with children":
# proc child1() {.async.} =
# os.sleep(1)
# proc withChildren() {.async.} =
# await child1()
# waitFor withChildren()
# check getRecording().forProcs("withChildren", "child1") == @[
# Event(kind: EventKind.Create, procedure: "withChildren"),
# Event(kind: EventKind.Run, procedure: "withChildren"),
# Event(kind: EventKind.Create, procedure: "child1"),
# Event(kind: EventKind.Pause, procedure: "withChildren"),
# Event(kind: EventKind.Run, procedure: "child1"),
# Event(kind: EventKind.Complete, procedure: "child1"),
# Event(kind: EventKind.Run, procedure: "withChildren"),
# Event(kind: EventKind.Complete, procedure: "withChildren"),
# ]
# test "should emit correct events for a future with timers":
# proc withChildren() {.async.} =
# await sleepAsync(1.milliseconds)
# waitFor withChildren()
# check getRecording().forProcs(
# "withChildren", "chronos.sleepAsync(Duration)") == @[
# Event(kind: EventKind.Create, procedure: "withChildren"),
# Event(kind: EventKind.Run, procedure: "withChildren"),
# Event(kind: EventKind.Pause, procedure: "withChildren"),
# Event(kind: EventKind.Create, procedure: "chronos.sleepAsync(Duration)"),
# # Timers don't "run"
# Event(kind: EventKind.Complete, procedure: "chronos.sleepAsync(Duration)"),
# Event(kind: EventKind.Run, procedure: "withChildren"),
# Event(kind: EventKind.Complete, procedure: "withChildren"),
# ]
# test "should emit correct events when futures are canceled":
# proc withCancellation() {.async.} =
# let f = sleepyHead()
# f.cancel()
# proc sleepyHead() {.async.} =
# await sleepAsync(10.minutes)
# waitFor withCancellation()
# check getRecording().forProcs("sleepyHead", "withCancellation") == @[
# Event(kind: EventKind.Create, procedure: "withCancellation"),
# Event(kind: EventKind.Create, procedure: "sleepyHead"),
# Event(kind: EventKind.Run, procedure: "sleepyHead"),
# ]
# type
# FakeFuture = object
# id: uint
# internalLocation*: array[LocationKind, ptr SrcLoc]
# suite "asyncprofiler metrics":
# test "should not keep metrics for a pending future in memory after it completes":
# var fakeLoc = SrcLoc(procedure: "foo", file: "foo.nim", line: 1)
# let future = FakeFuture(
# id: 1,
# internalLocation: [
# LocationKind.Create: addr fakeLoc,
# LocationKind.Finish: addr fakeLoc,
# ])
# var profiler = AsyncProfiler[FakeFuture]()
# profiler.handleFutureCreate(future)
# profiler.handleFutureComplete(future)
# check len(profiler.getPerFutureMetrics()) == 0

53
tests/profiler/utils.nim Normal file
View File

@ -0,0 +1,53 @@
import std/sequtils
import std/sugar
import ".."/".."/chronos
import ".."/".."/chronos/profiler/events
type
SimpleEvent* = object
procedure*: string
state*: ExtendedFutureState
# XXX this is sort of bad cause we get global state all over, but the fact we
# can't use closures on callbacks and that callbacks themselves are just
# global vars means we can't really do much better for now.
var recording: seq[SimpleEvent]
proc forProcs*(self: seq[SimpleEvent], procs: varargs[string]): seq[SimpleEvent] =
collect:
for e in self:
if e.procedure in procs:
e
# FIXME bad, this needs to be refactored into a callback interface for the profiler.
var oldHandleFutureEvent: proc(event: Event) {.nimcall, gcsafe, raises: [].} = nil
var installed: bool = false
proc recordEvent(event: Event) {.nimcall, gcsafe, raises: [].} =
{.cast(gcsafe).}:
recording.add(
SimpleEvent(
procedure: $(event.location.procedure),
state: event.newState
)
)
proc getRecording*(): seq[SimpleEvent] = {.cast(gcsafe).}: recording
proc clearRecording*(): void = recording = @[]
proc installCallbacks*() =
assert not installed, "Callbacks already installed"
oldHandleFutureEvent = handleFutureEvent
handleFutureEvent = recordEvent
installed = true
proc revertCallbacks*() =
assert installed, "Callbacks already uninstalled"
handleFutureEvent = oldHandleFutureEvent
installed = false

View File

@ -1,264 +0,0 @@
# Chronos Test Suite
# (c) Copyright 2020-Present
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import unittest2
import ../chronos, ../chronos/config
{.used.}
when chronosFuturesInstrumentation:
import std/[tables, os, options, hashes]
import ../chronos/timer
type
FutureMetric = object
## Holds average timing information for a given closure
closureLoc*: ptr SrcLoc
created*: Moment
start*: Option[Moment]
duration*: Duration
blocks*: int
initDuration*: Duration
durationChildren*: Duration
CallbackMetric = object
totalExecTime*: Duration
totalWallTime*: Duration
totalRunTime*: Duration
minSingleTime*: Duration
maxSingleTime*: Duration
count*: int64
var
futureDurations: Table[uint, FutureMetric]
callbackDurations: Table[ptr SrcLoc, CallbackMetric]
suite "Asynchronous utilities test suite":
when chronosFutureTracking:
proc getCount(): uint =
# This procedure counts number of Future[T] in double-linked list via list
# iteration.
var res = 0'u
for item in pendingFutures():
inc(res)
res
test "Future clean and leaks test":
when chronosFutureTracking:
if pendingFuturesCount(WithoutCompleted) == 0'u:
if pendingFuturesCount(OnlyCompleted) > 0'u:
poll()
check pendingFuturesCount() == 0'u
else:
echo dumpPendingFutures()
check false
else:
skip()
test "FutureList basics test":
when chronosFutureTracking:
var fut1 = newFuture[void]()
check:
getCount() == 1'u
pendingFuturesCount() == 1'u
var fut2 = newFuture[void]()
check:
getCount() == 2'u
pendingFuturesCount() == 2'u
var fut3 = newFuture[void]()
check:
getCount() == 3'u
pendingFuturesCount() == 3'u
fut1.complete()
poll()
check:
getCount() == 2'u
pendingFuturesCount() == 2'u
fut2.fail(newException(ValueError, ""))
poll()
check:
getCount() == 1'u
pendingFuturesCount() == 1'u
fut3.cancel()
poll()
check:
getCount() == 0'u
pendingFuturesCount() == 0'u
else:
skip()
test "FutureList async procedure test":
when chronosFutureTracking:
proc simpleProc() {.async.} =
await sleepAsync(10.milliseconds)
var fut = simpleProc()
check:
getCount() == 2'u
pendingFuturesCount() == 2'u
waitFor fut
check:
getCount() == 1'u
pendingFuturesCount() == 1'u
poll()
check:
getCount() == 0'u
pendingFuturesCount() == 0'u
else:
skip()
test "check empty futures instrumentation runs":
when chronosFuturesInstrumentation:
proc simpleAsyncChild() {.async.} =
echo "child sleep..."
os.sleep(25)
proc simpleAsync1() {.async.} =
for i in 0..1:
await sleepAsync(40.milliseconds)
await simpleAsyncChild()
echo "sleep..."
os.sleep(50)
waitFor(simpleAsync1())
check true
test "Example of using Future hooks to gather metrics":
when chronosFuturesInstrumentation:
proc setFutureCreate(fut: FutureBase) {.nimcall, gcsafe, raises: [].} =
## used for setting the duration
{.cast(gcsafe).}:
let loc = fut.internalLocation[Create]
futureDurations[fut.id] = FutureMetric()
futureDurations.withValue(fut.id, metric):
metric.created = Moment.now()
echo loc, "; future create "
proc setFutureStart(fut: FutureBase) {.nimcall, gcsafe, raises: [].} =
## used for setting the duration
{.cast(gcsafe).}:
let loc = fut.internalLocation[Create]
assert futureDurations.hasKey(fut.id)
futureDurations.withValue(fut.id, metric):
let ts = Moment.now()
metric.start = some ts
metric.blocks.inc()
echo loc, "; future start: ", metric.initDuration
proc setFuturePause(fut, child: FutureBase) {.nimcall, gcsafe, raises: [].} =
## used for setting the duration
{.cast(gcsafe).}:
let loc = fut.internalLocation[Create]
let childLoc = if child.isNil: nil else: child.internalLocation[Create]
var durationChildren = ZeroDuration
var initDurationChildren = ZeroDuration
if childLoc != nil:
futureDurations.withValue(child.id, metric):
durationChildren = metric.duration
initDurationChildren = metric.initDuration
assert futureDurations.hasKey(fut.id)
futureDurations.withValue(fut.id, metric):
if metric.start.isSome:
let ts = Moment.now()
metric.duration += ts - metric.start.get()
metric.duration -= initDurationChildren
if metric.blocks == 1:
metric.initDuration = ts - metric.created # tricky,
# the first block of a child iterator also
# runs on the parents clock, so we track our first block
# time so any parents can get it
echo loc, "; child firstBlock time: ", initDurationChildren
metric.durationChildren += durationChildren
metric.start = none Moment
echo loc, "; future pause ", if childLoc.isNil: "" else: " child: " & $childLoc
proc setFutureDuration(fut: FutureBase) {.nimcall, gcsafe, raises: [].} =
## used for setting the duration
{.cast(gcsafe).}:
let loc = fut.internalLocation[Create]
# assert "set duration: " & $loc
var fm: FutureMetric
# assert futureDurations.pop(fut.id, fm)
futureDurations.withValue(fut.id, metric):
fm = metric[]
discard callbackDurations.hasKeyOrPut(loc, CallbackMetric(minSingleTime: InfiniteDuration))
callbackDurations.withValue(loc, metric):
echo loc, " set duration: ", callbackDurations.hasKey(loc)
metric.totalExecTime += fm.duration
metric.totalWallTime += Moment.now() - fm.created
metric.totalRunTime += metric.totalExecTime + fm.durationChildren
echo loc, " child duration: ", fm.durationChildren
metric.count.inc
metric.minSingleTime = min(metric.minSingleTime, fm.duration)
metric.maxSingleTime = max(metric.maxSingleTime, fm.duration)
# handle overflow
if metric.count == metric.count.typeof.high:
metric.totalExecTime = ZeroDuration
metric.count = 0
onFutureCreate =
proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} =
f.setFutureCreate()
onFutureRunning =
proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} =
f.setFutureStart()
onFuturePause =
proc (f, child: FutureBase) {.nimcall, gcsafe, raises: [].} =
f.setFuturePause(child)
onFutureStop =
proc (f: FutureBase) {.nimcall, gcsafe, raises: [].} =
f.setFuturePause(nil)
f.setFutureDuration()
proc simpleAsyncChild() {.async.} =
echo "child sleep..."
os.sleep(25)
proc simpleAsync1() {.async.} =
for i in 0..1:
await sleepAsync(40.milliseconds)
await simpleAsyncChild()
echo "sleep..."
os.sleep(50)
waitFor(simpleAsync1())
let metrics = callbackDurations
echo "\n=== metrics ==="
echo "execTime:\ttime to execute non-async portions of async proc"
echo "runTime:\texecution time + execution time of children"
echo "wallTime:\twall time elapsed for future's lifetime"
for (k,v) in metrics.pairs():
let count = v.count
if count > 0:
echo ""
echo "metric: ", $k
echo "count: ", count
echo "avg execTime:\t", v.totalExecTime div count, "\ttotal: ", v.totalExecTime
echo "avg wallTime:\t", v.totalWallTime div count, "\ttotal: ", v.totalWallTime
echo "avg runTime:\t", v.totalRunTime div count, "\ttotal: ", v.totalRunTime
if k.procedure == "simpleAsync1":
echo "v: ", v
check v.totalExecTime >= 100.milliseconds()
check v.totalExecTime <= 180.milliseconds()
check v.totalRunTime >= 150.milliseconds()
check v.totalRunTime <= 240.milliseconds()
discard
echo ""
else:
skip()