diff --git a/asyncdispatch2/asyncloop.nim b/asyncdispatch2/asyncloop.nim index 2d7e5884..b3b96201 100644 --- a/asyncdispatch2/asyncloop.nim +++ b/asyncdispatch2/asyncloop.nim @@ -429,24 +429,23 @@ else: return disp.selector proc register*(fd: AsyncFD) = + ## Register file descriptor ``fd`` in selector. var data: SelectorData data.rdata.fd = fd data.wdata.fd = fd let loop = getGlobalDispatcher() loop.selector.registerHandle(int(fd), {}, data) - proc closeSocket*(sock: AsyncFD) = - let loop = getGlobalDispatcher() - loop.selector.unregister(sock.SocketHandle) - sock.SocketHandle.close() - proc unregister*(fd: AsyncFD) = + ## Unregister file descriptor ``fd`` from selector. getGlobalDispatcher().selector.unregister(int(fd)) proc contains*(disp: PDispatcher, fd: AsyncFd): bool {.inline.} = result = int(fd) in disp.selector proc addReader*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil) = + ## Start watching the file descriptor ``fd`` for read availability and then + ## call the callback ``cb`` with specified argument ``udata``. let p = getGlobalDispatcher() var newEvents = {Event.Read} withData(p.selector, int(fd), adata) do: @@ -461,6 +460,7 @@ else: p.selector.updateHandle(int(fd), newEvents) proc removeReader*(fd: AsyncFD) = + ## Stop watching the file descriptor ``fd`` for read availability. let p = getGlobalDispatcher() var newEvents: set[Event] withData(p.selector, int(fd), adata) do: @@ -470,6 +470,8 @@ else: p.selector.updateHandle(int(fd), newEvents) proc addWriter*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil) = + ## Start watching the file descriptor ``fd`` for write availability and then + ## call the callback ``cb`` with specified argument ``udata``. let p = getGlobalDispatcher() var newEvents = {Event.Write} withData(p.selector, int(fd), adata) do: @@ -484,6 +486,7 @@ else: p.selector.updateHandle(int(fd), newEvents) proc removeWriter*(fd: AsyncFD) = + ## Stop watching the file descriptor ``fd`` for write availability. let p = getGlobalDispatcher() var newEvents: set[Event] withData(p.selector, int(fd), adata) do: @@ -496,8 +499,9 @@ else: proc addSignal*(signal: int, cb: CallbackFunc, udata: pointer = nil): int = ## Start watching signal ``signal``, and when signal appears, call the - ## callback ``cb``. Returns signal identifier code, which can be used - ## to remove signal callback via ``removeSignal``. + ## callback ``cb`` with specified argument ``udata``. Returns signal + ## identifier code, which can be used to remove signal callback + ## via ``removeSignal``. let p = getGlobalDispatcher() var data: SelectorData result = p.selector.registerSignal(signal, data) @@ -514,6 +518,7 @@ else: p.selector.unregister(sigfd) proc poll*() = + ## Perform single asynchronous step. let loop = getGlobalDispatcher() var curTime = fastEpochTime() var curTimeout = 0 @@ -587,12 +592,16 @@ else: discard getGlobalDispatcher() proc addTimer*(at: uint64, cb: CallbackFunc, udata: pointer = nil) = + ## Arrange for the callback ``cb`` to be called at the given absolute + ## timestamp ``at``. You can also pass ``udata`` to callback. let loop = getGlobalDispatcher() var tcb = TimerCallback(finishAt: at, function: AsyncCallback(function: cb, udata: udata)) loop.timers.push(tcb) proc removeTimer*(at: uint64, cb: CallbackFunc, udata: pointer = nil) = + ## Remove timer callback ``cb`` with absolute timestamp ``at`` from waiting + ## queue. let loop = getGlobalDispatcher() var list = cast[seq[TimerCallback]](loop.timers) var index = -1 @@ -656,5 +665,5 @@ proc waitFor*[T](fut: Future[T]): T = fut.read -# Global API and callSoon initialization. +# Global API and callSoon() initialization. initAPI() diff --git a/asyncdispatch2/transports/datagram.nim b/asyncdispatch2/transports/datagram.nim index f7c6b26c..edc78b1f 100644 --- a/asyncdispatch2/transports/datagram.nim +++ b/asyncdispatch2/transports/datagram.nim @@ -8,7 +8,7 @@ # MIT license (LICENSE-MIT) import net, nativesockets, os, deques, strutils -import ../asyncloop, ../handles, ../hexdump +import ../asyncloop, ../handles import common type @@ -448,6 +448,16 @@ proc newDatagramTransport*(cbproc: DatagramCallback, udata: pointer = nil, bufSize: int = DefaultDatagramBufferSize ): DatagramTransport = + ## Create new UDP datagram transport (IPv4). + ## + ## ``cbproc`` - callback which will be called, when new datagram received. + ## ``remote`` - bind transport to remote address (optional). + ## ``local`` - bind transport to local address (to serving incoming + ## datagrams, optional) + ## ``sock`` - application-driven socket to use. + ## ``flags`` - flags that will be applied to socket. + ## ``udata`` - custom argument which will be passed to ``cbproc``. + ## ``bufSize`` - size of internal buffer result = newDatagramTransportCommon(cbproc, remote, local, sock, flags, udata, bufSize) @@ -459,15 +469,28 @@ proc newDatagramTransport6*(cbproc: DatagramCallback, udata: pointer = nil, bufSize: int = DefaultDatagramBufferSize ): DatagramTransport = + ## Create new UDP datagram transport (IPv6). + ## + ## ``cbproc`` - callback which will be called, when new datagram received. + ## ``remote`` - bind transport to remote address (optional). + ## ``local`` - bind transport to local address (to serving incoming + ## datagrams, optional) + ## ``sock`` - application-driven socket to use. + ## ``flags`` - flags that will be applied to socket. + ## ``udata`` - custom argument which will be passed to ``cbproc``. + ## ``bufSize`` - size of internal buffer result = newDatagramTransportCommon(cbproc, remote, local, sock, flags, udata, bufSize) proc join*(transp: DatagramTransport) {.async.} = + ## Wait until the transport ``transp`` will be closed. if not transp.future.finished: await transp.future proc send*(transp: DatagramTransport, pbytes: pointer, nbytes: int) {.async.} = + ## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport + ## ``transp`` to remote destination address which was bounded on transport. checkClosed(transp) if transp.remote.port == Port(0): raise newException(TransportError, "Remote peer is not set!") @@ -488,6 +511,8 @@ proc send*(transp: DatagramTransport, pbytes: pointer, proc sendTo*(transp: DatagramTransport, pbytes: pointer, nbytes: int, remote: TransportAddress) {.async.} = + ## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport + ## ``transp`` to remote destination address ``remote``. checkClosed(transp) var saddr: Sockaddr_storage var slen: SockLen diff --git a/asyncdispatch2/transports/stream.nim b/asyncdispatch2/transports/stream.nim index 7d945ef3..412feeaa 100644 --- a/asyncdispatch2/transports/stream.nim +++ b/asyncdispatch2/transports/stream.nim @@ -353,6 +353,9 @@ when defined(windows): proc connect*(address: TransportAddress, bufferSize = DefaultStreamBufferSize): Future[StreamTransport] = + ## Open new connection to remote peer with address ``address`` and create + ## new transport object ``StreamTransport`` for established connection. + ## ``bufferSize`` - size of internal buffer for transport. let loop = getGlobalDispatcher() var saddr: Sockaddr_storage @@ -616,7 +619,9 @@ else: proc connect*(address: TransportAddress, bufferSize = DefaultStreamBufferSize): Future[StreamTransport] = - ## Connect to ``address`` and create new transport for this connection. + ## Open new connection to remote peer with address ``address`` and create + ## new transport object ``StreamTransport`` for established connection. + ## ``bufferSize`` - size of internal buffer for transport. var saddr: Sockaddr_storage slen: SockLen @@ -748,7 +753,14 @@ proc createStreamServer*(host: TransportAddress, backlog: int = 100, bufferSize: int = DefaultStreamBufferSize, udata: pointer = nil): StreamServer = - ## Create new TCP server + ## Create new TCP server. + ## + ## ``host`` - address to which server will be bound. + ## ``flags`` - flags to apply to server socket. + ## ``cbproc`` - callback function which will be called, when new client + ## connection will be established. + ## ``sock`` - application-driven socket to use. + ## ``backlog`` - number of var saddr: Sockaddr_storage slen: SockLen @@ -799,7 +811,7 @@ proc createStreamServer*(host: TransportAddress, proc write*(transp: StreamTransport, pbytes: pointer, nbytes: int): Future[int] {.async.} = - ## Write data from buffer ``pbytes`` with size ``nbytes`` to transport + ## Write data from buffer ``pbytes`` with size ``nbytes`` using transport ## ``transp``. checkClosed(transp) var waitFuture = newFuture[void]("transport.write") @@ -874,6 +886,9 @@ proc readExactly*(transp: StreamTransport, pbytes: pointer, proc readOnce*(transp: StreamTransport, pbytes: pointer, nbytes: int): Future[int] {.async.} = ## Perform one read operation on transport ``transp``. + ## + ## If internal buffer is not empty, ``nbytes`` bytes will be transferred from + ## internal buffer, otherwise it will wait until some bytes will be received. checkClosed(transp) checkPending(transp) while true: diff --git a/tests/testfut.nim b/tests/testfut.nim index 49028d91..9b8d81c7 100644 --- a/tests/testfut.nim +++ b/tests/testfut.nim @@ -9,16 +9,13 @@ import unittest import ../asyncdispatch2 -when defined(vcc): - {.passC: "/Zi /FS".} - proc testFuture1(): Future[int] {.async.} = await sleepAsync(100) proc testFuture2(): Future[int] {.async.} = return 1 -proc testFuture3(): Future[int] {.async.} = +proc testFuture3(): Future[int] {.async.} = result = await testFuture2() proc test1(): bool = @@ -31,9 +28,29 @@ proc test2(): bool = var fut = testFuture3() result = fut.finished +proc test3(): string = + var testResult = "" + var fut = testFuture1() + fut.addCallback proc(udata: pointer) = + testResult &= "1" + fut.addCallback proc(udata: pointer) = + testResult &= "2" + fut.addCallback proc(udata: pointer) = + testResult &= "3" + fut.addCallback proc(udata: pointer) = + testResult &= "4" + fut.addCallback proc(udata: pointer) = + testResult &= "5" + discard waitFor(fut) + poll() + if fut.finished: + result = testResult + when isMainModule: suite "Future[T] behavior test suite": - test "`Async undefined behavior (#7758)` test": + test "Async undefined behavior (#7758) test": check test1() == true test "Immediately completed asynchronous procedure test": check test2() == true + test "Future[T] callbacks are invoked in reverse order (#7197) test": + check test3() == "12345"