diff --git a/chronos/config.nim b/chronos/config.nim index 21c3132..7be3e98 100644 --- a/chronos/config.nim +++ b/chronos/config.nim @@ -79,6 +79,11 @@ const "" ## OS polling engine type which is going to be used by chronos. + chronosProfiling* {.booldefine.} = defined(chronosProfiling) + ## Enable instrumentation callbacks which are called at + ## the start, pause, or end of a Future's lifetime. + ## Useful for implementing metrics or other instrumentation. + when defined(chronosStrictException): {.warning: "-d:chronosStrictException has been deprecated in favor of handleException".} # In chronos v3, this setting was used as the opposite of diff --git a/chronos/futures.nim b/chronos/futures.nim index 6fb9592..90f4fa5 100644 --- a/chronos/futures.nim +++ b/chronos/futures.nim @@ -98,6 +98,14 @@ when chronosFutureTracking: var futureList* {.threadvar.}: FutureList +when chronosProfiling: + type FutureExecutionEvent* {.pure.} = enum + Init, Run, Pause, Finish + + var onFutureEvent* {.threadvar.}: + proc (fut: FutureBase, + state: FutureExecutionEvent): void {.nimcall, gcsafe, raises: [].} + # Internal utilities - these are not part of the stable API proc internalInitFutureBase*(fut: FutureBase, loc: ptr SrcLoc, state: FutureState, flags: FutureFlags) = @@ -125,6 +133,10 @@ proc internalInitFutureBase*(fut: FutureBase, loc: ptr SrcLoc, futureList.head = fut futureList.count.inc() + when chronosProfiling: + if not isNil(onFutureEvent): + onFutureEvent(fut, Init) + # Public API template init*[T](F: type Future[T], fromProc: static[string] = ""): Future[T] = ## Creates a new pending future. diff --git a/chronos/internal/asyncfutures.nim b/chronos/internal/asyncfutures.nim index a7fd961..ee58eb5 100644 --- a/chronos/internal/asyncfutures.nim +++ b/chronos/internal/asyncfutures.nim @@ -193,6 +193,9 @@ proc finish(fut: FutureBase, state: FutureState) = # 2. `fut.state` is checked by `checkFinished()`. fut.internalState = state fut.internalCancelcb = nil # release cancellation callback memory + when chronosProfiling: + if not isNil(onFutureEvent): + onFutureEvent(fut, Finish) for item in fut.internalCallbacks.mitems(): if not(isNil(item.function)): callSoon(item) @@ -373,6 +376,9 @@ proc futureContinue*(fut: FutureBase) {.raises: [], gcsafe.} = # Every call to an `{.async.}` proc is redirected to call this function # instead with its original body captured in `fut.closure`. while true: + when chronosProfiling: + if not isNil(onFutureExecEvent): + onFutureEvent(fut, Run) # Call closure to make progress on `fut` until it reaches `yield` (inside # `await` typically) or completes / fails / is cancelled let next: FutureBase = fut.internalClosure(fut) @@ -389,6 +395,11 @@ proc futureContinue*(fut: FutureBase) {.raises: [], gcsafe.} = GC_ref(fut) next.addCallback(CallbackFunc(internalContinue), cast[pointer](fut)) + # If we got thus far, means the future is paused. + when chronosProfiling: + if not isNil(onFutureExecEvent): + onFutureEvent(fut, Pause) + # return here so that we don't remove the closure below return diff --git a/chronos/profiler.nim b/chronos/profiler.nim new file mode 100644 index 0000000..1f43f11 --- /dev/null +++ b/chronos/profiler.nim @@ -0,0 +1,28 @@ +import ./config + +when chronosProfiling: + import futures + import ./profiler/[events, metrics] + + export futures, events, metrics + + when not chronosFutureId: + {.error: "chronosProfiling requires chronosFutureId to be enabled".} + + var futureMetrics {.threadvar.}: ProfilerMetrics + + proc getMetrics*(): ProfilerMetrics = + ## Returns metrics for the current event loop. + result = futureMetrics + + proc enableEventCallbacks*(): void = + onFutureEvent = handleFutureEventCB + onFutureExecEvent = handleFutureExecEventCB + + proc enableProfiling*() = + ## Enables profiling on the current event loop. + if not isNil(handleFutureEvent): return + + enableEventCallbacks() + handleFutureEvent = proc (e: Event) {.nimcall.} = + futureMetrics.processEvent(e) diff --git a/chronos/profiler/events.nim b/chronos/profiler/events.nim new file mode 100644 index 0000000..c9e7346 --- /dev/null +++ b/chronos/profiler/events.nim @@ -0,0 +1,62 @@ +## This module defines the lower-level callback implementations that hook into +## the Chronos scheduler when profiling is enabled. The main goal is to provide +## timestamped events changes for futures while allowing a simpler implementation +## (only one event object type) for the remainder of the profiler. + +import ".."/timer +import ".."/futures +import ".."/srcloc + +type + ExtendedFutureState* {.pure.} = enum + Pending, + Running, + Paused, + Completed, + Cancelled, + Failed, + + Event* = object + future: FutureBase + newState*: ExtendedFutureState + timestamp*: Moment + +var handleFutureEvent* {.threadvar.}: proc (event: Event) {.nimcall, gcsafe, raises: [].} + +proc `location`*(self: Event): SrcLoc = + self.future.internalLocation[Create][] + +proc `futureId`*(self: Event): uint = + self.future.id + +proc mkEvent(future: FutureBase, state: ExtendedFutureState): Event = + Event( + future: future, + newState: state, + timestamp: Moment.now(), + ) + +proc handleFutureEventCB*(future: FutureBase, + state: FutureState): void {.nimcall.} = + {.cast(gcsafe).}: + let extendedState = case state: + of FutureState.Pending: ExtendedFutureState.Pending + of FutureState.Completed: ExtendedFutureState.Completed + of FutureState.Cancelled: ExtendedFutureState.Cancelled + of FutureState.Failed: ExtendedFutureState.Failed + + if not isNil(handleFutureEvent): + handleFutureEvent(mkEvent(future, extendedState)) + +proc handleFutureExecEventCB*(future: FutureBase, + state: FutureExecutionState): void {.nimcall.} = + {.cast(gcsafe).}: + let extendedState = case state: + of FutureExecutionState.Running: ExtendedFutureState.Running + of FutureExecutionState.Paused: ExtendedFutureState.Paused + + if not isNil(handleFutureEvent): + handleFutureEvent(mkEvent(future, extendedState)) + + + diff --git a/chronos/profiler/metrics.nim b/chronos/profiler/metrics.nim new file mode 100644 index 0000000..4c22bd1 --- /dev/null +++ b/chronos/profiler/metrics.nim @@ -0,0 +1,133 @@ +import std/tables +import std/options + +import ./events +import ../[timer, srcloc] + +export timer, tables, srcloc + +type + AggregateFutureMetrics* = object + execTime*: Duration + execTimeMax*: Duration + childrenExecTime*: Duration + wallClockTime*: Duration + callCount*: uint + + RunningFuture* = object + state*: ExtendedFutureState + created*: Moment + lastStarted*: Moment + timeToFirstPause*: Duration + partialExecTime*: Duration + partialChildrenExecTime*: Duration + partialChildrenExecOverlap*: Duration + parent*: Option[uint] + pauses*: uint + + MetricsTotals* = Table[SrcLoc, AggregateFutureMetrics] + + ProfilerMetrics* = object + callStack: seq[uint] + partials: Table[uint, RunningFuture] + totals*: MetricsTotals + +proc `execTimeWithChildren`*(self: AggregateFutureMetrics): Duration = + self.execTime + self.childrenExecTime + +proc push(self: var seq[uint], value: uint): void = self.add(value) + +proc pop(self: var seq[uint]): uint = + let value = self[^1] + self.setLen(self.len - 1) + value + +proc peek(self: var seq[uint]): Option[uint] = + if self.len == 0: none(uint) else: self[^1].some + +proc `$`(location: SrcLoc): string = + $location.procedure & "[" & $location.file & ":" & $location.line & "]" + +proc futureCreated(self: var ProfilerMetrics, event: Event): void = + assert not self.partials.hasKey(event.futureId), $event.location + + self.partials[event.futureId] = RunningFuture( + created: event.timestamp, + state: Pending, + ) + +proc bindParent(self: var ProfilerMetrics, metrics: ptr RunningFuture): void = + let current = self.callStack.peek() + if current.isNone: + return + + if metrics.parent.isSome: + assert metrics.parent.get == current.get + metrics.parent = current + +proc futureRunning(self: var ProfilerMetrics, event: Event): void = + assert self.partials.hasKey(event.futureId), $event.location + + self.partials.withValue(event.futureId, metrics): + assert metrics.state == Pending or metrics.state == Paused, + $event.location + + self.bindParent(metrics) + self.callStack.push(event.futureId) + + metrics.lastStarted = event.timestamp + metrics.state = Running + +proc futurePaused(self: var ProfilerMetrics, event: Event): void = + assert self.partials.hasKey(event.futureId), $event.location + assert event.futureId == self.callStack.pop(), $event.location + + self.partials.withValue(event.futureId, metrics): + assert metrics.state == Running, $event.location + + let segmentExecTime = event.timestamp - metrics.lastStarted + + if metrics.pauses == 0: + metrics.timeToFirstPause = segmentExecTime + metrics.partialExecTime += segmentExecTime + metrics.pauses += 1 + metrics.state = Paused + +proc futureCompleted(self: var ProfilerMetrics, event: Event): void = + self.partials.withValue(event.futureId, metrics): + if metrics.state == Running: + self.futurePaused(event) + + let location = event.location + if not self.totals.hasKey(location): + self.totals[location] = AggregateFutureMetrics() + + self.totals.withValue(location, aggMetrics): + let execTime = metrics.partialExecTime - metrics.partialChildrenExecOverlap + + aggMetrics.callCount.inc() + aggMetrics.execTime += execTime + aggMetrics.execTimeMax = max(aggMetrics.execTimeMax, execTime) + aggMetrics.childrenExecTime += metrics.partialChildrenExecTime + aggMetrics.wallClockTime += event.timestamp - metrics.created + + if metrics.parent.isSome: + self.partials.withValue(metrics.parent.get, parentMetrics): + parentMetrics.partialChildrenExecTime += metrics.partialExecTime + parentMetrics.partialChildrenExecOverlap += metrics.timeToFirstPause + + self.partials.del(event.futureId) + +proc processEvent*(self: var ProfilerMetrics, event: Event): void = + case event.newState: + of Pending: self.futureCreated(event) + of Running: self.futureRunning(event) + of Paused: self.futurePaused(event) + # Completion, failure and cancellation are currently handled the same way. + of Completed: self.futureCompleted(event) + of Failed: self.futureCompleted(event) + of Cancelled: self.futureCompleted(event) + +proc processAllEvents*(self: var ProfilerMetrics, events: seq[Event]): void = + for event in events: + self.processEvent(event) \ No newline at end of file