# # Chronos # # (c) Copyright 2015 Dominik Picheta # (c) Copyright 2018-Present Status Research & Development GmbH # # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) when (NimMajor, NimMinor) < (1, 4): {.push raises: [Defect].} else: {.push raises: [].} import std/[os, tables, strutils, heapqueue, lists, nativesockets, net, deques] import ./timer when defined(chronosDurationThreshold): import std/monotimes import pkg/metrics declareGauge(chronosCallbackDuration, "chronos callback duration") export Port, SocketFlag export timer #{.injectStmt: newGcInvariant().} ## AsyncDispatch ## ************* ## ## This module implements asynchronous IO. This includes a dispatcher, ## a ``Future`` type implementation, and an ``async`` macro which allows ## asynchronous code to be written in a synchronous style with the ``await`` ## keyword. ## ## The dispatcher acts as a kind of event loop. You must call ``poll`` on it ## (or a function which does so for you such as ``waitFor`` or ``runForever``) ## in order to poll for any outstanding events. The underlying implementation ## is based on epoll on Linux, IO Completion Ports on Windows and select on ## other operating systems. ## ## The ``poll`` function will not, on its own, return any events. Instead ## an appropriate ``Future`` object will be completed. A ``Future`` is a ## type which holds a value which is not yet available, but which *may* be ## available in the future. You can check whether a future is finished ## by using the ``finished`` function. When a future is finished it means that ## either the value that it holds is now available or it holds an error instead. ## The latter situation occurs when the operation to complete a future fails ## with an exception. You can distinguish between the two situations with the ## ``failed`` function. ## ## Future objects can also store a callback procedure which will be called ## automatically once the future completes. ## ## Futures therefore can be thought of as an implementation of the proactor ## pattern. In this ## pattern you make a request for an action, and once that action is fulfilled ## a future is completed with the result of that action. Requests can be ## made by calling the appropriate functions. For example: calling the ``recv`` ## function will create a request for some data to be read from a socket. The ## future which the ``recv`` function returns will then complete once the ## requested amount of data is read **or** an exception occurs. ## ## Code to read some data from a socket may look something like this: ## ## .. code-block::nim ## var future = socket.recv(100) ## future.addCallback( ## proc () = ## echo(future.read) ## ) ## ## All asynchronous functions returning a ``Future`` will not block. They ## will not however return immediately. An asynchronous function will have ## code which will be executed before an asynchronous request is made, in most ## cases this code sets up the request. ## ## In the above example, the ``recv`` function will return a brand new ## ``Future`` instance once the request for data to be read from the socket ## is made. This ``Future`` instance will complete once the requested amount ## of data is read, in this case it is 100 bytes. The second line sets a ## callback on this future which will be called once the future completes. ## All the callback does is write the data stored in the future to ``stdout``. ## The ``read`` function is used for this and it checks whether the future ## completes with an error for you (if it did it will simply raise the ## error), if there is no error however it returns the value of the future. ## ## Asynchronous procedures ## ----------------------- ## ## Asynchronous procedures remove the pain of working with callbacks. They do ## this by allowing you to write asynchronous code the same way as you would ## write synchronous code. ## ## An asynchronous procedure is marked using the ``{.async.}`` pragma. ## When marking a procedure with the ``{.async.}`` pragma it must have a ## ``Future[T]`` return type or no return type at all. If you do not specify ## a return type then ``Future[void]`` is assumed. ## ## Inside asynchronous procedures ``await`` can be used to call any ## procedures which return a ## ``Future``; this includes asynchronous procedures. When a procedure is ## "awaited", the asynchronous procedure it is awaited in will ## suspend its execution ## until the awaited procedure's Future completes. At which point the ## asynchronous procedure will resume its execution. During the period ## when an asynchronous procedure is suspended other asynchronous procedures ## will be run by the dispatcher. ## ## The ``await`` call may be used in many contexts. It can be used on the right ## hand side of a variable declaration: ``var data = await socket.recv(100)``, ## in which case the variable will be set to the value of the future ## automatically. It can be used to await a ``Future`` object, and it can ## be used to await a procedure returning a ``Future[void]``: ## ``await socket.send("foobar")``. ## ## If an awaited future completes with an error, then ``await`` will re-raise ## this error. To avoid this, you can use the ``yield`` keyword instead of ## ``await``. The following section shows different ways that you can handle ## exceptions in async procs. ## ## Handling Exceptions ## ~~~~~~~~~~~~~~~~~~~ ## ## The most reliable way to handle exceptions is to use ``yield`` on a future ## then check the future's ``failed`` property. For example: ## ## .. code-block:: Nim ## var future = sock.recv(100) ## yield future ## if future.failed: ## # Handle exception ## ## The ``async`` procedures also offer limited support for the try statement. ## ## .. code-block:: Nim ## try: ## let data = await sock.recv(100) ## echo("Received ", data) ## except: ## # Handle exception ## ## Unfortunately the semantics of the try statement may not always be correct, ## and occasionally the compilation may fail altogether. ## As such it is better to use the former style when possible. ## ## ## Discarding futures ## ------------------ ## ## Futures should **never** be discarded. This is because they may contain ## errors. If you do not care for the result of a Future then you should ## use the ``asyncCheck`` procedure instead of the ``discard`` keyword. ## ## Examples ## -------- ## ## For examples take a look at the documentation for the modules implementing ## asynchronous IO. A good place to start is the ## `asyncnet module `_. ## ## Limitations/Bugs ## ---------------- ## ## * The effect system (``raises: []``) does not work with async procedures. ## * Can't await in a ``except`` body ## * Forward declarations for async procs are broken, ## link includes workaround: https://github.com/nim-lang/Nim/issues/3182. # TODO: Check if yielded future is nil and throw a more meaningful exception const unixPlatform = defined(macosx) or defined(freebsd) or defined(netbsd) or defined(openbsd) or defined(dragonfly) or defined(macos) or defined(linux) or defined(android) or defined(solaris) MaxEventsCount* = 64 when defined(windows): import winlean, sets, hashes elif unixPlatform: import ./selectors2 from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK, MSG_NOSIGNAL from posix import SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT, SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2, SIGPIPE, SIGALRM, SIGTERM, SIGPIPE export SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT, SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2, SIGPIPE, SIGALRM, SIGTERM, SIGPIPE type CallbackFunc* = proc (arg: pointer) {.gcsafe, raises: [Defect].} AsyncCallback* = object function*: CallbackFunc udata*: pointer AsyncError* = object of CatchableError ## Generic async exception AsyncTimeoutError* = object of AsyncError ## Timeout exception TimerCallback* = ref object finishAt*: Moment function*: AsyncCallback TrackerBase* = ref object of RootRef id*: string dump*: proc(): string {.gcsafe, raises: [Defect].} isLeaked*: proc(): bool {.gcsafe, raises: [Defect].} PDispatcherBase = ref object of RootRef timers*: HeapQueue[TimerCallback] callbacks*: Deque[AsyncCallback] idlers*: Deque[AsyncCallback] trackers*: Table[string, TrackerBase] const chronosDurationThreshold {.intdefine.} = 0 template newAsyncCallback*(cbFunc: CallbackFunc, udataPtr: pointer = nil): AsyncCallback = AsyncCallback(function: cbFunc, udata: udataPtr) when defined(chronosDurationThreshold): type ChronosDurationThreadholdBreachedCallback* = proc(durationUs: int64) {.gcsafe, raises: [].} proc defaultThresholdBreachedHandler(durationUs: int64) {.gcsafe, raises: [].} = raise newException(Defect, "ChronosDurationThreshold breached. Time taken: " & $durationUs & " usecs") var chronosDurationThresholdBreachedHandler {.threadvar.} : ChronosDurationThreadholdBreachedCallback proc setChronosDurationThresholdBreachedHandler*(handler: ChronosDurationThreadholdBreachedCallback) = chronosDurationThresholdBreachedHandler = handler proc invokeChronosDurationThresholdBreachedHandler(durationUs: int64) = if chronosDurationThresholdBreachedHandler == nil: defaultThresholdBreachedHandler(durationUs) return chronosDurationThresholdBreachedHandler(durationUs) proc sentinelCallbackImpl(arg: pointer) {.gcsafe, raises: [Defect].} = raiseAssert "Sentinel callback MUST not be scheduled" const SentinelCallback = AsyncCallback(function: sentinelCallbackImpl) proc isSentinel(acb: AsyncCallback): bool {.raises: [Defect].} = acb == SentinelCallback proc `<`(a, b: TimerCallback): bool = result = a.finishAt < b.finishAt func getAsyncTimestamp*(a: Duration): auto {.inline.} = ## Return rounded up value of duration with milliseconds resolution. ## ## This function also take care on int32 overflow, because Linux and Windows ## accepts signed 32bit integer as timeout. let milsec = Millisecond.nanoseconds() let nansec = a.nanoseconds() var res = nansec div milsec let mid = nansec mod milsec when defined(windows): res = min(int64(high(int32) - 1), res) result = cast[DWORD](res) result += DWORD(min(1'i32, cast[int32](mid))) else: res = min(int64(high(int32) - 1), res) result = cast[int32](res) result += min(1, cast[int32](mid)) template processTimersGetTimeout(loop, timeout: untyped) = var lastFinish = curTime while loop.timers.len > 0: if loop.timers[0].function.function.isNil: discard loop.timers.pop() continue lastFinish = loop.timers[0].finishAt if curTime < lastFinish: break loop.callbacks.addLast(loop.timers.pop().function) if loop.timers.len > 0: timeout = (lastFinish - curTime).getAsyncTimestamp() if timeout == 0: if (len(loop.callbacks) == 0) and (len(loop.idlers) == 0): when defined(windows): timeout = INFINITE else: timeout = -1 else: if (len(loop.callbacks) != 0) or (len(loop.idlers) != 0): timeout = 0 template processTimers(loop: untyped) = var curTime = Moment.now() while loop.timers.len > 0: if loop.timers[0].function.function.isNil: discard loop.timers.pop() continue if curTime < loop.timers[0].finishAt: break loop.callbacks.addLast(loop.timers.pop().function) template processIdlers(loop: untyped) = if len(loop.idlers) > 0: loop.callbacks.addLast(loop.idlers.popFirst()) template processCallbacks(loop: untyped) = while true: let callable = loop.callbacks.popFirst() # len must be > 0 due to sentinel if isSentinel(callable): break if not(isNil(callable.function)): when defined(chronosDurationThreshold): let startTime = getMonoTime().ticks callable.function(callable.udata) when defined(chronosDurationThreshold): let durationNs = getMonoTime().ticks - startTime durationUs = durationNs div 1000 chronosCallbackDuration.set(durationNs) if durationUs > chronosDurationThreshold: invokeChronosDurationThresholdBreachedHandler(durationUs) proc raiseAsDefect*(exc: ref Exception, msg: string) {. raises: [Defect], noreturn, noinline.} = # Reraise an exception as a Defect, where it's unexpected and can't be handled # We include the stack trace in the message because otherwise, it's easily # lost - Nim doesn't print it for `parent` exceptions for example (!) raise (ref Defect)( msg: msg & "\n" & exc.msg & "\n" & exc.getStackTrace(), parent: exc) when defined(windows): type WSAPROC_TRANSMITFILE = proc(hSocket: SocketHandle, hFile: Handle, nNumberOfBytesToWrite: DWORD, nNumberOfBytesPerSend: DWORD, lpOverlapped: POVERLAPPED, lpTransmitBuffers: pointer, dwReserved: DWORD): cint {. gcsafe, stdcall, raises: [].} LPFN_GETQUEUEDCOMPLETIONSTATUSEX = proc(completionPort: Handle, lpPortEntries: ptr OVERLAPPED_ENTRY, ulCount: DWORD, ulEntriesRemoved: var ULONG, dwMilliseconds: DWORD, fAlertable: WINBOOL): WINBOOL {. gcsafe, stdcall, raises: [].} CompletionKey = ULONG_PTR CompletionData* = object cb*: CallbackFunc errCode*: OSErrorCode bytesCount*: int32 udata*: pointer CustomOverlapped* = object of OVERLAPPED data*: CompletionData OVERLAPPED_ENTRY* = object lpCompletionKey*: ULONG_PTR lpOverlapped*: ptr CustomOverlapped internal: ULONG_PTR dwNumberOfBytesTransferred: DWORD PDispatcher* = ref object of PDispatcherBase ioPort: Handle handles: HashSet[AsyncFD] connectEx*: WSAPROC_CONNECTEX acceptEx*: WSAPROC_ACCEPTEX getAcceptExSockAddrs*: WSAPROC_GETACCEPTEXSOCKADDRS transmitFile*: WSAPROC_TRANSMITFILE getQueuedCompletionStatusEx*: LPFN_GETQUEUEDCOMPLETIONSTATUSEX PtrCustomOverlapped* = ptr CustomOverlapped RefCustomOverlapped* = ref CustomOverlapped AsyncFD* = distinct int proc getModuleHandle(lpModuleName: WideCString): Handle {. stdcall, dynlib: "kernel32", importc: "GetModuleHandleW", sideEffect.} proc getProcAddress(hModule: Handle, lpProcName: cstring): pointer {. stdcall, dynlib: "kernel32", importc: "GetProcAddress", sideEffect.} proc rtlNtStatusToDosError(code: uint64): ULONG {. stdcall, dynlib: "ntdll", importc: "RtlNtStatusToDosError", sideEffect.} proc hash(x: AsyncFD): Hash {.borrow.} proc `==`*(x: AsyncFD, y: AsyncFD): bool {.borrow, gcsafe.} proc getFunc(s: SocketHandle, fun: var pointer, guid: var GUID): bool = var bytesRet: DWORD fun = nil result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid, sizeof(GUID).DWORD, addr fun, sizeof(pointer).DWORD, addr bytesRet, nil, nil) == 0 proc globalInit() {.raises: [Defect, OSError].} = var wsa: WSAData if wsaStartup(0x0202'i16, addr wsa) != 0: raiseOSError(osLastError()) proc initAPI(loop: PDispatcher) {.raises: [Defect, CatchableError].} = var WSAID_TRANSMITFILE = GUID( D1: 0xb5367df0'i32, D2: 0xcbac'i16, D3: 0x11cf'i16, D4: [0x95'i8, 0xca'i8, 0x00'i8, 0x80'i8, 0x5f'i8, 0x48'i8, 0xa1'i8, 0x92'i8]) let kernel32 = getModuleHandle(newWideCString("kernel32.dll")) loop.getQueuedCompletionStatusEx = cast[LPFN_GETQUEUEDCOMPLETIONSTATUSEX]( getProcAddress(kernel32, "GetQueuedCompletionStatusEx")) let sock = winlean.socket(winlean.AF_INET, 1, 6) if sock == INVALID_SOCKET: raiseOSError(osLastError()) var funcPointer: pointer = nil if not getFunc(sock, funcPointer, WSAID_CONNECTEX): let err = osLastError() close(sock) raiseOSError(err) loop.connectEx = cast[WSAPROC_CONNECTEX](funcPointer) if not getFunc(sock, funcPointer, WSAID_ACCEPTEX): let err = osLastError() close(sock) raiseOSError(err) loop.acceptEx = cast[WSAPROC_ACCEPTEX](funcPointer) if not getFunc(sock, funcPointer, WSAID_GETACCEPTEXSOCKADDRS): let err = osLastError() close(sock) raiseOSError(err) loop.getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](funcPointer) if not getFunc(sock, funcPointer, WSAID_TRANSMITFILE): let err = osLastError() close(sock) raiseOSError(err) loop.transmitFile = cast[WSAPROC_TRANSMITFILE](funcPointer) close(sock) proc newDispatcher*(): PDispatcher {.raises: [Defect, CatchableError].} = ## Creates a new Dispatcher instance. var res = PDispatcher() res.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) when declared(initHashSet): # After 0.20.0 Nim's stdlib version res.handles = initHashSet[AsyncFD]() else: # Pre 0.20.0 Nim's stdlib version res.handles = initSet[AsyncFD]() when declared(initHeapQueue): # After 0.20.0 Nim's stdlib version res.timers = initHeapQueue[TimerCallback]() else: # Pre 0.20.0 Nim's stdlib version res.timers = newHeapQueue[TimerCallback]() res.callbacks = initDeque[AsyncCallback](64) res.callbacks.addLast(SentinelCallback) res.idlers = initDeque[AsyncCallback]() res.trackers = initTable[string, TrackerBase]() initAPI(res) res var gDisp{.threadvar.}: PDispatcher ## Global dispatcher proc setThreadDispatcher*(disp: PDispatcher) {.gcsafe, raises: [Defect].} proc getThreadDispatcher*(): PDispatcher {.gcsafe, raises: [Defect].} proc getIoHandler*(disp: PDispatcher): Handle = ## Returns the underlying IO Completion Port handle (Windows) or selector ## (Unix) for the specified dispatcher. return disp.ioPort proc register*(fd: AsyncFD) {.raises: [Defect, CatchableError].} = ## Register file descriptor ``fd`` in thread's dispatcher. let loop = getThreadDispatcher() if createIoCompletionPort(fd.Handle, loop.ioPort, cast[CompletionKey](fd), 1) == 0: raiseOSError(osLastError()) loop.handles.incl(fd) proc unregister*(fd: AsyncFD) {.raises: [Defect, CatchableError].} = ## Unregisters ``fd``. getThreadDispatcher().handles.excl(fd) proc poll*() {.raises: [Defect, CatchableError].} = ## Perform single asynchronous step, processing timers and completing ## tasks. Blocks until at least one event has completed. ## ## Exceptions raised here indicate that waiting for tasks to be unblocked ## failed - exceptions from within tasks are instead propagated through ## their respective futures and not allowed to interrrupt the poll call. let loop = getThreadDispatcher() var curTime = Moment.now() curTimeout = DWORD(0) events: array[MaxEventsCount, OVERLAPPED_ENTRY] # On reentrant `poll` calls from `processCallbacks`, e.g., `waitFor`, # complete pending work of the outer `processCallbacks` call. # On non-reentrant `poll` calls, this only removes sentinel element. processCallbacks(loop) # Moving expired timers to `loop.callbacks` and calculate timeout loop.processTimersGetTimeout(curTimeout) let networkEventsCount = if isNil(loop.getQueuedCompletionStatusEx): let res = getQueuedCompletionStatus( loop.ioPort, addr events[0].dwNumberOfBytesTransferred, addr events[0].lpCompletionKey, cast[ptr POVERLAPPED](addr events[0].lpOverlapped), curTimeout ) if res == WINBOOL(0): let errCode = osLastError() if not(isNil(events[0].lpOverlapped)): 1 else: if int32(errCode) != WAIT_TIMEOUT: raiseOSError(errCode) 0 else: 1 else: var eventsReceived = ULONG(0) let res = loop.getQueuedCompletionStatusEx( loop.ioPort, addr events[0], ULONG(len(events)), eventsReceived, curTimeout, WINBOOL(0) ) if res == WINBOOL(0): let errCode = osLastError() if int32(errCode) != WAIT_TIMEOUT: raiseOSError(errCode) 0 else: eventsReceived for i in 0 ..< networkEventsCount: var customOverlapped = events[i].lpOverlapped customOverlapped.data.errCode = block: let res = cast[uint64](customOverlapped.internal) if res == 0'u64: OSErrorCode(-1) else: OSErrorCode(rtlNtStatusToDosError(res)) customOverlapped.data.bytesCount = events[i].dwNumberOfBytesTransferred let acb = newAsyncCallback(customOverlapped.data.cb, cast[pointer](customOverlapped)) loop.callbacks.addLast(acb) # Moving expired timers to `loop.callbacks`. loop.processTimers() # We move idle callbacks to `loop.callbacks` only if there no pending # network events. if networkEventsCount == 0: loop.processIdlers() # All callbacks which will be added during `processCallbacks` will be # scheduled after the sentinel and are processed on next `poll()` call. loop.callbacks.addLast(SentinelCallback) processCallbacks(loop) # All callbacks done, skip `processCallbacks` at start. loop.callbacks.addFirst(SentinelCallback) proc closeSocket*(fd: AsyncFD, aftercb: CallbackFunc = nil) = ## Closes a socket and ensures that it is unregistered. let loop = getThreadDispatcher() loop.handles.excl(fd) close(SocketHandle(fd)) if not isNil(aftercb): var acb = newAsyncCallback(aftercb) loop.callbacks.addLast(acb) proc closeHandle*(fd: AsyncFD, aftercb: CallbackFunc = nil) = ## Closes a (pipe/file) handle and ensures that it is unregistered. let loop = getThreadDispatcher() loop.handles.excl(fd) discard closeHandle(Handle(fd)) if not isNil(aftercb): var acb = newAsyncCallback(aftercb) loop.callbacks.addLast(acb) proc contains*(disp: PDispatcher, fd: AsyncFD): bool = ## Returns ``true`` if ``fd`` is registered in thread's dispatcher. return fd in disp.handles elif unixPlatform: const SIG_IGN = cast[proc(x: cint) {.raises: [], noconv, gcsafe.}](1) type AsyncFD* = distinct cint SelectorData* = object reader*: AsyncCallback writer*: AsyncCallback PDispatcher* = ref object of PDispatcherBase selector: Selector[SelectorData] keys: seq[ReadyKey] proc `==`*(x, y: AsyncFD): bool {.borrow, gcsafe.} proc globalInit() = # We are ignoring SIGPIPE signal, because we are working with EPIPE. posix.signal(cint(SIGPIPE), SIG_IGN) proc initAPI(disp: PDispatcher) {.raises: [Defect, CatchableError].} = discard proc newDispatcher*(): PDispatcher {.raises: [Defect, CatchableError].} = ## Create new dispatcher. var res = PDispatcher() res.selector = newSelector[SelectorData]() when declared(initHeapQueue): # After 0.20.0 Nim's stdlib version res.timers = initHeapQueue[TimerCallback]() else: # Before 0.20.0 Nim's stdlib version res.timers.newHeapQueue() res.callbacks = initDeque[AsyncCallback](64) res.callbacks.addLast(SentinelCallback) res.idlers = initDeque[AsyncCallback]() res.keys = newSeq[ReadyKey](64) res.trackers = initTable[string, TrackerBase]() initAPI(res) res var gDisp{.threadvar.}: PDispatcher ## Global dispatcher proc setThreadDispatcher*(disp: PDispatcher) {.gcsafe, raises: [Defect].} proc getThreadDispatcher*(): PDispatcher {.gcsafe, raises: [Defect].} proc getIoHandler*(disp: PDispatcher): Selector[SelectorData] = ## Returns system specific OS queue. return disp.selector proc register*(fd: AsyncFD) {.raises: [Defect, CatchableError].} = ## Register file descriptor ``fd`` in thread's dispatcher. let loop = getThreadDispatcher() var data: SelectorData loop.selector.registerHandle(int(fd), {}, data) proc unregister*(fd: AsyncFD) {.raises: [Defect, CatchableError].} = ## Unregister file descriptor ``fd`` from thread's dispatcher. getThreadDispatcher().selector.unregister(int(fd)) proc contains*(disp: PDispatcher, fd: AsyncFD): bool {.inline.} = ## Returns ``true`` if ``fd`` is registered in thread's dispatcher. result = int(fd) in disp.selector proc addReader*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil) {. raises: [Defect, IOSelectorsException, ValueError].} = ## Start watching the file descriptor ``fd`` for read availability and then ## call the callback ``cb`` with specified argument ``udata``. let loop = getThreadDispatcher() var newEvents = {Event.Read} withData(loop.selector, int(fd), adata) do: let acb = newAsyncCallback(cb, udata) adata.reader = acb newEvents.incl(Event.Read) if not(isNil(adata.writer.function)): newEvents.incl(Event.Write) do: raise newException(ValueError, "File descriptor not registered.") loop.selector.updateHandle(int(fd), newEvents) proc removeReader*(fd: AsyncFD) {. raises: [Defect, IOSelectorsException, ValueError].} = ## Stop watching the file descriptor ``fd`` for read availability. let loop = getThreadDispatcher() var newEvents: set[Event] withData(loop.selector, int(fd), adata) do: # We need to clear `reader` data, because `selectors` don't do it adata.reader = default(AsyncCallback) if not(isNil(adata.writer.function)): newEvents.incl(Event.Write) do: raise newException(ValueError, "File descriptor not registered.") loop.selector.updateHandle(int(fd), newEvents) proc addWriter*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil) {. raises: [Defect, IOSelectorsException, ValueError].} = ## Start watching the file descriptor ``fd`` for write availability and then ## call the callback ``cb`` with specified argument ``udata``. let loop = getThreadDispatcher() var newEvents = {Event.Write} withData(loop.selector, int(fd), adata) do: let acb = newAsyncCallback(cb, udata) adata.writer = acb newEvents.incl(Event.Write) if not(isNil(adata.reader.function)): newEvents.incl(Event.Read) do: raise newException(ValueError, "File descriptor not registered.") loop.selector.updateHandle(int(fd), newEvents) proc removeWriter*(fd: AsyncFD) {. raises: [Defect, IOSelectorsException, ValueError].} = ## Stop watching the file descriptor ``fd`` for write availability. let loop = getThreadDispatcher() var newEvents: set[Event] withData(loop.selector, int(fd), adata) do: # We need to clear `writer` data, because `selectors` don't do it adata.writer = default(AsyncCallback) if not(isNil(adata.reader.function)): newEvents.incl(Event.Read) do: raise newException(ValueError, "File descriptor not registered.") loop.selector.updateHandle(int(fd), newEvents) proc closeSocket*(fd: AsyncFD, aftercb: CallbackFunc = nil) = ## Close asynchronous socket. ## ## Please note, that socket is not closed immediately. To avoid bugs with ## closing socket, while operation pending, socket will be closed as ## soon as all pending operations will be notified. ## You can execute ``aftercb`` before actual socket close operation. let loop = getThreadDispatcher() proc continuation(udata: pointer) = if SocketHandle(fd) in loop.selector: try: unregister(fd) except CatchableError as exc: raiseAsDefect(exc, "unregister failed") close(SocketHandle(fd)) if not isNil(aftercb): aftercb(nil) withData(loop.selector, int(fd), adata) do: # We are scheduling reader and writer callbacks to be called # explicitly, so they can get an error and continue work. # Callbacks marked as deleted so we don't need to get REAL notifications # from system queue for this reader and writer. if not(isNil(adata.reader.function)): loop.callbacks.addLast(adata.reader) adata.reader = default(AsyncCallback) if not(isNil(adata.writer.function)): loop.callbacks.addLast(adata.writer) adata.writer = default(AsyncCallback) # We can't unregister file descriptor from system queue here, because # in such case processing queue will stuck on poll() call, because there # can be no file descriptors registered in system queue. var acb = newAsyncCallback(continuation) loop.callbacks.addLast(acb) proc closeHandle*(fd: AsyncFD, aftercb: CallbackFunc = nil) = ## Close asynchronous file/pipe handle. ## ## Please note, that socket is not closed immediately. To avoid bugs with ## closing socket, while operation pending, socket will be closed as ## soon as all pending operations will be notified. ## You can execute ``aftercb`` before actual socket close operation. closeSocket(fd, aftercb) when ioselSupportedPlatform: proc addSignal*(signal: int, cb: CallbackFunc, udata: pointer = nil): int {. raises: [Defect, IOSelectorsException, ValueError, OSError].} = ## Start watching signal ``signal``, and when signal appears, call the ## callback ``cb`` with specified argument ``udata``. Returns signal ## identifier code, which can be used to remove signal callback ## via ``removeSignal``. let loop = getThreadDispatcher() var data: SelectorData result = loop.selector.registerSignal(signal, data) withData(loop.selector, result, adata) do: adata.reader = newAsyncCallback(cb, udata) do: raise newException(ValueError, "File descriptor not registered.") proc removeSignal*(sigfd: int) {. raises: [Defect, IOSelectorsException].} = ## Remove watching signal ``signal``. let loop = getThreadDispatcher() loop.selector.unregister(sigfd) proc poll*() {.raises: [Defect, CatchableError].} = ## Perform single asynchronous step. let loop = getThreadDispatcher() var curTime = Moment.now() var curTimeout = 0 when ioselSupportedPlatform: let customSet = {Event.Timer, Event.Signal, Event.Process, Event.Vnode} # On reentrant `poll` calls from `processCallbacks`, e.g., `waitFor`, # complete pending work of the outer `processCallbacks` call. # On non-reentrant `poll` calls, this only removes sentinel element. processCallbacks(loop) # Moving expired timers to `loop.callbacks` and calculate timeout. loop.processTimersGetTimeout(curTimeout) # Processing IO descriptors and all hardware events. let count = loop.selector.selectInto(curTimeout, loop.keys) for i in 0..