From 546cc36d79d0ed104b95304922cf331fd0ca2f4b Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 24 Dec 2019 13:23:45 -0600 Subject: [PATCH 1/8] fix: avoid completing future twise --- chronos/asyncloop.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index 68cd541..6fee704 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -855,7 +855,7 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = proc continuation(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): - if isNil(udata): + if not(fut.finished()): # Timer exceeded first. fut.removeCallback(continuation) fut.cancel() From b9c534724c05b4891f6cd7d331d2cc160016e8c1 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 6 Jan 2020 23:26:18 -0600 Subject: [PATCH 2/8] don't scan timers heap on each remove --- chronos/asyncloop.nim | 88 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 75 insertions(+), 13 deletions(-) diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index 6fee704..7f10a9a 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -11,7 +11,7 @@ include "system/inclrtl" import os, tables, strutils, heapqueue, lists, options, nativesockets, net, - deques + deques, sequtils import timer export Port, SocketFlag @@ -190,9 +190,12 @@ type AsyncTimeoutError* = object of AsyncError ## Timeout exception - TimerCallback* = object + TimerHandle* = distinct Natural + + TimerCallback* = ref object finishAt*: Moment function*: AsyncCallback + removed: bool TrackerBase* = ref object of RootRef id*: string @@ -203,6 +206,7 @@ type timers*: HeapQueue[TimerCallback] callbacks*: Deque[AsyncCallback] trackers*: Table[string, TrackerBase] + timerHandles: seq[TimerCallback] proc `<`(a, b: TimerCallback): bool = result = a.finishAt < b.finishAt @@ -227,14 +231,27 @@ func getAsyncTimestamp*(a: Duration): auto {.inline.} = result = cast[int32](res) result += min(1, cast[int32](mid)) +template removeTimers(loop) = + var timers = cast[seq[TimerCallback]](loop.timers) + timers.keepItIf( not(it.removed) ) + loop.timers = cast[HeapQueue[TimerCallback]](timers) + template processTimersGetTimeout(loop, timeout: untyped) = var count = len(loop.timers) if count > 0: var lastFinish = curTime while count > 0: + if loop.timers.len < 1: + break + + if loop.timers[0].removed: + discard loop.timers.pop() + continue + lastFinish = loop.timers[0].finishAt if curTime < lastFinish: break + loop.callbacks.addLast(loop.timers.pop().function) dec(count) if count > 0: @@ -255,6 +272,13 @@ template processTimers(loop: untyped) = var count = len(loop.timers) if count > 0: while count > 0: + if loop.timers.len < 1: + break + + if loop.timers[0].removed: + discard loop.timers.pop() + continue + if curTime < loop.timers[0].finishAt: break loop.callbacks.addLast(loop.timers.pop().function) @@ -721,11 +745,44 @@ elif unixPlatform: # poll() call. loop.processCallbacks() + # Cleanup canceled timers + loop.removeTimers() + else: proc initAPI() = discard proc globalInit() = discard -proc addTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) = +proc setTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil): TimerHandle = + ## Arrange for the callback ``cb`` to be called at the given absolute + ## timestamp ``at``. You can also pass ``udata`` to callback. + let loop = getGlobalDispatcher() + + var timer = TimerCallback(finishAt: at, + function: AsyncCallback(function: cb, udata: udata)) + loop.timerHandles.add(timer) + result = TimerHandle(loop.timerHandles.len - 1) + + # try to optimize for the case where the top + # element is already removed and just replace it + if loop.timers.len > 0 and loop.timers[0].removed: + discard loop.timers.replace(timer) + else: + loop.timers.push(timer) + +proc clearTimer*(handle: TimerHandle) = + var loop = getGlobalDispatcher() + var timer = loop.timerHandles[handle.Natural] + loop.timerHandles[handle.Natural] = nil + + # optimize for handle being at the top + if timer == loop.timers[0]: + discard loop.timers.pop() + else: + timer.removed = true + +proc addTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) {. + deprecated: "Use setTimer/clearTimer pairs" + .} = ## Arrange for the callback ``cb`` to be called at the given absolute ## timestamp ``at``. You can also pass ``udata`` to callback. let loop = getGlobalDispatcher() @@ -770,6 +827,7 @@ proc sleepAsync*(duration: Duration): Future[void] = ## ``duration`` time. var retFuture = newFuture[void]("chronos.sleepAsync(Duration)") let moment = Moment.fromNow(duration) + var timer: TimerHandle proc completion(data: pointer) {.gcsafe.} = if not(retFuture.finished()): @@ -777,10 +835,10 @@ proc sleepAsync*(duration: Duration): Future[void] = proc cancel(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): - removeTimer(moment, completion, cast[pointer](retFuture)) + clearTimer(timer) retFuture.cancelCallback = cancel - addTimer(moment, completion, cast[pointer](retFuture)) + timer = setTimer(moment, completion, cast[pointer](retFuture)) return retFuture proc sleepAsync*(ms: int): Future[void] {. @@ -797,6 +855,7 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] = var retFuture = newFuture[bool]("chronos.`withTimeout`") var moment: Moment var timerPresent = false + var timer: TimerHandle proc continuation(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): @@ -808,13 +867,13 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] = else: # Future `fut` completed/failed/cancelled first. if timerPresent: - removeTimer(moment, continuation, nil) + clearTimer(timer) retFuture.complete(true) proc cancel(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): if timerPresent: - removeTimer(moment, continuation, nil) + clearTimer(timer) if not(fut.finished()): fut.removeCallback(continuation) fut.cancel() @@ -831,7 +890,7 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] = timerPresent = true moment = Moment.fromNow(timeout) retFuture.cancelCallback = cancel - addTimer(moment, continuation, nil) + timer = setTimer(moment, continuation, nil) fut.addCallback(continuation) return retFuture @@ -851,7 +910,8 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = ## should return, because it can't be cancelled too. var retFuture = newFuture[T]("chronos.wait()") var moment: Moment - var timerPresent = false + var timerPresent: bool + var timer: TimerHandle proc continuation(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): @@ -859,11 +919,13 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = # Timer exceeded first. fut.removeCallback(continuation) fut.cancel() + if timerPresent: + clearTimer(timer) retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!")) else: # Future `fut` completed/failed/cancelled first. if timerPresent: - removeTimer(moment, continuation, nil) + clearTimer(timer) if fut.failed(): retFuture.fail(fut.error) @@ -876,7 +938,7 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = proc cancel(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): if timerPresent: - removeTimer(moment, continuation, nil) + clearTimer(timer) if not(fut.finished()): fut.removeCallback(continuation) fut.cancel() @@ -896,10 +958,10 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = retFuture.cancelCallback = cancel fut.addCallback(continuation) else: - timerPresent = true moment = Moment.fromNow(timeout) retFuture.cancelCallback = cancel - addTimer(moment, continuation, nil) + timer = setTimer(moment, continuation, nil) + timerPresent = true fut.addCallback(continuation) return retFuture From d979770263e96980cfa0dbcecca13a6caf453c0c Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 7 Jan 2020 10:22:38 -0600 Subject: [PATCH 3/8] clear timers under windown as well --- chronos/asyncloop.nim | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index 7f10a9a..92bc2c7 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -472,6 +472,9 @@ when defined(windows) or defined(nimdoc): # poll() call. loop.processCallbacks() + # Cleanup canceled timers + loop.removeTimers() + proc closeSocket*(fd: AsyncFD, aftercb: CallbackFunc = nil) = ## Closes a socket and ensures that it is unregistered. let loop = getGlobalDispatcher() @@ -755,6 +758,8 @@ else: proc setTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil): TimerHandle = ## Arrange for the callback ``cb`` to be called at the given absolute ## timestamp ``at``. You can also pass ``udata`` to callback. + ## Returns an opaque handle to be passed to ``clearTimer()`` to cancel + ## a pending timer. let loop = getGlobalDispatcher() var timer = TimerCallback(finishAt: at, @@ -770,6 +775,7 @@ proc setTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil): TimerHandle loop.timers.push(timer) proc clearTimer*(handle: TimerHandle) = + ## Clear the timer associated with the passed handle var loop = getGlobalDispatcher() var timer = loop.timerHandles[handle.Natural] loop.timerHandles[handle.Natural] = nil From d8dd09a35839dd97f2ea988cf629974ae2c97d3d Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 7 Jan 2020 19:06:27 -0600 Subject: [PATCH 4/8] don't loop over timers heap --- chronos/asyncloop.nim | 109 ++++++++++++++---------------------------- 1 file changed, 36 insertions(+), 73 deletions(-) diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index 92bc2c7..f39c62a 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -11,7 +11,7 @@ include "system/inclrtl" import os, tables, strutils, heapqueue, lists, options, nativesockets, net, - deques, sequtils + deques import timer export Port, SocketFlag @@ -180,7 +180,7 @@ type CallbackFunc* = proc (arg: pointer = nil) {.gcsafe.} CallSoonProc* = proc (c: CallbackFunc, u: pointer = nil) {.gcsafe.} - AsyncCallback* = object + AsyncCallback* = ref object function*: CallbackFunc udata*: pointer deleted*: bool @@ -190,12 +190,10 @@ type AsyncTimeoutError* = object of AsyncError ## Timeout exception - TimerHandle* = distinct Natural - TimerCallback* = ref object finishAt*: Moment function*: AsyncCallback - removed: bool + deleted*: bool TrackerBase* = ref object of RootRef id*: string @@ -206,7 +204,6 @@ type timers*: HeapQueue[TimerCallback] callbacks*: Deque[AsyncCallback] trackers*: Table[string, TrackerBase] - timerHandles: seq[TimerCallback] proc `<`(a, b: TimerCallback): bool = result = a.finishAt < b.finishAt @@ -231,20 +228,15 @@ func getAsyncTimestamp*(a: Duration): auto {.inline.} = result = cast[int32](res) result += min(1, cast[int32](mid)) -template removeTimers(loop) = - var timers = cast[seq[TimerCallback]](loop.timers) - timers.keepItIf( not(it.removed) ) - loop.timers = cast[HeapQueue[TimerCallback]](timers) - template processTimersGetTimeout(loop, timeout: untyped) = var count = len(loop.timers) if count > 0: var lastFinish = curTime while count > 0: - if loop.timers.len < 1: + if not(loop.timers.len > 0): break - if loop.timers[0].removed: + if loop.timers[0].deleted: discard loop.timers.pop() continue @@ -272,10 +264,10 @@ template processTimers(loop: untyped) = var count = len(loop.timers) if count > 0: while count > 0: - if loop.timers.len < 1: + if not(loop.timers.len > 0): break - if loop.timers[0].removed: + if loop.timers[0].deleted: discard loop.timers.pop() continue @@ -472,9 +464,6 @@ when defined(windows) or defined(nimdoc): # poll() call. loop.processCallbacks() - # Cleanup canceled timers - loop.removeTimers() - proc closeSocket*(fd: AsyncFD, aftercb: CallbackFunc = nil) = ## Closes a socket and ensures that it is unregistered. let loop = getGlobalDispatcher() @@ -592,7 +581,8 @@ elif unixPlatform: adata.reader = acb adata.rdata = CompletionData(fd: fd, udata: udata) newEvents.incl(Event.Read) - if not isNil(adata.writer.function): newEvents.incl(Event.Write) + if not(isNil(adata.writer)) and not(isNil(adata.writer.function)): + newEvents.incl(Event.Write) do: raise newException(ValueError, "File descriptor not registered.") loop.selector.updateHandle(int(fd), newEvents) @@ -605,7 +595,8 @@ elif unixPlatform: # We need to clear `reader` data, because `selectors` don't do it adata.reader.function = nil # adata.rdata = CompletionData() - if not isNil(adata.writer.function): newEvents.incl(Event.Write) + if not(isNil(adata.writer)) and not(isNil(adata.writer.function)): + newEvents.incl(Event.Write) do: raise newException(ValueError, "File descriptor not registered.") loop.selector.updateHandle(int(fd), newEvents) @@ -620,7 +611,8 @@ elif unixPlatform: adata.writer = acb adata.wdata = CompletionData(fd: fd, udata: udata) newEvents.incl(Event.Write) - if not isNil(adata.reader.function): newEvents.incl(Event.Read) + if not(isNil(adata.reader)) and not(isNil(adata.reader.function)): + newEvents.incl(Event.Read) do: raise newException(ValueError, "File descriptor not registered.") loop.selector.updateHandle(int(fd), newEvents) @@ -633,7 +625,8 @@ elif unixPlatform: # We need to clear `writer` data, because `selectors` don't do it adata.writer.function = nil # adata.wdata = CompletionData() - if not isNil(adata.reader.function): newEvents.incl(Event.Read) + if not(isNil(adata.reader)) and not(isNil(adata.reader.function)): + newEvents.incl(Event.Read) do: raise newException(ValueError, "File descriptor not registered.") loop.selector.updateHandle(int(fd), newEvents) @@ -656,16 +649,16 @@ elif unixPlatform: withData(loop.selector, int(fd), adata) do: # We are scheduling reader and writer callbacks to be called # explicitly, so they can get an error and continue work. - if not isNil(adata.reader.function): + if not(isNil(adata.reader)) and not(isNil(adata.reader.function)): if not adata.reader.deleted: loop.callbacks.addLast(adata.reader) - if not isNil(adata.writer.function): + if not(isNil(adata.writer)) and not(isNil(adata.writer.function)): if not adata.writer.deleted: loop.callbacks.addLast(adata.writer) # Mark callbacks as deleted, we don't need to get REAL notifications # from system queue for this reader and writer. - adata.reader.deleted = true - adata.writer.deleted = true + if not isNil(adata.reader): adata.reader.deleted = true + if not isNil(adata.writer): adata.writer.deleted = true # We can't unregister file descriptor from system queue here, because # in such case processing queue will stuck on poll() call, because there @@ -748,47 +741,23 @@ elif unixPlatform: # poll() call. loop.processCallbacks() - # Cleanup canceled timers - loop.removeTimers() - else: proc initAPI() = discard proc globalInit() = discard -proc setTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil): TimerHandle = +proc setTimer(at: Moment, cb: CallbackFunc, udata: pointer = nil): TimerCallback = ## Arrange for the callback ``cb`` to be called at the given absolute ## timestamp ``at``. You can also pass ``udata`` to callback. - ## Returns an opaque handle to be passed to ``clearTimer()`` to cancel - ## a pending timer. let loop = getGlobalDispatcher() + result = TimerCallback(finishAt: at, + function: AsyncCallback(function: cb, udata: udata)) + loop.timers.push(result) - var timer = TimerCallback(finishAt: at, - function: AsyncCallback(function: cb, udata: udata)) - loop.timerHandles.add(timer) - result = TimerHandle(loop.timerHandles.len - 1) +proc clearTimer(timer: TimerCallback) = + timer.deleted = true - # try to optimize for the case where the top - # element is already removed and just replace it - if loop.timers.len > 0 and loop.timers[0].removed: - discard loop.timers.replace(timer) - else: - loop.timers.push(timer) - -proc clearTimer*(handle: TimerHandle) = - ## Clear the timer associated with the passed handle - var loop = getGlobalDispatcher() - var timer = loop.timerHandles[handle.Natural] - loop.timerHandles[handle.Natural] = nil - - # optimize for handle being at the top - if timer == loop.timers[0]: - discard loop.timers.pop() - else: - timer.removed = true - -proc addTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) {. - deprecated: "Use setTimer/clearTimer pairs" - .} = +proc addTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) + {.deprecated: "Use setTimer/clearTimer instead".} = ## Arrange for the callback ``cb`` to be called at the given absolute ## timestamp ``at``. You can also pass ``udata`` to callback. let loop = getGlobalDispatcher() @@ -833,7 +802,7 @@ proc sleepAsync*(duration: Duration): Future[void] = ## ``duration`` time. var retFuture = newFuture[void]("chronos.sleepAsync(Duration)") let moment = Moment.fromNow(duration) - var timer: TimerHandle + var timer: TimerCallback proc completion(data: pointer) {.gcsafe.} = if not(retFuture.finished()): @@ -860,25 +829,24 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] = ## future will hold false. var retFuture = newFuture[bool]("chronos.`withTimeout`") var moment: Moment - var timerPresent = false - var timer: TimerHandle + var timer: TimerCallback proc continuation(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): if isNil(udata): # Timer exceeded first. - fut.removeCallback(continuation) + clearTimer(timer) fut.cancel() retFuture.complete(false) else: # Future `fut` completed/failed/cancelled first. - if timerPresent: + if not isNil(timer): clearTimer(timer) retFuture.complete(true) proc cancel(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): - if timerPresent: + if not isNil(timer): clearTimer(timer) if not(fut.finished()): fut.removeCallback(continuation) @@ -893,7 +861,6 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] = retFuture.cancelCallback = cancel fut.addCallback(continuation) else: - timerPresent = true moment = Moment.fromNow(timeout) retFuture.cancelCallback = cancel timer = setTimer(moment, continuation, nil) @@ -916,21 +883,18 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = ## should return, because it can't be cancelled too. var retFuture = newFuture[T]("chronos.wait()") var moment: Moment - var timerPresent: bool - var timer: TimerHandle + var timer: TimerCallback proc continuation(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): - if not(fut.finished()): + if isNil(udata): # Timer exceeded first. fut.removeCallback(continuation) fut.cancel() - if timerPresent: - clearTimer(timer) retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!")) else: # Future `fut` completed/failed/cancelled first. - if timerPresent: + if not isNil(timer): clearTimer(timer) if fut.failed(): @@ -943,7 +907,7 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = proc cancel(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): - if timerPresent: + if not isNil(timer): clearTimer(timer) if not(fut.finished()): fut.removeCallback(continuation) @@ -967,7 +931,6 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = moment = Moment.fromNow(timeout) retFuture.cancelCallback = cancel timer = setTimer(moment, continuation, nil) - timerPresent = true fut.addCallback(continuation) return retFuture From 78953f8fc307f9fdf5dda275c32bd818f322c9ae Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 8 Jan 2020 10:03:34 -0600 Subject: [PATCH 5/8] consolidating timers loop --- chronos/asyncloop.nim | 48 ++++++++++++++++--------------------------- 1 file changed, 18 insertions(+), 30 deletions(-) diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index f39c62a..ed90f60 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -229,25 +229,19 @@ func getAsyncTimestamp*(a: Duration): auto {.inline.} = result += min(1, cast[int32](mid)) template processTimersGetTimeout(loop, timeout: untyped) = - var count = len(loop.timers) - if count > 0: - var lastFinish = curTime - while count > 0: - if not(loop.timers.len > 0): - break + var lastFinish = curTime + while loop.timers.len > 0: + if loop.timers[0].deleted: + discard loop.timers.pop() + continue - if loop.timers[0].deleted: - discard loop.timers.pop() - continue + lastFinish = loop.timers[0].finishAt + if curTime < lastFinish: + break - lastFinish = loop.timers[0].finishAt - if curTime < lastFinish: - break - - loop.callbacks.addLast(loop.timers.pop().function) - dec(count) - if count > 0: - timeout = (lastFinish - curTime).getAsyncTimestamp() + loop.callbacks.addLast(loop.timers.pop().function) + if loop.timers.len > 0: + timeout = (lastFinish - curTime).getAsyncTimestamp() if timeout == 0: if len(loop.callbacks) == 0: @@ -261,20 +255,14 @@ template processTimersGetTimeout(loop, timeout: untyped) = template processTimers(loop: untyped) = var curTime = Moment.now() - var count = len(loop.timers) - if count > 0: - while count > 0: - if not(loop.timers.len > 0): - break + while loop.timers.len > 0: + if loop.timers[0].deleted: + discard loop.timers.pop() + continue - if loop.timers[0].deleted: - discard loop.timers.pop() - continue - - if curTime < loop.timers[0].finishAt: - break - loop.callbacks.addLast(loop.timers.pop().function) - dec(count) + if curTime < loop.timers[0].finishAt: + break + loop.callbacks.addLast(loop.timers.pop().function) template processCallbacks(loop: untyped) = var count = len(loop.callbacks) From 96e0206c27c1357e2e46fab12c1fd11eee34ebcf Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 8 Jan 2020 11:06:56 -0600 Subject: [PATCH 6/8] reverting AsyncCallback to object --- chronos/asyncloop.nim | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index ed90f60..8c2b099 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -180,7 +180,7 @@ type CallbackFunc* = proc (arg: pointer = nil) {.gcsafe.} CallSoonProc* = proc (c: CallbackFunc, u: pointer = nil) {.gcsafe.} - AsyncCallback* = ref object + AsyncCallback* = object function*: CallbackFunc udata*: pointer deleted*: bool @@ -569,7 +569,7 @@ elif unixPlatform: adata.reader = acb adata.rdata = CompletionData(fd: fd, udata: udata) newEvents.incl(Event.Read) - if not(isNil(adata.writer)) and not(isNil(adata.writer.function)): + if not(isNil(adata.writer.function)): newEvents.incl(Event.Write) do: raise newException(ValueError, "File descriptor not registered.") @@ -583,7 +583,7 @@ elif unixPlatform: # We need to clear `reader` data, because `selectors` don't do it adata.reader.function = nil # adata.rdata = CompletionData() - if not(isNil(adata.writer)) and not(isNil(adata.writer.function)): + if not(isNil(adata.writer.function)): newEvents.incl(Event.Write) do: raise newException(ValueError, "File descriptor not registered.") @@ -599,7 +599,7 @@ elif unixPlatform: adata.writer = acb adata.wdata = CompletionData(fd: fd, udata: udata) newEvents.incl(Event.Write) - if not(isNil(adata.reader)) and not(isNil(adata.reader.function)): + if not(isNil(adata.reader.function)): newEvents.incl(Event.Read) do: raise newException(ValueError, "File descriptor not registered.") @@ -613,7 +613,7 @@ elif unixPlatform: # We need to clear `writer` data, because `selectors` don't do it adata.writer.function = nil # adata.wdata = CompletionData() - if not(isNil(adata.reader)) and not(isNil(adata.reader.function)): + if not(isNil(adata.reader.function)): newEvents.incl(Event.Read) do: raise newException(ValueError, "File descriptor not registered.") @@ -637,16 +637,16 @@ elif unixPlatform: withData(loop.selector, int(fd), adata) do: # We are scheduling reader and writer callbacks to be called # explicitly, so they can get an error and continue work. - if not(isNil(adata.reader)) and not(isNil(adata.reader.function)): + if not(isNil(adata.reader.function)): if not adata.reader.deleted: loop.callbacks.addLast(adata.reader) - if not(isNil(adata.writer)) and not(isNil(adata.writer.function)): + if not(isNil(adata.writer.function)): if not adata.writer.deleted: loop.callbacks.addLast(adata.writer) # Mark callbacks as deleted, we don't need to get REAL notifications # from system queue for this reader and writer. - if not isNil(adata.reader): adata.reader.deleted = true - if not isNil(adata.writer): adata.writer.deleted = true + adata.reader.deleted = true + adata.writer.deleted = true # We can't unregister file descriptor from system queue here, because # in such case processing queue will stuck on poll() call, because there From 0d84f273c9c74974412c50aa5a8ba6d94b97593b Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 8 Jan 2020 11:23:01 -0600 Subject: [PATCH 7/8] make `clearTimer` inline --- chronos/asyncloop.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index 8c2b099..d48f9dc 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -741,7 +741,7 @@ proc setTimer(at: Moment, cb: CallbackFunc, udata: pointer = nil): TimerCallback function: AsyncCallback(function: cb, udata: udata)) loop.timers.push(result) -proc clearTimer(timer: TimerCallback) = +proc clearTimer(timer: TimerCallback) {.inline.} = timer.deleted = true proc addTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) From 1c176c173d5dce30ce7fe24f72572077b5dd6ade Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 8 Jan 2020 19:32:53 -0600 Subject: [PATCH 8/8] fix: restore `removeCallback` --- chronos/asyncloop.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index d48f9dc..b72553e 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -823,7 +823,7 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] = if not(retFuture.finished()): if isNil(udata): # Timer exceeded first. - clearTimer(timer) + fut.removeCallback(continuation) fut.cancel() retFuture.complete(false) else: