diff --git a/chronos.nim b/chronos.nim index 53e39e61..62db3474 100644 --- a/chronos.nim +++ b/chronos.nim @@ -5,6 +5,5 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import chronos/[asyncloop, asyncfutures2, asyncsync, handles, transport, - timer] -export asyncloop, asyncfutures2, asyncsync, handles, transport, timer +import chronos/[asyncloop, asyncsync, handles, transport, timer] +export asyncloop, asyncsync, handles, transport, timer diff --git a/chronos/asyncfutures2.nim b/chronos/asyncfutures2.nim index a9275510..f9dd1761 100644 --- a/chronos/asyncfutures2.nim +++ b/chronos/asyncfutures2.nim @@ -8,7 +8,7 @@ # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import os, tables, strutils, times, heapqueue, options, deques, cstrutils +import os, tables, strutils, heapqueue, options, deques, cstrutils import srcloc export srcloc @@ -17,14 +17,6 @@ const LocCompleteIndex = 1 type - CallbackFunc* = proc (arg: pointer = nil) {.gcsafe.} - CallSoonProc* = proc (c: CallbackFunc, u: pointer = nil) {.gcsafe.} - - AsyncCallback* = object - function*: CallbackFunc - udata*: pointer - deleted*: bool - # ZAH: This can probably be stored with a cheaper representation # until the moment it needs to be printed to the screen # (e.g. seq[StackTraceEntry]) @@ -69,22 +61,6 @@ type var currentID* {.threadvar.}: int currentID = 0 -# ZAH: This seems unnecessary. Isn't it easy to introduce a seperate -# module for the dispatcher type, so it can be directly referenced here? -var callSoonHolder {.threadvar.}: CallSoonProc - -proc getCallSoonProc*(): CallSoonProc {.gcsafe.} = - ## Get current implementation of ``callSoon``. - return callSoonHolder - -proc setCallSoonProc*(p: CallSoonProc) = - ## Change current implementation of ``callSoon``. - callSoonHolder = p - -proc callSoon*(c: CallbackFunc, u: pointer = nil) = - ## Call ``cbproc`` "soon". - callSoonHolder(c, u) - template setupFutureBase(loc: ptr SrcLoc) = new(result) result.state = FutureState.Pending diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index 3c32441d..3d7d5f06 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -10,14 +10,12 @@ include "system/inclrtl" -import os, tables, strutils, heapqueue, lists, options +import os, tables, strutils, heapqueue, lists, options, nativesockets, net, + deques import timer -import asyncfutures2 except callSoon - -import nativesockets, net, deques export Port, SocketFlag -export asyncfutures2, timer +export timer #{.injectStmt: newGcInvariant().} @@ -165,14 +163,28 @@ export asyncfutures2, timer # TODO: Check if yielded future is nil and throw a more meaningful exception +const unixPlatform = defined(macosx) or defined(freebsd) or + defined(netbsd) or defined(openbsd) or + defined(dragonfly) or defined(macos) or + defined(linux) or defined(android) or + defined(solaris) + when defined(windows): import winlean, sets, hashes -else: +elif unixPlatform: import selectors from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK, MSG_NOSIGNAL, SIGPIPE type + CallbackFunc* = proc (arg: pointer = nil) {.gcsafe.} + CallSoonProc* = proc (c: CallbackFunc, u: pointer = nil) {.gcsafe.} + + AsyncCallback* = object + function*: CallbackFunc + udata*: pointer + deleted*: bool + AsyncError* = object of CatchableError ## Generic async exception AsyncTimeoutError* = object of AsyncError @@ -195,11 +207,7 @@ type proc `<`(a, b: TimerCallback): bool = result = a.finishAt < b.finishAt -proc callSoon(cbproc: CallbackFunc, data: pointer = nil) {.gcsafe.} - -proc initCallSoonProc() = - if asyncfutures2.getCallSoonProc().isNil: - asyncfutures2.setCallSoonProc(callSoon) +proc callSoon*(cbproc: CallbackFunc, data: pointer = nil) {.gcsafe.} func getAsyncTimestamp*(a: Duration): auto {.inline.} = ## Return rounded up value of duration with milliseconds resolution. @@ -304,6 +312,51 @@ when defined(windows) or defined(nimdoc): proc hash(x: AsyncFD): Hash {.borrow.} proc `==`*(x: AsyncFD, y: AsyncFD): bool {.borrow.} + proc getFunc(s: SocketHandle, fun: var pointer, guid: var GUID): bool = + var bytesRet: DWORD + fun = nil + result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid, + sizeof(GUID).DWORD, addr fun, sizeof(pointer).DWORD, + addr bytesRet, nil, nil) == 0 + + proc initAPI(loop: PDispatcher) = + var + WSAID_TRANSMITFILE = GUID( + D1: 0xb5367df0'i32, D2: 0xcbac'i16, D3: 0x11cf'i16, + D4: [0x95'i8, 0xca'i8, 0x00'i8, 0x80'i8, + 0x5f'i8, 0x48'i8, 0xa1'i8, 0x92'i8]) + + var wsa: WSAData + if wsaStartup(0x0202'i16, addr wsa) != 0: + raiseOSError(osLastError()) + + let sock = winlean.socket(winlean.AF_INET, 1, 6) + if sock == INVALID_SOCKET: + raiseOSError(osLastError()) + + var funcPointer: pointer = nil + if not getFunc(sock, funcPointer, WSAID_CONNECTEX): + let err = osLastError() + close(sock) + raiseOSError(err) + loop.connectEx = cast[WSAPROC_CONNECTEX](funcPointer) + if not getFunc(sock, funcPointer, WSAID_ACCEPTEX): + let err = osLastError() + close(sock) + raiseOSError(err) + loop.acceptEx = cast[WSAPROC_ACCEPTEX](funcPointer) + if not getFunc(sock, funcPointer, WSAID_GETACCEPTEXSOCKADDRS): + let err = osLastError() + close(sock) + raiseOSError(err) + loop.getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](funcPointer) + if not getFunc(sock, funcPointer, WSAID_TRANSMITFILE): + let err = osLastError() + close(sock) + raiseOSError(err) + loop.transmitFile = cast[WSAPROC_TRANSMITFILE](funcPointer) + close(sock) + proc newDispatcher*(): PDispatcher = ## Creates a new Dispatcher instance. new result @@ -322,6 +375,7 @@ when defined(windows) or defined(nimdoc): result.timers = newHeapQueue[TimerCallback]() result.callbacks = initDeque[AsyncCallback](64) result.trackers = initTable[string, TrackerBase]() + initAPI(result) var gDisp{.threadvar.}: PDispatcher ## Global dispatcher @@ -330,7 +384,6 @@ when defined(windows) or defined(nimdoc): if not gDisp.isNil: doAssert gDisp.callbacks.len == 0 gDisp = disp - initCallSoonProc() proc getGlobalDispatcher*(): PDispatcher = ## Returns current thread's dispatcher instance. @@ -394,53 +447,6 @@ when defined(windows) or defined(nimdoc): # poll() call. loop.processCallbacks() - proc getFunc(s: SocketHandle, fun: var pointer, guid: var GUID): bool = - var bytesRet: DWORD - fun = nil - result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid, - sizeof(GUID).DWORD, addr fun, sizeof(pointer).DWORD, - addr bytesRet, nil, nil) == 0 - - proc initAPI() = - var - WSAID_TRANSMITFILE = GUID( - D1: 0xb5367df0'i32, D2: 0xcbac'i16, D3: 0x11cf'i16, - D4: [0x95'i8, 0xca'i8, 0x00'i8, 0x80'i8, - 0x5f'i8, 0x48'i8, 0xa1'i8, 0x92'i8]) - - let loop = getGlobalDispatcher() - - var wsa: WSAData - if wsaStartup(0x0202'i16, addr wsa) != 0: - raiseOSError(osLastError()) - - let sock = winlean.socket(winlean.AF_INET, 1, 6) - if sock == INVALID_SOCKET: - raiseOSError(osLastError()) - - var funcPointer: pointer = nil - if not getFunc(sock, funcPointer, WSAID_CONNECTEX): - let err = osLastError() - close(sock) - raiseOSError(err) - loop.connectEx = cast[WSAPROC_CONNECTEX](funcPointer) - if not getFunc(sock, funcPointer, WSAID_ACCEPTEX): - let err = osLastError() - close(sock) - raiseOSError(err) - loop.acceptEx = cast[WSAPROC_ACCEPTEX](funcPointer) - if not getFunc(sock, funcPointer, WSAID_GETACCEPTEXSOCKADDRS): - let err = osLastError() - close(sock) - raiseOSError(err) - loop.getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](funcPointer) - if not getFunc(sock, funcPointer, WSAID_TRANSMITFILE): - let err = osLastError() - close(sock) - raiseOSError(err) - loop.transmitFile = cast[WSAPROC_TRANSMITFILE](funcPointer) - close(sock) - proc closeSocket*(fd: AsyncFD, aftercb: CallbackFunc = nil) = ## Closes a socket and ensures that it is unregistered. let loop = getGlobalDispatcher() @@ -467,7 +473,10 @@ when defined(windows) or defined(nimdoc): ## Returns ``true`` if ``fd`` is registered in thread's dispatcher. return fd in disp.handles -else: +elif unixPlatform: + const + SIG_IGN = cast[proc(x: cint) {.noconv,gcsafe.}](1) + type AsyncFD* = distinct cint @@ -489,6 +498,10 @@ else: proc `==`*(x, y: AsyncFD): bool {.borrow.} + proc initAPI(disp: PDispatcher) = + # We are ignoring SIGPIPE signal, because we are working with EPIPE. + posix.signal(cint(SIGPIPE), SIG_IGN) + proc newDispatcher*(): PDispatcher = ## Create new dispatcher. new result @@ -497,6 +510,7 @@ else: result.callbacks = initDeque[AsyncCallback](64) result.keys = newSeq[ReadyKey](64) result.trackers = initTable[string, TrackerBase]() + initAPI(result) var gDisp{.threadvar.}: PDispatcher ## Global dispatcher @@ -505,7 +519,6 @@ else: if not gDisp.isNil: doAssert gDisp.callbacks.len == 0 gDisp = disp - initCallSoonProc() proc getGlobalDispatcher*(): PDispatcher = ## Returns current thread's dispatcher instance. @@ -699,13 +712,8 @@ else: # poll() call. loop.processCallbacks() - const - SIG_IGN = cast[proc(x: cint) {.noconv,gcsafe.}](1) - - proc initAPI() = - # We are ignoring SIGPIPE signal, because we are working with EPIPE. - posix.signal(cint(SIGPIPE), SIG_IGN) - discard getGlobalDispatcher() +else: + proc initAPI() = discard proc addTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) = ## Arrange for the callback ``cb`` to be called at the given absolute @@ -745,6 +753,8 @@ proc removeTimer*(at: uint64, cb: CallbackFunc, udata: pointer = nil) {. inline, deprecated: "Use removeTimer(Duration, cb, udata)".} = removeTimer(Moment.init(int64(at), Millisecond), cb, udata) +include asyncfutures2 + proc sleepAsync*(duration: Duration): Future[void] = ## Suspends the execution of the current async procedure for the next ## ``duration`` time. @@ -895,7 +905,7 @@ proc wait*[T](fut: Future[T], timeout = -1): Future[T] {. include asyncmacro2 -proc callSoon(cbproc: CallbackFunc, data: pointer = nil) = +proc callSoon*(cbproc: CallbackFunc, data: pointer = nil) = ## Schedule `cbproc` to be called as soon as possible. ## The callback is called when control returns to the event loop. doAssert(not isNil(cbproc)) @@ -924,6 +934,3 @@ proc getTracker*(id: string): TrackerBase = ## Get ``tracker`` from current thread dispatcher using identifier ``id``. let loop = getGlobalDispatcher() result = loop.trackers.getOrDefault(id, nil) - -# Global API and callSoon() initialization. -initAPI() diff --git a/chronos/asyncmacro2.nim b/chronos/asyncmacro2.nim index c038b94a..9ab2a4bc 100644 --- a/chronos/asyncmacro2.nim +++ b/chronos/asyncmacro2.nim @@ -11,7 +11,7 @@ ## ************* ## `asyncdispatch` module depends on the `asyncmacro` module to work properly. -import macros, strutils, asyncfutures2 +import macros, strutils proc skipUntilStmtList(node: NimNode): NimNode {.compileTime.} = # Skips a nest of StmtList's.