mirror of
https://github.com/logos-storage/nim-chronos.git
synced 2026-01-07 16:03:09 +00:00
initial port
This commit is contained in:
parent
28a100b135
commit
4b7d88f5de
@ -79,6 +79,11 @@ const
|
|||||||
""
|
""
|
||||||
## OS polling engine type which is going to be used by chronos.
|
## OS polling engine type which is going to be used by chronos.
|
||||||
|
|
||||||
|
chronosProfiling* {.booldefine.} = defined(chronosProfiling)
|
||||||
|
## Enable instrumentation callbacks which are called at
|
||||||
|
## the start, pause, or end of a Future's lifetime.
|
||||||
|
## Useful for implementing metrics or other instrumentation.
|
||||||
|
|
||||||
when defined(chronosStrictException):
|
when defined(chronosStrictException):
|
||||||
{.warning: "-d:chronosStrictException has been deprecated in favor of handleException".}
|
{.warning: "-d:chronosStrictException has been deprecated in favor of handleException".}
|
||||||
# In chronos v3, this setting was used as the opposite of
|
# In chronos v3, this setting was used as the opposite of
|
||||||
|
|||||||
@ -98,6 +98,14 @@ when chronosFutureTracking:
|
|||||||
|
|
||||||
var futureList* {.threadvar.}: FutureList
|
var futureList* {.threadvar.}: FutureList
|
||||||
|
|
||||||
|
when chronosProfiling:
|
||||||
|
type FutureExecutionEvent* {.pure.} = enum
|
||||||
|
Init, Run, Pause, Finish
|
||||||
|
|
||||||
|
var onFutureEvent* {.threadvar.}:
|
||||||
|
proc (fut: FutureBase,
|
||||||
|
state: FutureExecutionEvent): void {.nimcall, gcsafe, raises: [].}
|
||||||
|
|
||||||
# Internal utilities - these are not part of the stable API
|
# Internal utilities - these are not part of the stable API
|
||||||
proc internalInitFutureBase*(fut: FutureBase, loc: ptr SrcLoc,
|
proc internalInitFutureBase*(fut: FutureBase, loc: ptr SrcLoc,
|
||||||
state: FutureState, flags: FutureFlags) =
|
state: FutureState, flags: FutureFlags) =
|
||||||
@ -125,6 +133,10 @@ proc internalInitFutureBase*(fut: FutureBase, loc: ptr SrcLoc,
|
|||||||
futureList.head = fut
|
futureList.head = fut
|
||||||
futureList.count.inc()
|
futureList.count.inc()
|
||||||
|
|
||||||
|
when chronosProfiling:
|
||||||
|
if not isNil(onFutureEvent):
|
||||||
|
onFutureEvent(fut, Init)
|
||||||
|
|
||||||
# Public API
|
# Public API
|
||||||
template init*[T](F: type Future[T], fromProc: static[string] = ""): Future[T] =
|
template init*[T](F: type Future[T], fromProc: static[string] = ""): Future[T] =
|
||||||
## Creates a new pending future.
|
## Creates a new pending future.
|
||||||
|
|||||||
@ -193,6 +193,9 @@ proc finish(fut: FutureBase, state: FutureState) =
|
|||||||
# 2. `fut.state` is checked by `checkFinished()`.
|
# 2. `fut.state` is checked by `checkFinished()`.
|
||||||
fut.internalState = state
|
fut.internalState = state
|
||||||
fut.internalCancelcb = nil # release cancellation callback memory
|
fut.internalCancelcb = nil # release cancellation callback memory
|
||||||
|
when chronosProfiling:
|
||||||
|
if not isNil(onFutureEvent):
|
||||||
|
onFutureEvent(fut, Finish)
|
||||||
for item in fut.internalCallbacks.mitems():
|
for item in fut.internalCallbacks.mitems():
|
||||||
if not(isNil(item.function)):
|
if not(isNil(item.function)):
|
||||||
callSoon(item)
|
callSoon(item)
|
||||||
@ -373,6 +376,9 @@ proc futureContinue*(fut: FutureBase) {.raises: [], gcsafe.} =
|
|||||||
# Every call to an `{.async.}` proc is redirected to call this function
|
# Every call to an `{.async.}` proc is redirected to call this function
|
||||||
# instead with its original body captured in `fut.closure`.
|
# instead with its original body captured in `fut.closure`.
|
||||||
while true:
|
while true:
|
||||||
|
when chronosProfiling:
|
||||||
|
if not isNil(onFutureExecEvent):
|
||||||
|
onFutureEvent(fut, Run)
|
||||||
# Call closure to make progress on `fut` until it reaches `yield` (inside
|
# Call closure to make progress on `fut` until it reaches `yield` (inside
|
||||||
# `await` typically) or completes / fails / is cancelled
|
# `await` typically) or completes / fails / is cancelled
|
||||||
let next: FutureBase = fut.internalClosure(fut)
|
let next: FutureBase = fut.internalClosure(fut)
|
||||||
@ -389,6 +395,11 @@ proc futureContinue*(fut: FutureBase) {.raises: [], gcsafe.} =
|
|||||||
GC_ref(fut)
|
GC_ref(fut)
|
||||||
next.addCallback(CallbackFunc(internalContinue), cast[pointer](fut))
|
next.addCallback(CallbackFunc(internalContinue), cast[pointer](fut))
|
||||||
|
|
||||||
|
# If we got thus far, means the future is paused.
|
||||||
|
when chronosProfiling:
|
||||||
|
if not isNil(onFutureExecEvent):
|
||||||
|
onFutureEvent(fut, Pause)
|
||||||
|
|
||||||
# return here so that we don't remove the closure below
|
# return here so that we don't remove the closure below
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|||||||
28
chronos/profiler.nim
Normal file
28
chronos/profiler.nim
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
import ./config
|
||||||
|
|
||||||
|
when chronosProfiling:
|
||||||
|
import futures
|
||||||
|
import ./profiler/[events, metrics]
|
||||||
|
|
||||||
|
export futures, events, metrics
|
||||||
|
|
||||||
|
when not chronosFutureId:
|
||||||
|
{.error: "chronosProfiling requires chronosFutureId to be enabled".}
|
||||||
|
|
||||||
|
var futureMetrics {.threadvar.}: ProfilerMetrics
|
||||||
|
|
||||||
|
proc getMetrics*(): ProfilerMetrics =
|
||||||
|
## Returns metrics for the current event loop.
|
||||||
|
result = futureMetrics
|
||||||
|
|
||||||
|
proc enableEventCallbacks*(): void =
|
||||||
|
onFutureEvent = handleFutureEventCB
|
||||||
|
onFutureExecEvent = handleFutureExecEventCB
|
||||||
|
|
||||||
|
proc enableProfiling*() =
|
||||||
|
## Enables profiling on the current event loop.
|
||||||
|
if not isNil(handleFutureEvent): return
|
||||||
|
|
||||||
|
enableEventCallbacks()
|
||||||
|
handleFutureEvent = proc (e: Event) {.nimcall.} =
|
||||||
|
futureMetrics.processEvent(e)
|
||||||
62
chronos/profiler/events.nim
Normal file
62
chronos/profiler/events.nim
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
## This module defines the lower-level callback implementations that hook into
|
||||||
|
## the Chronos scheduler when profiling is enabled. The main goal is to provide
|
||||||
|
## timestamped events changes for futures while allowing a simpler implementation
|
||||||
|
## (only one event object type) for the remainder of the profiler.
|
||||||
|
|
||||||
|
import ".."/timer
|
||||||
|
import ".."/futures
|
||||||
|
import ".."/srcloc
|
||||||
|
|
||||||
|
type
|
||||||
|
ExtendedFutureState* {.pure.} = enum
|
||||||
|
Pending,
|
||||||
|
Running,
|
||||||
|
Paused,
|
||||||
|
Completed,
|
||||||
|
Cancelled,
|
||||||
|
Failed,
|
||||||
|
|
||||||
|
Event* = object
|
||||||
|
future: FutureBase
|
||||||
|
newState*: ExtendedFutureState
|
||||||
|
timestamp*: Moment
|
||||||
|
|
||||||
|
var handleFutureEvent* {.threadvar.}: proc (event: Event) {.nimcall, gcsafe, raises: [].}
|
||||||
|
|
||||||
|
proc `location`*(self: Event): SrcLoc =
|
||||||
|
self.future.internalLocation[Create][]
|
||||||
|
|
||||||
|
proc `futureId`*(self: Event): uint =
|
||||||
|
self.future.id
|
||||||
|
|
||||||
|
proc mkEvent(future: FutureBase, state: ExtendedFutureState): Event =
|
||||||
|
Event(
|
||||||
|
future: future,
|
||||||
|
newState: state,
|
||||||
|
timestamp: Moment.now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
proc handleFutureEventCB*(future: FutureBase,
|
||||||
|
state: FutureState): void {.nimcall.} =
|
||||||
|
{.cast(gcsafe).}:
|
||||||
|
let extendedState = case state:
|
||||||
|
of FutureState.Pending: ExtendedFutureState.Pending
|
||||||
|
of FutureState.Completed: ExtendedFutureState.Completed
|
||||||
|
of FutureState.Cancelled: ExtendedFutureState.Cancelled
|
||||||
|
of FutureState.Failed: ExtendedFutureState.Failed
|
||||||
|
|
||||||
|
if not isNil(handleFutureEvent):
|
||||||
|
handleFutureEvent(mkEvent(future, extendedState))
|
||||||
|
|
||||||
|
proc handleFutureExecEventCB*(future: FutureBase,
|
||||||
|
state: FutureExecutionState): void {.nimcall.} =
|
||||||
|
{.cast(gcsafe).}:
|
||||||
|
let extendedState = case state:
|
||||||
|
of FutureExecutionState.Running: ExtendedFutureState.Running
|
||||||
|
of FutureExecutionState.Paused: ExtendedFutureState.Paused
|
||||||
|
|
||||||
|
if not isNil(handleFutureEvent):
|
||||||
|
handleFutureEvent(mkEvent(future, extendedState))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
133
chronos/profiler/metrics.nim
Normal file
133
chronos/profiler/metrics.nim
Normal file
@ -0,0 +1,133 @@
|
|||||||
|
import std/tables
|
||||||
|
import std/options
|
||||||
|
|
||||||
|
import ./events
|
||||||
|
import ../[timer, srcloc]
|
||||||
|
|
||||||
|
export timer, tables, srcloc
|
||||||
|
|
||||||
|
type
|
||||||
|
AggregateFutureMetrics* = object
|
||||||
|
execTime*: Duration
|
||||||
|
execTimeMax*: Duration
|
||||||
|
childrenExecTime*: Duration
|
||||||
|
wallClockTime*: Duration
|
||||||
|
callCount*: uint
|
||||||
|
|
||||||
|
RunningFuture* = object
|
||||||
|
state*: ExtendedFutureState
|
||||||
|
created*: Moment
|
||||||
|
lastStarted*: Moment
|
||||||
|
timeToFirstPause*: Duration
|
||||||
|
partialExecTime*: Duration
|
||||||
|
partialChildrenExecTime*: Duration
|
||||||
|
partialChildrenExecOverlap*: Duration
|
||||||
|
parent*: Option[uint]
|
||||||
|
pauses*: uint
|
||||||
|
|
||||||
|
MetricsTotals* = Table[SrcLoc, AggregateFutureMetrics]
|
||||||
|
|
||||||
|
ProfilerMetrics* = object
|
||||||
|
callStack: seq[uint]
|
||||||
|
partials: Table[uint, RunningFuture]
|
||||||
|
totals*: MetricsTotals
|
||||||
|
|
||||||
|
proc `execTimeWithChildren`*(self: AggregateFutureMetrics): Duration =
|
||||||
|
self.execTime + self.childrenExecTime
|
||||||
|
|
||||||
|
proc push(self: var seq[uint], value: uint): void = self.add(value)
|
||||||
|
|
||||||
|
proc pop(self: var seq[uint]): uint =
|
||||||
|
let value = self[^1]
|
||||||
|
self.setLen(self.len - 1)
|
||||||
|
value
|
||||||
|
|
||||||
|
proc peek(self: var seq[uint]): Option[uint] =
|
||||||
|
if self.len == 0: none(uint) else: self[^1].some
|
||||||
|
|
||||||
|
proc `$`(location: SrcLoc): string =
|
||||||
|
$location.procedure & "[" & $location.file & ":" & $location.line & "]"
|
||||||
|
|
||||||
|
proc futureCreated(self: var ProfilerMetrics, event: Event): void =
|
||||||
|
assert not self.partials.hasKey(event.futureId), $event.location
|
||||||
|
|
||||||
|
self.partials[event.futureId] = RunningFuture(
|
||||||
|
created: event.timestamp,
|
||||||
|
state: Pending,
|
||||||
|
)
|
||||||
|
|
||||||
|
proc bindParent(self: var ProfilerMetrics, metrics: ptr RunningFuture): void =
|
||||||
|
let current = self.callStack.peek()
|
||||||
|
if current.isNone:
|
||||||
|
return
|
||||||
|
|
||||||
|
if metrics.parent.isSome:
|
||||||
|
assert metrics.parent.get == current.get
|
||||||
|
metrics.parent = current
|
||||||
|
|
||||||
|
proc futureRunning(self: var ProfilerMetrics, event: Event): void =
|
||||||
|
assert self.partials.hasKey(event.futureId), $event.location
|
||||||
|
|
||||||
|
self.partials.withValue(event.futureId, metrics):
|
||||||
|
assert metrics.state == Pending or metrics.state == Paused,
|
||||||
|
$event.location
|
||||||
|
|
||||||
|
self.bindParent(metrics)
|
||||||
|
self.callStack.push(event.futureId)
|
||||||
|
|
||||||
|
metrics.lastStarted = event.timestamp
|
||||||
|
metrics.state = Running
|
||||||
|
|
||||||
|
proc futurePaused(self: var ProfilerMetrics, event: Event): void =
|
||||||
|
assert self.partials.hasKey(event.futureId), $event.location
|
||||||
|
assert event.futureId == self.callStack.pop(), $event.location
|
||||||
|
|
||||||
|
self.partials.withValue(event.futureId, metrics):
|
||||||
|
assert metrics.state == Running, $event.location
|
||||||
|
|
||||||
|
let segmentExecTime = event.timestamp - metrics.lastStarted
|
||||||
|
|
||||||
|
if metrics.pauses == 0:
|
||||||
|
metrics.timeToFirstPause = segmentExecTime
|
||||||
|
metrics.partialExecTime += segmentExecTime
|
||||||
|
metrics.pauses += 1
|
||||||
|
metrics.state = Paused
|
||||||
|
|
||||||
|
proc futureCompleted(self: var ProfilerMetrics, event: Event): void =
|
||||||
|
self.partials.withValue(event.futureId, metrics):
|
||||||
|
if metrics.state == Running:
|
||||||
|
self.futurePaused(event)
|
||||||
|
|
||||||
|
let location = event.location
|
||||||
|
if not self.totals.hasKey(location):
|
||||||
|
self.totals[location] = AggregateFutureMetrics()
|
||||||
|
|
||||||
|
self.totals.withValue(location, aggMetrics):
|
||||||
|
let execTime = metrics.partialExecTime - metrics.partialChildrenExecOverlap
|
||||||
|
|
||||||
|
aggMetrics.callCount.inc()
|
||||||
|
aggMetrics.execTime += execTime
|
||||||
|
aggMetrics.execTimeMax = max(aggMetrics.execTimeMax, execTime)
|
||||||
|
aggMetrics.childrenExecTime += metrics.partialChildrenExecTime
|
||||||
|
aggMetrics.wallClockTime += event.timestamp - metrics.created
|
||||||
|
|
||||||
|
if metrics.parent.isSome:
|
||||||
|
self.partials.withValue(metrics.parent.get, parentMetrics):
|
||||||
|
parentMetrics.partialChildrenExecTime += metrics.partialExecTime
|
||||||
|
parentMetrics.partialChildrenExecOverlap += metrics.timeToFirstPause
|
||||||
|
|
||||||
|
self.partials.del(event.futureId)
|
||||||
|
|
||||||
|
proc processEvent*(self: var ProfilerMetrics, event: Event): void =
|
||||||
|
case event.newState:
|
||||||
|
of Pending: self.futureCreated(event)
|
||||||
|
of Running: self.futureRunning(event)
|
||||||
|
of Paused: self.futurePaused(event)
|
||||||
|
# Completion, failure and cancellation are currently handled the same way.
|
||||||
|
of Completed: self.futureCompleted(event)
|
||||||
|
of Failed: self.futureCompleted(event)
|
||||||
|
of Cancelled: self.futureCompleted(event)
|
||||||
|
|
||||||
|
proc processAllEvents*(self: var ProfilerMetrics, events: seq[Event]): void =
|
||||||
|
for event in events:
|
||||||
|
self.processEvent(event)
|
||||||
Loading…
x
Reference in New Issue
Block a user