diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index d553e77f..428a3964 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -206,7 +206,8 @@ type proc `<`(a, b: TimerCallback): bool = result = a.finishAt < b.finishAt -proc callSoon*(cbproc: CallbackFunc, data: pointer = nil) {.gcsafe.} +proc callSoon*(cbproc: CallbackFunc, data: pointer = nil) {. + gcsafe, raises: [Defect].} func getAsyncTimestamp*(a: Duration): auto {.inline.} = ## Return rounded up value of duration with milliseconds resolution. @@ -382,17 +383,12 @@ when defined(windows) or defined(nimdoc): var gDisp{.threadvar.}: PDispatcher ## Global dispatcher - proc setGlobalDispatcher*(disp: PDispatcher) = - ## Set current thread's dispatcher instance to ``disp``. - if not gDisp.isNil: - doAssert gDisp.callbacks.len == 0 - gDisp = disp - - proc getGlobalDispatcher*(): PDispatcher = - ## Returns current thread's dispatcher instance. - if gDisp.isNil: - setGlobalDispatcher(newDispatcher()) - result = gDisp + proc setThreadDispatcher*(disp: PDispatcher) {.gcsafe, raises: [Defect].} + proc getThreadDispatcher*(): PDispatcher {.gcsafe, raises: [Defect].} + proc setGlobalDispatcher*(disp: PDispatcher) {. + gcsafe, deprecated: "Use setThreadDispatcher() instead".} + proc getGlobalDispatcher*(): PDispatcher {. + gcsafe, deprecated: "Use getThreadDispatcher() instead".} proc getIoHandler*(disp: PDispatcher): Handle = ## Returns the underlying IO Completion Port handle (Windows) or selector @@ -401,7 +397,7 @@ when defined(windows) or defined(nimdoc): proc register*(fd: AsyncFD) = ## Register file descriptor ``fd`` in thread's dispatcher. - let loop = getGlobalDispatcher() + let loop = getThreadDispatcher() if createIoCompletionPort(fd.Handle, loop.ioPort, cast[CompletionKey](fd), 1) == 0: raiseOSError(osLastError()) @@ -409,7 +405,7 @@ when defined(windows) or defined(nimdoc): proc poll*() = ## Perform single asynchronous step. - let loop = getGlobalDispatcher() + let loop = getThreadDispatcher() var curTime = Moment.now() var curTimeout = DWORD(0) @@ -452,7 +448,7 @@ when defined(windows) or defined(nimdoc): proc closeSocket*(fd: AsyncFD, aftercb: CallbackFunc = nil) = ## Closes a socket and ensures that it is unregistered. - let loop = getGlobalDispatcher() + let loop = getThreadDispatcher() loop.handles.excl(fd) close(SocketHandle(fd)) if not isNil(aftercb): @@ -461,7 +457,7 @@ when defined(windows) or defined(nimdoc): proc closeHandle*(fd: AsyncFD, aftercb: CallbackFunc = nil) = ## Closes a (pipe/file) handle and ensures that it is unregistered. - let loop = getGlobalDispatcher() + let loop = getThreadDispatcher() loop.handles.excl(fd) discard closeHandle(Handle(fd)) if not isNil(aftercb): @@ -470,7 +466,7 @@ when defined(windows) or defined(nimdoc): proc unregister*(fd: AsyncFD) = ## Unregisters ``fd``. - getGlobalDispatcher().handles.excl(fd) + getThreadDispatcher().handles.excl(fd) proc contains*(disp: PDispatcher, fd: AsyncFD): bool = ## Returns ``true`` if ``fd`` is registered in thread's dispatcher. @@ -525,17 +521,12 @@ elif unixPlatform: var gDisp{.threadvar.}: PDispatcher ## Global dispatcher - proc setGlobalDispatcher*(disp: PDispatcher) = - ## Set current thread's dispatcher instance to ``disp``. - if not gDisp.isNil: - doAssert gDisp.callbacks.len == 0 - gDisp = disp - - proc getGlobalDispatcher*(): PDispatcher = - ## Returns current thread's dispatcher instance. - if gDisp.isNil: - setGlobalDispatcher(newDispatcher()) - result = gDisp + proc setThreadDispatcher*(disp: PDispatcher) {.gcsafe, raises: [Defect].} + proc getThreadDispatcher*(): PDispatcher {.gcsafe, raises: [Defect].} + proc setGlobalDispatcher*(disp: PDispatcher) {. + gcsafe, deprecated: "Use setThreadDispatcher() instead".} + proc getGlobalDispatcher*(): PDispatcher {. + gcsafe, deprecated: "Use getThreadDispatcher() instead".} proc getIoHandler*(disp: PDispatcher): Selector[SelectorData] = ## Returns system specific OS queue. @@ -543,7 +534,7 @@ elif unixPlatform: proc register*(fd: AsyncFD) = ## Register file descriptor ``fd`` in thread's dispatcher. - let loop = getGlobalDispatcher() + let loop = getThreadDispatcher() var data: SelectorData data.rdata.fd = fd data.wdata.fd = fd @@ -551,7 +542,7 @@ elif unixPlatform: proc unregister*(fd: AsyncFD) = ## Unregister file descriptor ``fd`` from thread's dispatcher. - getGlobalDispatcher().selector.unregister(int(fd)) + getThreadDispatcher().selector.unregister(int(fd)) proc contains*(disp: PDispatcher, fd: AsyncFd): bool {.inline.} = ## Returns ``true`` if ``fd`` is registered in thread's dispatcher. @@ -560,7 +551,7 @@ elif unixPlatform: proc addReader*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil) = ## Start watching the file descriptor ``fd`` for read availability and then ## call the callback ``cb`` with specified argument ``udata``. - let loop = getGlobalDispatcher() + let loop = getThreadDispatcher() var newEvents = {Event.Read} withData(loop.selector, int(fd), adata) do: let acb = AsyncCallback(function: cb, udata: addr adata.rdata) @@ -575,7 +566,7 @@ elif unixPlatform: proc removeReader*(fd: AsyncFD) = ## Stop watching the file descriptor ``fd`` for read availability. - let loop = getGlobalDispatcher() + let loop = getThreadDispatcher() var newEvents: set[Event] withData(loop.selector, int(fd), adata) do: # We need to clear `reader` data, because `selectors` don't do it @@ -590,7 +581,7 @@ elif unixPlatform: proc addWriter*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil) = ## Start watching the file descriptor ``fd`` for write availability and then ## call the callback ``cb`` with specified argument ``udata``. - let loop = getGlobalDispatcher() + let loop = getThreadDispatcher() var newEvents = {Event.Write} withData(loop.selector, int(fd), adata) do: let acb = AsyncCallback(function: cb, udata: addr adata.wdata) @@ -605,7 +596,7 @@ elif unixPlatform: proc removeWriter*(fd: AsyncFD) = ## Stop watching the file descriptor ``fd`` for write availability. - let loop = getGlobalDispatcher() + let loop = getThreadDispatcher() var newEvents: set[Event] withData(loop.selector, int(fd), adata) do: # We need to clear `writer` data, because `selectors` don't do it @@ -624,7 +615,7 @@ elif unixPlatform: ## closing socket, while operation pending, socket will be closed as ## soon as all pending operations will be notified. ## You can execute ``aftercb`` before actual socket close operation. - let loop = getGlobalDispatcher() + let loop = getThreadDispatcher() proc continuation(udata: pointer) = if SocketHandle(fd) in loop.selector: @@ -669,7 +660,7 @@ elif unixPlatform: ## callback ``cb`` with specified argument ``udata``. Returns signal ## identifier code, which can be used to remove signal callback ## via ``removeSignal``. - let loop = getGlobalDispatcher() + let loop = getThreadDispatcher() var data: SelectorData result = loop.selector.registerSignal(signal, data) withData(loop.selector, result, adata) do: @@ -681,12 +672,12 @@ elif unixPlatform: proc removeSignal*(sigfd: int) = ## Remove watching signal ``signal``. - let loop = getGlobalDispatcher() + let loop = getThreadDispatcher() loop.selector.unregister(sigfd) proc poll*() = ## Perform single asynchronous step. - let loop = getGlobalDispatcher() + let loop = getThreadDispatcher() var curTime = Moment.now() var curTimeout = 0 @@ -732,11 +723,48 @@ else: proc initAPI() = discard proc globalInit() = discard +proc setThreadDispatcher*(disp: PDispatcher) = + ## Set current thread's dispatcher instance to ``disp``. + if not gDisp.isNil: + doAssert gDisp.callbacks.len == 0 + gDisp = disp + +proc getThreadDispatcher*(): PDispatcher = + ## Returns current thread's dispatcher instance. + template getErrorMessage(exc): string = + "Cannot create thread dispatcher: " & exc.msg + + if gDisp.isNil: + when defined(windows): + let disp = + try: + newDispatcher() + except CatchableError as exc: + raise newException(Defect, getErrorMessage(exc)) + else: + let disp = + try: + newDispatcher() + except IOSelectorsException as exc: + raise newException(Defect, getErrorMessage(exc)) + except CatchableError as exc: + raise newException(Defect, getErrorMessage(exc)) + setThreadDispatcher(disp) + return gDisp + +proc setGlobalDispatcher*(disp: PDispatcher) = + ## Set current thread's dispatcher instance to ``disp``. + setThreadDispatcher(disp) + +proc getGlobalDispatcher*(): PDispatcher = + ## Returns current thread's dispatcher instance. + getThreadDispatcher() + proc setTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil): TimerCallback = ## Arrange for the callback ``cb`` to be called at the given absolute ## timestamp ``at``. You can also pass ``udata`` to callback. - let loop = getGlobalDispatcher() + let loop = getThreadDispatcher() result = TimerCallback(finishAt: at, function: AsyncCallback(function: cb, udata: udata)) loop.timers.push(result) @@ -761,7 +789,7 @@ proc addTimer*(at: uint64, cb: CallbackFunc, udata: pointer = nil) {. proc removeTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) = ## Remove timer callback ``cb`` with absolute timestamp ``at`` from waiting ## queue. - let loop = getGlobalDispatcher() + let loop = getThreadDispatcher() var list = cast[seq[TimerCallback]](loop.timers) var index = -1 for i in 0..