diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index 6fee704..7f10a9a 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -11,7 +11,7 @@ include "system/inclrtl" import os, tables, strutils, heapqueue, lists, options, nativesockets, net, - deques + deques, sequtils import timer export Port, SocketFlag @@ -190,9 +190,12 @@ type AsyncTimeoutError* = object of AsyncError ## Timeout exception - TimerCallback* = object + TimerHandle* = distinct Natural + + TimerCallback* = ref object finishAt*: Moment function*: AsyncCallback + removed: bool TrackerBase* = ref object of RootRef id*: string @@ -203,6 +206,7 @@ type timers*: HeapQueue[TimerCallback] callbacks*: Deque[AsyncCallback] trackers*: Table[string, TrackerBase] + timerHandles: seq[TimerCallback] proc `<`(a, b: TimerCallback): bool = result = a.finishAt < b.finishAt @@ -227,14 +231,27 @@ func getAsyncTimestamp*(a: Duration): auto {.inline.} = result = cast[int32](res) result += min(1, cast[int32](mid)) +template removeTimers(loop) = + var timers = cast[seq[TimerCallback]](loop.timers) + timers.keepItIf( not(it.removed) ) + loop.timers = cast[HeapQueue[TimerCallback]](timers) + template processTimersGetTimeout(loop, timeout: untyped) = var count = len(loop.timers) if count > 0: var lastFinish = curTime while count > 0: + if loop.timers.len < 1: + break + + if loop.timers[0].removed: + discard loop.timers.pop() + continue + lastFinish = loop.timers[0].finishAt if curTime < lastFinish: break + loop.callbacks.addLast(loop.timers.pop().function) dec(count) if count > 0: @@ -255,6 +272,13 @@ template processTimers(loop: untyped) = var count = len(loop.timers) if count > 0: while count > 0: + if loop.timers.len < 1: + break + + if loop.timers[0].removed: + discard loop.timers.pop() + continue + if curTime < loop.timers[0].finishAt: break loop.callbacks.addLast(loop.timers.pop().function) @@ -721,11 +745,44 @@ elif unixPlatform: # poll() call. loop.processCallbacks() + # Cleanup canceled timers + loop.removeTimers() + else: proc initAPI() = discard proc globalInit() = discard -proc addTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) = +proc setTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil): TimerHandle = + ## Arrange for the callback ``cb`` to be called at the given absolute + ## timestamp ``at``. You can also pass ``udata`` to callback. + let loop = getGlobalDispatcher() + + var timer = TimerCallback(finishAt: at, + function: AsyncCallback(function: cb, udata: udata)) + loop.timerHandles.add(timer) + result = TimerHandle(loop.timerHandles.len - 1) + + # try to optimize for the case where the top + # element is already removed and just replace it + if loop.timers.len > 0 and loop.timers[0].removed: + discard loop.timers.replace(timer) + else: + loop.timers.push(timer) + +proc clearTimer*(handle: TimerHandle) = + var loop = getGlobalDispatcher() + var timer = loop.timerHandles[handle.Natural] + loop.timerHandles[handle.Natural] = nil + + # optimize for handle being at the top + if timer == loop.timers[0]: + discard loop.timers.pop() + else: + timer.removed = true + +proc addTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) {. + deprecated: "Use setTimer/clearTimer pairs" + .} = ## Arrange for the callback ``cb`` to be called at the given absolute ## timestamp ``at``. You can also pass ``udata`` to callback. let loop = getGlobalDispatcher() @@ -770,6 +827,7 @@ proc sleepAsync*(duration: Duration): Future[void] = ## ``duration`` time. var retFuture = newFuture[void]("chronos.sleepAsync(Duration)") let moment = Moment.fromNow(duration) + var timer: TimerHandle proc completion(data: pointer) {.gcsafe.} = if not(retFuture.finished()): @@ -777,10 +835,10 @@ proc sleepAsync*(duration: Duration): Future[void] = proc cancel(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): - removeTimer(moment, completion, cast[pointer](retFuture)) + clearTimer(timer) retFuture.cancelCallback = cancel - addTimer(moment, completion, cast[pointer](retFuture)) + timer = setTimer(moment, completion, cast[pointer](retFuture)) return retFuture proc sleepAsync*(ms: int): Future[void] {. @@ -797,6 +855,7 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] = var retFuture = newFuture[bool]("chronos.`withTimeout`") var moment: Moment var timerPresent = false + var timer: TimerHandle proc continuation(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): @@ -808,13 +867,13 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] = else: # Future `fut` completed/failed/cancelled first. if timerPresent: - removeTimer(moment, continuation, nil) + clearTimer(timer) retFuture.complete(true) proc cancel(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): if timerPresent: - removeTimer(moment, continuation, nil) + clearTimer(timer) if not(fut.finished()): fut.removeCallback(continuation) fut.cancel() @@ -831,7 +890,7 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] = timerPresent = true moment = Moment.fromNow(timeout) retFuture.cancelCallback = cancel - addTimer(moment, continuation, nil) + timer = setTimer(moment, continuation, nil) fut.addCallback(continuation) return retFuture @@ -851,7 +910,8 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = ## should return, because it can't be cancelled too. var retFuture = newFuture[T]("chronos.wait()") var moment: Moment - var timerPresent = false + var timerPresent: bool + var timer: TimerHandle proc continuation(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): @@ -859,11 +919,13 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = # Timer exceeded first. fut.removeCallback(continuation) fut.cancel() + if timerPresent: + clearTimer(timer) retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!")) else: # Future `fut` completed/failed/cancelled first. if timerPresent: - removeTimer(moment, continuation, nil) + clearTimer(timer) if fut.failed(): retFuture.fail(fut.error) @@ -876,7 +938,7 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = proc cancel(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): if timerPresent: - removeTimer(moment, continuation, nil) + clearTimer(timer) if not(fut.finished()): fut.removeCallback(continuation) fut.cancel() @@ -896,10 +958,10 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = retFuture.cancelCallback = cancel fut.addCallback(continuation) else: - timerPresent = true moment = Moment.fromNow(timeout) retFuture.cancelCallback = cancel - addTimer(moment, continuation, nil) + timer = setTimer(moment, continuation, nil) + timerPresent = true fut.addCallback(continuation) return retFuture