some metrics for monitoring futures (#85)

This commit is contained in:
Ștefan Talpalaru 2020-08-06 19:30:53 +02:00 committed by GitHub
parent 0d4d0002b0
commit e45ef32b5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 136 additions and 22 deletions

View File

@ -13,10 +13,10 @@ requires "nim > 1.2.0",
task test, "Run all tests":
var commands = [
"nim c -r -d:useSysAssert -d:useGcAssert tests/",
"nim c -r -d:chronosStackTrace tests/",
"nim c -r -d:release tests/",
"nim c -r -d:release -d:chronosFutureTracking tests/"
"nim c -r --skipUserCfg --skipParentCfg -d:useSysAssert -d:useGcAssert tests/",
"nim c -r --skipUserCfg --skipParentCfg -d:chronosStackTrace tests/",
"nim c -r --skipUserCfg --skipParentCfg -d:release tests/",
"nim c -r --skipUserCfg --skipParentCfg -d:release -d:chronosFutureTracking tests/"
]
for testname in ["testall"]:
for cmd in commands:

View File

@ -8,8 +8,10 @@
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import os, tables, strutils, heapqueue, options, deques, cstrutils
import srcloc
import os, tables, strutils, heapqueue, options, deques, cstrutils, sets, hashes
when defined(metrics):
import metrics, locks
import ./srcloc
export srcloc
const
@ -73,10 +75,40 @@ type
var currentID* {.threadvar.}: int
currentID = 0
when defined(metrics):
declareCounter chronos_new_future, "new Future being created"
when defined(chronosFutureTracking):
var futureList* {.threadvar.}: FutureList
futureList = FutureList()
when defined(chronosFutureTracking):
proc registerPendingFuture(future: var FutureBase) =
future.next = nil
future.prev = futureList.tail
if not(isNil(futureList.tail)):
futureList.tail.next = future
futureList.tail = future
if isNil(futureList.head):
futureList.head = future
futureList.count.inc()
when defined(metrics):
chronos_new_future.inc()
{.gcsafe.}:
withLock(pendingFuturesTableLock):
pendingFuturesTable[$future.location[LocCreateIndex]] = pendingFuturesTable.getOrDefault($future.location[LocCreateIndex]) + 1
proc unregisterPendingFuture(future: var FutureBase) =
if future == futureList.tail: futureList.tail = future.prev
if future == futureList.head: futureList.head = future.next
if not(isNil(future.next)): future.next.prev = future.prev
if not(isNil(future.prev)): future.prev.next = future.next
futureList.count.dec()
when defined(metrics):
{.gcsafe.}:
withLock(pendingFuturesTableLock):
pendingFuturesTable[$future.location[LocCreateIndex]] = pendingFuturesTable.getOrDefault($future.location[LocCreateIndex]) - 1
template setupFutureBase(loc: ptr SrcLoc) =
new(result)
result.state = FutureState.Pending
@ -87,14 +119,7 @@ template setupFutureBase(loc: ptr SrcLoc) =
currentID.inc()
when defined(chronosFutureTracking):
result.next = nil
result.prev = futureList.tail
if not(isNil(futureList.tail)):
futureList.tail.next = result
futureList.tail = result
if isNil(futureList.head):
futureList.head = result
futureList.count.inc()
registerPendingFuture(result.FutureBase)
proc newFuture[T](loc: ptr SrcLoc): Future[T] =
setupFutureBase(loc)
@ -165,12 +190,8 @@ when defined(chronosFutureTracking):
proc futureDestructor(udata: pointer) {.gcsafe.} =
## This procedure will be called when Future[T] got finished, cancelled or
## failed and all Future[T].callbacks are already scheduled and processed.
let future = cast[FutureBase](udata)
if future == futureList.tail: futureList.tail = future.prev
if future == futureList.head: futureList.head = future.next
if not(isNil(future.next)): future.next.prev = future.prev
if not(isNil(future.prev)): future.prev.next = future.next
futureList.count.dec()
var future = cast[FutureBase](udata)
unregisterPendingFuture(future)
proc scheduleDestructor(future: FutureBase) {.inline.} =
callSoon(futureDestructor, cast[pointer](future))
@ -337,6 +358,13 @@ proc addCallback*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) =
## Adds the callbacks proc to be called when the future completes.
##
## If future has already completed then ``cb`` will be called immediately.
when defined(metrics):
{.gcsafe.}:
if future.location[0] != nil:
withLock(callbacksByFutureLock):
callbacksByFuture.inc($future.location[LocCreateIndex])
doAssert(not isNil(cb))
if future.finished():
callSoon(cb, udata)

View File

