From e45ef32b5b7b1ed49ffcc52f39f42b7c30fab564 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C8=98tefan=20Talpalaru?= Date: Thu, 6 Aug 2020 19:30:53 +0200 Subject: [PATCH] some metrics for monitoring futures (#85) --- chronos.nimble | 8 ++-- chronos/asyncfutures2.nim | 60 +++++++++++++++++++------- chronos/asyncloop.nim | 90 ++++++++++++++++++++++++++++++++++++++- 3 files changed, 136 insertions(+), 22 deletions(-) diff --git a/chronos.nimble b/chronos.nimble index 84c8aade..dbb12a70 100644 --- a/chronos.nimble +++ b/chronos.nimble @@ -13,10 +13,10 @@ requires "nim > 1.2.0", task test, "Run all tests": var commands = [ - "nim c -r -d:useSysAssert -d:useGcAssert tests/", - "nim c -r -d:chronosStackTrace tests/", - "nim c -r -d:release tests/", - "nim c -r -d:release -d:chronosFutureTracking tests/" + "nim c -r --skipUserCfg --skipParentCfg -d:useSysAssert -d:useGcAssert tests/", + "nim c -r --skipUserCfg --skipParentCfg -d:chronosStackTrace tests/", + "nim c -r --skipUserCfg --skipParentCfg -d:release tests/", + "nim c -r --skipUserCfg --skipParentCfg -d:release -d:chronosFutureTracking tests/" ] for testname in ["testall"]: for cmd in commands: diff --git a/chronos/asyncfutures2.nim b/chronos/asyncfutures2.nim index d24cc1b4..ad949437 100644 --- a/chronos/asyncfutures2.nim +++ b/chronos/asyncfutures2.nim @@ -8,8 +8,10 @@ # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import os, tables, strutils, heapqueue, options, deques, cstrutils -import srcloc +import os, tables, strutils, heapqueue, options, deques, cstrutils, sets, hashes +when defined(metrics): + import metrics, locks +import ./srcloc export srcloc const @@ -73,10 +75,40 @@ type var currentID* {.threadvar.}: int currentID = 0 +when defined(metrics): + declareCounter chronos_new_future, "new Future being created" + when defined(chronosFutureTracking): var futureList* {.threadvar.}: FutureList futureList = FutureList() +when defined(chronosFutureTracking): + proc registerPendingFuture(future: var FutureBase) = + future.next = nil + future.prev = futureList.tail + if not(isNil(futureList.tail)): + futureList.tail.next = future + futureList.tail = future + if isNil(futureList.head): + futureList.head = future + futureList.count.inc() + when defined(metrics): + chronos_new_future.inc() + {.gcsafe.}: + withLock(pendingFuturesTableLock): + pendingFuturesTable[$future.location[LocCreateIndex]] = pendingFuturesTable.getOrDefault($future.location[LocCreateIndex]) + 1 + + proc unregisterPendingFuture(future: var FutureBase) = + if future == futureList.tail: futureList.tail = future.prev + if future == futureList.head: futureList.head = future.next + if not(isNil(future.next)): future.next.prev = future.prev + if not(isNil(future.prev)): future.prev.next = future.next + futureList.count.dec() + when defined(metrics): + {.gcsafe.}: + withLock(pendingFuturesTableLock): + pendingFuturesTable[$future.location[LocCreateIndex]] = pendingFuturesTable.getOrDefault($future.location[LocCreateIndex]) - 1 + template setupFutureBase(loc: ptr SrcLoc) = new(result) result.state = FutureState.Pending @@ -87,14 +119,7 @@ template setupFutureBase(loc: ptr SrcLoc) = currentID.inc() when defined(chronosFutureTracking): - result.next = nil - result.prev = futureList.tail - if not(isNil(futureList.tail)): - futureList.tail.next = result - futureList.tail = result - if isNil(futureList.head): - futureList.head = result - futureList.count.inc() + registerPendingFuture(result.FutureBase) proc newFuture[T](loc: ptr SrcLoc): Future[T] = setupFutureBase(loc) @@ -165,12 +190,8 @@ when defined(chronosFutureTracking): proc futureDestructor(udata: pointer) {.gcsafe.} = ## This procedure will be called when Future[T] got finished, cancelled or ## failed and all Future[T].callbacks are already scheduled and processed. - let future = cast[FutureBase](udata) - if future == futureList.tail: futureList.tail = future.prev - if future == futureList.head: futureList.head = future.next - if not(isNil(future.next)): future.next.prev = future.prev - if not(isNil(future.prev)): future.prev.next = future.next - futureList.count.dec() + var future = cast[FutureBase](udata) + unregisterPendingFuture(future) proc scheduleDestructor(future: FutureBase) {.inline.} = callSoon(futureDestructor, cast[pointer](future)) @@ -337,6 +358,13 @@ proc addCallback*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) = ## Adds the callbacks proc to be called when the future completes. ## ## If future has already completed then ``cb`` will be called immediately. + + when defined(metrics): + {.gcsafe.}: + if future.location[0] != nil: + withLock(callbacksByFutureLock): + callbacksByFuture.inc($future.location[LocCreateIndex]) + doAssert(not isNil(cb)) if future.finished(): callSoon(cb, udata) diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index 84685ab6..405c302f 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -12,7 +12,9 @@ include "system/inclrtl" import os, tables, strutils, heapqueue, lists, options, nativesockets, net, deques -import timer +when defined(metrics): + import metrics, locks, algorithm, sequtils +import ./timer, ./srcloc export Port, SocketFlag export timer @@ -253,8 +255,69 @@ template processTimersGetTimeout(loop, timeout: untyped) = if len(loop.callbacks) != 0: timeout = 0 +when defined(metrics): + var + callbacksByFuture* = initCountTable[string]() + callbacksByFutureLock*: Lock + pendingFuturesTable* = initTable[string, int]() + pendingFuturesTableLock*: Lock + initLock(callbacksByFutureLock) + initLock(pendingFuturesTableLock) + + declareCounter chronos_loop_timers, "loop timers" + declareGauge chronos_loop_timers_queue, "loop timers queue" + declareCounter chronos_poll_ticks, "Chronos event loop ticks" + declareHistogram chronos_poll_duration_seconds, "Chronos event loop - duration of poll()", + buckets = [0.25, 0.5, 1, 2, 4, 8, Inf] + declareCounter chronos_poll_events, "Chronos poll events", ["event"] + declareCounter chronos_future_callbacks, "Future callbacks", ["location"] + declareCounter chronos_processed_callbacks, "total number of processed callbacks" + declareGauge chronos_pending_futures, "pending futures", ["location"] + + + proc processFutureMetrics() {.gcsafe.} = + # Wait until we have a decent amount of data and pick the most frequently + # seen futures. + const + ticksBetweenChecks = 50 + minimumCallbacksPerCheck = 1000 + maximumPicksPerCheck = 5 + + if chronos_poll_ticks.value.int64 mod ticksBetweenChecks == 0: + {.gcsafe.}: + withLock(callbacksByFutureLock): + var sum = 0 + for val in callbacksByFuture.values: + sum += val + if sum >= minimumCallbacksPerCheck: + callbacksByFuture.sort() + var i = 0 + for futureLocation, val in callbacksByFuture: + if i == maximumPicksPerCheck: + break + chronos_future_callbacks.inc(val.int64, labelValues = [futureLocation]) + i.inc() + + {.gcsafe.}: + # buggy compiler is buggy + callbacksByFuture.clear() + + {.gcsafe.}: + withLock(pendingFuturesTableLock): + const minimumPendingFutures = 10 + var i = 0 + for futureLocation in sorted(toSeq(pendingFuturesTable.keys()), + proc (x, y: string): int = cmp(pendingFuturesTable[x], pendingFuturesTable[y]), + SortOrder.Descending): + if i == maximumPicksPerCheck or pendingFuturesTable[futureLocation] < minimumPendingFutures: + break + chronos_pending_futures.set(pendingFuturesTable[futureLocation].int64, labelValues = [futureLocation]) + i.inc() + template processTimers(loop: untyped) = var curTime = Moment.now() + when defined(metrics): + chronos_loop_timers_queue.set(loop.timers.len.int64) while loop.timers.len > 0: if loop.timers[0].deleted: discard loop.timers.pop() @@ -263,6 +326,8 @@ template processTimers(loop: untyped) = if curTime < loop.timers[0].finishAt: break loop.callbacks.addLast(loop.timers.pop().function) + when defined(metrics): + chronos_loop_timers.inc() template processCallbacks(loop: untyped) = var count = len(loop.callbacks) @@ -276,6 +341,8 @@ template processCallbacks(loop: untyped) = let callable = loop.callbacks.popFirst() if not isNil(callable.function): callable.function(callable.udata) + when defined(metrics): + chronos_processed_callbacks.inc(count.int64) when defined(windows) or defined(nimdoc): type @@ -415,6 +482,9 @@ when defined(windows) or defined(nimdoc): var curTime = Moment.now() var curTimeout = DWORD(0) + when defined(metrics): + chronos_poll_ticks.inc() + # Moving expired timers to `loop.callbacks` and calculate timeout loop.processTimersGetTimeout(curTimeout) @@ -452,6 +522,10 @@ when defined(windows) or defined(nimdoc): # poll() call. loop.processCallbacks() + when defined(metrics): + processFutureMetrics() + chronos_poll_duration_seconds.observe((Moment.now() - curTime).milliseconds.float64 / 1000) + proc closeSocket*(fd: AsyncFD, aftercb: CallbackFunc = nil) = ## Closes a socket and ensures that it is unregistered. let loop = getGlobalDispatcher() @@ -692,6 +766,9 @@ elif unixPlatform: var curTime = Moment.now() var curTimeout = 0 + when defined(metrics): + chronos_poll_ticks.inc() + when ioselSupportedPlatform: let customSet = {Event.Timer, Event.Signal, Event.Process, Event.Vnode} @@ -704,6 +781,9 @@ elif unixPlatform: for i in 0..