Updated README.

Updated documentation.
This commit is contained in:
cheatfate 2018-05-29 02:35:15 +03:00
parent e3171a132a
commit 5529fc4669
4 changed files with 188 additions and 75 deletions

View File

@ -2,3 +2,81 @@
[![Build Status](https://travis-ci.org/status-im/nim-asyncdispatch2.svg?branch=master)](https://travis-ci.org/status-im/nim-asyncdispatch2) [![Build status](https://ci.appveyor.com/api/projects/status/ihrxhooltyrmo0mc?svg=true)](https://ci.appveyor.com/project/cheatfate/nim-asyncdispatch2) [![Build Status](https://travis-ci.org/status-im/nim-asyncdispatch2.svg?branch=master)](https://travis-ci.org/status-im/nim-asyncdispatch2) [![Build status](https://ci.appveyor.com/api/projects/status/ihrxhooltyrmo0mc?svg=true)](https://ci.appveyor.com/project/cheatfate/nim-asyncdispatch2)
Asyncdispatch hard fork. Asyncdispatch hard fork.
## Core differences between asyncdispatch and asyncdispatch2.
1. Unified callback type `CallbackFunc`.
Current version of asyncdispatch uses many types of callbacks.
`proc ()` used in callSoon() callbacks and Future[T] completion callbacks.
`proc (fut: Future[T])` used in Future[T] completion callbacks.
`proc (fd: AsyncFD, bytesTransferred: Dword, errcode: OSErrorCode)` used in Windows IO completion callbacks.
`proc (fd: AsyncFD): bool` used in Unix IO events callbacks.
Such a number of different types creates big problems in the storage, processing and interaction between callbacks. Lack of ability to pass custom user data to
callback also creates difficulties and inefficiency, to pass custom user-defined data you need to use closures (one more allocation).
To resolve this issue introduced unified callback type `CallbackFunc`, which is
looks like `CallbackFunc* = proc (arg: pointer = nil) {.gcsafe.}`. Also one more type is introduced for callback storage is `AsyncCallback`.
```
type
AsyncCallback* = object
function*: CallbackFunc
udata*: pointer
```
2. Future[T] completion callbacks order.
Current version of asyncdispatch processing Future[T] completion callbacks in reverse order, asyncdispatch2 schedule callbacks in forward order.
- https://github.com/nim-lang/Nim/issues/7197
3. Changed behavior of OS decriptor events callbacks.
For some unknown reason current version of asyncdispatch uses seq[T] to hold list of descriptor event listeners. Actually in asynchronous environment there no need to have list of event listeners.
So in asyncdispatch2 there only one place for one READ listener and one place for one WRITE listener.
4. Removed default timeout value for poll() procedure, which allows incorrect
usage asyncdispatch and produces 500ms timeouts in correct usage.
5. Changed behavior of scheduler in poll() procedure.
Fixed issues:
- https://github.com/nim-lang/Nim/issues/7758
- https://github.com/nim-lang/Nim/issues/7197
- https://github.com/nim-lang/Nim/issues/7193
- https://github.com/nim-lang/Nim/issues/7192
- https://github.com/nim-lang/Nim/issues/6846
- https://github.com/nim-lang/Nim/issues/6929
5. Asyncdispatch2 no longer use `epochTime()`, it uses most fastest time primitives for specific OS `fastEpochTime()`. Also because MacOS supports only millisecond resolution in `kqueue` there no need on submillisecond resolution.
https://github.com/nim-lang/Nim/issues/3909
6. Removed all IO primitives recv(), recvFrom(), connect(), accept(), send(), sendTo() from public API, and moved all it functionality into Transports.
7. Introduced addTimer/removeTimer callback interface.
8. Introduced removeReader() for addReader() and removeWriter() for addWriter().
9. Changed behavior of addReader()/addWriter()/addTimer() callbacks, now only explicit removal of callbacks must be supplied via (removeReader(), removeWriter(), removeTimer())
10. Support cross-platform `sendfile` operation.
11. Removed expensive `AsyncEvent`, also removed support of hardware timers and ``addProcess``. (``addProcess`` will be implement as SubprocessTransport, while hardware based `AsyncEvent` will be renamed to ``ThreadAsyncEvent``).
12. Added cheap synchronization primitives AsyncLock, AsyncEvent, AsyncQueue[T].
## Transport concept.
Transports are high level interface for interaction with OS IO system.
The main task that the Transport concept is designed to solve is reduce number of syscalls and number of memory allocations to perform single IO operation. Current version of asyncdispatch uses at least 4 syscalls for every single IO operation:
For Posix compliant systems current version of asyncdispatch performs such operations for every single IO operation:
- register for read/write event in system queue
- wait for event in system queue
- perform IO operation
- unregister read/write event from system queue
For Windows system current version of asyncdispatch performs allocations of OVERLAPPED structure for every single IO operation.
In order to successfully cope with the task Transport also needs to incorporate some `asyncnet.nim` functionality (e.g. buffering) for stream transports. So asyncdispatch2 has buffering IO by default.

View File

@ -47,7 +47,7 @@ type
status*: ServerStatus # Current server status status*: ServerStatus # Current server status
udata*: pointer # User-defined pointer udata*: pointer # User-defined pointer
flags*: set[ServerFlags] # Flags flags*: set[ServerFlags] # Flags
bufferSize*: int # Buffer Size for transports bufferSize*: int # Size of internal transports' buffer
loopFuture*: Future[void] # Server's main Future loopFuture*: Future[void] # Server's main Future
TransportError* = object of Exception TransportError* = object of Exception
@ -87,6 +87,10 @@ proc getDomain*(address: IpAddress): Domain =
of IpAddressFamily.IPv6: of IpAddressFamily.IPv6:
result = Domain.AF_INET6 result = Domain.AF_INET6
proc getDomain*(address: TransportAddress): Domain =
## Returns OS specific Domain from TransportAddress.
result = address.address.getDomain()
proc `$`*(address: TransportAddress): string = proc `$`*(address: TransportAddress): string =
## Returns string representation of ``address``. ## Returns string representation of ``address``.
case address.address.family case address.address.family

View File

@ -95,10 +95,15 @@ when defined(windows):
transp.state.excl(WritePending) transp.state.excl(WritePending)
let err = transp.wovl.data.errCode let err = transp.wovl.data.errCode
if err == OSErrorCode(-1): if err == OSErrorCode(-1):
discard
elif int(err) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt
transp.state.incl(WritePaused)
transp.finishWriter() transp.finishWriter()
break
else: else:
transp.setWriteError(err) transp.setWriteError(err)
transp.finishWriter() transp.finishWriter()
else: else:
## Initiation ## Initiation
var saddr: Sockaddr_storage var saddr: Sockaddr_storage
@ -117,6 +122,7 @@ when defined(windows):
if ret != 0: if ret != 0:
let err = osLastError() let err = osLastError()
if int(err) == ERROR_OPERATION_ABORTED: if int(err) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt
transp.state.incl(WritePaused) transp.state.incl(WritePaused)
elif int(err) == ERROR_IO_PENDING: elif int(err) == ERROR_IO_PENDING:
transp.queue.addFirst(vector) transp.queue.addFirst(vector)
@ -152,6 +158,10 @@ when defined(windows):
fromSockAddr(transp.raddr, transp.ralen, raddr.address, raddr.port) fromSockAddr(transp.raddr, transp.ralen, raddr.address, raddr.port)
discard transp.function(transp, addr transp.buffer[0], bytesCount, discard transp.function(transp, addr transp.buffer[0], bytesCount,
raddr, transp.udata) raddr, transp.udata)
elif int(err) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt
transp.state.incl(ReadPaused)
break
else: else:
transp.setReadError(err) transp.setReadError(err)
transp.state.incl(ReadPaused) transp.state.incl(ReadPaused)
@ -174,6 +184,7 @@ when defined(windows):
if ret != 0: if ret != 0:
let err = osLastError() let err = osLastError()
if int(err) == ERROR_OPERATION_ABORTED: if int(err) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt
transp.state.excl(ReadPending) transp.state.excl(ReadPending)
transp.state.incl(ReadPaused) transp.state.incl(ReadPaused)
elif int(err) == WSAECONNRESET: elif int(err) == WSAECONNRESET:
@ -478,7 +489,7 @@ proc newDatagramTransport6*(cbproc: DatagramCallback,
## ``sock`` - application-driven socket to use. ## ``sock`` - application-driven socket to use.
## ``flags`` - flags that will be applied to socket. ## ``flags`` - flags that will be applied to socket.
## ``udata`` - custom argument which will be passed to ``cbproc``. ## ``udata`` - custom argument which will be passed to ``cbproc``.
## ``bufSize`` - size of internal buffer ## ``bufSize`` - size of internal buffer.
result = newDatagramTransportCommon(cbproc, remote, local, sock, result = newDatagramTransportCommon(cbproc, remote, local, sock,
flags, udata, bufSize) flags, udata, bufSize)

View File

@ -58,7 +58,10 @@ type
StreamCallback* = proc(server: StreamServer, StreamCallback* = proc(server: StreamServer,
client: StreamTransport, client: StreamTransport,
udata: pointer): Future[void] {.gcsafe.} udata: pointer): Future[void] {.gcsafe.}
## New connection callback ## New remote client connection callback
## ``server`` - StreamServer object.
## ``client`` - accepted client transport.
## ``udata`` - user-defined pointer passed at ``createStreamServer()`` call.
StreamServer* = ref object of SocketServer StreamServer* = ref object of SocketServer
function*: StreamCallback function*: StreamCallback
@ -134,7 +137,7 @@ when defined(windows):
WindowsStreamServer* = ref object of RootRef WindowsStreamServer* = ref object of RootRef
server: SocketServer # Server object server: SocketServer # Server object
domain: Domain # Current server domain (IPv4 or IPv6) domain: Domain # Current server domain (IPv4 or IPv6)
abuffer: array[128, byte] # Windows AcceptEx buffer abuffer: array[128, byte] # Windows AcceptEx() buffer
aovl: CustomOverlapped # AcceptEx OVERLAPPED structure aovl: CustomOverlapped # AcceptEx OVERLAPPED structure
const SO_UPDATE_CONNECT_CONTEXT = 0x7010 const SO_UPDATE_CONNECT_CONTEXT = 0x7010
@ -172,8 +175,6 @@ when defined(windows):
proc writeStreamLoop(udata: pointer) {.gcsafe, nimcall.} = proc writeStreamLoop(udata: pointer) {.gcsafe, nimcall.} =
var bytesCount: int32 var bytesCount: int32
if isNil(udata):
return
var ovl = cast[PtrCustomOverlapped](udata) var ovl = cast[PtrCustomOverlapped](udata)
var transp = cast[WindowsStreamTransport](ovl.data.udata) var transp = cast[WindowsStreamTransport](ovl.data.udata)
@ -201,6 +202,9 @@ when defined(windows):
transp.queue.addFirst(vector) transp.queue.addFirst(vector)
else: else:
vector.writer.complete() vector.writer.complete()
elif int(err) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt
transp.finishWriter()
else: else:
transp.setWriteError(err) transp.setWriteError(err)
transp.finishWriter() transp.finishWriter()
@ -219,6 +223,7 @@ when defined(windows):
if ret != 0: if ret != 0:
let err = osLastError() let err = osLastError()
if int(err) == ERROR_OPERATION_ABORTED: if int(err) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt
transp.state.excl(WritePending) transp.state.excl(WritePending)
transp.state.incl(WritePaused) transp.state.incl(WritePaused)
elif int(err) == ERROR_IO_PENDING: elif int(err) == ERROR_IO_PENDING:
@ -246,6 +251,7 @@ when defined(windows):
if ret == 0: if ret == 0:
let err = osLastError() let err = osLastError()
if int(err) == ERROR_OPERATION_ABORTED: if int(err) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt
transp.state.excl(WritePending) transp.state.excl(WritePending)
transp.state.incl(WritePaused) transp.state.incl(WritePaused)
elif int(err) == ERROR_IO_PENDING: elif int(err) == ERROR_IO_PENDING:
@ -262,8 +268,6 @@ when defined(windows):
transp.state.incl(WritePaused) transp.state.incl(WritePaused)
proc readStreamLoop(udata: pointer) {.gcsafe, nimcall.} = proc readStreamLoop(udata: pointer) {.gcsafe, nimcall.} =
if isNil(udata):
return
var ovl = cast[PtrCustomOverlapped](udata) var ovl = cast[PtrCustomOverlapped](udata)
var transp = cast[WindowsStreamTransport](ovl.data.udata) var transp = cast[WindowsStreamTransport](ovl.data.udata)
@ -289,6 +293,9 @@ when defined(windows):
transp.roffset = transp.offset transp.roffset = transp.offset
if transp.offset == len(transp.buffer): if transp.offset == len(transp.buffer):
transp.state.incl(ReadPaused) transp.state.incl(ReadPaused)
elif int(err) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt
discard
else: else:
transp.setReadError(err) transp.setReadError(err)
if not isNil(transp.reader): if not isNil(transp.reader):
@ -313,6 +320,7 @@ when defined(windows):
if ret != 0: if ret != 0:
let err = osLastError() let err = osLastError()
if int(err) == ERROR_OPERATION_ABORTED: if int(err) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt
transp.state.excl(ReadPending) transp.state.excl(ReadPending)
transp.state.incl(ReadPaused) transp.state.incl(ReadPaused)
elif int32(err) != ERROR_IO_PENDING: elif int32(err) != ERROR_IO_PENDING:
@ -355,7 +363,7 @@ when defined(windows):
bufferSize = DefaultStreamBufferSize): Future[StreamTransport] = bufferSize = DefaultStreamBufferSize): Future[StreamTransport] =
## Open new connection to remote peer with address ``address`` and create ## Open new connection to remote peer with address ``address`` and create
## new transport object ``StreamTransport`` for established connection. ## new transport object ``StreamTransport`` for established connection.
## ``bufferSize`` - size of internal buffer for transport. ## ``bufferSize`` is size of internal buffer for transport.
let loop = getGlobalDispatcher() let loop = getGlobalDispatcher()
var var
saddr: Sockaddr_storage saddr: Sockaddr_storage
@ -437,6 +445,7 @@ when defined(windows):
else: else:
retFuture.complete(sock) retFuture.complete(sock)
elif int32(ovl.data.errCode) == ERROR_OPERATION_ABORTED: elif int32(ovl.data.errCode) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt
sock.closeAsyncSocket() sock.closeAsyncSocket()
retFuture.complete(asyncInvalidSocket) retFuture.complete(asyncInvalidSocket)
else: else:
@ -500,7 +509,6 @@ when defined(windows):
newStreamSocketTransport(sock, server.bufferSize), newStreamSocketTransport(sock, server.bufferSize),
server.udata) server.udata)
proc resumeRead(transp: StreamTransport) {.inline.} = proc resumeRead(transp: StreamTransport) {.inline.} =
var wtransp = cast[WindowsStreamTransport](transp) var wtransp = cast[WindowsStreamTransport](transp)
wtransp.state.excl(ReadPaused) wtransp.state.excl(ReadPaused)
@ -533,79 +541,77 @@ else:
var cdata = cast[ptr CompletionData](udata) var cdata = cast[ptr CompletionData](udata)
var transp = cast[UnixStreamTransport](cdata.udata) var transp = cast[UnixStreamTransport](cdata.udata)
let fd = SocketHandle(cdata.fd) let fd = SocketHandle(cdata.fd)
if not isNil(transp): if len(transp.queue) > 0:
if len(transp.queue) > 0: var vector = transp.queue.popFirst()
var vector = transp.queue.popFirst() while true:
while true: if transp.kind == TransportKind.Socket:
if transp.kind == TransportKind.Socket: if vector.kind == VectorKind.DataBuffer:
if vector.kind == VectorKind.DataBuffer: let res = posix.send(fd, vector.buf, vector.buflen, MSG_NOSIGNAL)
let res = posix.send(fd, vector.buf, vector.buflen, MSG_NOSIGNAL) if res >= 0:
if res >= 0: if vector.buflen - res == 0:
if vector.buflen - res == 0: vector.writer.complete()
vector.writer.complete()
else:
vector.shiftVectorBuffer(res)
transp.queue.addFirst(vector)
else: else:
let err = osLastError() vector.shiftVectorBuffer(res)
if int(err) == EINTR: transp.queue.addFirst(vector)
continue
else:
transp.setWriteError(err)
vector.writer.complete()
else: else:
let res = sendfile(int(fd), cast[int](vector.buflen), let err = osLastError()
int(vector.offset), if int(err) == EINTR:
cast[int](vector.buf)) continue
if res >= 0:
if cast[int](vector.buf) - res == 0:
vector.writer.complete()
else:
vector.shiftVectorFile(res)
transp.queue.addFirst(vector)
else: else:
let err = osLastError() transp.setWriteError(err)
if int(err) == EINTR: vector.writer.complete()
continue else:
else: let res = sendfile(int(fd), cast[int](vector.buflen),
transp.setWriteError(err) int(vector.offset),
vector.writer.complete() cast[int](vector.buf))
break if res >= 0:
else: if cast[int](vector.buf) - res == 0:
transp.state.incl(WritePaused) vector.writer.complete()
transp.fd.removeWriter() else:
vector.shiftVectorFile(res)
transp.queue.addFirst(vector)
else:
let err = osLastError()
if int(err) == EINTR:
continue
else:
transp.setWriteError(err)
vector.writer.complete()
break
else:
transp.state.incl(WritePaused)
transp.fd.removeWriter()
proc readStreamLoop(udata: pointer) {.gcsafe.} = proc readStreamLoop(udata: pointer) {.gcsafe.} =
var cdata = cast[ptr CompletionData](udata) var cdata = cast[ptr CompletionData](udata)
var transp = cast[UnixStreamTransport](cdata.udata) var transp = cast[UnixStreamTransport](cdata.udata)
let fd = SocketHandle(cdata.fd) let fd = SocketHandle(cdata.fd)
if not isNil(transp): while true:
while true: var res = posix.recv(fd, addr transp.buffer[transp.offset],
var res = posix.recv(fd, addr transp.buffer[transp.offset], len(transp.buffer) - transp.offset, cint(0))
len(transp.buffer) - transp.offset, cint(0)) if res < 0:
if res < 0: let err = osLastError()
let err = osLastError() if int(err) == EINTR:
if int(err) == EINTR: continue
continue elif int(err) in {ECONNRESET}:
elif int(err) in {ECONNRESET}:
transp.state.incl(ReadEof)
transp.state.incl(ReadPaused)
cdata.fd.removeReader()
else:
transp.setReadError(err)
cdata.fd.removeReader()
elif res == 0:
transp.state.incl(ReadEof) transp.state.incl(ReadEof)
transp.state.incl(ReadPaused) transp.state.incl(ReadPaused)
cdata.fd.removeReader() cdata.fd.removeReader()
else: else:
transp.offset += res transp.setReadError(err)
if transp.offset == len(transp.buffer): cdata.fd.removeReader()
transp.state.incl(ReadPaused) elif res == 0:
cdata.fd.removeReader() transp.state.incl(ReadEof)
if not isNil(transp.reader): transp.state.incl(ReadPaused)
transp.finishReader() cdata.fd.removeReader()
break else:
transp.offset += res
if transp.offset == len(transp.buffer):
transp.state.incl(ReadPaused)
cdata.fd.removeReader()
if not isNil(transp.reader):
transp.finishReader()
break
proc newStreamSocketTransport(sock: AsyncFD, bufsize: int): StreamTransport = proc newStreamSocketTransport(sock: AsyncFD, bufsize: int): StreamTransport =
var transp = UnixStreamTransport(kind: TransportKind.Socket) var transp = UnixStreamTransport(kind: TransportKind.Socket)
@ -753,14 +759,17 @@ proc createStreamServer*(host: TransportAddress,
backlog: int = 100, backlog: int = 100,
bufferSize: int = DefaultStreamBufferSize, bufferSize: int = DefaultStreamBufferSize,
udata: pointer = nil): StreamServer = udata: pointer = nil): StreamServer =
## Create new TCP server. ## Create new TCP stream server.
## ##
## ``host`` - address to which server will be bound. ## ``host`` - address to which server will be bound.
## ``flags`` - flags to apply to server socket. ## ``flags`` - flags to apply to server socket.
## ``cbproc`` - callback function which will be called, when new client ## ``cbproc`` - callback function which will be called, when new client
## connection will be established. ## connection will be established.
## ``sock`` - application-driven socket to use. ## ``sock`` - user-driven socket to use.
## ``backlog`` - number of ## ``backlog`` - number of outstanding connections in the socket's listen
## queue.
## ``bufferSize`` - size of internal buffer for transport.
## ``udata`` - user-defined pointer.
var var
saddr: Sockaddr_storage saddr: Sockaddr_storage
slen: SockLen slen: SockLen
@ -919,6 +928,14 @@ proc readUntil*(transp: StreamTransport, pbytes: pointer, nbytes: int,
## ##
## On success, the data and separator will be removed from the internal ## On success, the data and separator will be removed from the internal
## buffer (consumed). Returned data will NOT include the separator at the end. ## buffer (consumed). Returned data will NOT include the separator at the end.
##
## If EOF is received, and `sep` was not found, procedure will raise
## ``TransportIncompleteError``.
##
## If ``nbytes`` bytes has been received and `sep` was not found, procedure
## will raise ``TransportLimitError``.
##
## Procedure returns actual number of bytes read.
checkClosed(transp) checkClosed(transp)
checkPending(transp) checkPending(transp)
@ -1026,6 +1043,9 @@ proc readLine*(transp: StreamTransport, limit = 0,
transp.reader = nil transp.reader = nil
proc read*(transp: StreamTransport, n = -1): Future[seq[byte]] {.async.} = proc read*(transp: StreamTransport, n = -1): Future[seq[byte]] {.async.} =
## Read all bytes (n == -1) or `n` bytes from transport ``transp``.
##
## This procedure allocates buffer seq[byte] and return it as result.
checkClosed(transp) checkClosed(transp)
checkPending(transp) checkPending(transp)