@ -12,7 +12,9 @@ include "system/inclrtl"
import os, tables, strutils, heapqueue, lists, options, nativesockets, net,
deques
import timer
when defined(metrics):
import metrics, locks, algorithm, sequtils
import ./timer, ./srcloc
export Port, SocketFlag
export timer
@ -253,8 +255,69 @@ template processTimersGetTimeout(loop, timeout: untyped) =
if len(loop.callbacks) != 0:
timeout = 0
when defined(metrics):
var
callbacksByFuture* = initCountTable[string]()
callbacksByFutureLock*: Lock
pendingFuturesTable* = initTable[string, int]()
pendingFuturesTableLock*: Lock
initLock(callbacksByFutureLock)
initLock(pendingFuturesTableLock)
declareCounter chronos_loop_timers, "loop timers"
declareGauge chronos_loop_timers_queue, "loop timers queue"
declareCounter chronos_poll_ticks, "Chronos event loop ticks"
declareHistogram chronos_poll_duration_seconds, "Chronos event loop - duration of poll()",
buckets = [0.25, 0.5, 1, 2, 4, 8, Inf]
declareCounter chronos_poll_events, "Chronos poll events", ["event"]
declareCounter chronos_future_callbacks, "Future callbacks", ["location"]
declareCounter chronos_processed_callbacks, "total number of processed callbacks"
declareGauge chronos_pending_futures, "pending futures", ["location"]
proc processFutureMetrics() {.gcsafe.} =
# Wait until we have a decent amount of data and pick the most frequently
# seen futures.
const
ticksBetweenChecks = 50
minimumCallbacksPerCheck = 1000
maximumPicksPerCheck = 5
if chronos_poll_ticks.value.int64 mod ticksBetweenChecks == 0:
{.gcsafe.}:
withLock(callbacksByFutureLock):
var sum = 0
for val in callbacksByFuture.values:
sum += val
if sum >= minimumCallbacksPerCheck:
callbacksByFuture.sort()
var i = 0
for futureLocation, val in callbacksByFuture:
if i == maximumPicksPerCheck:
break
chronos_future_callbacks.inc(val.int64, labelValues = [futureLocation])
i.inc()
{.gcsafe.}:
# buggy compiler is buggy
callbacksByFuture.clear()
{.gcsafe.}:
withLock(pendingFuturesTableLock):
const minimumPendingFutures = 10
var i = 0
for futureLocation in sorted(toSeq(pendingFuturesTable.keys()),
proc (x, y: string): int = cmp(pendingFuturesTable[x], pendingFuturesTable[y]),
SortOrder.Descending):
if i == maximumPicksPerCheck or pendingFuturesTable[futureLocation] < minimumPendingFutures:
break
chronos_pending_futures.set(pendingFuturesTable[futureLocation].int64, labelValues = [futureLocation])
i.inc()
template processTimers(loop: untyped) =
var curTime = Moment.now()
when defined(metrics):
chronos_loop_timers_queue.set(loop.timers.len.int64)
while loop.timers.len > 0:
if loop.timers[0].deleted:
discard loop.timers.pop()
@ -263,6 +326,8 @@ template processTimers(loop: untyped) =
if curTime < loop.timers[0].finishAt:
break
loop.callbacks.addLast(loop.timers.pop().function)
when defined(metrics):
chronos_loop_timers.inc()
template processCallbacks(loop: untyped) =
var count = len(loop.callbacks)
@ -276,6 +341,8 @@ template processCallbacks(loop: untyped) =
let callable = loop.callbacks.popFirst()
if not isNil(callable.function):
callable.function(callable.udata)
when defined(metrics):
chronos_processed_callbacks.inc(count.int64)
when defined(windows) or defined(nimdoc):
type
@ -415,6 +482,9 @@ when defined(windows) or defined(nimdoc):
var curTime = Moment.now()
var curTimeout = DWORD(0)
when defined(metrics):
chronos_poll_ticks.inc()
# Moving expired timers to `loop.callbacks` and calculate timeout
loop.processTimersGetTimeout(curTimeout)
@ -452,6 +522,10 @@ when defined(windows) or defined(nimdoc):
# poll() call.
loop.processCallbacks()
when defined(metrics):
processFutureMetrics()
chronos_poll_duration_seconds.observe((Moment.now() - curTime).milliseconds.float64 / 1000)
proc closeSocket*(fd: AsyncFD, aftercb: CallbackFunc = nil) =
## Closes a socket and ensures that it is unregistered.
let loop = getGlobalDispatcher()
@ -692,6 +766,9 @@ elif unixPlatform:
var curTime = Moment.now()
var curTimeout = 0
when defined(metrics):
chronos_poll_ticks.inc()
when ioselSupportedPlatform:
let customSet = {Event.Timer, Event.Signal, Event.Process,
Event.Vnode}
@ -704,6 +781,9 @@ elif unixPlatform:
for i in 0..<count:
let fd = loop.keys[i].fd
let events = loop.keys[i].events
when defined(metrics):
for event in events:
chronos_poll_events.inc(labelValues = [$event])
withData(loop.selector, fd, adata) do:
if Event.Read in events or events == {Event.Error}:
@ -730,6 +810,10 @@ elif unixPlatform:
# poll() call.
loop.processCallbacks()
when defined(metrics):
processFutureMetrics()
chronos_poll_duration_seconds.observe((Moment.now() - curTime).milliseconds.float64 / 1000)
else:
proc initAPI() = discard
proc globalInit() = discard
@ -787,7 +871,9 @@ include asyncfutures2
proc sleepAsync*(duration: Duration): Future[void] =
## Suspends the execution of the current async procedure for the next
## ``duration`` time.
var retFuture = newFuture[void]("chronos.sleepAsync(Duration)")
# It won't compile with a string argument.
var retFuture = newFuture[void](getSrcLocation("chronos.sleepAsync(chronos.timer.Duration)"))
let moment = Moment.fromNow(duration)
var timer: TimerCallback