mirror of
https://github.com/status-im/nim-chronos.git
synced 2025-02-20 23:18:22 +00:00
Merge pull request #63 from status-im/fix-wait-timeout
Optimize timers processing
This commit is contained in:
commit
f02e748f18
@ -190,9 +190,10 @@ type
|
||||
AsyncTimeoutError* = object of AsyncError
|
||||
## Timeout exception
|
||||
|
||||
TimerCallback* = object
|
||||
TimerCallback* = ref object
|
||||
finishAt*: Moment
|
||||
function*: AsyncCallback
|
||||
deleted*: bool
|
||||
|
||||
TrackerBase* = ref object of RootRef
|
||||
id*: string
|
||||
@ -228,17 +229,19 @@ func getAsyncTimestamp*(a: Duration): auto {.inline.} =
|
||||
result += min(1, cast[int32](mid))
|
||||
|
||||
template processTimersGetTimeout(loop, timeout: untyped) =
|
||||
var count = len(loop.timers)
|
||||
if count > 0:
|
||||
var lastFinish = curTime
|
||||
while count > 0:
|
||||
lastFinish = loop.timers[0].finishAt
|
||||
if curTime < lastFinish:
|
||||
break
|
||||
loop.callbacks.addLast(loop.timers.pop().function)
|
||||
dec(count)
|
||||
if count > 0:
|
||||
timeout = (lastFinish - curTime).getAsyncTimestamp()
|
||||
var lastFinish = curTime
|
||||
while loop.timers.len > 0:
|
||||
if loop.timers[0].deleted:
|
||||
discard loop.timers.pop()
|
||||
continue
|
||||
|
||||
lastFinish = loop.timers[0].finishAt
|
||||
if curTime < lastFinish:
|
||||
break
|
||||
|
||||
loop.callbacks.addLast(loop.timers.pop().function)
|
||||
if loop.timers.len > 0:
|
||||
timeout = (lastFinish - curTime).getAsyncTimestamp()
|
||||
|
||||
if timeout == 0:
|
||||
if len(loop.callbacks) == 0:
|
||||
@ -252,13 +255,14 @@ template processTimersGetTimeout(loop, timeout: untyped) =
|
||||
|
||||
template processTimers(loop: untyped) =
|
||||
var curTime = Moment.now()
|
||||
var count = len(loop.timers)
|
||||
if count > 0:
|
||||
while count > 0:
|
||||
if curTime < loop.timers[0].finishAt:
|
||||
break
|
||||
loop.callbacks.addLast(loop.timers.pop().function)
|
||||
dec(count)
|
||||
while loop.timers.len > 0:
|
||||
if loop.timers[0].deleted:
|
||||
discard loop.timers.pop()
|
||||
continue
|
||||
|
||||
if curTime < loop.timers[0].finishAt:
|
||||
break
|
||||
loop.callbacks.addLast(loop.timers.pop().function)
|
||||
|
||||
template processCallbacks(loop: untyped) =
|
||||
var count = len(loop.callbacks)
|
||||
@ -565,7 +569,8 @@ elif unixPlatform:
|
||||
adata.reader = acb
|
||||
adata.rdata = CompletionData(fd: fd, udata: udata)
|
||||
newEvents.incl(Event.Read)
|
||||
if not isNil(adata.writer.function): newEvents.incl(Event.Write)
|
||||
if not(isNil(adata.writer.function)):
|
||||
newEvents.incl(Event.Write)
|
||||
do:
|
||||
raise newException(ValueError, "File descriptor not registered.")
|
||||
loop.selector.updateHandle(int(fd), newEvents)
|
||||
@ -578,7 +583,8 @@ elif unixPlatform:
|
||||
# We need to clear `reader` data, because `selectors` don't do it
|
||||
adata.reader.function = nil
|
||||
# adata.rdata = CompletionData()
|
||||
if not isNil(adata.writer.function): newEvents.incl(Event.Write)
|
||||
if not(isNil(adata.writer.function)):
|
||||
newEvents.incl(Event.Write)
|
||||
do:
|
||||
raise newException(ValueError, "File descriptor not registered.")
|
||||
loop.selector.updateHandle(int(fd), newEvents)
|
||||
@ -593,7 +599,8 @@ elif unixPlatform:
|
||||
adata.writer = acb
|
||||
adata.wdata = CompletionData(fd: fd, udata: udata)
|
||||
newEvents.incl(Event.Write)
|
||||
if not isNil(adata.reader.function): newEvents.incl(Event.Read)
|
||||
if not(isNil(adata.reader.function)):
|
||||
newEvents.incl(Event.Read)
|
||||
do:
|
||||
raise newException(ValueError, "File descriptor not registered.")
|
||||
loop.selector.updateHandle(int(fd), newEvents)
|
||||
@ -606,7 +613,8 @@ elif unixPlatform:
|
||||
# We need to clear `writer` data, because `selectors` don't do it
|
||||
adata.writer.function = nil
|
||||
# adata.wdata = CompletionData()
|
||||
if not isNil(adata.reader.function): newEvents.incl(Event.Read)
|
||||
if not(isNil(adata.reader.function)):
|
||||
newEvents.incl(Event.Read)
|
||||
do:
|
||||
raise newException(ValueError, "File descriptor not registered.")
|
||||
loop.selector.updateHandle(int(fd), newEvents)
|
||||
@ -629,10 +637,10 @@ elif unixPlatform:
|
||||
withData(loop.selector, int(fd), adata) do:
|
||||
# We are scheduling reader and writer callbacks to be called
|
||||
# explicitly, so they can get an error and continue work.
|
||||
if not isNil(adata.reader.function):
|
||||
if not(isNil(adata.reader.function)):
|
||||
if not adata.reader.deleted:
|
||||
loop.callbacks.addLast(adata.reader)
|
||||
if not isNil(adata.writer.function):
|
||||
if not(isNil(adata.writer.function)):
|
||||
if not adata.writer.deleted:
|
||||
loop.callbacks.addLast(adata.writer)
|
||||
# Mark callbacks as deleted, we don't need to get REAL notifications
|
||||
@ -725,7 +733,19 @@ else:
|
||||
proc initAPI() = discard
|
||||
proc globalInit() = discard
|
||||
|
||||
proc addTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) =
|
||||
proc setTimer(at: Moment, cb: CallbackFunc, udata: pointer = nil): TimerCallback =
|
||||
## Arrange for the callback ``cb`` to be called at the given absolute
|
||||
## timestamp ``at``. You can also pass ``udata`` to callback.
|
||||
let loop = getGlobalDispatcher()
|
||||
result = TimerCallback(finishAt: at,
|
||||
function: AsyncCallback(function: cb, udata: udata))
|
||||
loop.timers.push(result)
|
||||
|
||||
proc clearTimer(timer: TimerCallback) {.inline.} =
|
||||
timer.deleted = true
|
||||
|
||||
proc addTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil)
|
||||
{.deprecated: "Use setTimer/clearTimer instead".} =
|
||||
## Arrange for the callback ``cb`` to be called at the given absolute
|
||||
## timestamp ``at``. You can also pass ``udata`` to callback.
|
||||
let loop = getGlobalDispatcher()
|
||||
@ -770,6 +790,7 @@ proc sleepAsync*(duration: Duration): Future[void] =
|
||||
## ``duration`` time.
|
||||
var retFuture = newFuture[void]("chronos.sleepAsync(Duration)")
|
||||
let moment = Moment.fromNow(duration)
|
||||
var timer: TimerCallback
|
||||
|
||||
proc completion(data: pointer) {.gcsafe.} =
|
||||
if not(retFuture.finished()):
|
||||
@ -777,10 +798,10 @@ proc sleepAsync*(duration: Duration): Future[void] =
|
||||
|
||||
proc cancel(udata: pointer) {.gcsafe.} =
|
||||
if not(retFuture.finished()):
|
||||
removeTimer(moment, completion, cast[pointer](retFuture))
|
||||
clearTimer(timer)
|
||||
|
||||
retFuture.cancelCallback = cancel
|
||||
addTimer(moment, completion, cast[pointer](retFuture))
|
||||
timer = setTimer(moment, completion, cast[pointer](retFuture))
|
||||
return retFuture
|
||||
|
||||
proc sleepAsync*(ms: int): Future[void] {.
|
||||
@ -796,7 +817,7 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] =
|
||||
## future will hold false.
|
||||
var retFuture = newFuture[bool]("chronos.`withTimeout`")
|
||||
var moment: Moment
|
||||
var timerPresent = false
|
||||
var timer: TimerCallback
|
||||
|
||||
proc continuation(udata: pointer) {.gcsafe.} =
|
||||
if not(retFuture.finished()):
|
||||
@ -807,14 +828,14 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] =
|
||||
retFuture.complete(false)
|
||||
else:
|
||||
# Future `fut` completed/failed/cancelled first.
|
||||
if timerPresent:
|
||||
removeTimer(moment, continuation, nil)
|
||||
if not isNil(timer):
|
||||
clearTimer(timer)
|
||||
retFuture.complete(true)
|
||||
|
||||
proc cancel(udata: pointer) {.gcsafe.} =
|
||||
if not(retFuture.finished()):
|
||||
if timerPresent:
|
||||
removeTimer(moment, continuation, nil)
|
||||
if not isNil(timer):
|
||||
clearTimer(timer)
|
||||
if not(fut.finished()):
|
||||
fut.removeCallback(continuation)
|
||||
fut.cancel()
|
||||
@ -828,10 +849,9 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] =
|
||||
retFuture.cancelCallback = cancel
|
||||
fut.addCallback(continuation)
|
||||
else:
|
||||
timerPresent = true
|
||||
moment = Moment.fromNow(timeout)
|
||||
retFuture.cancelCallback = cancel
|
||||
addTimer(moment, continuation, nil)
|
||||
timer = setTimer(moment, continuation, nil)
|
||||
fut.addCallback(continuation)
|
||||
|
||||
return retFuture
|
||||
@ -851,7 +871,7 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] =
|
||||
## should return, because it can't be cancelled too.
|
||||
var retFuture = newFuture[T]("chronos.wait()")
|
||||
var moment: Moment
|
||||
var timerPresent = false
|
||||
var timer: TimerCallback
|
||||
|
||||
proc continuation(udata: pointer) {.gcsafe.} =
|
||||
if not(retFuture.finished()):
|
||||
@ -862,8 +882,8 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] =
|
||||
retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!"))
|
||||
else:
|
||||
# Future `fut` completed/failed/cancelled first.
|
||||
if timerPresent:
|
||||
removeTimer(moment, continuation, nil)
|
||||
if not isNil(timer):
|
||||
clearTimer(timer)
|
||||
|
||||
if fut.failed():
|
||||
retFuture.fail(fut.error)
|
||||
@ -875,8 +895,8 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] =
|
||||
|
||||
proc cancel(udata: pointer) {.gcsafe.} =
|
||||
if not(retFuture.finished()):
|
||||
if timerPresent:
|
||||
removeTimer(moment, continuation, nil)
|
||||
if not isNil(timer):
|
||||
clearTimer(timer)
|
||||
if not(fut.finished()):
|
||||
fut.removeCallback(continuation)
|
||||
fut.cancel()
|
||||
@ -896,10 +916,9 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] =
|
||||
retFuture.cancelCallback = cancel
|
||||
fut.addCallback(continuation)
|
||||
else:
|
||||
timerPresent = true
|
||||
moment = Moment.fromNow(timeout)
|
||||
retFuture.cancelCallback = cancel
|
||||
addTimer(moment, continuation, nil)
|
||||
timer = setTimer(moment, continuation, nil)
|
||||
fut.addCallback(continuation)
|
||||
|
||||
return retFuture
|
||||
|
Loading…
x
Reference in New Issue
Block a user