1276 lines
46 KiB
Nim
1276 lines
46 KiB
Nim
#
|
|
# Chronos
|
|
#
|
|
# (c) Copyright 2015 Dominik Picheta
|
|
# (c) Copyright 2018-Present Status Research & Development GmbH
|
|
#
|
|
# Licensed under either of
|
|
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
|
# MIT license (LICENSE-MIT)
|
|
|
|
when (NimMajor, NimMinor) < (1, 4):
|
|
{.push raises: [Defect].}
|
|
else:
|
|
{.push raises: [].}
|
|
|
|
import std/[os, tables, strutils, heapqueue, lists, nativesockets, net,
|
|
deques]
|
|
import ./timer
|
|
|
|
when defined(chronosDurationThreshold):
|
|
import std/monotimes
|
|
import pkg/metrics
|
|
|
|
declareGauge(chronosCallbackDuration, "chronos callback duration")
|
|
|
|
export Port, SocketFlag
|
|
export timer
|
|
|
|
#{.injectStmt: newGcInvariant().}
|
|
|
|
## AsyncDispatch
|
|
## *************
|
|
##
|
|
## This module implements asynchronous IO. This includes a dispatcher,
|
|
## a ``Future`` type implementation, and an ``async`` macro which allows
|
|
## asynchronous code to be written in a synchronous style with the ``await``
|
|
## keyword.
|
|
##
|
|
## The dispatcher acts as a kind of event loop. You must call ``poll`` on it
|
|
## (or a function which does so for you such as ``waitFor`` or ``runForever``)
|
|
## in order to poll for any outstanding events. The underlying implementation
|
|
## is based on epoll on Linux, IO Completion Ports on Windows and select on
|
|
## other operating systems.
|
|
##
|
|
## The ``poll`` function will not, on its own, return any events. Instead
|
|
## an appropriate ``Future`` object will be completed. A ``Future`` is a
|
|
## type which holds a value which is not yet available, but which *may* be
|
|
## available in the future. You can check whether a future is finished
|
|
## by using the ``finished`` function. When a future is finished it means that
|
|
## either the value that it holds is now available or it holds an error instead.
|
|
## The latter situation occurs when the operation to complete a future fails
|
|
## with an exception. You can distinguish between the two situations with the
|
|
## ``failed`` function.
|
|
##
|
|
## Future objects can also store a callback procedure which will be called
|
|
## automatically once the future completes.
|
|
##
|
|
## Futures therefore can be thought of as an implementation of the proactor
|
|
## pattern. In this
|
|
## pattern you make a request for an action, and once that action is fulfilled
|
|
## a future is completed with the result of that action. Requests can be
|
|
## made by calling the appropriate functions. For example: calling the ``recv``
|
|
## function will create a request for some data to be read from a socket. The
|
|
## future which the ``recv`` function returns will then complete once the
|
|
## requested amount of data is read **or** an exception occurs.
|
|
##
|
|
## Code to read some data from a socket may look something like this:
|
|
##
|
|
## .. code-block::nim
|
|
## var future = socket.recv(100)
|
|
## future.addCallback(
|
|
## proc () =
|
|
## echo(future.read)
|
|
## )
|
|
##
|
|
## All asynchronous functions returning a ``Future`` will not block. They
|
|
## will not however return immediately. An asynchronous function will have
|
|
## code which will be executed before an asynchronous request is made, in most
|
|
## cases this code sets up the request.
|
|
##
|
|
## In the above example, the ``recv`` function will return a brand new
|
|
## ``Future`` instance once the request for data to be read from the socket
|
|
## is made. This ``Future`` instance will complete once the requested amount
|
|
## of data is read, in this case it is 100 bytes. The second line sets a
|
|
## callback on this future which will be called once the future completes.
|
|
## All the callback does is write the data stored in the future to ``stdout``.
|
|
## The ``read`` function is used for this and it checks whether the future
|
|
## completes with an error for you (if it did it will simply raise the
|
|
## error), if there is no error however it returns the value of the future.
|
|
##
|
|
## Asynchronous procedures
|
|
## -----------------------
|
|
##
|
|
## Asynchronous procedures remove the pain of working with callbacks. They do
|
|
## this by allowing you to write asynchronous code the same way as you would
|
|
## write synchronous code.
|
|
##
|
|
## An asynchronous procedure is marked using the ``{.async.}`` pragma.
|
|
## When marking a procedure with the ``{.async.}`` pragma it must have a
|
|
## ``Future[T]`` return type or no return type at all. If you do not specify
|
|
## a return type then ``Future[void]`` is assumed.
|
|
##
|
|
## Inside asynchronous procedures ``await`` can be used to call any
|
|
## procedures which return a
|
|
## ``Future``; this includes asynchronous procedures. When a procedure is
|
|
## "awaited", the asynchronous procedure it is awaited in will
|
|
## suspend its execution
|
|
## until the awaited procedure's Future completes. At which point the
|
|
## asynchronous procedure will resume its execution. During the period
|
|
## when an asynchronous procedure is suspended other asynchronous procedures
|
|
## will be run by the dispatcher.
|
|
##
|
|
## The ``await`` call may be used in many contexts. It can be used on the right
|
|
## hand side of a variable declaration: ``var data = await socket.recv(100)``,
|
|
## in which case the variable will be set to the value of the future
|
|
## automatically. It can be used to await a ``Future`` object, and it can
|
|
## be used to await a procedure returning a ``Future[void]``:
|
|
## ``await socket.send("foobar")``.
|
|
##
|
|
## If an awaited future completes with an error, then ``await`` will re-raise
|
|
## this error. To avoid this, you can use the ``yield`` keyword instead of
|
|
## ``await``. The following section shows different ways that you can handle
|
|
## exceptions in async procs.
|
|
##
|
|
## Handling Exceptions
|
|
## ~~~~~~~~~~~~~~~~~~~
|
|
##
|
|
## The most reliable way to handle exceptions is to use ``yield`` on a future
|
|
## then check the future's ``failed`` property. For example:
|
|
##
|
|
## .. code-block:: Nim
|
|
## var future = sock.recv(100)
|
|
## yield future
|
|
## if future.failed:
|
|
## # Handle exception
|
|
##
|
|
## The ``async`` procedures also offer limited support for the try statement.
|
|
##
|
|
## .. code-block:: Nim
|
|
## try:
|
|
## let data = await sock.recv(100)
|
|
## echo("Received ", data)
|
|
## except:
|
|
## # Handle exception
|
|
##
|
|
## Unfortunately the semantics of the try statement may not always be correct,
|
|
## and occasionally the compilation may fail altogether.
|
|
## As such it is better to use the former style when possible.
|
|
##
|
|
##
|
|
## Discarding futures
|
|
## ------------------
|
|
##
|
|
## Futures should **never** be discarded. This is because they may contain
|
|
## errors. If you do not care for the result of a Future then you should
|
|
## use the ``asyncCheck`` procedure instead of the ``discard`` keyword.
|
|
##
|
|
## Examples
|
|
## --------
|
|
##
|
|
## For examples take a look at the documentation for the modules implementing
|
|
## asynchronous IO. A good place to start is the
|
|
## `asyncnet module <asyncnet.html>`_.
|
|
##
|
|
## Limitations/Bugs
|
|
## ----------------
|
|
##
|
|
## * The effect system (``raises: []``) does not work with async procedures.
|
|
## * Can't await in a ``except`` body
|
|
## * Forward declarations for async procs are broken,
|
|
## link includes workaround: https://github.com/nim-lang/Nim/issues/3182.
|
|
|
|
# TODO: Check if yielded future is nil and throw a more meaningful exception
|
|
|
|
const
|
|
unixPlatform = defined(macosx) or defined(freebsd) or
|
|
defined(netbsd) or defined(openbsd) or
|
|
defined(dragonfly) or defined(macos) or
|
|
defined(linux) or defined(android) or
|
|
defined(solaris)
|
|
MaxEventsCount* = 64
|
|
|
|
when defined(windows):
|
|
import winlean, sets, hashes
|
|
elif unixPlatform:
|
|
import ./selectors2
|
|
from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
|
|
MSG_NOSIGNAL
|
|
from posix import SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT,
|
|
SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2,
|
|
SIGPIPE, SIGALRM, SIGTERM, SIGPIPE
|
|
export SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT,
|
|
SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2,
|
|
SIGPIPE, SIGALRM, SIGTERM, SIGPIPE
|
|
|
|
type
|
|
CallbackFunc* = proc (arg: pointer) {.gcsafe, raises: [Defect].}
|
|
|
|
AsyncCallback* = object
|
|
function*: CallbackFunc
|
|
udata*: pointer
|
|
|
|
AsyncError* = object of CatchableError
|
|
## Generic async exception
|
|
AsyncTimeoutError* = object of AsyncError
|
|
## Timeout exception
|
|
|
|
TimerCallback* = ref object
|
|
finishAt*: Moment
|
|
function*: AsyncCallback
|
|
|
|
TrackerBase* = ref object of RootRef
|
|
id*: string
|
|
dump*: proc(): string {.gcsafe, raises: [Defect].}
|
|
isLeaked*: proc(): bool {.gcsafe, raises: [Defect].}
|
|
|
|
PDispatcherBase = ref object of RootRef
|
|
timers*: HeapQueue[TimerCallback]
|
|
callbacks*: Deque[AsyncCallback]
|
|
idlers*: Deque[AsyncCallback]
|
|
trackers*: Table[string, TrackerBase]
|
|
|
|
const chronosDurationThreshold {.intdefine.} = 0
|
|
|
|
template newAsyncCallback*(cbFunc: CallbackFunc, udataPtr: pointer = nil): AsyncCallback =
|
|
AsyncCallback(function: cbFunc, udata: udataPtr)
|
|
|
|
when defined(chronosDurationThreshold):
|
|
type
|
|
ChronosDurationThreadholdBreachedCallback* = proc(durationUs: int64) {.gcsafe, raises: [].}
|
|
|
|
proc defaultThresholdBreachedHandler(durationUs: int64) {.gcsafe, raises: [].} =
|
|
raise newException(Defect, "ChronosDurationThreshold breached. Time taken: " & $durationUs & " usecs")
|
|
|
|
var chronosDurationThresholdBreachedHandler {.threadvar.} : ChronosDurationThreadholdBreachedCallback
|
|
|
|
proc setChronosDurationThresholdBreachedHandler*(handler: ChronosDurationThreadholdBreachedCallback) =
|
|
chronosDurationThresholdBreachedHandler = handler
|
|
|
|
proc invokeChronosDurationThresholdBreachedHandler(durationUs: int64) =
|
|
if chronosDurationThresholdBreachedHandler == nil:
|
|
defaultThresholdBreachedHandler(durationUs)
|
|
return
|
|
|
|
chronosDurationThresholdBreachedHandler(durationUs)
|
|
|
|
proc sentinelCallbackImpl(arg: pointer) {.gcsafe, raises: [Defect].} =
|
|
raiseAssert "Sentinel callback MUST not be scheduled"
|
|
|
|
const
|
|
SentinelCallback = AsyncCallback(function: sentinelCallbackImpl)
|
|
|
|
proc isSentinel(acb: AsyncCallback): bool {.raises: [Defect].} =
|
|
acb == SentinelCallback
|
|
|
|
proc `<`(a, b: TimerCallback): bool =
|
|
result = a.finishAt < b.finishAt
|
|
|
|
func getAsyncTimestamp*(a: Duration): auto {.inline.} =
|
|
## Return rounded up value of duration with milliseconds resolution.
|
|
##
|
|
## This function also take care on int32 overflow, because Linux and Windows
|
|
## accepts signed 32bit integer as timeout.
|
|
let milsec = Millisecond.nanoseconds()
|
|
let nansec = a.nanoseconds()
|
|
var res = nansec div milsec
|
|
let mid = nansec mod milsec
|
|
when defined(windows):
|
|
res = min(int64(high(int32) - 1), res)
|
|
result = cast[DWORD](res)
|
|
result += DWORD(min(1'i32, cast[int32](mid)))
|
|
else:
|
|
res = min(int64(high(int32) - 1), res)
|
|
result = cast[int32](res)
|
|
result += min(1, cast[int32](mid))
|
|
|
|
template processTimersGetTimeout(loop, timeout: untyped) =
|
|
var lastFinish = curTime
|
|
while loop.timers.len > 0:
|
|
if loop.timers[0].function.function.isNil:
|
|
discard loop.timers.pop()
|
|
continue
|
|
|
|
lastFinish = loop.timers[0].finishAt
|
|
if curTime < lastFinish:
|
|
break
|
|
|
|
loop.callbacks.addLast(loop.timers.pop().function)
|
|
|
|
if loop.timers.len > 0:
|
|
timeout = (lastFinish - curTime).getAsyncTimestamp()
|
|
|
|
if timeout == 0:
|
|
if (len(loop.callbacks) == 0) and (len(loop.idlers) == 0):
|
|
when defined(windows):
|
|
timeout = INFINITE
|
|
else:
|
|
timeout = -1
|
|
else:
|
|
if (len(loop.callbacks) != 0) or (len(loop.idlers) != 0):
|
|
timeout = 0
|
|
|
|
template processTimers(loop: untyped) =
|
|
var curTime = Moment.now()
|
|
while loop.timers.len > 0:
|
|
if loop.timers[0].function.function.isNil:
|
|
discard loop.timers.pop()
|
|
continue
|
|
|
|
if curTime < loop.timers[0].finishAt:
|
|
break
|
|
loop.callbacks.addLast(loop.timers.pop().function)
|
|
|
|
template processIdlers(loop: untyped) =
|
|
if len(loop.idlers) > 0:
|
|
loop.callbacks.addLast(loop.idlers.popFirst())
|
|
|
|
template processCallbacks(loop: untyped) =
|
|
while true:
|
|
let callable = loop.callbacks.popFirst() # len must be > 0 due to sentinel
|
|
if isSentinel(callable):
|
|
break
|
|
if not(isNil(callable.function)):
|
|
when defined(chronosDurationThreshold):
|
|
let startTime = getMonoTime().ticks
|
|
|
|
callable.function(callable.udata)
|
|
|
|
when defined(chronosDurationThreshold):
|
|
let
|
|
durationNs = getMonoTime().ticks - startTime
|
|
durationUs = durationNs div 1000
|
|
|
|
chronosCallbackDuration.set(durationNs)
|
|
|
|
if durationUs > chronosDurationThreshold:
|
|
invokeChronosDurationThresholdBreachedHandler(durationUs)
|
|
|
|
proc raiseAsDefect*(exc: ref Exception, msg: string) {.
|
|
raises: [Defect], noreturn, noinline.} =
|
|
# Reraise an exception as a Defect, where it's unexpected and can't be handled
|
|
# We include the stack trace in the message because otherwise, it's easily
|
|
# lost - Nim doesn't print it for `parent` exceptions for example (!)
|
|
raise (ref Defect)(
|
|
msg: msg & "\n" & exc.msg & "\n" & exc.getStackTrace(), parent: exc)
|
|
|
|
when defined(windows):
|
|
type
|
|
WSAPROC_TRANSMITFILE = proc(hSocket: SocketHandle, hFile: Handle,
|
|
nNumberOfBytesToWrite: DWORD,
|
|
nNumberOfBytesPerSend: DWORD,
|
|
lpOverlapped: POVERLAPPED,
|
|
lpTransmitBuffers: pointer,
|
|
dwReserved: DWORD): cint {.
|
|
gcsafe, stdcall, raises: [].}
|
|
|
|
LPFN_GETQUEUEDCOMPLETIONSTATUSEX = proc(completionPort: Handle,
|
|
lpPortEntries: ptr OVERLAPPED_ENTRY,
|
|
ulCount: DWORD,
|
|
ulEntriesRemoved: var ULONG,
|
|
dwMilliseconds: DWORD,
|
|
fAlertable: WINBOOL): WINBOOL {.
|
|
gcsafe, stdcall, raises: [].}
|
|
|
|
CompletionKey = ULONG_PTR
|
|
|
|
CompletionData* = object
|
|
cb*: CallbackFunc
|
|
errCode*: OSErrorCode
|
|
bytesCount*: int32
|
|
udata*: pointer
|
|
|
|
CustomOverlapped* = object of OVERLAPPED
|
|
data*: CompletionData
|
|
|
|
OVERLAPPED_ENTRY* = object
|
|
lpCompletionKey*: ULONG_PTR
|
|
lpOverlapped*: ptr CustomOverlapped
|
|
internal: ULONG_PTR
|
|
dwNumberOfBytesTransferred: DWORD
|
|
|
|
PDispatcher* = ref object of PDispatcherBase
|
|
ioPort: Handle
|
|
handles: HashSet[AsyncFD]
|
|
connectEx*: WSAPROC_CONNECTEX
|
|
acceptEx*: WSAPROC_ACCEPTEX
|
|
getAcceptExSockAddrs*: WSAPROC_GETACCEPTEXSOCKADDRS
|
|
transmitFile*: WSAPROC_TRANSMITFILE
|
|
getQueuedCompletionStatusEx*: LPFN_GETQUEUEDCOMPLETIONSTATUSEX
|
|
|
|
PtrCustomOverlapped* = ptr CustomOverlapped
|
|
|
|
RefCustomOverlapped* = ref CustomOverlapped
|
|
|
|
AsyncFD* = distinct int
|
|
|
|
proc getModuleHandle(lpModuleName: WideCString): Handle {.
|
|
stdcall, dynlib: "kernel32", importc: "GetModuleHandleW", sideEffect.}
|
|
proc getProcAddress(hModule: Handle, lpProcName: cstring): pointer {.
|
|
stdcall, dynlib: "kernel32", importc: "GetProcAddress", sideEffect.}
|
|
proc rtlNtStatusToDosError(code: uint64): ULONG {.
|
|
stdcall, dynlib: "ntdll", importc: "RtlNtStatusToDosError", sideEffect.}
|
|
|
|
proc hash(x: AsyncFD): Hash {.borrow.}
|
|
proc `==`*(x: AsyncFD, y: AsyncFD): bool {.borrow, gcsafe.}
|
|
|
|
proc getFunc(s: SocketHandle, fun: var pointer, guid: var GUID): bool =
|
|
var bytesRet: DWORD
|
|
fun = nil
|
|
result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid,
|
|
sizeof(GUID).DWORD, addr fun, sizeof(pointer).DWORD,
|
|
addr bytesRet, nil, nil) == 0
|
|
|
|
proc globalInit() {.raises: [Defect, OSError].} =
|
|
var wsa: WSAData
|
|
if wsaStartup(0x0202'i16, addr wsa) != 0:
|
|
raiseOSError(osLastError())
|
|
|
|
proc initAPI(loop: PDispatcher) {.raises: [Defect, CatchableError].} =
|
|
var
|
|
WSAID_TRANSMITFILE = GUID(
|
|
D1: 0xb5367df0'i32, D2: 0xcbac'i16, D3: 0x11cf'i16,
|
|
D4: [0x95'i8, 0xca'i8, 0x00'i8, 0x80'i8,
|
|
0x5f'i8, 0x48'i8, 0xa1'i8, 0x92'i8])
|
|
|
|
let kernel32 = getModuleHandle(newWideCString("kernel32.dll"))
|
|
loop.getQueuedCompletionStatusEx = cast[LPFN_GETQUEUEDCOMPLETIONSTATUSEX](
|
|
getProcAddress(kernel32, "GetQueuedCompletionStatusEx"))
|
|
|
|
let sock = winlean.socket(winlean.AF_INET, 1, 6)
|
|
if sock == INVALID_SOCKET:
|
|
raiseOSError(osLastError())
|
|
|
|
var funcPointer: pointer = nil
|
|
if not getFunc(sock, funcPointer, WSAID_CONNECTEX):
|
|
let err = osLastError()
|
|
close(sock)
|
|
raiseOSError(err)
|
|
loop.connectEx = cast[WSAPROC_CONNECTEX](funcPointer)
|
|
if not getFunc(sock, funcPointer, WSAID_ACCEPTEX):
|
|
let err = osLastError()
|
|
close(sock)
|
|
raiseOSError(err)
|
|
loop.acceptEx = cast[WSAPROC_ACCEPTEX](funcPointer)
|
|
if not getFunc(sock, funcPointer, WSAID_GETACCEPTEXSOCKADDRS):
|
|
let err = osLastError()
|
|
close(sock)
|
|
raiseOSError(err)
|
|
loop.getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](funcPointer)
|
|
if not getFunc(sock, funcPointer, WSAID_TRANSMITFILE):
|
|
let err = osLastError()
|
|
close(sock)
|
|
raiseOSError(err)
|
|
loop.transmitFile = cast[WSAPROC_TRANSMITFILE](funcPointer)
|
|
close(sock)
|
|
|
|
proc newDispatcher*(): PDispatcher {.raises: [Defect, CatchableError].} =
|
|
## Creates a new Dispatcher instance.
|
|
var res = PDispatcher()
|
|
res.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
|
|
when declared(initHashSet):
|
|
# After 0.20.0 Nim's stdlib version
|
|
res.handles = initHashSet[AsyncFD]()
|
|
else:
|
|
# Pre 0.20.0 Nim's stdlib version
|
|
res.handles = initSet[AsyncFD]()
|
|
when declared(initHeapQueue):
|
|
# After 0.20.0 Nim's stdlib version
|
|
res.timers = initHeapQueue[TimerCallback]()
|
|
else:
|
|
# Pre 0.20.0 Nim's stdlib version
|
|
res.timers = newHeapQueue[TimerCallback]()
|
|
res.callbacks = initDeque[AsyncCallback](64)
|
|
res.callbacks.addLast(SentinelCallback)
|
|
res.idlers = initDeque[AsyncCallback]()
|
|
res.trackers = initTable[string, TrackerBase]()
|
|
initAPI(res)
|
|
res
|
|
|
|
var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
|
|
|
|
proc setThreadDispatcher*(disp: PDispatcher) {.gcsafe, raises: [Defect].}
|
|
proc getThreadDispatcher*(): PDispatcher {.gcsafe, raises: [Defect].}
|
|
|
|
proc getIoHandler*(disp: PDispatcher): Handle =
|
|
## Returns the underlying IO Completion Port handle (Windows) or selector
|
|
## (Unix) for the specified dispatcher.
|
|
return disp.ioPort
|
|
|
|
proc register*(fd: AsyncFD) {.raises: [Defect, CatchableError].} =
|
|
## Register file descriptor ``fd`` in thread's dispatcher.
|
|
let loop = getThreadDispatcher()
|
|
if createIoCompletionPort(fd.Handle, loop.ioPort,
|
|
cast[CompletionKey](fd), 1) == 0:
|
|
raiseOSError(osLastError())
|
|
loop.handles.incl(fd)
|
|
|
|
proc unregister*(fd: AsyncFD) {.raises: [Defect, CatchableError].} =
|
|
## Unregisters ``fd``.
|
|
getThreadDispatcher().handles.excl(fd)
|
|
|
|
proc poll*() {.raises: [Defect, CatchableError].} =
|
|
## Perform single asynchronous step, processing timers and completing
|
|
## tasks. Blocks until at least one event has completed.
|
|
##
|
|
## Exceptions raised here indicate that waiting for tasks to be unblocked
|
|
## failed - exceptions from within tasks are instead propagated through
|
|
## their respective futures and not allowed to interrrupt the poll call.
|
|
let loop = getThreadDispatcher()
|
|
var
|
|
curTime = Moment.now()
|
|
curTimeout = DWORD(0)
|
|
events: array[MaxEventsCount, OVERLAPPED_ENTRY]
|
|
|
|
# On reentrant `poll` calls from `processCallbacks`, e.g., `waitFor`,
|
|
# complete pending work of the outer `processCallbacks` call.
|
|
# On non-reentrant `poll` calls, this only removes sentinel element.
|
|
processCallbacks(loop)
|
|
|
|
# Moving expired timers to `loop.callbacks` and calculate timeout
|
|
loop.processTimersGetTimeout(curTimeout)
|
|
|
|
let networkEventsCount =
|
|
if isNil(loop.getQueuedCompletionStatusEx):
|
|
let res = getQueuedCompletionStatus(
|
|
loop.ioPort,
|
|
addr events[0].dwNumberOfBytesTransferred,
|
|
addr events[0].lpCompletionKey,
|
|
cast[ptr POVERLAPPED](addr events[0].lpOverlapped),
|
|
curTimeout
|
|
)
|
|
if res == WINBOOL(0):
|
|
let errCode = osLastError()
|
|
if not(isNil(events[0].lpOverlapped)):
|
|
1
|
|
else:
|
|
if int32(errCode) != WAIT_TIMEOUT:
|
|
raiseOSError(errCode)
|
|
0
|
|
else:
|
|
1
|
|
else:
|
|
var eventsReceived = ULONG(0)
|
|
let res = loop.getQueuedCompletionStatusEx(
|
|
loop.ioPort,
|
|
addr events[0],
|
|
ULONG(len(events)),
|
|
eventsReceived,
|
|
curTimeout,
|
|
WINBOOL(0)
|
|
)
|
|
if res == WINBOOL(0):
|
|
let errCode = osLastError()
|
|
if int32(errCode) != WAIT_TIMEOUT:
|
|
raiseOSError(errCode)
|
|
0
|
|
else:
|
|
eventsReceived
|
|
|
|
for i in 0 ..< networkEventsCount:
|
|
var customOverlapped = events[i].lpOverlapped
|
|
customOverlapped.data.errCode =
|
|
block:
|
|
let res = cast[uint64](customOverlapped.internal)
|
|
if res == 0'u64:
|
|
OSErrorCode(-1)
|
|
else:
|
|
OSErrorCode(rtlNtStatusToDosError(res))
|
|
customOverlapped.data.bytesCount = events[i].dwNumberOfBytesTransferred
|
|
let acb = newAsyncCallback(customOverlapped.data.cb,
|
|
cast[pointer](customOverlapped))
|
|
loop.callbacks.addLast(acb)
|
|
|
|
# Moving expired timers to `loop.callbacks`.
|
|
loop.processTimers()
|
|
|
|
# We move idle callbacks to `loop.callbacks` only if there no pending
|
|
# network events.
|
|
if networkEventsCount == 0:
|
|
loop.processIdlers()
|
|
|
|
# All callbacks which will be added during `processCallbacks` will be
|
|
# scheduled after the sentinel and are processed on next `poll()` call.
|
|
loop.callbacks.addLast(SentinelCallback)
|
|
processCallbacks(loop)
|
|
|
|
# All callbacks done, skip `processCallbacks` at start.
|
|
loop.callbacks.addFirst(SentinelCallback)
|
|
|
|
proc closeSocket*(fd: AsyncFD, aftercb: CallbackFunc = nil) =
|
|
## Closes a socket and ensures that it is unregistered.
|
|
let loop = getThreadDispatcher()
|
|
loop.handles.excl(fd)
|
|
close(SocketHandle(fd))
|
|
if not isNil(aftercb):
|
|
var acb = newAsyncCallback(aftercb)
|
|
loop.callbacks.addLast(acb)
|
|
|
|
proc closeHandle*(fd: AsyncFD, aftercb: CallbackFunc = nil) =
|
|
## Closes a (pipe/file) handle and ensures that it is unregistered.
|
|
let loop = getThreadDispatcher()
|
|
loop.handles.excl(fd)
|
|
discard closeHandle(Handle(fd))
|
|
if not isNil(aftercb):
|
|
var acb = newAsyncCallback(aftercb)
|
|
loop.callbacks.addLast(acb)
|
|
|
|
proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
|
|
## Returns ``true`` if ``fd`` is registered in thread's dispatcher.
|
|
return fd in disp.handles
|
|
|
|
elif unixPlatform:
|
|
const
|
|
SIG_IGN = cast[proc(x: cint) {.raises: [], noconv, gcsafe.}](1)
|
|
|
|
type
|
|
AsyncFD* = distinct cint
|
|
|
|
SelectorData* = object
|
|
reader*: AsyncCallback
|
|
writer*: AsyncCallback
|
|
|
|
PDispatcher* = ref object of PDispatcherBase
|
|
selector: Selector[SelectorData]
|
|
keys: seq[ReadyKey]
|
|
|
|
proc `==`*(x, y: AsyncFD): bool {.borrow, gcsafe.}
|
|
|
|
proc globalInit() =
|
|
# We are ignoring SIGPIPE signal, because we are working with EPIPE.
|
|
posix.signal(cint(SIGPIPE), SIG_IGN)
|
|
|
|
proc initAPI(disp: PDispatcher) {.raises: [Defect, CatchableError].} =
|
|
discard
|
|
|
|
proc newDispatcher*(): PDispatcher {.raises: [Defect, CatchableError].} =
|
|
## Create new dispatcher.
|
|
var res = PDispatcher()
|
|
res.selector = newSelector[SelectorData]()
|
|
when declared(initHeapQueue):
|
|
# After 0.20.0 Nim's stdlib version
|
|
res.timers = initHeapQueue[TimerCallback]()
|
|
else:
|
|
# Before 0.20.0 Nim's stdlib version
|
|
res.timers.newHeapQueue()
|
|
res.callbacks = initDeque[AsyncCallback](64)
|
|
res.callbacks.addLast(SentinelCallback)
|
|
res.idlers = initDeque[AsyncCallback]()
|
|
res.keys = newSeq[ReadyKey](64)
|
|
res.trackers = initTable[string, TrackerBase]()
|
|
initAPI(res)
|
|
res
|
|
|
|
var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
|
|
|
|
proc setThreadDispatcher*(disp: PDispatcher) {.gcsafe, raises: [Defect].}
|
|
proc getThreadDispatcher*(): PDispatcher {.gcsafe, raises: [Defect].}
|
|
|
|
proc getIoHandler*(disp: PDispatcher): Selector[SelectorData] =
|
|
## Returns system specific OS queue.
|
|
return disp.selector
|
|
|
|
proc register*(fd: AsyncFD) {.raises: [Defect, CatchableError].} =
|
|
## Register file descriptor ``fd`` in thread's dispatcher.
|
|
let loop = getThreadDispatcher()
|
|
var data: SelectorData
|
|
loop.selector.registerHandle(int(fd), {}, data)
|
|
|
|
proc unregister*(fd: AsyncFD) {.raises: [Defect, CatchableError].} =
|
|
## Unregister file descriptor ``fd`` from thread's dispatcher.
|
|
getThreadDispatcher().selector.unregister(int(fd))
|
|
|
|
proc contains*(disp: PDispatcher, fd: AsyncFD): bool {.inline.} =
|
|
## Returns ``true`` if ``fd`` is registered in thread's dispatcher.
|
|
result = int(fd) in disp.selector
|
|
|
|
proc addReader*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil) {.
|
|
raises: [Defect, IOSelectorsException, ValueError].} =
|
|
## Start watching the file descriptor ``fd`` for read availability and then
|
|
## call the callback ``cb`` with specified argument ``udata``.
|
|
let loop = getThreadDispatcher()
|
|
var newEvents = {Event.Read}
|
|
withData(loop.selector, int(fd), adata) do:
|
|
let acb = newAsyncCallback(cb, udata)
|
|
adata.reader = acb
|
|
newEvents.incl(Event.Read)
|
|
if not(isNil(adata.writer.function)):
|
|
newEvents.incl(Event.Write)
|
|
do:
|
|
raise newException(ValueError, "File descriptor not registered.")
|
|
loop.selector.updateHandle(int(fd), newEvents)
|
|
|
|
proc removeReader*(fd: AsyncFD) {.
|
|
raises: [Defect, IOSelectorsException, ValueError].} =
|
|
## Stop watching the file descriptor ``fd`` for read availability.
|
|
let loop = getThreadDispatcher()
|
|
var newEvents: set[Event]
|
|
withData(loop.selector, int(fd), adata) do:
|
|
# We need to clear `reader` data, because `selectors` don't do it
|
|
adata.reader = default(AsyncCallback)
|
|
if not(isNil(adata.writer.function)):
|
|
newEvents.incl(Event.Write)
|
|
do:
|
|
raise newException(ValueError, "File descriptor not registered.")
|
|
loop.selector.updateHandle(int(fd), newEvents)
|
|
|
|
proc addWriter*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil) {.
|
|
raises: [Defect, IOSelectorsException, ValueError].} =
|
|
## Start watching the file descriptor ``fd`` for write availability and then
|
|
## call the callback ``cb`` with specified argument ``udata``.
|
|
let loop = getThreadDispatcher()
|
|
var newEvents = {Event.Write}
|
|
withData(loop.selector, int(fd), adata) do:
|
|
let acb = newAsyncCallback(cb, udata)
|
|
adata.writer = acb
|
|
newEvents.incl(Event.Write)
|
|
if not(isNil(adata.reader.function)):
|
|
newEvents.incl(Event.Read)
|
|
do:
|
|
raise newException(ValueError, "File descriptor not registered.")
|
|
loop.selector.updateHandle(int(fd), newEvents)
|
|
|
|
proc removeWriter*(fd: AsyncFD) {.
|
|
raises: [Defect, IOSelectorsException, ValueError].} =
|
|
## Stop watching the file descriptor ``fd`` for write availability.
|
|
let loop = getThreadDispatcher()
|
|
var newEvents: set[Event]
|
|
withData(loop.selector, int(fd), adata) do:
|
|
# We need to clear `writer` data, because `selectors` don't do it
|
|
adata.writer = default(AsyncCallback)
|
|
if not(isNil(adata.reader.function)):
|
|
newEvents.incl(Event.Read)
|
|
do:
|
|
raise newException(ValueError, "File descriptor not registered.")
|
|
loop.selector.updateHandle(int(fd), newEvents)
|
|
|
|
proc closeSocket*(fd: AsyncFD, aftercb: CallbackFunc = nil) =
|
|
## Close asynchronous socket.
|
|
##
|
|
## Please note, that socket is not closed immediately. To avoid bugs with
|
|
## closing socket, while operation pending, socket will be closed as
|
|
## soon as all pending operations will be notified.
|
|
## You can execute ``aftercb`` before actual socket close operation.
|
|
let loop = getThreadDispatcher()
|
|
|
|
proc continuation(udata: pointer) =
|
|
if SocketHandle(fd) in loop.selector:
|
|
try:
|
|
unregister(fd)
|
|
except CatchableError as exc:
|
|
raiseAsDefect(exc, "unregister failed")
|
|
|
|
close(SocketHandle(fd))
|
|
if not isNil(aftercb):
|
|
aftercb(nil)
|
|
|
|
withData(loop.selector, int(fd), adata) do:
|
|
# We are scheduling reader and writer callbacks to be called
|
|
# explicitly, so they can get an error and continue work.
|
|
# Callbacks marked as deleted so we don't need to get REAL notifications
|
|
# from system queue for this reader and writer.
|
|
|
|
if not(isNil(adata.reader.function)):
|
|
loop.callbacks.addLast(adata.reader)
|
|
adata.reader = default(AsyncCallback)
|
|
|
|
if not(isNil(adata.writer.function)):
|
|
loop.callbacks.addLast(adata.writer)
|
|
adata.writer = default(AsyncCallback)
|
|
|
|
# We can't unregister file descriptor from system queue here, because
|
|
# in such case processing queue will stuck on poll() call, because there
|
|
# can be no file descriptors registered in system queue.
|
|
var acb = newAsyncCallback(continuation)
|
|
loop.callbacks.addLast(acb)
|
|
|
|
proc closeHandle*(fd: AsyncFD, aftercb: CallbackFunc = nil) =
|
|
## Close asynchronous file/pipe handle.
|
|
##
|
|
## Please note, that socket is not closed immediately. To avoid bugs with
|
|
## closing socket, while operation pending, socket will be closed as
|
|
## soon as all pending operations will be notified.
|
|
## You can execute ``aftercb`` before actual socket close operation.
|
|
closeSocket(fd, aftercb)
|
|
|
|
when ioselSupportedPlatform:
|
|
proc addSignal*(signal: int, cb: CallbackFunc,
|
|
udata: pointer = nil): int {.
|
|
raises: [Defect, IOSelectorsException, ValueError, OSError].} =
|
|
## Start watching signal ``signal``, and when signal appears, call the
|
|
## callback ``cb`` with specified argument ``udata``. Returns signal
|
|
## identifier code, which can be used to remove signal callback
|
|
## via ``removeSignal``.
|
|
let loop = getThreadDispatcher()
|
|
var data: SelectorData
|
|
result = loop.selector.registerSignal(signal, data)
|
|
withData(loop.selector, result, adata) do:
|
|
adata.reader = newAsyncCallback(cb, udata)
|
|
do:
|
|
raise newException(ValueError, "File descriptor not registered.")
|
|
|
|
proc removeSignal*(sigfd: int) {.
|
|
raises: [Defect, IOSelectorsException].} =
|
|
## Remove watching signal ``signal``.
|
|
let loop = getThreadDispatcher()
|
|
loop.selector.unregister(sigfd)
|
|
|
|
proc poll*() {.raises: [Defect, CatchableError].} =
|
|
## Perform single asynchronous step.
|
|
let loop = getThreadDispatcher()
|
|
var curTime = Moment.now()
|
|
var curTimeout = 0
|
|
|
|
when ioselSupportedPlatform:
|
|
let customSet = {Event.Timer, Event.Signal, Event.Process,
|
|
Event.Vnode}
|
|
|
|
# On reentrant `poll` calls from `processCallbacks`, e.g., `waitFor`,
|
|
# complete pending work of the outer `processCallbacks` call.
|
|
# On non-reentrant `poll` calls, this only removes sentinel element.
|
|
processCallbacks(loop)
|
|
|
|
# Moving expired timers to `loop.callbacks` and calculate timeout.
|
|
loop.processTimersGetTimeout(curTimeout)
|
|
|
|
# Processing IO descriptors and all hardware events.
|
|
let count = loop.selector.selectInto(curTimeout, loop.keys)
|
|
for i in 0..<count:
|
|
let fd = loop.keys[i].fd
|
|
let events = loop.keys[i].events
|
|
|
|
withData(loop.selector, fd, adata) do:
|
|
if Event.Read in events or events == {Event.Error}:
|
|
if not isNil(adata.reader.function):
|
|
loop.callbacks.addLast(adata.reader)
|
|
|
|
if Event.Write in events or events == {Event.Error}:
|
|
if not isNil(adata.writer.function):
|
|
loop.callbacks.addLast(adata.writer)
|
|
|
|
if Event.User in events:
|
|
if not isNil(adata.reader.function):
|
|
loop.callbacks.addLast(adata.reader)
|
|
|
|
when ioselSupportedPlatform:
|
|
if customSet * events != {}:
|
|
if not isNil(adata.reader.function):
|
|
loop.callbacks.addLast(adata.reader)
|
|
|
|
# Moving expired timers to `loop.callbacks`.
|
|
loop.processTimers()
|
|
|
|
# We move idle callbacks to `loop.callbacks` only if there no pending
|
|
# network events.
|
|
if count == 0:
|
|
loop.processIdlers()
|
|
|
|
# All callbacks which will be added during `processCallbacks` will be
|
|
# scheduled after the sentinel and are processed on next `poll()` call.
|
|
loop.callbacks.addLast(SentinelCallback)
|
|
processCallbacks(loop)
|
|
|
|
# All callbacks done, skip `processCallbacks` at start.
|
|
loop.callbacks.addFirst(SentinelCallback)
|
|
|
|
else:
|
|
proc initAPI() = discard
|
|
proc globalInit() = discard
|
|
|
|
proc setThreadDispatcher*(disp: PDispatcher) =
|
|
## Set current thread's dispatcher instance to ``disp``.
|
|
if not gDisp.isNil:
|
|
doAssert gDisp.callbacks.len == 0
|
|
gDisp = disp
|
|
|
|
proc getThreadDispatcher*(): PDispatcher =
|
|
## Returns current thread's dispatcher instance.
|
|
if gDisp.isNil:
|
|
try:
|
|
setThreadDispatcher(newDispatcher())
|
|
except CatchableError as exc:
|
|
raiseAsDefect exc, "Cannot create dispatcher"
|
|
gDisp
|
|
|
|
proc setGlobalDispatcher*(disp: PDispatcher) {.
|
|
gcsafe, deprecated: "Use setThreadDispatcher() instead".} =
|
|
setThreadDispatcher(disp)
|
|
|
|
proc getGlobalDispatcher*(): PDispatcher {.
|
|
gcsafe, deprecated: "Use getThreadDispatcher() instead".} =
|
|
getThreadDispatcher()
|
|
|
|
proc setTimer*(at: Moment, cb: CallbackFunc,
|
|
udata: pointer = nil): TimerCallback =
|
|
## Arrange for the callback ``cb`` to be called at the given absolute
|
|
## timestamp ``at``. You can also pass ``udata`` to callback.
|
|
let loop = getThreadDispatcher()
|
|
result = TimerCallback(finishAt: at,
|
|
function: newAsyncCallback(cb, udata))
|
|
loop.timers.push(result)
|
|
|
|
proc clearTimer*(timer: TimerCallback) {.inline.} =
|
|
timer.function = default(AsyncCallback)
|
|
|
|
proc addTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) {.
|
|
inline, deprecated: "Use setTimer/clearTimer instead".} =
|
|
## Arrange for the callback ``cb`` to be called at the given absolute
|
|
## timestamp ``at``. You can also pass ``udata`` to callback.
|
|
discard setTimer(at, cb, udata)
|
|
|
|
proc addTimer*(at: int64, cb: CallbackFunc, udata: pointer = nil) {.
|
|
inline, deprecated: "Use addTimer(Duration, cb, udata)".} =
|
|
discard setTimer(Moment.init(at, Millisecond), cb, udata)
|
|
|
|
proc addTimer*(at: uint64, cb: CallbackFunc, udata: pointer = nil) {.
|
|
inline, deprecated: "Use addTimer(Duration, cb, udata)".} =
|
|
discard setTimer(Moment.init(int64(at), Millisecond), cb, udata)
|
|
|
|
proc removeTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) =
|
|
## Remove timer callback ``cb`` with absolute timestamp ``at`` from waiting
|
|
## queue.
|
|
let loop = getThreadDispatcher()
|
|
var list = cast[seq[TimerCallback]](loop.timers)
|
|
var index = -1
|
|
for i in 0..<len(list):
|
|
if list[i].finishAt == at and list[i].function.function == cb and
|
|
list[i].function.udata == udata:
|
|
index = i
|
|
break
|
|
if index != -1:
|
|
loop.timers.del(index)
|
|
|
|
proc removeTimer*(at: int64, cb: CallbackFunc, udata: pointer = nil) {.
|
|
inline, deprecated: "Use removeTimer(Duration, cb, udata)".} =
|
|
removeTimer(Moment.init(at, Millisecond), cb, udata)
|
|
|
|
proc removeTimer*(at: uint64, cb: CallbackFunc, udata: pointer = nil) {.
|
|
inline, deprecated: "Use removeTimer(Duration, cb, udata)".} =
|
|
removeTimer(Moment.init(int64(at), Millisecond), cb, udata)
|
|
|
|
proc callSoon*(acb: AsyncCallback) {.gcsafe, raises: [Defect].} =
|
|
## Schedule `cbproc` to be called as soon as possible.
|
|
## The callback is called when control returns to the event loop.
|
|
getThreadDispatcher().callbacks.addLast(acb)
|
|
|
|
proc callSoon*(cbproc: CallbackFunc, data: pointer) {.
|
|
gcsafe, raises: [Defect].} =
|
|
## Schedule `cbproc` to be called as soon as possible.
|
|
## The callback is called when control returns to the event loop.
|
|
doAssert(not isNil(cbproc))
|
|
callSoon(newAsyncCallback(cbproc, data))
|
|
|
|
proc callSoon*(cbproc: CallbackFunc) {.gcsafe, raises: [Defect].} =
|
|
callSoon(cbproc, nil)
|
|
|
|
proc callIdle*(acb: AsyncCallback) {.gcsafe, raises: [Defect].} =
|
|
## Schedule ``cbproc`` to be called when there no pending network events
|
|
## available.
|
|
##
|
|
## **WARNING!** Despite the name, "idle" callbacks called on every loop
|
|
## iteration if there no network events available, not when the loop is
|
|
## actually "idle".
|
|
getThreadDispatcher().idlers.addLast(acb)
|
|
|
|
proc callIdle*(cbproc: CallbackFunc, data: pointer) {.
|
|
gcsafe, raises: [Defect].} =
|
|
## Schedule ``cbproc`` to be called when there no pending network events
|
|
## available.
|
|
##
|
|
## **WARNING!** Despite the name, "idle" callbacks called on every loop
|
|
## iteration if there no network events available, not when the loop is
|
|
## actually "idle".
|
|
doAssert(not isNil(cbproc))
|
|
callIdle(newAsyncCallback(cbproc, data))
|
|
|
|
proc callIdle*(cbproc: CallbackFunc) {.gcsafe, raises: [Defect].} =
|
|
callIdle(cbproc, nil)
|
|
|
|
include asyncfutures2
|
|
|
|
when not(defined(windows)):
|
|
when ioselSupportedPlatform:
|
|
proc waitSignal*(signal: int): Future[void] {.
|
|
raises: [Defect].} =
|
|
var retFuture = newFuture[void]("chronos.waitSignal()")
|
|
var sigfd: int = -1
|
|
|
|
template getSignalException(e: untyped): untyped =
|
|
newException(AsyncError, "Could not manipulate signal handler, " &
|
|
"reason [" & $e.name & "]: " & $e.msg)
|
|
|
|
proc continuation(udata: pointer) {.gcsafe.} =
|
|
if not(retFuture.finished()):
|
|
if sigfd != -1:
|
|
try:
|
|
removeSignal(sigfd)
|
|
retFuture.complete()
|
|
except IOSelectorsException as exc:
|
|
retFuture.fail(getSignalException(exc))
|
|
|
|
proc cancellation(udata: pointer) {.gcsafe.} =
|
|
if not(retFuture.finished()):
|
|
if sigfd != -1:
|
|
try:
|
|
removeSignal(sigfd)
|
|
except IOSelectorsException as exc:
|
|
retFuture.fail(getSignalException(exc))
|
|
|
|
sigfd =
|
|
try:
|
|
addSignal(signal, continuation)
|
|
except IOSelectorsException as exc:
|
|
retFuture.fail(getSignalException(exc))
|
|
return retFuture
|
|
except ValueError as exc:
|
|
retFuture.fail(getSignalException(exc))
|
|
return retFuture
|
|
except OSError as exc:
|
|
retFuture.fail(getSignalException(exc))
|
|
return retFuture
|
|
|
|
retFuture.cancelCallback = cancellation
|
|
retFuture
|
|
|
|
proc sleepAsync*(duration: Duration): Future[void] =
|
|
## Suspends the execution of the current async procedure for the next
|
|
## ``duration`` time.
|
|
var retFuture = newFuture[void]("chronos.sleepAsync(Duration)")
|
|
let moment = Moment.fromNow(duration)
|
|
var timer: TimerCallback
|
|
|
|
proc completion(data: pointer) {.gcsafe.} =
|
|
if not(retFuture.finished()):
|
|
retFuture.complete()
|
|
|
|
proc cancellation(udata: pointer) {.gcsafe.} =
|
|
if not(retFuture.finished()):
|
|
clearTimer(timer)
|
|
|
|
retFuture.cancelCallback = cancellation
|
|
timer = setTimer(moment, completion, cast[pointer](retFuture))
|
|
return retFuture
|
|
|
|
proc sleepAsync*(ms: int): Future[void] {.
|
|
inline, deprecated: "Use sleepAsync(Duration)".} =
|
|
result = sleepAsync(ms.milliseconds())
|
|
|
|
proc stepsAsync*(number: int): Future[void] =
|
|
## Suspends the execution of the current async procedure for the next
|
|
## ``number`` of asynchronous steps (``poll()`` calls).
|
|
##
|
|
## This primitive can be useful when you need to create more deterministic
|
|
## tests and cases.
|
|
##
|
|
## WARNING! Do not use this primitive to perform switch between tasks, because
|
|
## this can lead to 100% CPU load in the moments when there are no I/O
|
|
## events. Usually when there no I/O events CPU consumption should be near 0%.
|
|
var retFuture = newFuture[void]("chronos.stepsAsync(int)")
|
|
var counter = 0
|
|
|
|
var continuation: proc(data: pointer) {.gcsafe, raises: [Defect].}
|
|
continuation = proc(data: pointer) {.gcsafe, raises: [Defect].} =
|
|
if not(retFuture.finished()):
|
|
inc(counter)
|
|
if counter < number:
|
|
callSoon(continuation, nil)
|
|
else:
|
|
retFuture.complete()
|
|
|
|
proc cancellation(udata: pointer) =
|
|
discard
|
|
|
|
if number <= 0:
|
|
retFuture.complete()
|
|
else:
|
|
retFuture.cancelCallback = cancellation
|
|
callSoon(continuation, nil)
|
|
|
|
retFuture
|
|
|
|
proc idleAsync*(): Future[void] =
|
|
## Suspends the execution of the current asynchronous task until "idle" time.
|
|
##
|
|
## "idle" time its moment of time, when no network events were processed by
|
|
## ``poll()`` call.
|
|
var retFuture = newFuture[void]("chronos.idleAsync()")
|
|
|
|
proc continuation(data: pointer) {.gcsafe.} =
|
|
if not(retFuture.finished()):
|
|
retFuture.complete()
|
|
|
|
proc cancellation(udata: pointer) {.gcsafe.} =
|
|
discard
|
|
|
|
retFuture.cancelCallback = cancellation
|
|
callIdle(continuation, nil)
|
|
retFuture
|
|
|
|
proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] =
|
|
## Returns a future which will complete once ``fut`` completes or after
|
|
## ``timeout`` milliseconds has elapsed.
|
|
##
|
|
## If ``fut`` completes first the returned future will hold true,
|
|
## otherwise, if ``timeout`` milliseconds has elapsed first, the returned
|
|
## future will hold false.
|
|
var retFuture = newFuture[bool]("chronos.`withTimeout`")
|
|
var moment: Moment
|
|
var timer: TimerCallback
|
|
var cancelling = false
|
|
|
|
# TODO: raises annotation shouldn't be needed, but likely similar issue as
|
|
# https://github.com/nim-lang/Nim/issues/17369
|
|
proc continuation(udata: pointer) {.gcsafe, raises: [Defect].} =
|
|
if not(retFuture.finished()):
|
|
if not(cancelling):
|
|
if not(fut.finished()):
|
|
# Timer exceeded first, we going to cancel `fut` and wait until it
|
|
# not completes.
|
|
cancelling = true
|
|
fut.cancel()
|
|
else:
|
|
# Future `fut` completed/failed/cancelled first.
|
|
if not(isNil(timer)):
|
|
clearTimer(timer)
|
|
retFuture.complete(true)
|
|
else:
|
|
retFuture.complete(false)
|
|
|
|
# TODO: raises annotation shouldn't be needed, but likely similar issue as
|
|
# https://github.com/nim-lang/Nim/issues/17369
|
|
proc cancellation(udata: pointer) {.gcsafe, raises: [Defect].} =
|
|
if not isNil(timer):
|
|
clearTimer(timer)
|
|
if not(fut.finished()):
|
|
fut.removeCallback(continuation)
|
|
fut.cancel()
|
|
|
|
if fut.finished():
|
|
retFuture.complete(true)
|
|
else:
|
|
if timeout.isZero():
|
|
retFuture.complete(false)
|
|
elif timeout.isInfinite():
|
|
retFuture.cancelCallback = cancellation
|
|
fut.addCallback(continuation)
|
|
else:
|
|
moment = Moment.fromNow(timeout)
|
|
retFuture.cancelCallback = cancellation
|
|
timer = setTimer(moment, continuation, nil)
|
|
fut.addCallback(continuation)
|
|
|
|
return retFuture
|
|
|
|
proc withTimeout*[T](fut: Future[T], timeout: int): Future[bool] {.
|
|
inline, deprecated: "Use withTimeout(Future[T], Duration)".} =
|
|
result = withTimeout(fut, timeout.milliseconds())
|
|
|
|
proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] =
|
|
## Returns a future which will complete once future ``fut`` completes
|
|
## or if timeout of ``timeout`` milliseconds has been expired.
|
|
##
|
|
## If ``timeout`` is ``-1``, then statement ``await wait(fut)`` is
|
|
## equal to ``await fut``.
|
|
##
|
|
## TODO: In case when ``fut`` got cancelled, what result Future[T]
|
|
## should return, because it can't be cancelled too.
|
|
var retFuture = newFuture[T]("chronos.wait()")
|
|
var moment: Moment
|
|
var timer: TimerCallback
|
|
var cancelling = false
|
|
|
|
proc continuation(udata: pointer) {.raises: [Defect].} =
|
|
if not(retFuture.finished()):
|
|
if not(cancelling):
|
|
if not(fut.finished()):
|
|
# Timer exceeded first.
|
|
cancelling = true
|
|
fut.cancel()
|
|
else:
|
|
# Future `fut` completed/failed/cancelled first.
|
|
if not isNil(timer):
|
|
clearTimer(timer)
|
|
|
|
if fut.failed():
|
|
retFuture.fail(fut.error)
|
|
else:
|
|
when T is void:
|
|
retFuture.complete()
|
|
else:
|
|
retFuture.complete(fut.value)
|
|
else:
|
|
retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!"))
|
|
|
|
var cancellation: proc(udata: pointer) {.gcsafe, raises: [Defect].}
|
|
cancellation = proc(udata: pointer) {.gcsafe, raises: [Defect].} =
|
|
if not isNil(timer):
|
|
clearTimer(timer)
|
|
if not(fut.finished()):
|
|
fut.removeCallback(continuation)
|
|
fut.cancel()
|
|
|
|
if fut.finished():
|
|
if fut.failed():
|
|
retFuture.fail(fut.error)
|
|
else:
|
|
when T is void:
|
|
retFuture.complete()
|
|
else:
|
|
retFuture.complete(fut.value)
|
|
else:
|
|
if timeout.isZero():
|
|
retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!"))
|
|
elif timeout.isInfinite():
|
|
retFuture.cancelCallback = cancellation
|
|
fut.addCallback(continuation)
|
|
else:
|
|
moment = Moment.fromNow(timeout)
|
|
retFuture.cancelCallback = cancellation
|
|
timer = setTimer(moment, continuation, nil)
|
|
fut.addCallback(continuation)
|
|
|
|
return retFuture
|
|
|
|
proc wait*[T](fut: Future[T], timeout = -1): Future[T] {.
|
|
inline, deprecated: "Use wait(Future[T], Duration)".} =
|
|
if timeout == -1:
|
|
wait(fut, InfiniteDuration)
|
|
elif timeout == 0:
|
|
wait(fut, ZeroDuration)
|
|
else:
|
|
wait(fut, timeout.milliseconds())
|
|
|
|
include asyncmacro2
|
|
|
|
proc runForever*() {.raises: [Defect, CatchableError].} =
|
|
## Begins a never ending global dispatcher poll loop.
|
|
## Raises different exceptions depending on the platform.
|
|
while true:
|
|
poll()
|
|
|
|
proc waitFor*[T](fut: Future[T]): T {.raises: [Defect, CatchableError].} =
|
|
## **Blocks** the current thread until the specified future completes.
|
|
## There's no way to tell if poll or read raised the exception
|
|
while not(fut.finished()):
|
|
poll()
|
|
|
|
fut.read()
|
|
|
|
proc addTracker*[T](id: string, tracker: T) =
|
|
## Add new ``tracker`` object to current thread dispatcher with identifier
|
|
## ``id``.
|
|
let loop = getThreadDispatcher()
|
|
loop.trackers[id] = tracker
|
|
|
|
proc getTracker*(id: string): TrackerBase =
|
|
## Get ``tracker`` from current thread dispatcher using identifier ``id``.
|
|
let loop = getThreadDispatcher()
|
|
result = loop.trackers.getOrDefault(id, nil)
|
|
|
|
when defined(chronosFutureTracking):
|
|
iterator pendingFutures*(): FutureBase =
|
|
## Iterates over the list of pending Futures (Future[T] objects which not
|
|
## yet completed, cancelled or failed).
|
|
var slider = futureList.head
|
|
while not(isNil(slider)):
|
|
yield slider
|
|
slider = slider.next
|
|
|
|
proc pendingFuturesCount*(): uint =
|
|
## Returns number of pending Futures (Future[T] objects which not yet
|
|
## completed, cancelled or failed).
|
|
futureList.count
|
|
|
|
# Perform global per-module initialization.
|
|
globalInit()
|