don't scan timers heap on each remove

This commit is contained in:
Dmitriy Ryajov 2020-01-06 23:26:18 -06:00
parent 546cc36d79
commit b9c534724c
1 changed files with 75 additions and 13 deletions

View File

@ -11,7 +11,7 @@
include "system/inclrtl"
import os, tables, strutils, heapqueue, lists, options, nativesockets, net,
deques
deques, sequtils
import timer
export Port, SocketFlag
@ -190,9 +190,12 @@ type
AsyncTimeoutError* = object of AsyncError
## Timeout exception
TimerCallback* = object
TimerHandle* = distinct Natural
TimerCallback* = ref object
finishAt*: Moment
function*: AsyncCallback
removed: bool
TrackerBase* = ref object of RootRef
id*: string
@ -203,6 +206,7 @@ type
timers*: HeapQueue[TimerCallback]
callbacks*: Deque[AsyncCallback]
trackers*: Table[string, TrackerBase]
timerHandles: seq[TimerCallback]
proc `<`(a, b: TimerCallback): bool =
result = a.finishAt < b.finishAt
@ -227,14 +231,27 @@ func getAsyncTimestamp*(a: Duration): auto {.inline.} =
result = cast[int32](res)
result += min(1, cast[int32](mid))
template removeTimers(loop) =
var timers = cast[seq[TimerCallback]](loop.timers)
timers.keepItIf( not(it.removed) )
loop.timers = cast[HeapQueue[TimerCallback]](timers)
template processTimersGetTimeout(loop, timeout: untyped) =
var count = len(loop.timers)
if count > 0:
var lastFinish = curTime
while count > 0:
if loop.timers.len < 1:
break
if loop.timers[0].removed:
discard loop.timers.pop()
continue
lastFinish = loop.timers[0].finishAt
if curTime < lastFinish:
break
loop.callbacks.addLast(loop.timers.pop().function)
dec(count)
if count > 0:
@ -255,6 +272,13 @@ template processTimers(loop: untyped) =
var count = len(loop.timers)
if count > 0:
while count > 0:
if loop.timers.len < 1:
break
if loop.timers[0].removed:
discard loop.timers.pop()
continue
if curTime < loop.timers[0].finishAt:
break
loop.callbacks.addLast(loop.timers.pop().function)
@ -721,11 +745,44 @@ elif unixPlatform:
# poll() call.
loop.processCallbacks()
# Cleanup canceled timers
loop.removeTimers()
else:
proc initAPI() = discard
proc globalInit() = discard
proc addTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) =
proc setTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil): TimerHandle =
## Arrange for the callback ``cb`` to be called at the given absolute
## timestamp ``at``. You can also pass ``udata`` to callback.
let loop = getGlobalDispatcher()
var timer = TimerCallback(finishAt: at,
function: AsyncCallback(function: cb, udata: udata))
loop.timerHandles.add(timer)
result = TimerHandle(loop.timerHandles.len - 1)
# try to optimize for the case where the top
# element is already removed and just replace it
if loop.timers.len > 0 and loop.timers[0].removed:
discard loop.timers.replace(timer)
else:
loop.timers.push(timer)
proc clearTimer*(handle: TimerHandle) =
var loop = getGlobalDispatcher()
var timer = loop.timerHandles[handle.Natural]
loop.timerHandles[handle.Natural] = nil
# optimize for handle being at the top
if timer == loop.timers[0]:
discard loop.timers.pop()
else:
timer.removed = true
proc addTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) {.
deprecated: "Use setTimer/clearTimer pairs"
.} =
## Arrange for the callback ``cb`` to be called at the given absolute
## timestamp ``at``. You can also pass ``udata`` to callback.
let loop = getGlobalDispatcher()
@ -770,6 +827,7 @@ proc sleepAsync*(duration: Duration): Future[void] =
## ``duration`` time.
var retFuture = newFuture[void]("chronos.sleepAsync(Duration)")
let moment = Moment.fromNow(duration)
var timer: TimerHandle
proc completion(data: pointer) {.gcsafe.} =
if not(retFuture.finished()):
@ -777,10 +835,10 @@ proc sleepAsync*(duration: Duration): Future[void] =
proc cancel(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
removeTimer(moment, completion, cast[pointer](retFuture))
clearTimer(timer)
retFuture.cancelCallback = cancel
addTimer(moment, completion, cast[pointer](retFuture))
timer = setTimer(moment, completion, cast[pointer](retFuture))
return retFuture
proc sleepAsync*(ms: int): Future[void] {.
@ -797,6 +855,7 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] =
var retFuture = newFuture[bool]("chronos.`withTimeout`")
var moment: Moment
var timerPresent = false
var timer: TimerHandle
proc continuation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
@ -808,13 +867,13 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] =
else:
# Future `fut` completed/failed/cancelled first.
if timerPresent:
removeTimer(moment, continuation, nil)
clearTimer(timer)
retFuture.complete(true)
proc cancel(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
if timerPresent:
removeTimer(moment, continuation, nil)
clearTimer(timer)
if not(fut.finished()):
fut.removeCallback(continuation)
fut.cancel()
@ -831,7 +890,7 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] =
timerPresent = true
moment = Moment.fromNow(timeout)
retFuture.cancelCallback = cancel
addTimer(moment, continuation, nil)
timer = setTimer(moment, continuation, nil)
fut.addCallback(continuation)
return retFuture
@ -851,7 +910,8 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] =
## should return, because it can't be cancelled too.
var retFuture = newFuture[T]("chronos.wait()")
var moment: Moment
var timerPresent = false
var timerPresent: bool
var timer: TimerHandle
proc continuation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
@ -859,11 +919,13 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] =
# Timer exceeded first.
fut.removeCallback(continuation)
fut.cancel()
if timerPresent:
clearTimer(timer)
retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!"))
else:
# Future `fut` completed/failed/cancelled first.
if timerPresent:
removeTimer(moment, continuation, nil)
clearTimer(timer)
if fut.failed():
retFuture.fail(fut.error)
@ -876,7 +938,7 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] =
proc cancel(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
if timerPresent:
removeTimer(moment, continuation, nil)
clearTimer(timer)
if not(fut.finished()):
fut.removeCallback(continuation)
fut.cancel()
@ -896,10 +958,10 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] =
retFuture.cancelCallback = cancel
fut.addCallback(continuation)
else:
timerPresent = true
moment = Moment.fromNow(timeout)
retFuture.cancelCallback = cancel
addTimer(moment, continuation, nil)
timer = setTimer(moment, continuation, nil)
timerPresent = true
fut.addCallback(continuation)
return retFuture