Add more documentation.

Add (#7197) test.
This commit is contained in:
cheatfate 2018-05-27 08:49:47 +03:00
parent 9e63caf694
commit e3171a132a
4 changed files with 83 additions and 17 deletions

View File

@ -429,24 +429,23 @@ else:
return disp.selector return disp.selector
proc register*(fd: AsyncFD) = proc register*(fd: AsyncFD) =
## Register file descriptor ``fd`` in selector.
var data: SelectorData var data: SelectorData
data.rdata.fd = fd data.rdata.fd = fd
data.wdata.fd = fd data.wdata.fd = fd
let loop = getGlobalDispatcher() let loop = getGlobalDispatcher()
loop.selector.registerHandle(int(fd), {}, data) loop.selector.registerHandle(int(fd), {}, data)
proc closeSocket*(sock: AsyncFD) =
let loop = getGlobalDispatcher()
loop.selector.unregister(sock.SocketHandle)
sock.SocketHandle.close()
proc unregister*(fd: AsyncFD) = proc unregister*(fd: AsyncFD) =
## Unregister file descriptor ``fd`` from selector.
getGlobalDispatcher().selector.unregister(int(fd)) getGlobalDispatcher().selector.unregister(int(fd))
proc contains*(disp: PDispatcher, fd: AsyncFd): bool {.inline.} = proc contains*(disp: PDispatcher, fd: AsyncFd): bool {.inline.} =
result = int(fd) in disp.selector result = int(fd) in disp.selector
proc addReader*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil) = proc addReader*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil) =
## Start watching the file descriptor ``fd`` for read availability and then
## call the callback ``cb`` with specified argument ``udata``.
let p = getGlobalDispatcher() let p = getGlobalDispatcher()
var newEvents = {Event.Read} var newEvents = {Event.Read}
withData(p.selector, int(fd), adata) do: withData(p.selector, int(fd), adata) do:
@ -461,6 +460,7 @@ else:
p.selector.updateHandle(int(fd), newEvents) p.selector.updateHandle(int(fd), newEvents)
proc removeReader*(fd: AsyncFD) = proc removeReader*(fd: AsyncFD) =
## Stop watching the file descriptor ``fd`` for read availability.
let p = getGlobalDispatcher() let p = getGlobalDispatcher()
var newEvents: set[Event] var newEvents: set[Event]
withData(p.selector, int(fd), adata) do: withData(p.selector, int(fd), adata) do:
@ -470,6 +470,8 @@ else:
p.selector.updateHandle(int(fd), newEvents) p.selector.updateHandle(int(fd), newEvents)
proc addWriter*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil) = proc addWriter*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil) =
## Start watching the file descriptor ``fd`` for write availability and then
## call the callback ``cb`` with specified argument ``udata``.
let p = getGlobalDispatcher() let p = getGlobalDispatcher()
var newEvents = {Event.Write} var newEvents = {Event.Write}
withData(p.selector, int(fd), adata) do: withData(p.selector, int(fd), adata) do:
@ -484,6 +486,7 @@ else:
p.selector.updateHandle(int(fd), newEvents) p.selector.updateHandle(int(fd), newEvents)
proc removeWriter*(fd: AsyncFD) = proc removeWriter*(fd: AsyncFD) =
## Stop watching the file descriptor ``fd`` for write availability.
let p = getGlobalDispatcher() let p = getGlobalDispatcher()
var newEvents: set[Event] var newEvents: set[Event]
withData(p.selector, int(fd), adata) do: withData(p.selector, int(fd), adata) do:
@ -496,8 +499,9 @@ else:
proc addSignal*(signal: int, cb: CallbackFunc, proc addSignal*(signal: int, cb: CallbackFunc,
udata: pointer = nil): int = udata: pointer = nil): int =
## Start watching signal ``signal``, and when signal appears, call the ## Start watching signal ``signal``, and when signal appears, call the
## callback ``cb``. Returns signal identifier code, which can be used ## callback ``cb`` with specified argument ``udata``. Returns signal
## to remove signal callback via ``removeSignal``. ## identifier code, which can be used to remove signal callback
## via ``removeSignal``.
let p = getGlobalDispatcher() let p = getGlobalDispatcher()
var data: SelectorData var data: SelectorData
result = p.selector.registerSignal(signal, data) result = p.selector.registerSignal(signal, data)
@ -514,6 +518,7 @@ else:
p.selector.unregister(sigfd) p.selector.unregister(sigfd)
proc poll*() = proc poll*() =
## Perform single asynchronous step.
let loop = getGlobalDispatcher() let loop = getGlobalDispatcher()
var curTime = fastEpochTime() var curTime = fastEpochTime()
var curTimeout = 0 var curTimeout = 0
@ -587,12 +592,16 @@ else:
discard getGlobalDispatcher() discard getGlobalDispatcher()
proc addTimer*(at: uint64, cb: CallbackFunc, udata: pointer = nil) = proc addTimer*(at: uint64, cb: CallbackFunc, udata: pointer = nil) =
## Arrange for the callback ``cb`` to be called at the given absolute
## timestamp ``at``. You can also pass ``udata`` to callback.
let loop = getGlobalDispatcher() let loop = getGlobalDispatcher()
var tcb = TimerCallback(finishAt: at, var tcb = TimerCallback(finishAt: at,
function: AsyncCallback(function: cb, udata: udata)) function: AsyncCallback(function: cb, udata: udata))
loop.timers.push(tcb) loop.timers.push(tcb)
proc removeTimer*(at: uint64, cb: CallbackFunc, udata: pointer = nil) = proc removeTimer*(at: uint64, cb: CallbackFunc, udata: pointer = nil) =
## Remove timer callback ``cb`` with absolute timestamp ``at`` from waiting
## queue.
let loop = getGlobalDispatcher() let loop = getGlobalDispatcher()
var list = cast[seq[TimerCallback]](loop.timers) var list = cast[seq[TimerCallback]](loop.timers)
var index = -1 var index = -1
@ -656,5 +665,5 @@ proc waitFor*[T](fut: Future[T]): T =
fut.read fut.read
# Global API and callSoon initialization. # Global API and callSoon() initialization.
initAPI() initAPI()

