# # Chronos # # (c) Copyright 2015 Dominik Picheta # (c) Copyright 2018-Present Status Research & Development GmbH # # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) include "system/inclrtl" import os, tables, strutils, heapqueue, lists, options, nativesockets, net, deques import timer export Port, SocketFlag export timer #{.injectStmt: newGcInvariant().} ## AsyncDispatch ## ************* ## ## This module implements asynchronous IO. This includes a dispatcher, ## a ``Future`` type implementation, and an ``async`` macro which allows ## asynchronous code to be written in a synchronous style with the ``await`` ## keyword. ## ## The dispatcher acts as a kind of event loop. You must call ``poll`` on it ## (or a function which does so for you such as ``waitFor`` or ``runForever``) ## in order to poll for any outstanding events. The underlying implementation ## is based on epoll on Linux, IO Completion Ports on Windows and select on ## other operating systems. ## ## The ``poll`` function will not, on its own, return any events. Instead ## an appropriate ``Future`` object will be completed. A ``Future`` is a ## type which holds a value which is not yet available, but which *may* be ## available in the future. You can check whether a future is finished ## by using the ``finished`` function. When a future is finished it means that ## either the value that it holds is now available or it holds an error instead. ## The latter situation occurs when the operation to complete a future fails ## with an exception. You can distinguish between the two situations with the ## ``failed`` function. ## ## Future objects can also store a callback procedure which will be called ## automatically once the future completes. ## ## Futures therefore can be thought of as an implementation of the proactor ## pattern. In this ## pattern you make a request for an action, and once that action is fulfilled ## a future is completed with the result of that action. Requests can be ## made by calling the appropriate functions. For example: calling the ``recv`` ## function will create a request for some data to be read from a socket. The ## future which the ``recv`` function returns will then complete once the ## requested amount of data is read **or** an exception occurs. ## ## Code to read some data from a socket may look something like this: ## ## .. code-block::nim ## var future = socket.recv(100) ## future.addCallback( ## proc () = ## echo(future.read) ## ) ## ## All asynchronous functions returning a ``Future`` will not block. They ## will not however return immediately. An asynchronous function will have ## code which will be executed before an asynchronous request is made, in most ## cases this code sets up the request. ## ## In the above example, the ``recv`` function will return a brand new ## ``Future`` instance once the request for data to be read from the socket ## is made. This ``Future`` instance will complete once the requested amount ## of data is read, in this case it is 100 bytes. The second line sets a ## callback on this future which will be called once the future completes. ## All the callback does is write the data stored in the future to ``stdout``. ## The ``read`` function is used for this and it checks whether the future ## completes with an error for you (if it did it will simply raise the ## error), if there is no error however it returns the value of the future. ## ## Asynchronous procedures ## ----------------------- ## ## Asynchronous procedures remove the pain of working with callbacks. They do ## this by allowing you to write asynchronous code the same way as you would ## write synchronous code. ## ## An asynchronous procedure is marked using the ``{.async.}`` pragma. ## When marking a procedure with the ``{.async.}`` pragma it must have a ## ``Future[T]`` return type or no return type at all. If you do not specify ## a return type then ``Future[void]`` is assumed. ## ## Inside asynchronous procedures ``await`` can be used to call any ## procedures which return a ## ``Future``; this includes asynchronous procedures. When a procedure is ## "awaited", the asynchronous procedure it is awaited in will ## suspend its execution ## until the awaited procedure's Future completes. At which point the ## asynchronous procedure will resume its execution. During the period ## when an asynchronous procedure is suspended other asynchronous procedures ## will be run by the dispatcher. ## ## The ``await`` call may be used in many contexts. It can be used on the right ## hand side of a variable declaration: ``var data = await socket.recv(100)``, ## in which case the variable will be set to the value of the future ## automatically. It can be used to await a ``Future`` object, and it can ## be used to await a procedure returning a ``Future[void]``: ## ``await socket.send("foobar")``. ## ## If an awaited future completes with an error, then ``await`` will re-raise ## this error. To avoid this, you can use the ``yield`` keyword instead of ## ``await``. The following section shows different ways that you can handle ## exceptions in async procs. ## ## Handling Exceptions ## ~~~~~~~~~~~~~~~~~~~ ## ## The most reliable way to handle exceptions is to use ``yield`` on a future ## then check the future's ``failed`` property. For example: ## ## .. code-block:: Nim ## var future = sock.recv(100) ## yield future ## if future.failed: ## # Handle exception ## ## The ``async`` procedures also offer limited support for the try statement. ## ## .. code-block:: Nim ## try: ## let data = await sock.recv(100) ## echo("Received ", data) ## except: ## # Handle exception ## ## Unfortunately the semantics of the try statement may not always be correct, ## and occasionally the compilation may fail altogether. ## As such it is better to use the former style when possible. ## ## ## Discarding futures ## ------------------ ## ## Futures should **never** be discarded. This is because they may contain ## errors. If you do not care for the result of a Future then you should ## use the ``asyncCheck`` procedure instead of the ``discard`` keyword. ## ## Examples ## -------- ## ## For examples take a look at the documentation for the modules implementing ## asynchronous IO. A good place to start is the ## `asyncnet module `_. ## ## Limitations/Bugs ## ---------------- ## ## * The effect system (``raises: []``) does not work with async procedures. ## * Can't await in a ``except`` body ## * Forward declarations for async procs are broken, ## link includes workaround: https://github.com/nim-lang/Nim/issues/3182. # TODO: Check if yielded future is nil and throw a more meaningful exception const unixPlatform = defined(macosx) or defined(freebsd) or defined(netbsd) or defined(openbsd) or defined(dragonfly) or defined(macos) or defined(linux) or defined(android) or defined(solaris) when defined(windows): import winlean, sets, hashes elif unixPlatform: import selectors from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK, MSG_NOSIGNAL, SIGPIPE type CallbackFunc* = proc (arg: pointer) {.gcsafe.} AsyncCallback* = object function*: CallbackFunc udata*: pointer AsyncError* = object of CatchableError ## Generic async exception AsyncTimeoutError* = object of AsyncError ## Timeout exception TimerCallback* = ref object finishAt*: Moment function*: AsyncCallback TrackerBase* = ref object of RootRef id*: string dump*: proc(): string {.gcsafe.} isLeaked*: proc(): bool {.gcsafe.} PDispatcherBase = ref object of RootRef timers*: HeapQueue[TimerCallback] callbacks*: Deque[AsyncCallback] idlers*: Deque[AsyncCallback] trackers*: Table[string, TrackerBase] proc `<`(a, b: TimerCallback): bool = result = a.finishAt < b.finishAt func getAsyncTimestamp*(a: Duration): auto {.inline.} = ## Return rounded up value of duration with milliseconds resolution. ## ## This function also take care on int32 overflow, because Linux and Windows ## accepts signed 32bit integer as timeout. let milsec = Millisecond.nanoseconds() let nansec = a.nanoseconds() var res = nansec div milsec let mid = nansec mod milsec when defined(windows): res = min(cast[int64](high(int32) - 1), res) result = cast[DWORD](res) result += DWORD(min(1'i32, cast[int32](mid))) else: res = min(cast[int64](high(int32) - 1), res) result = cast[int32](res) result += min(1, cast[int32](mid)) template processTimersGetTimeout(loop, timeout: untyped) = var lastFinish = curTime while loop.timers.len > 0: if loop.timers[0].function.function.isNil: discard loop.timers.pop() continue lastFinish = loop.timers[0].finishAt if curTime < lastFinish: break loop.callbacks.addLast(loop.timers.pop().function) if loop.timers.len > 0: timeout = (lastFinish - curTime).getAsyncTimestamp() if timeout == 0: if (len(loop.callbacks) == 0) and (len(loop.idlers) == 0): when defined(windows): timeout = INFINITE else: timeout = -1 else: if (len(loop.callbacks) != 0) or (len(loop.idlers) != 0): timeout = 0 template processTimers(loop: untyped) = var curTime = Moment.now() while loop.timers.len > 0: if loop.timers[0].function.function.isNil: discard loop.timers.pop() continue if curTime < loop.timers[0].finishAt: break loop.callbacks.addLast(loop.timers.pop().function) template processIdlers(loop: untyped) = if len(loop.idlers) > 0: loop.callbacks.addLast(loop.idlers.popFirst()) template processCallbacks(loop: untyped) = var count = len(loop.callbacks) for i in 0..