Add callIdle() primitive. (#148)

* Add callIdle primitive.

* Make single idle callback to be processed by single poll() step.
Add idleAsync() primitive to allow wait for "idle" time.
Refactor some `result` usage.
This commit is contained in:
Eugene Kabanov 2021-01-19 14:48:39 +02:00 committed by GitHub
parent 9a420c6b05
commit 491213dfa0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 85 additions and 22 deletions

View File

@ -200,6 +200,7 @@ type
PDispatcherBase = ref object of RootRef
timers*: HeapQueue[TimerCallback]
callbacks*: Deque[AsyncCallback]
idlers*: Deque[AsyncCallback]
trackers*: Table[string, TrackerBase]
proc `<`(a, b: TimerCallback): bool =
@ -235,17 +236,18 @@ template processTimersGetTimeout(loop, timeout: untyped) =
break
loop.callbacks.addLast(loop.timers.pop().function)
if loop.timers.len > 0:
timeout = (lastFinish - curTime).getAsyncTimestamp()
if timeout == 0:
if len(loop.callbacks) == 0:
if (len(loop.callbacks) == 0) and (len(loop.idlers) == 0):
when defined(windows):
timeout = INFINITE
else:
timeout = -1
else:
if len(loop.callbacks) != 0:
if (len(loop.callbacks) != 0) or (len(loop.idlers) != 0):
timeout = 0
template processTimers(loop: untyped) =
@ -259,6 +261,10 @@ template processTimers(loop: untyped) =
break
loop.callbacks.addLast(loop.timers.pop().function)
template processIdlers(loop: untyped) =
if len(loop.idlers) > 0:
loop.callbacks.addLast(loop.idlers.popFirst())
template processCallbacks(loop: untyped) =
var count = len(loop.callbacks)
for i in 0..<count:
@ -359,23 +365,25 @@ when defined(windows) or defined(nimdoc):
proc newDispatcher*(): PDispatcher =
## Creates a new Dispatcher instance.
new result
result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
var res = PDispatcher()
res.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
when declared(initHashSet):
# After 0.20.0 Nim's stdlib version
result.handles = initHashSet[AsyncFD]()
res.handles = initHashSet[AsyncFD]()
else:
# Pre 0.20.0 Nim's stdlib version
result.handles = initSet[AsyncFD]()
res.handles = initSet[AsyncFD]()
when declared(initHeapQueue):
# After 0.20.0 Nim's stdlib version
result.timers = initHeapQueue[TimerCallback]()
res.timers = initHeapQueue[TimerCallback]()
else:
# Pre 0.20.0 Nim's stdlib version
result.timers = newHeapQueue[TimerCallback]()
result.callbacks = initDeque[AsyncCallback](64)
result.trackers = initTable[string, TrackerBase]()
initAPI(result)
res.timers = newHeapQueue[TimerCallback]()
res.callbacks = initDeque[AsyncCallback](64)
res.idlers = initDeque[AsyncCallback]()
res.trackers = initTable[string, TrackerBase]()
initAPI(res)
res
var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
@ -404,6 +412,7 @@ when defined(windows) or defined(nimdoc):
let loop = getThreadDispatcher()
var curTime = Moment.now()
var curTimeout = DWORD(0)
var noNetworkEvents = false
# Moving expired timers to `loop.callbacks` and calculate timeout
loop.processTimersGetTimeout(curTimeout)
@ -434,10 +443,17 @@ when defined(windows) or defined(nimdoc):
else:
if int32(errCode) != WAIT_TIMEOUT:
raiseOSError(errCode)
else:
noNetworkEvents = true
# Moving expired timers to `loop.callbacks`.
loop.processTimers()
# We move idle callbacks to `loop.callbacks` only if there no pending
# network events.
if noNetworkEvents:
loop.processIdlers()
# All callbacks which will be added in process will be processed on next
# poll() call.
loop.processCallbacks()
@ -502,18 +518,20 @@ elif unixPlatform:
proc newDispatcher*(): PDispatcher =
## Create new dispatcher.
new result
result.selector = newSelector[SelectorData]()
var res = PDispatcher()
res.selector = newSelector[SelectorData]()
when declared(initHeapQueue):
# After 0.20.0 Nim's stdlib version
result.timers = initHeapQueue[TimerCallback]()
res.timers = initHeapQueue[TimerCallback]()
else:
# Before 0.20.0 Nim's stdlib version
result.timers.newHeapQueue()
result.callbacks = initDeque[AsyncCallback](64)
result.keys = newSeq[ReadyKey](64)
result.trackers = initTable[string, TrackerBase]()
initAPI(result)
res.timers.newHeapQueue()
res.callbacks = initDeque[AsyncCallback](64)
res.idlers = initDeque[AsyncCallback]()
res.keys = newSeq[ReadyKey](64)
res.trackers = initTable[string, TrackerBase]()
initAPI(res)
res
var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
@ -685,7 +703,7 @@ elif unixPlatform:
loop.processTimersGetTimeout(curTimeout)
# Processing IO descriptors and all hardware events.
var count = loop.selector.selectInto(curTimeout, loop.keys)
let count = loop.selector.selectInto(curTimeout, loop.keys)
for i in 0..<count:
let fd = loop.keys[i].fd
let events = loop.keys[i].events
@ -711,6 +729,11 @@ elif unixPlatform:
# Moving expired timers to `loop.callbacks`.
loop.processTimers()
# We move idle callbacks to `loop.callbacks` only if there no pending
# network events.
if count == 0:
loop.processIdlers()
# All callbacks which will be added in process, will be processed on next
# poll() call.
loop.processCallbacks()
@ -816,10 +839,32 @@ proc callSoon*(cbproc: CallbackFunc, data: pointer) {.
doAssert(not isNil(cbproc))
callSoon(AsyncCallback(function: cbproc, udata: data))
proc callSoon*(cbproc: CallbackFunc) {.
gcsafe, raises: [Defect].} =
proc callSoon*(cbproc: CallbackFunc) {.gcsafe, raises: [Defect].} =
callSoon(cbproc, nil)
proc callIdle*(acb: AsyncCallback) {.gcsafe, raises: [Defect].} =
## Schedule ``cbproc`` to be called when there no pending network events
## available.
##
## **WARNING!** Despite the name, "idle" callbacks called on every loop
## iteration if there no network events available, not when the loop is
## actually "idle".
getThreadDispatcher().idlers.addLast(acb)
proc callIdle*(cbproc: CallbackFunc, data: pointer) {.
gcsafe, raises: [Defect].} =
## Schedule ``cbproc`` to be called when there no pending network events
## available.
##
## **WARNING!** Despite the name, "idle" callbacks called on every loop
## iteration if there no network events available, not when the loop is
## actually "idle".
doAssert(not isNil(cbproc))
callIdle(AsyncCallback(function: cbproc, udata: data))
proc callIdle*(cbproc: CallbackFunc) {.gcsafe, raises: [Defect].} =
callIdle(cbproc, nil)
include asyncfutures2
proc sleepAsync*(duration: Duration): Future[void] =
@ -877,6 +922,24 @@ proc stepsAsync*(number: int): Future[void] =
retFuture
proc idleAsync*(): Future[void] =
## Suspends the execution of the current asynchronous task until "idle" time.
##
## "idle" time its moment of time, when no network events were processed by
## ``poll()`` call.
var retFuture = newFuture[void]("chronos.idleAsync()")
proc continuation(data: pointer) {.gcsafe.} =
if not(retFuture.finished()):
retFuture.complete()
proc cancellation(udata: pointer) {.gcsafe.} =
discard
retFuture.cancelCallback = cancellation
callIdle(continuation, nil)
retFuture
proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] =
## Returns a future which will complete once ``fut`` completes or after
## ``timeout`` milliseconds has elapsed.