diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index 68cd541..b72553e 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -190,9 +190,10 @@ type AsyncTimeoutError* = object of AsyncError ## Timeout exception - TimerCallback* = object + TimerCallback* = ref object finishAt*: Moment function*: AsyncCallback + deleted*: bool TrackerBase* = ref object of RootRef id*: string @@ -228,17 +229,19 @@ func getAsyncTimestamp*(a: Duration): auto {.inline.} = result += min(1, cast[int32](mid)) template processTimersGetTimeout(loop, timeout: untyped) = - var count = len(loop.timers) - if count > 0: - var lastFinish = curTime - while count > 0: - lastFinish = loop.timers[0].finishAt - if curTime < lastFinish: - break - loop.callbacks.addLast(loop.timers.pop().function) - dec(count) - if count > 0: - timeout = (lastFinish - curTime).getAsyncTimestamp() + var lastFinish = curTime + while loop.timers.len > 0: + if loop.timers[0].deleted: + discard loop.timers.pop() + continue + + lastFinish = loop.timers[0].finishAt + if curTime < lastFinish: + break + + loop.callbacks.addLast(loop.timers.pop().function) + if loop.timers.len > 0: + timeout = (lastFinish - curTime).getAsyncTimestamp() if timeout == 0: if len(loop.callbacks) == 0: @@ -252,13 +255,14 @@ template processTimersGetTimeout(loop, timeout: untyped) = template processTimers(loop: untyped) = var curTime = Moment.now() - var count = len(loop.timers) - if count > 0: - while count > 0: - if curTime < loop.timers[0].finishAt: - break - loop.callbacks.addLast(loop.timers.pop().function) - dec(count) + while loop.timers.len > 0: + if loop.timers[0].deleted: + discard loop.timers.pop() + continue + + if curTime < loop.timers[0].finishAt: + break + loop.callbacks.addLast(loop.timers.pop().function) template processCallbacks(loop: untyped) = var count = len(loop.callbacks) @@ -565,7 +569,8 @@ elif unixPlatform: adata.reader = acb adata.rdata = CompletionData(fd: fd, udata: udata) newEvents.incl(Event.Read) - if not isNil(adata.writer.function): newEvents.incl(Event.Write) + if not(isNil(adata.writer.function)): + newEvents.incl(Event.Write) do: raise newException(ValueError, "File descriptor not registered.") loop.selector.updateHandle(int(fd), newEvents) @@ -578,7 +583,8 @@ elif unixPlatform: # We need to clear `reader` data, because `selectors` don't do it adata.reader.function = nil # adata.rdata = CompletionData() - if not isNil(adata.writer.function): newEvents.incl(Event.Write) + if not(isNil(adata.writer.function)): + newEvents.incl(Event.Write) do: raise newException(ValueError, "File descriptor not registered.") loop.selector.updateHandle(int(fd), newEvents) @@ -593,7 +599,8 @@ elif unixPlatform: adata.writer = acb adata.wdata = CompletionData(fd: fd, udata: udata) newEvents.incl(Event.Write) - if not isNil(adata.reader.function): newEvents.incl(Event.Read) + if not(isNil(adata.reader.function)): + newEvents.incl(Event.Read) do: raise newException(ValueError, "File descriptor not registered.") loop.selector.updateHandle(int(fd), newEvents) @@ -606,7 +613,8 @@ elif unixPlatform: # We need to clear `writer` data, because `selectors` don't do it adata.writer.function = nil # adata.wdata = CompletionData() - if not isNil(adata.reader.function): newEvents.incl(Event.Read) + if not(isNil(adata.reader.function)): + newEvents.incl(Event.Read) do: raise newException(ValueError, "File descriptor not registered.") loop.selector.updateHandle(int(fd), newEvents) @@ -629,10 +637,10 @@ elif unixPlatform: withData(loop.selector, int(fd), adata) do: # We are scheduling reader and writer callbacks to be called # explicitly, so they can get an error and continue work. - if not isNil(adata.reader.function): + if not(isNil(adata.reader.function)): if not adata.reader.deleted: loop.callbacks.addLast(adata.reader) - if not isNil(adata.writer.function): + if not(isNil(adata.writer.function)): if not adata.writer.deleted: loop.callbacks.addLast(adata.writer) # Mark callbacks as deleted, we don't need to get REAL notifications @@ -725,7 +733,19 @@ else: proc initAPI() = discard proc globalInit() = discard -proc addTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) = +proc setTimer(at: Moment, cb: CallbackFunc, udata: pointer = nil): TimerCallback = + ## Arrange for the callback ``cb`` to be called at the given absolute + ## timestamp ``at``. You can also pass ``udata`` to callback. + let loop = getGlobalDispatcher() + result = TimerCallback(finishAt: at, + function: AsyncCallback(function: cb, udata: udata)) + loop.timers.push(result) + +proc clearTimer(timer: TimerCallback) {.inline.} = + timer.deleted = true + +proc addTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) + {.deprecated: "Use setTimer/clearTimer instead".} = ## Arrange for the callback ``cb`` to be called at the given absolute ## timestamp ``at``. You can also pass ``udata`` to callback. let loop = getGlobalDispatcher() @@ -770,6 +790,7 @@ proc sleepAsync*(duration: Duration): Future[void] = ## ``duration`` time. var retFuture = newFuture[void]("chronos.sleepAsync(Duration)") let moment = Moment.fromNow(duration) + var timer: TimerCallback proc completion(data: pointer) {.gcsafe.} = if not(retFuture.finished()): @@ -777,10 +798,10 @@ proc sleepAsync*(duration: Duration): Future[void] = proc cancel(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): - removeTimer(moment, completion, cast[pointer](retFuture)) + clearTimer(timer) retFuture.cancelCallback = cancel - addTimer(moment, completion, cast[pointer](retFuture)) + timer = setTimer(moment, completion, cast[pointer](retFuture)) return retFuture proc sleepAsync*(ms: int): Future[void] {. @@ -796,7 +817,7 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] = ## future will hold false. var retFuture = newFuture[bool]("chronos.`withTimeout`") var moment: Moment - var timerPresent = false + var timer: TimerCallback proc continuation(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): @@ -807,14 +828,14 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] = retFuture.complete(false) else: # Future `fut` completed/failed/cancelled first. - if timerPresent: - removeTimer(moment, continuation, nil) + if not isNil(timer): + clearTimer(timer) retFuture.complete(true) proc cancel(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): - if timerPresent: - removeTimer(moment, continuation, nil) + if not isNil(timer): + clearTimer(timer) if not(fut.finished()): fut.removeCallback(continuation) fut.cancel() @@ -828,10 +849,9 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] = retFuture.cancelCallback = cancel fut.addCallback(continuation) else: - timerPresent = true moment = Moment.fromNow(timeout) retFuture.cancelCallback = cancel - addTimer(moment, continuation, nil) + timer = setTimer(moment, continuation, nil) fut.addCallback(continuation) return retFuture @@ -851,7 +871,7 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = ## should return, because it can't be cancelled too. var retFuture = newFuture[T]("chronos.wait()") var moment: Moment - var timerPresent = false + var timer: TimerCallback proc continuation(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): @@ -862,8 +882,8 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!")) else: # Future `fut` completed/failed/cancelled first. - if timerPresent: - removeTimer(moment, continuation, nil) + if not isNil(timer): + clearTimer(timer) if fut.failed(): retFuture.fail(fut.error) @@ -875,8 +895,8 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = proc cancel(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): - if timerPresent: - removeTimer(moment, continuation, nil) + if not isNil(timer): + clearTimer(timer) if not(fut.finished()): fut.removeCallback(continuation) fut.cancel() @@ -896,10 +916,9 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = retFuture.cancelCallback = cancel fut.addCallback(continuation) else: - timerPresent = true moment = Moment.fromNow(timeout) retFuture.cancelCallback = cancel - addTimer(moment, continuation, nil) + timer = setTimer(moment, continuation, nil) fut.addCallback(continuation) return retFuture