[WIP] Zero-cost unattended Future[T] tracking mechanism. (#106)
* Zero-cost unattended Future[T] tracking mechanism with tests and tracking of test suite.
This commit is contained in:
parent
16ed169f25
commit
5629b3c41f
|
@ -5,5 +5,5 @@
|
|||
# Licensed under either of
|
||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||
# MIT license (LICENSE-MIT)
|
||||
import chronos/[asyncloop, asyncsync, handles, transport, timer]
|
||||
export asyncloop, asyncsync, handles, transport, timer
|
||||
import chronos/[asyncloop, asyncsync, handles, transport, timer, debugutils]
|
||||
export asyncloop, asyncsync, handles, transport, timer, debugutils
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
packageName = "chronos"
|
||||
version = "2.4.1"
|
||||
version = "2.4.2"
|
||||
author = "Status Research & Development GmbH"
|
||||
description = "Chronos"
|
||||
license = "Apache License 2.0 or MIT"
|
||||
|
@ -12,13 +12,12 @@ requires "nim > 0.19.4",
|
|||
|
||||
task test, "Run all tests":
|
||||
var commands = [
|
||||
"nim c -r -d:useSysAssert -d:useGcAssert tests/testall",
|
||||
"nim c -r tests/testall",
|
||||
"nim c -r -d:release tests/testall"
|
||||
"nim c -r -d:useSysAssert -d:useGcAssert tests/",
|
||||
"nim c -r tests/",
|
||||
"nim c -r -d:release tests/"
|
||||
]
|
||||
echo "\n" & commands[0]
|
||||
exec commands[0]
|
||||
echo "\n" & commands[1]
|
||||
exec commands[1]
|
||||
echo "\n" & commands[2]
|
||||
exec commands[2]
|
||||
for testname in ["testall"]:
|
||||
for cmd in commands:
|
||||
let curcmd = cmd & testname
|
||||
echo "\n" & curcmd
|
||||
exec curcmd
|
||||
|
|
|
@ -13,8 +13,8 @@ import srcloc
|
|||
export srcloc
|
||||
|
||||
const
|
||||
LocCreateIndex = 0
|
||||
LocCompleteIndex = 1
|
||||
LocCreateIndex* = 0
|
||||
LocCompleteIndex* = 1
|
||||
|
||||
type
|
||||
# ZAH: This can probably be stored with a cheaper representation
|
||||
|
@ -36,6 +36,8 @@ type
|
|||
stackTrace: StackTrace ## For debugging purposes only.
|
||||
mustCancel*: bool
|
||||
id*: int
|
||||
next*: FutureBase
|
||||
prev*: FutureBase
|
||||
|
||||
# ZAH: we have discussed some possible optimizations where
|
||||
# the future can be stored within the caller's stack frame.
|
||||
|
@ -61,8 +63,15 @@ type
|
|||
|
||||
CancelledError* = object of FutureError
|
||||
|
||||
FutureList* = object
|
||||
head*: FutureBase
|
||||
tail*: FutureBase
|
||||
count*: int
|
||||
|
||||
var currentID* {.threadvar.}: int
|
||||
var futureList* {.threadvar.}: FutureList
|
||||
currentID = 0
|
||||
futureList = FutureList()
|
||||
|
||||
template setupFutureBase(loc: ptr SrcLoc) =
|
||||
new(result)
|
||||
|
@ -72,6 +81,15 @@ template setupFutureBase(loc: ptr SrcLoc) =
|
|||
result.location[LocCreateIndex] = loc
|
||||
currentID.inc()
|
||||
|
||||
result.next = nil
|
||||
result.prev = futureList.tail
|
||||
if not(isNil(futureList.tail)):
|
||||
futureList.tail.next = result
|
||||
futureList.tail = result
|
||||
if isNil(futureList.head):
|
||||
futureList.head = result
|
||||
futureList.count.inc()
|
||||
|
||||
proc newFuture[T](loc: ptr SrcLoc): Future[T] =
|
||||
setupFutureBase(loc)
|
||||
|
||||
|
@ -137,6 +155,16 @@ proc failed*(future: FutureBase): bool {.inline.} =
|
|||
## Determines whether ``future`` completed with an error.
|
||||
result = (future.state == FutureState.Failed)
|
||||
|
||||
proc futureDestructor(udata: pointer) {.gcsafe.} =
|
||||
## This procedure will be called when Future[T] got finished, cancelled or
|
||||
## failed and all Future[T].callbacks are already scheduled and processed.
|
||||
let future = cast[FutureBase](udata)
|
||||
if future == futureList.tail: futureList.tail = future.prev
|
||||
if future == futureList.head: futureList.head = future.next
|
||||
if not(isNil(future.next)): future.next.prev = future.prev
|
||||
if not(isNil(future.prev)): future.prev.next = future.next
|
||||
futureList.count.dec()
|
||||
|
||||
proc checkFinished(future: FutureBase, loc: ptr SrcLoc) =
|
||||
## Checks whether `future` is finished. If it is then raises a
|
||||
## ``FutureDefect``.
|
||||
|
@ -170,6 +198,9 @@ proc call(callbacks: var Deque[AsyncCallback]) =
|
|||
callSoon(item.function, item.udata)
|
||||
dec(count)
|
||||
|
||||
proc scheduleDestructor(future: FutureBase) {.inline.} =
|
||||
callSoon(futureDestructor, cast[pointer](future))
|
||||
|
||||
proc add(callbacks: var Deque[AsyncCallback], item: AsyncCallback) =
|
||||
if len(callbacks) == 0:
|
||||
callbacks = initDeque[AsyncCallback]()
|
||||
|
@ -187,6 +218,7 @@ proc complete[T](future: Future[T], val: T, loc: ptr SrcLoc) =
|
|||
future.value = val
|
||||
future.state = FutureState.Finished
|
||||
future.callbacks.call()
|
||||
scheduleDestructor(FutureBase(future))
|
||||
|
||||
template complete*[T](future: Future[T], val: T) =
|
||||
## Completes ``future`` with value ``val``.
|
||||
|
@ -198,6 +230,7 @@ proc complete(future: Future[void], loc: ptr SrcLoc) =
|
|||
doAssert(isNil(future.error))
|
||||
future.state = FutureState.Finished
|
||||
future.callbacks.call()
|
||||
scheduleDestructor(FutureBase(future))
|
||||
|
||||
template complete*(future: Future[void]) =
|
||||
## Completes a void ``future``.
|
||||
|
@ -210,6 +243,7 @@ proc complete[T](future: FutureVar[T], loc: ptr SrcLoc) =
|
|||
doAssert(isNil(fut.error))
|
||||
fut.state = FutureState.Finished
|
||||
fut.callbacks.call()
|
||||
scheduleDestructor(FutureBase(future))
|
||||
|
||||
template complete*[T](futvar: FutureVar[T]) =
|
||||
## Completes a ``FutureVar``.
|
||||
|
@ -223,6 +257,7 @@ proc complete[T](futvar: FutureVar[T], val: T, loc: ptr SrcLoc) =
|
|||
fut.state = FutureState.Finished
|
||||
fut.value = val
|
||||
fut.callbacks.call()
|
||||
scheduleDestructor(FutureBase(fut))
|
||||
|
||||
template complete*[T](futvar: FutureVar[T], val: T) =
|
||||
## Completes a ``FutureVar`` with value ``val``.
|
||||
|
@ -238,6 +273,7 @@ proc fail[T](future: Future[T], error: ref Exception, loc: ptr SrcLoc) =
|
|||
future.errorStackTrace =
|
||||
if getStackTrace(error) == "": getStackTrace() else: getStackTrace(error)
|
||||
future.callbacks.call()
|
||||
scheduleDestructor(FutureBase(future))
|
||||
|
||||
template fail*[T](future: Future[T], error: ref Exception) =
|
||||
## Completes ``future`` with ``error``.
|
||||
|
@ -250,6 +286,7 @@ proc cancelAndSchedule(future: FutureBase, loc: ptr SrcLoc) =
|
|||
future.error = newException(CancelledError, "")
|
||||
future.errorStackTrace = getStackTrace()
|
||||
future.callbacks.call()
|
||||
scheduleDestructor(future)
|
||||
|
||||
template cancelAndSchedule*[T](future: Future[T]) =
|
||||
cancelAndSchedule(FutureBase(future), getSrcLocation())
|
||||
|
|
|
@ -959,5 +959,18 @@ proc getTracker*(id: string): TrackerBase =
|
|||
let loop = getGlobalDispatcher()
|
||||
result = loop.trackers.getOrDefault(id, nil)
|
||||
|
||||
iterator pendingFutures*(): FutureBase =
|
||||
## Iterates over the list of pending Futures (Future[T] objects which not yet
|
||||
## completed, cancelled or failed).
|
||||
var slider = futureList.head
|
||||
while not(isNil(slider)):
|
||||
yield slider
|
||||
slider = slider.next
|
||||
|
||||
proc pendingFuturesCount*(): int =
|
||||
## Returns number of pending Futures (Future[T] objects which not yet
|
||||
## completed, cancelled or failed).
|
||||
futureList.count
|
||||
|
||||
# Perform global per-module initialization.
|
||||
globalInit()
|
||||
|
|
|
@ -0,0 +1,58 @@
|
|||
#
|
||||
# Chronos Debugging Utilities
|
||||
#
|
||||
# (c) Copyright 2020-Present Status Research & Development GmbH
|
||||
#
|
||||
# Licensed under either of
|
||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||
# MIT license (LICENSE-MIT)
|
||||
import asyncloop
|
||||
|
||||
const
|
||||
AllFutureStates* = {FutureState.Pending, FutureState.Cancelled,
|
||||
FutureState.Finished, FutureState.Failed}
|
||||
WithoutFinished* = {FutureState.Pending, FutureState.Cancelled,
|
||||
FutureState.Failed}
|
||||
OnlyPending* = {FutureState.Pending}
|
||||
OnlyFinished* = {FutureState.Finished}
|
||||
|
||||
proc dumpPendingFutures*(filter = AllFutureStates): string =
|
||||
## Dump all `pending` Future[T] objects.
|
||||
##
|
||||
## This list will contain:
|
||||
## 1. Future[T] objects with ``FutureState.Pending`` state (this Futures are
|
||||
## not yet finished).
|
||||
## 2. Future[T] objects with ``FutureState.Finished/Cancelled/Failed`` state
|
||||
## which callbacks are scheduled, but not yet fully processed.
|
||||
var count = 0
|
||||
var res = ""
|
||||
for item in pendingFutures():
|
||||
if item.state in filter:
|
||||
inc(count)
|
||||
let loc = item.location[LocCreateIndex][]
|
||||
let procedure = $loc.procedure
|
||||
let filename = $loc.file
|
||||
let procname = if len(procedure) == 0:
|
||||
"\"unspecified\""
|
||||
else:
|
||||
"\"" & procedure & "\""
|
||||
let item = "Future[" & $item.id & "] with name " & $procname &
|
||||
" created at " & "<" & filename & ":" & $loc.line & ">" &
|
||||
" and state = " & $item.state & "\n"
|
||||
res.add(item)
|
||||
result = $count & " pending Future[T] objects found:\n" & $res
|
||||
|
||||
proc pendingFuturesCount*(filter: set[FutureState]): int =
|
||||
## Returns number of `pending` Future[T] objects which satisfy the ``filter``
|
||||
## condition.
|
||||
##
|
||||
## If ``filter`` is equal to ``AllFutureStates`` Operation's complexity is
|
||||
## O(1), otherwise operation's complexity is O(n).
|
||||
if filter == AllFutureStates:
|
||||
pendingFuturesCount()
|
||||
else:
|
||||
var res = 0
|
||||
for item in pendingFutures():
|
||||
if item.state in filter:
|
||||
inc(res)
|
||||
res
|
|
@ -7,4 +7,4 @@
|
|||
# MIT license (LICENSE-MIT)
|
||||
import testmacro, testsync, testsoon, testtime, testfut, testsignal,
|
||||
testaddress, testdatagram, teststream, testserver, testbugs, testnet,
|
||||
testasyncstream
|
||||
testasyncstream, testutils
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
# Chronos Test Suite
|
||||
# (c) Copyright 2020-Present
|
||||
# Status Research & Development GmbH
|
||||
#
|
||||
# Licensed under either of
|
||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||
# MIT license (LICENSE-MIT)
|
||||
import unittest, strutils
|
||||
import ../chronos
|
||||
|
||||
when defined(nimHasUsed): {.used.}
|
||||
|
||||
suite "Asynchronous utilities test suite":
|
||||
proc getCount(): int =
|
||||
# This procedure counts number of Future[T] in double-linked list via list
|
||||
# iteration.
|
||||
result = 0
|
||||
for item in pendingFutures():
|
||||
inc(result)
|
||||
|
||||
test "Future clean and leaks test":
|
||||
if pendingFuturesCount(WithoutFinished) == 0:
|
||||
if pendingFuturesCount(OnlyFinished) > 0:
|
||||
poll()
|
||||
check pendingFuturesCount() == 0
|
||||
else:
|
||||
echo dumpPendingFutures()
|
||||
check false
|
||||
|
||||
test "FutureList basics test":
|
||||
var fut1 = newFuture[void]()
|
||||
check:
|
||||
getCount() == 1
|
||||
pendingFuturesCount() == 1
|
||||
var fut2 = newFuture[void]()
|
||||
check:
|
||||
getCount() == 2
|
||||
pendingFuturesCount() == 2
|
||||
var fut3 = newFuture[void]()
|
||||
check:
|
||||
getCount() == 3
|
||||
pendingFuturesCount() == 3
|
||||
fut1.complete()
|
||||
poll()
|
||||
check:
|
||||
getCount() == 2
|
||||
pendingFuturesCount() == 2
|
||||
fut2.fail(newException(ValueError, ""))
|
||||
poll()
|
||||
check:
|
||||
getCount() == 1
|
||||
pendingFuturesCount() == 1
|
||||
fut3.cancel()
|
||||
poll()
|
||||
check:
|
||||
getCount() == 0
|
||||
pendingFuturesCount() == 0
|
||||
|
||||
test "FutureList async procedure test":
|
||||
proc simpleProc() {.async.} =
|
||||
await sleepAsync(10.milliseconds)
|
||||
|
||||
var fut = simpleProc()
|
||||
check:
|
||||
getCount() == 2
|
||||
pendingFuturesCount() == 2
|
||||
|
||||
waitFor fut
|
||||
check:
|
||||
getCount() == 1
|
||||
pendingFuturesCount() == 1
|
||||
|
||||
poll()
|
||||
check:
|
||||
getCount() == 0
|
||||
pendingFuturesCount() == 0
|
Loading…
Reference in New Issue