diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index 92bc2c78..f39c62a6 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -11,7 +11,7 @@ include "system/inclrtl" import os, tables, strutils, heapqueue, lists, options, nativesockets, net, - deques, sequtils + deques import timer export Port, SocketFlag @@ -180,7 +180,7 @@ type CallbackFunc* = proc (arg: pointer = nil) {.gcsafe.} CallSoonProc* = proc (c: CallbackFunc, u: pointer = nil) {.gcsafe.} - AsyncCallback* = object + AsyncCallback* = ref object function*: CallbackFunc udata*: pointer deleted*: bool @@ -190,12 +190,10 @@ type AsyncTimeoutError* = object of AsyncError ## Timeout exception - TimerHandle* = distinct Natural - TimerCallback* = ref object finishAt*: Moment function*: AsyncCallback - removed: bool + deleted*: bool TrackerBase* = ref object of RootRef id*: string @@ -206,7 +204,6 @@ type timers*: HeapQueue[TimerCallback] callbacks*: Deque[AsyncCallback] trackers*: Table[string, TrackerBase] - timerHandles: seq[TimerCallback] proc `<`(a, b: TimerCallback): bool = result = a.finishAt < b.finishAt @@ -231,20 +228,15 @@ func getAsyncTimestamp*(a: Duration): auto {.inline.} = result = cast[int32](res) result += min(1, cast[int32](mid)) -template removeTimers(loop) = - var timers = cast[seq[TimerCallback]](loop.timers) - timers.keepItIf( not(it.removed) ) - loop.timers = cast[HeapQueue[TimerCallback]](timers) - template processTimersGetTimeout(loop, timeout: untyped) = var count = len(loop.timers) if count > 0: var lastFinish = curTime while count > 0: - if loop.timers.len < 1: + if not(loop.timers.len > 0): break - if loop.timers[0].removed: + if loop.timers[0].deleted: discard loop.timers.pop() continue @@ -272,10 +264,10 @@ template processTimers(loop: untyped) = var count = len(loop.timers) if count > 0: while count > 0: - if loop.timers.len < 1: + if not(loop.timers.len > 0): break - if loop.timers[0].removed: + if loop.timers[0].deleted: discard loop.timers.pop() continue @@ -472,9 +464,6 @@ when defined(windows) or defined(nimdoc): # poll() call. loop.processCallbacks() - # Cleanup canceled timers - loop.removeTimers() - proc closeSocket*(fd: AsyncFD, aftercb: CallbackFunc = nil) = ## Closes a socket and ensures that it is unregistered. let loop = getGlobalDispatcher() @@ -592,7 +581,8 @@ elif unixPlatform: adata.reader = acb adata.rdata = CompletionData(fd: fd, udata: udata) newEvents.incl(Event.Read) - if not isNil(adata.writer.function): newEvents.incl(Event.Write) + if not(isNil(adata.writer)) and not(isNil(adata.writer.function)): + newEvents.incl(Event.Write) do: raise newException(ValueError, "File descriptor not registered.") loop.selector.updateHandle(int(fd), newEvents) @@ -605,7 +595,8 @@ elif unixPlatform: # We need to clear `reader` data, because `selectors` don't do it adata.reader.function = nil # adata.rdata = CompletionData() - if not isNil(adata.writer.function): newEvents.incl(Event.Write) + if not(isNil(adata.writer)) and not(isNil(adata.writer.function)): + newEvents.incl(Event.Write) do: raise newException(ValueError, "File descriptor not registered.") loop.selector.updateHandle(int(fd), newEvents) @@ -620,7 +611,8 @@ elif unixPlatform: adata.writer = acb adata.wdata = CompletionData(fd: fd, udata: udata) newEvents.incl(Event.Write) - if not isNil(adata.reader.function): newEvents.incl(Event.Read) + if not(isNil(adata.reader)) and not(isNil(adata.reader.function)): + newEvents.incl(Event.Read) do: raise newException(ValueError, "File descriptor not registered.") loop.selector.updateHandle(int(fd), newEvents) @@ -633,7 +625,8 @@ elif unixPlatform: # We need to clear `writer` data, because `selectors` don't do it adata.writer.function = nil # adata.wdata = CompletionData() - if not isNil(adata.reader.function): newEvents.incl(Event.Read) + if not(isNil(adata.reader)) and not(isNil(adata.reader.function)): + newEvents.incl(Event.Read) do: raise newException(ValueError, "File descriptor not registered.") loop.selector.updateHandle(int(fd), newEvents) @@ -656,16 +649,16 @@ elif unixPlatform: withData(loop.selector, int(fd), adata) do: # We are scheduling reader and writer callbacks to be called # explicitly, so they can get an error and continue work. - if not isNil(adata.reader.function): + if not(isNil(adata.reader)) and not(isNil(adata.reader.function)): if not adata.reader.deleted: loop.callbacks.addLast(adata.reader) - if not isNil(adata.writer.function): + if not(isNil(adata.writer)) and not(isNil(adata.writer.function)): if not adata.writer.deleted: loop.callbacks.addLast(adata.writer) # Mark callbacks as deleted, we don't need to get REAL notifications # from system queue for this reader and writer. - adata.reader.deleted = true - adata.writer.deleted = true + if not isNil(adata.reader): adata.reader.deleted = true + if not isNil(adata.writer): adata.writer.deleted = true # We can't unregister file descriptor from system queue here, because # in such case processing queue will stuck on poll() call, because there @@ -748,47 +741,23 @@ elif unixPlatform: # poll() call. loop.processCallbacks() - # Cleanup canceled timers - loop.removeTimers() - else: proc initAPI() = discard proc globalInit() = discard -proc setTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil): TimerHandle = +proc setTimer(at: Moment, cb: CallbackFunc, udata: pointer = nil): TimerCallback = ## Arrange for the callback ``cb`` to be called at the given absolute ## timestamp ``at``. You can also pass ``udata`` to callback. - ## Returns an opaque handle to be passed to ``clearTimer()`` to cancel - ## a pending timer. let loop = getGlobalDispatcher() + result = TimerCallback(finishAt: at, + function: AsyncCallback(function: cb, udata: udata)) + loop.timers.push(result) - var timer = TimerCallback(finishAt: at, - function: AsyncCallback(function: cb, udata: udata)) - loop.timerHandles.add(timer) - result = TimerHandle(loop.timerHandles.len - 1) +proc clearTimer(timer: TimerCallback) = + timer.deleted = true - # try to optimize for the case where the top - # element is already removed and just replace it - if loop.timers.len > 0 and loop.timers[0].removed: - discard loop.timers.replace(timer) - else: - loop.timers.push(timer) - -proc clearTimer*(handle: TimerHandle) = - ## Clear the timer associated with the passed handle - var loop = getGlobalDispatcher() - var timer = loop.timerHandles[handle.Natural] - loop.timerHandles[handle.Natural] = nil - - # optimize for handle being at the top - if timer == loop.timers[0]: - discard loop.timers.pop() - else: - timer.removed = true - -proc addTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) {. - deprecated: "Use setTimer/clearTimer pairs" - .} = +proc addTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) + {.deprecated: "Use setTimer/clearTimer instead".} = ## Arrange for the callback ``cb`` to be called at the given absolute ## timestamp ``at``. You can also pass ``udata`` to callback. let loop = getGlobalDispatcher() @@ -833,7 +802,7 @@ proc sleepAsync*(duration: Duration): Future[void] = ## ``duration`` time. var retFuture = newFuture[void]("chronos.sleepAsync(Duration)") let moment = Moment.fromNow(duration) - var timer: TimerHandle + var timer: TimerCallback proc completion(data: pointer) {.gcsafe.} = if not(retFuture.finished()): @@ -860,25 +829,24 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] = ## future will hold false. var retFuture = newFuture[bool]("chronos.`withTimeout`") var moment: Moment - var timerPresent = false - var timer: TimerHandle + var timer: TimerCallback proc continuation(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): if isNil(udata): # Timer exceeded first. - fut.removeCallback(continuation) + clearTimer(timer) fut.cancel() retFuture.complete(false) else: # Future `fut` completed/failed/cancelled first. - if timerPresent: + if not isNil(timer): clearTimer(timer) retFuture.complete(true) proc cancel(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): - if timerPresent: + if not isNil(timer): clearTimer(timer) if not(fut.finished()): fut.removeCallback(continuation) @@ -893,7 +861,6 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] = retFuture.cancelCallback = cancel fut.addCallback(continuation) else: - timerPresent = true moment = Moment.fromNow(timeout) retFuture.cancelCallback = cancel timer = setTimer(moment, continuation, nil) @@ -916,21 +883,18 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = ## should return, because it can't be cancelled too. var retFuture = newFuture[T]("chronos.wait()") var moment: Moment - var timerPresent: bool - var timer: TimerHandle + var timer: TimerCallback proc continuation(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): - if not(fut.finished()): + if isNil(udata): # Timer exceeded first. fut.removeCallback(continuation) fut.cancel() - if timerPresent: - clearTimer(timer) retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!")) else: # Future `fut` completed/failed/cancelled first. - if timerPresent: + if not isNil(timer): clearTimer(timer) if fut.failed(): @@ -943,7 +907,7 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = proc cancel(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): - if timerPresent: + if not isNil(timer): clearTimer(timer) if not(fut.finished()): fut.removeCallback(continuation) @@ -967,7 +931,6 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = moment = Moment.fromNow(timeout) retFuture.cancelCallback = cancel timer = setTimer(moment, continuation, nil) - timerPresent = true fut.addCallback(continuation) return retFuture