From e6d50b773675818df8a1a2c7d3f87fa3b6592bd3 Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Sun, 16 Aug 2020 01:45:41 +0300 Subject: [PATCH] Revert "some metrics for monitoring futures (#85)" This reverts commit e45ef32b5b7b1ed49ffcc52f39f42b7c30fab564. Metrics implemented this way, with a lock inside the otherwise tight event loop are not consistent with the chronos architecture that for good or bad uses thread local variables to avoid them - the solution does not have rough consensus behind it, and other avenues should be explored for this generally useful functionality. --- chronos.nimble | 8 ++-- chronos/asyncfutures2.nim | 60 +++++++------------------- chronos/asyncloop.nim | 90 +-------------------------------------- 3 files changed, 22 insertions(+), 136 deletions(-) diff --git a/chronos.nimble b/chronos.nimble index dbb12a7..84c8aad 100644 --- a/chronos.nimble +++ b/chronos.nimble @@ -13,10 +13,10 @@ requires "nim > 1.2.0", task test, "Run all tests": var commands = [ - "nim c -r --skipUserCfg --skipParentCfg -d:useSysAssert -d:useGcAssert tests/", - "nim c -r --skipUserCfg --skipParentCfg -d:chronosStackTrace tests/", - "nim c -r --skipUserCfg --skipParentCfg -d:release tests/", - "nim c -r --skipUserCfg --skipParentCfg -d:release -d:chronosFutureTracking tests/" + "nim c -r -d:useSysAssert -d:useGcAssert tests/", + "nim c -r -d:chronosStackTrace tests/", + "nim c -r -d:release tests/", + "nim c -r -d:release -d:chronosFutureTracking tests/" ] for testname in ["testall"]: for cmd in commands: diff --git a/chronos/asyncfutures2.nim b/chronos/asyncfutures2.nim index ad94943..d24cc1b 100644 --- a/chronos/asyncfutures2.nim +++ b/chronos/asyncfutures2.nim @@ -8,10 +8,8 @@ # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import os, tables, strutils, heapqueue, options, deques, cstrutils, sets, hashes -when defined(metrics): - import metrics, locks -import ./srcloc +import os, tables, strutils, heapqueue, options, deques, cstrutils +import srcloc export srcloc const @@ -75,40 +73,10 @@ type var currentID* {.threadvar.}: int currentID = 0 -when defined(metrics): - declareCounter chronos_new_future, "new Future being created" - when defined(chronosFutureTracking): var futureList* {.threadvar.}: FutureList futureList = FutureList() -when defined(chronosFutureTracking): - proc registerPendingFuture(future: var FutureBase) = - future.next = nil - future.prev = futureList.tail - if not(isNil(futureList.tail)): - futureList.tail.next = future - futureList.tail = future - if isNil(futureList.head): - futureList.head = future - futureList.count.inc() - when defined(metrics): - chronos_new_future.inc() - {.gcsafe.}: - withLock(pendingFuturesTableLock): - pendingFuturesTable[$future.location[LocCreateIndex]] = pendingFuturesTable.getOrDefault($future.location[LocCreateIndex]) + 1 - - proc unregisterPendingFuture(future: var FutureBase) = - if future == futureList.tail: futureList.tail = future.prev - if future == futureList.head: futureList.head = future.next - if not(isNil(future.next)): future.next.prev = future.prev - if not(isNil(future.prev)): future.prev.next = future.next - futureList.count.dec() - when defined(metrics): - {.gcsafe.}: - withLock(pendingFuturesTableLock): - pendingFuturesTable[$future.location[LocCreateIndex]] = pendingFuturesTable.getOrDefault($future.location[LocCreateIndex]) - 1 - template setupFutureBase(loc: ptr SrcLoc) = new(result) result.state = FutureState.Pending @@ -119,7 +87,14 @@ template setupFutureBase(loc: ptr SrcLoc) = currentID.inc() when defined(chronosFutureTracking): - registerPendingFuture(result.FutureBase) + result.next = nil + result.prev = futureList.tail + if not(isNil(futureList.tail)): + futureList.tail.next = result + futureList.tail = result + if isNil(futureList.head): + futureList.head = result + futureList.count.inc() proc newFuture[T](loc: ptr SrcLoc): Future[T] = setupFutureBase(loc) @@ -190,8 +165,12 @@ when defined(chronosFutureTracking): proc futureDestructor(udata: pointer) {.gcsafe.} = ## This procedure will be called when Future[T] got finished, cancelled or ## failed and all Future[T].callbacks are already scheduled and processed. - var future = cast[FutureBase](udata) - unregisterPendingFuture(future) + let future = cast[FutureBase](udata) + if future == futureList.tail: futureList.tail = future.prev + if future == futureList.head: futureList.head = future.next + if not(isNil(future.next)): future.next.prev = future.prev + if not(isNil(future.prev)): future.prev.next = future.next + futureList.count.dec() proc scheduleDestructor(future: FutureBase) {.inline.} = callSoon(futureDestructor, cast[pointer](future)) @@ -358,13 +337,6 @@ proc addCallback*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) = ## Adds the callbacks proc to be called when the future completes. ## ## If future has already completed then ``cb`` will be called immediately. - - when defined(metrics): - {.gcsafe.}: - if future.location[0] != nil: - withLock(callbacksByFutureLock): - callbacksByFuture.inc($future.location[LocCreateIndex]) - doAssert(not isNil(cb)) if future.finished(): callSoon(cb, udata) diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index 405c302..84685ab 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -12,9 +12,7 @@ include "system/inclrtl" import os, tables, strutils, heapqueue, lists, options, nativesockets, net, deques -when defined(metrics): - import metrics, locks, algorithm, sequtils -import ./timer, ./srcloc +import timer export Port, SocketFlag export timer @@ -255,69 +253,8 @@ template processTimersGetTimeout(loop, timeout: untyped) = if len(loop.callbacks) != 0: timeout = 0 -when defined(metrics): - var - callbacksByFuture* = initCountTable[string]() - callbacksByFutureLock*: Lock - pendingFuturesTable* = initTable[string, int]() - pendingFuturesTableLock*: Lock - initLock(callbacksByFutureLock) - initLock(pendingFuturesTableLock) - - declareCounter chronos_loop_timers, "loop timers" - declareGauge chronos_loop_timers_queue, "loop timers queue" - declareCounter chronos_poll_ticks, "Chronos event loop ticks" - declareHistogram chronos_poll_duration_seconds, "Chronos event loop - duration of poll()", - buckets = [0.25, 0.5, 1, 2, 4, 8, Inf] - declareCounter chronos_poll_events, "Chronos poll events", ["event"] - declareCounter chronos_future_callbacks, "Future callbacks", ["location"] - declareCounter chronos_processed_callbacks, "total number of processed callbacks" - declareGauge chronos_pending_futures, "pending futures", ["location"] - - - proc processFutureMetrics() {.gcsafe.} = - # Wait until we have a decent amount of data and pick the most frequently - # seen futures. - const - ticksBetweenChecks = 50 - minimumCallbacksPerCheck = 1000 - maximumPicksPerCheck = 5 - - if chronos_poll_ticks.value.int64 mod ticksBetweenChecks == 0: - {.gcsafe.}: - withLock(callbacksByFutureLock): - var sum = 0 - for val in callbacksByFuture.values: - sum += val - if sum >= minimumCallbacksPerCheck: - callbacksByFuture.sort() - var i = 0 - for futureLocation, val in callbacksByFuture: - if i == maximumPicksPerCheck: - break - chronos_future_callbacks.inc(val.int64, labelValues = [futureLocation]) - i.inc() - - {.gcsafe.}: - # buggy compiler is buggy - callbacksByFuture.clear() - - {.gcsafe.}: - withLock(pendingFuturesTableLock): - const minimumPendingFutures = 10 - var i = 0 - for futureLocation in sorted(toSeq(pendingFuturesTable.keys()), - proc (x, y: string): int = cmp(pendingFuturesTable[x], pendingFuturesTable[y]), - SortOrder.Descending): - if i == maximumPicksPerCheck or pendingFuturesTable[futureLocation] < minimumPendingFutures: - break - chronos_pending_futures.set(pendingFuturesTable[futureLocation].int64, labelValues = [futureLocation]) - i.inc() - template processTimers(loop: untyped) = var curTime = Moment.now() - when defined(metrics): - chronos_loop_timers_queue.set(loop.timers.len.int64) while loop.timers.len > 0: if loop.timers[0].deleted: discard loop.timers.pop() @@ -326,8 +263,6 @@ template processTimers(loop: untyped) = if curTime < loop.timers[0].finishAt: break loop.callbacks.addLast(loop.timers.pop().function) - when defined(metrics): - chronos_loop_timers.inc() template processCallbacks(loop: untyped) = var count = len(loop.callbacks) @@ -341,8 +276,6 @@ template processCallbacks(loop: untyped) = let callable = loop.callbacks.popFirst() if not isNil(callable.function): callable.function(callable.udata) - when defined(metrics): - chronos_processed_callbacks.inc(count.int64) when defined(windows) or defined(nimdoc): type @@ -482,9 +415,6 @@ when defined(windows) or defined(nimdoc): var curTime = Moment.now() var curTimeout = DWORD(0) - when defined(metrics): - chronos_poll_ticks.inc() - # Moving expired timers to `loop.callbacks` and calculate timeout loop.processTimersGetTimeout(curTimeout) @@ -522,10 +452,6 @@ when defined(windows) or defined(nimdoc): # poll() call. loop.processCallbacks() - when defined(metrics): - processFutureMetrics() - chronos_poll_duration_seconds.observe((Moment.now() - curTime).milliseconds.float64 / 1000) - proc closeSocket*(fd: AsyncFD, aftercb: CallbackFunc = nil) = ## Closes a socket and ensures that it is unregistered. let loop = getGlobalDispatcher() @@ -766,9 +692,6 @@ elif unixPlatform: var curTime = Moment.now() var curTimeout = 0 - when defined(metrics): - chronos_poll_ticks.inc() - when ioselSupportedPlatform: let customSet = {Event.Timer, Event.Signal, Event.Process, Event.Vnode} @@ -781,9 +704,6 @@ elif unixPlatform: for i in 0..