View File

@ -8,7 +8,7 @@
# MIT license (LICENSE-MIT) # MIT license (LICENSE-MIT)
import net, nativesockets, os, deques, strutils import net, nativesockets, os, deques, strutils
import ../asyncloop, ../handles, ../hexdump import ../asyncloop, ../handles
import common import common
type type
@ -448,6 +448,16 @@ proc newDatagramTransport*(cbproc: DatagramCallback,
udata: pointer = nil, udata: pointer = nil,
bufSize: int = DefaultDatagramBufferSize bufSize: int = DefaultDatagramBufferSize
): DatagramTransport = ): DatagramTransport =
## Create new UDP datagram transport (IPv4).
##
## ``cbproc`` - callback which will be called, when new datagram received.
## ``remote`` - bind transport to remote address (optional).
## ``local`` - bind transport to local address (to serving incoming
## datagrams, optional)
## ``sock`` - application-driven socket to use.
## ``flags`` - flags that will be applied to socket.
## ``udata`` - custom argument which will be passed to ``cbproc``.
## ``bufSize`` - size of internal buffer
result = newDatagramTransportCommon(cbproc, remote, local, sock, result = newDatagramTransportCommon(cbproc, remote, local, sock,
flags, udata, bufSize) flags, udata, bufSize)
@ -459,15 +469,28 @@ proc newDatagramTransport6*(cbproc: DatagramCallback,
udata: pointer = nil, udata: pointer = nil,
bufSize: int = DefaultDatagramBufferSize bufSize: int = DefaultDatagramBufferSize
): DatagramTransport = ): DatagramTransport =
## Create new UDP datagram transport (IPv6).
##
## ``cbproc`` - callback which will be called, when new datagram received.
## ``remote`` - bind transport to remote address (optional).
## ``local`` - bind transport to local address (to serving incoming
## datagrams, optional)
## ``sock`` - application-driven socket to use.
## ``flags`` - flags that will be applied to socket.
## ``udata`` - custom argument which will be passed to ``cbproc``.
## ``bufSize`` - size of internal buffer
result = newDatagramTransportCommon(cbproc, remote, local, sock, result = newDatagramTransportCommon(cbproc, remote, local, sock,
flags, udata, bufSize) flags, udata, bufSize)
proc join*(transp: DatagramTransport) {.async.} = proc join*(transp: DatagramTransport) {.async.} =
## Wait until the transport ``transp`` will be closed.
if not transp.future.finished: if not transp.future.finished:
await transp.future await transp.future
proc send*(transp: DatagramTransport, pbytes: pointer, proc send*(transp: DatagramTransport, pbytes: pointer,
nbytes: int) {.async.} = nbytes: int) {.async.} =
## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport
## ``transp`` to remote destination address which was bounded on transport.
checkClosed(transp) checkClosed(transp)
if transp.remote.port == Port(0): if transp.remote.port == Port(0):
raise newException(TransportError, "Remote peer is not set!") raise newException(TransportError, "Remote peer is not set!")
@ -488,6 +511,8 @@ proc send*(transp: DatagramTransport, pbytes: pointer,
proc sendTo*(transp: DatagramTransport, pbytes: pointer, nbytes: int, proc sendTo*(transp: DatagramTransport, pbytes: pointer, nbytes: int,
remote: TransportAddress) {.async.} = remote: TransportAddress) {.async.} =
## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport
## ``transp`` to remote destination address ``remote``.
checkClosed(transp) checkClosed(transp)
var saddr: Sockaddr_storage var saddr: Sockaddr_storage
var slen: SockLen var slen: SockLen

View File

@ -353,6 +353,9 @@ when defined(windows):
proc connect*(address: TransportAddress, proc connect*(address: TransportAddress,
bufferSize = DefaultStreamBufferSize): Future[StreamTransport] = bufferSize = DefaultStreamBufferSize): Future[StreamTransport] =
## Open new connection to remote peer with address ``address`` and create
## new transport object ``StreamTransport`` for established connection.
## ``bufferSize`` - size of internal buffer for transport.
let loop = getGlobalDispatcher() let loop = getGlobalDispatcher()
var var
saddr: Sockaddr_storage saddr: Sockaddr_storage
@ -616,7 +619,9 @@ else:
proc connect*(address: TransportAddress, proc connect*(address: TransportAddress,
bufferSize = DefaultStreamBufferSize): Future[StreamTransport] = bufferSize = DefaultStreamBufferSize): Future[StreamTransport] =
## Connect to ``address`` and create new transport for this connection. ## Open new connection to remote peer with address ``address`` and create
## new transport object ``StreamTransport`` for established connection.
## ``bufferSize`` - size of internal buffer for transport.
var var
saddr: Sockaddr_storage saddr: Sockaddr_storage
slen: SockLen slen: SockLen
@ -748,7 +753,14 @@ proc createStreamServer*(host: TransportAddress,
backlog: int = 100, backlog: int = 100,
bufferSize: int = DefaultStreamBufferSize, bufferSize: int = DefaultStreamBufferSize,
udata: pointer = nil): StreamServer = udata: pointer = nil): StreamServer =
## Create new TCP server ## Create new TCP server.
##
## ``host`` - address to which server will be bound.
## ``flags`` - flags to apply to server socket.
## ``cbproc`` - callback function which will be called, when new client
## connection will be established.
## ``sock`` - application-driven socket to use.
## ``backlog`` - number of
var var
saddr: Sockaddr_storage saddr: Sockaddr_storage
slen: SockLen slen: SockLen
@ -799,7 +811,7 @@ proc createStreamServer*(host: TransportAddress,
proc write*(transp: StreamTransport, pbytes: pointer, proc write*(transp: StreamTransport, pbytes: pointer,
nbytes: int): Future[int] {.async.} = nbytes: int): Future[int] {.async.} =
## Write data from buffer ``pbytes`` with size ``nbytes`` to transport ## Write data from buffer ``pbytes`` with size ``nbytes`` using transport
## ``transp``. ## ``transp``.
checkClosed(transp) checkClosed(transp)
var waitFuture = newFuture[void]("transport.write") var waitFuture = newFuture[void]("transport.write")
@ -874,6 +886,9 @@ proc readExactly*(transp: StreamTransport, pbytes: pointer,
proc readOnce*(transp: StreamTransport, pbytes: pointer, proc readOnce*(transp: StreamTransport, pbytes: pointer,
nbytes: int): Future[int] {.async.} = nbytes: int): Future[int] {.async.} =
## Perform one read operation on transport ``transp``. ## Perform one read operation on transport ``transp``.
##
## If internal buffer is not empty, ``nbytes`` bytes will be transferred from
## internal buffer, otherwise it will wait until some bytes will be received.
checkClosed(transp) checkClosed(transp)
checkPending(transp) checkPending(transp)
while true: while true:

View File

@ -9,9 +9,6 @@
import unittest import unittest
import ../asyncdispatch2 import ../asyncdispatch2
when defined(vcc):
{.passC: "/Zi /FS".}
proc testFuture1(): Future[int] {.async.} = proc testFuture1(): Future[int] {.async.} =
await sleepAsync(100) await sleepAsync(100)
@ -31,9 +28,29 @@ proc test2(): bool =
var fut = testFuture3() var fut = testFuture3()
result = fut.finished result = fut.finished
proc test3(): string =
var testResult = ""
var fut = testFuture1()
fut.addCallback proc(udata: pointer) =
testResult &= "1"
fut.addCallback proc(udata: pointer) =
testResult &= "2"
fut.addCallback proc(udata: pointer) =
testResult &= "3"
fut.addCallback proc(udata: pointer) =
testResult &= "4"
fut.addCallback proc(udata: pointer) =
testResult &= "5"
discard waitFor(fut)
poll()
if fut.finished:
result = testResult
when isMainModule: when isMainModule:
suite "Future[T] behavior test suite": suite "Future[T] behavior test suite":
test "`Async undefined behavior (#7758)` test": test "Async undefined behavior (#7758) test":
check test1() == true check test1() == true
test "Immediately completed asynchronous procedure test": test "Immediately completed asynchronous procedure test":
check test2() == true check test2() == true
test "Future[T] callbacks are invoked in reverse order (#7197) test":
check test3() == "12345"