From 5629b3c41f2a6c86cee53a5718b1e246c87ccd26 Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Mon, 6 Jul 2020 09:33:13 +0300 Subject: [PATCH] [WIP] Zero-cost unattended Future[T] tracking mechanism. (#106) * Zero-cost unattended Future[T] tracking mechanism with tests and tracking of test suite. --- chronos.nim | 4 +-- chronos.nimble | 19 +++++----- chronos/asyncfutures2.nim | 41 +++++++++++++++++++-- chronos/asyncloop.nim | 13 +++++++ chronos/debugutils.nim | 58 ++++++++++++++++++++++++++++++ tests/testall.nim | 2 +- tests/testutils.nim | 76 +++++++++++++++++++++++++++++++++++++++ 7 files changed, 198 insertions(+), 15 deletions(-) create mode 100644 chronos/debugutils.nim create mode 100644 tests/testutils.nim diff --git a/chronos.nim b/chronos.nim index 62db347..8295924 100644 --- a/chronos.nim +++ b/chronos.nim @@ -5,5 +5,5 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import chronos/[asyncloop, asyncsync, handles, transport, timer] -export asyncloop, asyncsync, handles, transport, timer +import chronos/[asyncloop, asyncsync, handles, transport, timer, debugutils] +export asyncloop, asyncsync, handles, transport, timer, debugutils diff --git a/chronos.nimble b/chronos.nimble index 621a243..e9a95c2 100644 --- a/chronos.nimble +++ b/chronos.nimble @@ -1,5 +1,5 @@ packageName = "chronos" -version = "2.4.1" +version = "2.4.2" author = "Status Research & Development GmbH" description = "Chronos" license = "Apache License 2.0 or MIT" @@ -12,13 +12,12 @@ requires "nim > 0.19.4", task test, "Run all tests": var commands = [ - "nim c -r -d:useSysAssert -d:useGcAssert tests/testall", - "nim c -r tests/testall", - "nim c -r -d:release tests/testall" + "nim c -r -d:useSysAssert -d:useGcAssert tests/", + "nim c -r tests/", + "nim c -r -d:release tests/" ] - echo "\n" & commands[0] - exec commands[0] - echo "\n" & commands[1] - exec commands[1] - echo "\n" & commands[2] - exec commands[2] + for testname in ["testall"]: + for cmd in commands: + let curcmd = cmd & testname + echo "\n" & curcmd + exec curcmd diff --git a/chronos/asyncfutures2.nim b/chronos/asyncfutures2.nim index ce003ec..e82c89d 100644 --- a/chronos/asyncfutures2.nim +++ b/chronos/asyncfutures2.nim @@ -13,8 +13,8 @@ import srcloc export srcloc const - LocCreateIndex = 0 - LocCompleteIndex = 1 + LocCreateIndex* = 0 + LocCompleteIndex* = 1 type # ZAH: This can probably be stored with a cheaper representation @@ -36,6 +36,8 @@ type stackTrace: StackTrace ## For debugging purposes only. mustCancel*: bool id*: int + next*: FutureBase + prev*: FutureBase # ZAH: we have discussed some possible optimizations where # the future can be stored within the caller's stack frame. @@ -61,8 +63,15 @@ type CancelledError* = object of FutureError + FutureList* = object + head*: FutureBase + tail*: FutureBase + count*: int + var currentID* {.threadvar.}: int +var futureList* {.threadvar.}: FutureList currentID = 0 +futureList = FutureList() template setupFutureBase(loc: ptr SrcLoc) = new(result) @@ -72,6 +81,15 @@ template setupFutureBase(loc: ptr SrcLoc) = result.location[LocCreateIndex] = loc currentID.inc() + result.next = nil + result.prev = futureList.tail + if not(isNil(futureList.tail)): + futureList.tail.next = result + futureList.tail = result + if isNil(futureList.head): + futureList.head = result + futureList.count.inc() + proc newFuture[T](loc: ptr SrcLoc): Future[T] = setupFutureBase(loc) @@ -137,6 +155,16 @@ proc failed*(future: FutureBase): bool {.inline.} = ## Determines whether ``future`` completed with an error. result = (future.state == FutureState.Failed) +proc futureDestructor(udata: pointer) {.gcsafe.} = + ## This procedure will be called when Future[T] got finished, cancelled or + ## failed and all Future[T].callbacks are already scheduled and processed. + let future = cast[FutureBase](udata) + if future == futureList.tail: futureList.tail = future.prev + if future == futureList.head: futureList.head = future.next + if not(isNil(future.next)): future.next.prev = future.prev + if not(isNil(future.prev)): future.prev.next = future.next + futureList.count.dec() + proc checkFinished(future: FutureBase, loc: ptr SrcLoc) = ## Checks whether `future` is finished. If it is then raises a ## ``FutureDefect``. @@ -170,6 +198,9 @@ proc call(callbacks: var Deque[AsyncCallback]) = callSoon(item.function, item.udata) dec(count) +proc scheduleDestructor(future: FutureBase) {.inline.} = + callSoon(futureDestructor, cast[pointer](future)) + proc add(callbacks: var Deque[AsyncCallback], item: AsyncCallback) = if len(callbacks) == 0: callbacks = initDeque[AsyncCallback]() @@ -187,6 +218,7 @@ proc complete[T](future: Future[T], val: T, loc: ptr SrcLoc) = future.value = val future.state = FutureState.Finished future.callbacks.call() + scheduleDestructor(FutureBase(future)) template complete*[T](future: Future[T], val: T) = ## Completes ``future`` with value ``val``. @@ -198,6 +230,7 @@ proc complete(future: Future[void], loc: ptr SrcLoc) = doAssert(isNil(future.error)) future.state = FutureState.Finished future.callbacks.call() + scheduleDestructor(FutureBase(future)) template complete*(future: Future[void]) = ## Completes a void ``future``. @@ -210,6 +243,7 @@ proc complete[T](future: FutureVar[T], loc: ptr SrcLoc) = doAssert(isNil(fut.error)) fut.state = FutureState.Finished fut.callbacks.call() + scheduleDestructor(FutureBase(future)) template complete*[T](futvar: FutureVar[T]) = ## Completes a ``FutureVar``. @@ -223,6 +257,7 @@ proc complete[T](futvar: FutureVar[T], val: T, loc: ptr SrcLoc) = fut.state = FutureState.Finished fut.value = val fut.callbacks.call() + scheduleDestructor(FutureBase(fut)) template complete*[T](futvar: FutureVar[T], val: T) = ## Completes a ``FutureVar`` with value ``val``. @@ -238,6 +273,7 @@ proc fail[T](future: Future[T], error: ref Exception, loc: ptr SrcLoc) = future.errorStackTrace = if getStackTrace(error) == "": getStackTrace() else: getStackTrace(error) future.callbacks.call() + scheduleDestructor(FutureBase(future)) template fail*[T](future: Future[T], error: ref Exception) = ## Completes ``future`` with ``error``. @@ -250,6 +286,7 @@ proc cancelAndSchedule(future: FutureBase, loc: ptr SrcLoc) = future.error = newException(CancelledError, "") future.errorStackTrace = getStackTrace() future.callbacks.call() + scheduleDestructor(future) template cancelAndSchedule*[T](future: Future[T]) = cancelAndSchedule(FutureBase(future), getSrcLocation()) diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index 96dfee8..a21680a 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -959,5 +959,18 @@ proc getTracker*(id: string): TrackerBase = let loop = getGlobalDispatcher() result = loop.trackers.getOrDefault(id, nil) +iterator pendingFutures*(): FutureBase = + ## Iterates over the list of pending Futures (Future[T] objects which not yet + ## completed, cancelled or failed). + var slider = futureList.head + while not(isNil(slider)): + yield slider + slider = slider.next + +proc pendingFuturesCount*(): int = + ## Returns number of pending Futures (Future[T] objects which not yet + ## completed, cancelled or failed). + futureList.count + # Perform global per-module initialization. globalInit() diff --git a/chronos/debugutils.nim b/chronos/debugutils.nim new file mode 100644 index 0000000..ceaa2c4 --- /dev/null +++ b/chronos/debugutils.nim @@ -0,0 +1,58 @@ +# +# Chronos Debugging Utilities +# +# (c) Copyright 2020-Present Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) +import asyncloop + +const + AllFutureStates* = {FutureState.Pending, FutureState.Cancelled, + FutureState.Finished, FutureState.Failed} + WithoutFinished* = {FutureState.Pending, FutureState.Cancelled, + FutureState.Failed} + OnlyPending* = {FutureState.Pending} + OnlyFinished* = {FutureState.Finished} + +proc dumpPendingFutures*(filter = AllFutureStates): string = + ## Dump all `pending` Future[T] objects. + ## + ## This list will contain: + ## 1. Future[T] objects with ``FutureState.Pending`` state (this Futures are + ## not yet finished). + ## 2. Future[T] objects with ``FutureState.Finished/Cancelled/Failed`` state + ## which callbacks are scheduled, but not yet fully processed. + var count = 0 + var res = "" + for item in pendingFutures(): + if item.state in filter: + inc(count) + let loc = item.location[LocCreateIndex][] + let procedure = $loc.procedure + let filename = $loc.file + let procname = if len(procedure) == 0: + "\"unspecified\"" + else: + "\"" & procedure & "\"" + let item = "Future[" & $item.id & "] with name " & $procname & + " created at " & "<" & filename & ":" & $loc.line & ">" & + " and state = " & $item.state & "\n" + res.add(item) + result = $count & " pending Future[T] objects found:\n" & $res + +proc pendingFuturesCount*(filter: set[FutureState]): int = + ## Returns number of `pending` Future[T] objects which satisfy the ``filter`` + ## condition. + ## + ## If ``filter`` is equal to ``AllFutureStates`` Operation's complexity is + ## O(1), otherwise operation's complexity is O(n). + if filter == AllFutureStates: + pendingFuturesCount() + else: + var res = 0 + for item in pendingFutures(): + if item.state in filter: + inc(res) + res diff --git a/tests/testall.nim b/tests/testall.nim index cc3bb9e..f56cbc7 100644 --- a/tests/testall.nim +++ b/tests/testall.nim @@ -7,4 +7,4 @@ # MIT license (LICENSE-MIT) import testmacro, testsync, testsoon, testtime, testfut, testsignal, testaddress, testdatagram, teststream, testserver, testbugs, testnet, - testasyncstream + testasyncstream, testutils diff --git a/tests/testutils.nim b/tests/testutils.nim new file mode 100644 index 0000000..ca76dfe --- /dev/null +++ b/tests/testutils.nim @@ -0,0 +1,76 @@ +# Chronos Test Suite +# (c) Copyright 2020-Present +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) +import unittest, strutils +import ../chronos + +when defined(nimHasUsed): {.used.} + +suite "Asynchronous utilities test suite": + proc getCount(): int = + # This procedure counts number of Future[T] in double-linked list via list + # iteration. + result = 0 + for item in pendingFutures(): + inc(result) + + test "Future clean and leaks test": + if pendingFuturesCount(WithoutFinished) == 0: + if pendingFuturesCount(OnlyFinished) > 0: + poll() + check pendingFuturesCount() == 0 + else: + echo dumpPendingFutures() + check false + + test "FutureList basics test": + var fut1 = newFuture[void]() + check: + getCount() == 1 + pendingFuturesCount() == 1 + var fut2 = newFuture[void]() + check: + getCount() == 2 + pendingFuturesCount() == 2 + var fut3 = newFuture[void]() + check: + getCount() == 3 + pendingFuturesCount() == 3 + fut1.complete() + poll() + check: + getCount() == 2 + pendingFuturesCount() == 2 + fut2.fail(newException(ValueError, "")) + poll() + check: + getCount() == 1 + pendingFuturesCount() == 1 + fut3.cancel() + poll() + check: + getCount() == 0 + pendingFuturesCount() == 0 + + test "FutureList async procedure test": + proc simpleProc() {.async.} = + await sleepAsync(10.milliseconds) + + var fut = simpleProc() + check: + getCount() == 2 + pendingFuturesCount() == 2 + + waitFor fut + check: + getCount() == 1 + pendingFuturesCount() == 1 + + poll() + check: + getCount() == 0 + pendingFuturesCount() == 0