don't loop over timers heap

This commit is contained in:
Dmitriy Ryajov 2020-01-07 19:06:27 -06:00
parent d979770263
commit d8dd09a358
1 changed files with 36 additions and 73 deletions

View File

@ -11,7 +11,7 @@
include "system/inclrtl" include "system/inclrtl"
import os, tables, strutils, heapqueue, lists, options, nativesockets, net, import os, tables, strutils, heapqueue, lists, options, nativesockets, net,
deques, sequtils deques
import timer import timer
export Port, SocketFlag export Port, SocketFlag
@ -180,7 +180,7 @@ type
CallbackFunc* = proc (arg: pointer = nil) {.gcsafe.} CallbackFunc* = proc (arg: pointer = nil) {.gcsafe.}
CallSoonProc* = proc (c: CallbackFunc, u: pointer = nil) {.gcsafe.} CallSoonProc* = proc (c: CallbackFunc, u: pointer = nil) {.gcsafe.}
AsyncCallback* = object AsyncCallback* = ref object
function*: CallbackFunc function*: CallbackFunc
udata*: pointer udata*: pointer
deleted*: bool deleted*: bool
@ -190,12 +190,10 @@ type
AsyncTimeoutError* = object of AsyncError AsyncTimeoutError* = object of AsyncError
## Timeout exception ## Timeout exception
TimerHandle* = distinct Natural
TimerCallback* = ref object TimerCallback* = ref object
finishAt*: Moment finishAt*: Moment
function*: AsyncCallback function*: AsyncCallback
removed: bool deleted*: bool
TrackerBase* = ref object of RootRef TrackerBase* = ref object of RootRef
id*: string id*: string
@ -206,7 +204,6 @@ type
timers*: HeapQueue[TimerCallback] timers*: HeapQueue[TimerCallback]
callbacks*: Deque[AsyncCallback] callbacks*: Deque[AsyncCallback]
trackers*: Table[string, TrackerBase] trackers*: Table[string, TrackerBase]
timerHandles: seq[TimerCallback]
proc `<`(a, b: TimerCallback): bool = proc `<`(a, b: TimerCallback): bool =
result = a.finishAt < b.finishAt result = a.finishAt < b.finishAt
@ -231,20 +228,15 @@ func getAsyncTimestamp*(a: Duration): auto {.inline.} =
result = cast[int32](res) result = cast[int32](res)
result += min(1, cast[int32](mid)) result += min(1, cast[int32](mid))
template removeTimers(loop) =
var timers = cast[seq[TimerCallback]](loop.timers)
timers.keepItIf( not(it.removed) )
loop.timers = cast[HeapQueue[TimerCallback]](timers)
template processTimersGetTimeout(loop, timeout: untyped) = template processTimersGetTimeout(loop, timeout: untyped) =
var count = len(loop.timers) var count = len(loop.timers)
if count > 0: if count > 0:
var lastFinish = curTime var lastFinish = curTime
while count > 0: while count > 0:
if loop.timers.len < 1: if not(loop.timers.len > 0):
break break
if loop.timers[0].removed: if loop.timers[0].deleted:
discard loop.timers.pop() discard loop.timers.pop()
continue continue
@ -272,10 +264,10 @@ template processTimers(loop: untyped) =
var count = len(loop.timers) var count = len(loop.timers)
if count > 0: if count > 0:
while count > 0: while count > 0:
if loop.timers.len < 1: if not(loop.timers.len > 0):
break break
if loop.timers[0].removed: if loop.timers[0].deleted:
discard loop.timers.pop() discard loop.timers.pop()
continue continue
@ -472,9 +464,6 @@ when defined(windows) or defined(nimdoc):
# poll() call. # poll() call.
loop.processCallbacks() loop.processCallbacks()
# Cleanup canceled timers
loop.removeTimers()
proc closeSocket*(fd: AsyncFD, aftercb: CallbackFunc = nil) = proc closeSocket*(fd: AsyncFD, aftercb: CallbackFunc = nil) =
## Closes a socket and ensures that it is unregistered. ## Closes a socket and ensures that it is unregistered.
let loop = getGlobalDispatcher() let loop = getGlobalDispatcher()
@ -592,7 +581,8 @@ elif unixPlatform:
adata.reader = acb adata.reader = acb
adata.rdata = CompletionData(fd: fd, udata: udata) adata.rdata = CompletionData(fd: fd, udata: udata)
newEvents.incl(Event.Read) newEvents.incl(Event.Read)
if not isNil(adata.writer.function): newEvents.incl(Event.Write) if not(isNil(adata.writer)) and not(isNil(adata.writer.function)):
newEvents.incl(Event.Write)
do: do:
raise newException(ValueError, "File descriptor not registered.") raise newException(ValueError, "File descriptor not registered.")
loop.selector.updateHandle(int(fd), newEvents) loop.selector.updateHandle(int(fd), newEvents)
@ -605,7 +595,8 @@ elif unixPlatform:
# We need to clear `reader` data, because `selectors` don't do it # We need to clear `reader` data, because `selectors` don't do it
adata.reader.function = nil adata.reader.function = nil
# adata.rdata = CompletionData() # adata.rdata = CompletionData()
if not isNil(adata.writer.function): newEvents.incl(Event.Write) if not(isNil(adata.writer)) and not(isNil(adata.writer.function)):
newEvents.incl(Event.Write)
do: do:
raise newException(ValueError, "File descriptor not registered.") raise newException(ValueError, "File descriptor not registered.")
loop.selector.updateHandle(int(fd), newEvents) loop.selector.updateHandle(int(fd), newEvents)
@ -620,7 +611,8 @@ elif unixPlatform:
adata.writer = acb adata.writer = acb
adata.wdata = CompletionData(fd: fd, udata: udata) adata.wdata = CompletionData(fd: fd, udata: udata)
newEvents.incl(Event.Write) newEvents.incl(Event.Write)
if not isNil(adata.reader.function): newEvents.incl(Event.Read) if not(isNil(adata.reader)) and not(isNil(adata.reader.function)):
newEvents.incl(Event.Read)
do: do:
raise newException(ValueError, "File descriptor not registered.") raise newException(ValueError, "File descriptor not registered.")
loop.selector.updateHandle(int(fd), newEvents) loop.selector.updateHandle(int(fd), newEvents)
@ -633,7 +625,8 @@ elif unixPlatform:
# We need to clear `writer` data, because `selectors` don't do it # We need to clear `writer` data, because `selectors` don't do it
adata.writer.function = nil adata.writer.function = nil
# adata.wdata = CompletionData() # adata.wdata = CompletionData()
if not isNil(adata.reader.function): newEvents.incl(Event.Read) if not(isNil(adata.reader)) and not(isNil(adata.reader.function)):
newEvents.incl(Event.Read)
do: do:
raise newException(ValueError, "File descriptor not registered.") raise newException(ValueError, "File descriptor not registered.")
loop.selector.updateHandle(int(fd), newEvents) loop.selector.updateHandle(int(fd), newEvents)
@ -656,16 +649,16 @@ elif unixPlatform:
withData(loop.selector, int(fd), adata) do: withData(loop.selector, int(fd), adata) do:
# We are scheduling reader and writer callbacks to be called # We are scheduling reader and writer callbacks to be called
# explicitly, so they can get an error and continue work. # explicitly, so they can get an error and continue work.
if not isNil(adata.reader.function): if not(isNil(adata.reader)) and not(isNil(adata.reader.function)):
if not adata.reader.deleted: if not adata.reader.deleted:
loop.callbacks.addLast(adata.reader) loop.callbacks.addLast(adata.reader)
if not isNil(adata.writer.function): if not(isNil(adata.writer)) and not(isNil(adata.writer.function)):
if not adata.writer.deleted: if not adata.writer.deleted:
loop.callbacks.addLast(adata.writer) loop.callbacks.addLast(adata.writer)
# Mark callbacks as deleted, we don't need to get REAL notifications # Mark callbacks as deleted, we don't need to get REAL notifications
# from system queue for this reader and writer. # from system queue for this reader and writer.
adata.reader.deleted = true if not isNil(adata.reader): adata.reader.deleted = true
adata.writer.deleted = true if not isNil(adata.writer): adata.writer.deleted = true
# We can't unregister file descriptor from system queue here, because # We can't unregister file descriptor from system queue here, because
# in such case processing queue will stuck on poll() call, because there # in such case processing queue will stuck on poll() call, because there
@ -748,47 +741,23 @@ elif unixPlatform:
# poll() call. # poll() call.
loop.processCallbacks() loop.processCallbacks()
# Cleanup canceled timers
loop.removeTimers()
else: else:
proc initAPI() = discard proc initAPI() = discard
proc globalInit() = discard proc globalInit() = discard
proc setTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil): TimerHandle = proc setTimer(at: Moment, cb: CallbackFunc, udata: pointer = nil): TimerCallback =
## Arrange for the callback ``cb`` to be called at the given absolute ## Arrange for the callback ``cb`` to be called at the given absolute
## timestamp ``at``. You can also pass ``udata`` to callback. ## timestamp ``at``. You can also pass ``udata`` to callback.
## Returns an opaque handle to be passed to ``clearTimer()`` to cancel
## a pending timer.
let loop = getGlobalDispatcher() let loop = getGlobalDispatcher()
result = TimerCallback(finishAt: at,
function: AsyncCallback(function: cb, udata: udata))
loop.timers.push(result)
var timer = TimerCallback(finishAt: at, proc clearTimer(timer: TimerCallback) =
function: AsyncCallback(function: cb, udata: udata)) timer.deleted = true
loop.timerHandles.add(timer)
result = TimerHandle(loop.timerHandles.len - 1)
# try to optimize for the case where the top proc addTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil)
# element is already removed and just replace it {.deprecated: "Use setTimer/clearTimer instead".} =
if loop.timers.len > 0 and loop.timers[0].removed:
discard loop.timers.replace(timer)
else:
loop.timers.push(timer)
proc clearTimer*(handle: TimerHandle) =
## Clear the timer associated with the passed handle
var loop = getGlobalDispatcher()
var timer = loop.timerHandles[handle.Natural]
loop.timerHandles[handle.Natural] = nil
# optimize for handle being at the top
if timer == loop.timers[0]:
discard loop.timers.pop()
else:
timer.removed = true
proc addTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) {.
deprecated: "Use setTimer/clearTimer pairs"
.} =
## Arrange for the callback ``cb`` to be called at the given absolute ## Arrange for the callback ``cb`` to be called at the given absolute
## timestamp ``at``. You can also pass ``udata`` to callback. ## timestamp ``at``. You can also pass ``udata`` to callback.
let loop = getGlobalDispatcher() let loop = getGlobalDispatcher()
@ -833,7 +802,7 @@ proc sleepAsync*(duration: Duration): Future[void] =
## ``duration`` time. ## ``duration`` time.
var retFuture = newFuture[void]("chronos.sleepAsync(Duration)") var retFuture = newFuture[void]("chronos.sleepAsync(Duration)")
let moment = Moment.fromNow(duration) let moment = Moment.fromNow(duration)
var timer: TimerHandle var timer: TimerCallback
proc completion(data: pointer) {.gcsafe.} = proc completion(data: pointer) {.gcsafe.} =
if not(retFuture.finished()): if not(retFuture.finished()):
@ -860,25 +829,24 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] =
## future will hold false. ## future will hold false.
var retFuture = newFuture[bool]("chronos.`withTimeout`") var retFuture = newFuture[bool]("chronos.`withTimeout`")
var moment: Moment var moment: Moment
var timerPresent = false var timer: TimerCallback
var timer: TimerHandle
proc continuation(udata: pointer) {.gcsafe.} = proc continuation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()): if not(retFuture.finished()):
if isNil(udata): if isNil(udata):
# Timer exceeded first. # Timer exceeded first.
fut.removeCallback(continuation) clearTimer(timer)
fut.cancel() fut.cancel()
retFuture.complete(false) retFuture.complete(false)
else: else:
# Future `fut` completed/failed/cancelled first. # Future `fut` completed/failed/cancelled first.
if timerPresent: if not isNil(timer):
clearTimer(timer) clearTimer(timer)
retFuture.complete(true) retFuture.complete(true)
proc cancel(udata: pointer) {.gcsafe.} = proc cancel(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()): if not(retFuture.finished()):
if timerPresent: if not isNil(timer):
clearTimer(timer) clearTimer(timer)
if not(fut.finished()): if not(fut.finished()):
fut.removeCallback(continuation) fut.removeCallback(continuation)
@ -893,7 +861,6 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] =
retFuture.cancelCallback = cancel retFuture.cancelCallback = cancel
fut.addCallback(continuation) fut.addCallback(continuation)
else: else:
timerPresent = true
moment = Moment.fromNow(timeout) moment = Moment.fromNow(timeout)
retFuture.cancelCallback = cancel retFuture.cancelCallback = cancel
timer = setTimer(moment, continuation, nil) timer = setTimer(moment, continuation, nil)
@ -916,21 +883,18 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] =
## should return, because it can't be cancelled too. ## should return, because it can't be cancelled too.
var retFuture = newFuture[T]("chronos.wait()") var retFuture = newFuture[T]("chronos.wait()")
var moment: Moment var moment: Moment
var timerPresent: bool var timer: TimerCallback
var timer: TimerHandle
proc continuation(udata: pointer) {.gcsafe.} = proc continuation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()): if not(retFuture.finished()):
if not(fut.finished()): if isNil(udata):
# Timer exceeded first. # Timer exceeded first.
fut.removeCallback(continuation) fut.removeCallback(continuation)
fut.cancel() fut.cancel()
if timerPresent:
clearTimer(timer)
retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!")) retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!"))
else: else:
# Future `fut` completed/failed/cancelled first. # Future `fut` completed/failed/cancelled first.
if timerPresent: if not isNil(timer):
clearTimer(timer) clearTimer(timer)
if fut.failed(): if fut.failed():
@ -943,7 +907,7 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] =
proc cancel(udata: pointer) {.gcsafe.} = proc cancel(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()): if not(retFuture.finished()):
if timerPresent: if not isNil(timer):
clearTimer(timer) clearTimer(timer)
if not(fut.finished()): if not(fut.finished()):
fut.removeCallback(continuation) fut.removeCallback(continuation)
@ -967,7 +931,6 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] =
moment = Moment.fromNow(timeout) moment = Moment.fromNow(timeout)
retFuture.cancelCallback = cancel retFuture.cancelCallback = cancel
timer = setTimer(moment, continuation, nil) timer = setTimer(moment, continuation, nil)
timerPresent = true
fut.addCallback(continuation) fut.addCallback(continuation)
return retFuture return retFuture