diff --git a/task_runner/asyncloop.nim b/task_runner/asyncloop.nim index 411ffd0..c9c1abd 100644 --- a/task_runner/asyncloop.nim +++ b/task_runner/asyncloop.nim @@ -9,9 +9,9 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import os, selectors +import os -import chronos/asyncloop +import chronos/[asyncloop, selectors2] export asyncloop when defined(windows): @@ -141,22 +141,28 @@ else: var moment: Moment proc handleContinuation(udata: pointer) {.gcsafe.} = - if not(retFuture.finished()): - loop.selector.unregister(event) - if isNil(udata): - retFuture.complete(false) - else: - retFuture.complete(true) + try: + if not(retFuture.finished()): + loop.selector.unregister(event) + if isNil(udata): + retFuture.complete(false) + else: + retFuture.complete(true) + except IOSelectorsException as e: + raise newException(Defect, e.msg) - proc cancel(udata: pointer) {.gcsafe.} = - if not(retFuture.finished()): - loop.selector.unregister(event) - if timeout != InfiniteDuration: - removeTimer(moment, handleContinuation, nil) + proc cancellation(udata: pointer) {.gcsafe.} = + try: + if not(retFuture.finished()): + loop.selector.unregister(event) + if timeout != InfiniteDuration: + removeTimer(moment, handleContinuation, nil) + except IOSelectorsException as e: + raise newException(Defect, e.msg) if timeout != InfiniteDuration: moment = Moment.fromNow(timeout) - addTimer(moment, handleContinuation, nil) + discard setTimer(moment, handleContinuation, nil) let fd = event.getFd() loop.selector.registerEvent(event, data) @@ -170,5 +176,5 @@ else: retFuture.fail(newException(ValueError, "Event descriptor not registered.")) - retFuture.cancelCallback = cancel + retFuture.cancelCallback = cancellation return retFuture diff --git a/task_runner/asyncsync.nim b/task_runner/asyncsync.nim index cface59..10b4c08 100644 --- a/task_runner/asyncsync.nim +++ b/task_runner/asyncsync.nim @@ -11,7 +11,7 @@ # MIT license (LICENSE-MIT) import os -import chronos/[asyncloop, asyncsync, handles, timer] +import chronos/[asyncloop, asyncsync, handles, selectors2, timer] export asyncsync import ./osapi @@ -216,47 +216,53 @@ else: else: let fd = AsyncFD(event.rfd) - proc contiunuation(udata: pointer) {.gcsafe.} = - if not(retFuture.finished()): - var data: uint64 = 0 - if isNil(udata): - removeReader(fd) - retFuture.complete(WaitTimeout) - else: - while true: - if posix.read(cint(fd), addr data, - sizeof(uint64)) != sizeof(uint64): - let err = osLastError() - if cint(err) == posix.EINTR: - # This error happens when interrupt signal was received by - # process so we need to repeat `read` syscall. - continue - elif cint(err) == posix.EAGAIN or - cint(err) == posix.EWOULDBLOCK: - # This error happens when there already pending `read` syscall - # in different thread for this descriptor. This is race - # condition, so to avoid it we will wait for another `read` - # event from system queue. - break + proc contiunuation(udata: pointer) {.gcsafe, raises: [Defect].} = + try: + if not(retFuture.finished()): + var data: uint64 = 0 + if isNil(udata): + removeReader(fd) + retFuture.complete(WaitTimeout) + else: + while true: + if posix.read(cint(fd), addr data, + sizeof(uint64)) != sizeof(uint64): + let err = osLastError() + if cint(err) == posix.EINTR: + # This error happens when interrupt signal was received by + # process so we need to repeat `read` syscall. + continue + elif cint(err) == posix.EAGAIN or + cint(err) == posix.EWOULDBLOCK: + # This error happens when there already pending `read` syscall + # in different thread for this descriptor. This is race + # condition, so to avoid it we will wait for another `read` + # event from system queue. + break + else: + # All other errors + removeReader(fd) + retFuture.complete(WaitFailed) else: - # All other errors removeReader(fd) - retFuture.complete(WaitFailed) - else: - removeReader(fd) - when not(defined(linux)): - when hasThreadSupport: - acquire(event.lock) - event.flag = false - when hasThreadSupport: - release(event.lock) - retFuture.complete(WaitSuccess) - break + when not(defined(linux)): + when hasThreadSupport: + acquire(event.lock) + event.flag = false + when hasThreadSupport: + release(event.lock) + retFuture.complete(WaitSuccess) + break + except IOSelectorsException, ValueError: + raise newException(Defect, getCurrentExceptionMsg()) - proc cancel(udata: pointer) {.gcsafe.} = - if not(retFuture.finished()): - removeTimer(moment, contiunuation, nil) - removeReader(fd) + proc cancellation(udata: pointer) {.gcsafe, raises: [Defect].} = + try: + if not(retFuture.finished()): + removeTimer(moment, contiunuation, nil) + removeReader(fd) + except IOSelectorsException, ValueError: + raise newException(Defect, getCurrentExceptionMsg()) if fd notin loop: register(fd) @@ -265,7 +271,7 @@ else: moment = Moment.fromNow(timeout) addTimer(moment, contiunuation, nil) - retFuture.cancelCallback = cancel + retFuture.cancelCallback = cancellation return retFuture proc waitReady(fd: int, timeout: var Duration): WaitResult {.inline.} = diff --git a/vendor/nim-chronos b/vendor/nim-chronos index c066bfc..aa36b64 160000 --- a/vendor/nim-chronos +++ b/vendor/nim-chronos @@ -1 +1 @@ -Subproject commit c066bfcb16482d82ef5b6fdbac85ec8f7565d56c +Subproject commit aa36b645182fe2e25d442bb5e93b0b06cff897bc