From 5529fc46692e47a2411bce5d9d0c57979dfcfba1 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Tue, 29 May 2018 02:35:15 +0300 Subject: [PATCH] Updated README. Updated documentation. --- README.md | 78 ++++++++++++ asyncdispatch2/transports/common.nim | 6 +- asyncdispatch2/transports/datagram.nim | 15 ++- asyncdispatch2/transports/stream.nim | 164 ++++++++++++++----------- 4 files changed, 188 insertions(+), 75 deletions(-) diff --git a/README.md b/README.md index 61ea465f..23678a46 100644 --- a/README.md +++ b/README.md @@ -2,3 +2,81 @@ [![Build Status](https://travis-ci.org/status-im/nim-asyncdispatch2.svg?branch=master)](https://travis-ci.org/status-im/nim-asyncdispatch2) [![Build status](https://ci.appveyor.com/api/projects/status/ihrxhooltyrmo0mc?svg=true)](https://ci.appveyor.com/project/cheatfate/nim-asyncdispatch2) Asyncdispatch hard fork. + +## Core differences between asyncdispatch and asyncdispatch2. + +1. Unified callback type `CallbackFunc`. + Current version of asyncdispatch uses many types of callbacks. + + `proc ()` used in callSoon() callbacks and Future[T] completion callbacks. + `proc (fut: Future[T])` used in Future[T] completion callbacks. + `proc (fd: AsyncFD, bytesTransferred: Dword, errcode: OSErrorCode)` used in Windows IO completion callbacks. + `proc (fd: AsyncFD): bool` used in Unix IO events callbacks. + + Such a number of different types creates big problems in the storage, processing and interaction between callbacks. Lack of ability to pass custom user data to + callback also creates difficulties and inefficiency, to pass custom user-defined data you need to use closures (one more allocation). + + To resolve this issue introduced unified callback type `CallbackFunc`, which is + looks like `CallbackFunc* = proc (arg: pointer = nil) {.gcsafe.}`. Also one more type is introduced for callback storage is `AsyncCallback`. + + ``` + type + AsyncCallback* = object + function*: CallbackFunc + udata*: pointer + ``` + +2. Future[T] completion callbacks order. + Current version of asyncdispatch processing Future[T] completion callbacks in reverse order, asyncdispatch2 schedule callbacks in forward order. + - https://github.com/nim-lang/Nim/issues/7197 + +3. Changed behavior of OS decriptor events callbacks. + For some unknown reason current version of asyncdispatch uses seq[T] to hold list of descriptor event listeners. Actually in asynchronous environment there no need to have list of event listeners. + + So in asyncdispatch2 there only one place for one READ listener and one place for one WRITE listener. + +4. Removed default timeout value for poll() procedure, which allows incorrect + usage asyncdispatch and produces 500ms timeouts in correct usage. + +5. Changed behavior of scheduler in poll() procedure. + Fixed issues: + - https://github.com/nim-lang/Nim/issues/7758 + - https://github.com/nim-lang/Nim/issues/7197 + - https://github.com/nim-lang/Nim/issues/7193 + - https://github.com/nim-lang/Nim/issues/7192 + - https://github.com/nim-lang/Nim/issues/6846 + - https://github.com/nim-lang/Nim/issues/6929 + +5. Asyncdispatch2 no longer use `epochTime()`, it uses most fastest time primitives for specific OS `fastEpochTime()`. Also because MacOS supports only millisecond resolution in `kqueue` there no need on submillisecond resolution. + + https://github.com/nim-lang/Nim/issues/3909 + +6. Removed all IO primitives recv(), recvFrom(), connect(), accept(), send(), sendTo() from public API, and moved all it functionality into Transports. + +7. Introduced addTimer/removeTimer callback interface. + +8. Introduced removeReader() for addReader() and removeWriter() for addWriter(). + +9. Changed behavior of addReader()/addWriter()/addTimer() callbacks, now only explicit removal of callbacks must be supplied via (removeReader(), removeWriter(), removeTimer()) + +10. Support cross-platform `sendfile` operation. + +11. Removed expensive `AsyncEvent`, also removed support of hardware timers and ``addProcess``. (``addProcess`` will be implement as SubprocessTransport, while hardware based `AsyncEvent` will be renamed to ``ThreadAsyncEvent``). + +12. Added cheap synchronization primitives AsyncLock, AsyncEvent, AsyncQueue[T]. + +## Transport concept. + +Transports are high level interface for interaction with OS IO system. +The main task that the Transport concept is designed to solve is reduce number of syscalls and number of memory allocations to perform single IO operation. Current version of asyncdispatch uses at least 4 syscalls for every single IO operation: + +For Posix compliant systems current version of asyncdispatch performs such operations for every single IO operation: + +- register for read/write event in system queue +- wait for event in system queue +- perform IO operation +- unregister read/write event from system queue + +For Windows system current version of asyncdispatch performs allocations of OVERLAPPED structure for every single IO operation. + +In order to successfully cope with the task Transport also needs to incorporate some `asyncnet.nim` functionality (e.g. buffering) for stream transports. So asyncdispatch2 has buffering IO by default. diff --git a/asyncdispatch2/transports/common.nim b/asyncdispatch2/transports/common.nim index b5da3ef1..947320f5 100644 --- a/asyncdispatch2/transports/common.nim +++ b/asyncdispatch2/transports/common.nim @@ -47,7 +47,7 @@ type status*: ServerStatus # Current server status udata*: pointer # User-defined pointer flags*: set[ServerFlags] # Flags - bufferSize*: int # Buffer Size for transports + bufferSize*: int # Size of internal transports' buffer loopFuture*: Future[void] # Server's main Future TransportError* = object of Exception @@ -87,6 +87,10 @@ proc getDomain*(address: IpAddress): Domain = of IpAddressFamily.IPv6: result = Domain.AF_INET6 +proc getDomain*(address: TransportAddress): Domain = + ## Returns OS specific Domain from TransportAddress. + result = address.address.getDomain() + proc `$`*(address: TransportAddress): string = ## Returns string representation of ``address``. case address.address.family diff --git a/asyncdispatch2/transports/datagram.nim b/asyncdispatch2/transports/datagram.nim index edc78b1f..4535b225 100644 --- a/asyncdispatch2/transports/datagram.nim +++ b/asyncdispatch2/transports/datagram.nim @@ -95,10 +95,15 @@ when defined(windows): transp.state.excl(WritePending) let err = transp.wovl.data.errCode if err == OSErrorCode(-1): + discard + elif int(err) == ERROR_OPERATION_ABORTED: + # CancelIO() interrupt + transp.state.incl(WritePaused) transp.finishWriter() + break else: transp.setWriteError(err) - transp.finishWriter() + transp.finishWriter() else: ## Initiation var saddr: Sockaddr_storage @@ -117,6 +122,7 @@ when defined(windows): if ret != 0: let err = osLastError() if int(err) == ERROR_OPERATION_ABORTED: + # CancelIO() interrupt transp.state.incl(WritePaused) elif int(err) == ERROR_IO_PENDING: transp.queue.addFirst(vector) @@ -152,6 +158,10 @@ when defined(windows): fromSockAddr(transp.raddr, transp.ralen, raddr.address, raddr.port) discard transp.function(transp, addr transp.buffer[0], bytesCount, raddr, transp.udata) + elif int(err) == ERROR_OPERATION_ABORTED: + # CancelIO() interrupt + transp.state.incl(ReadPaused) + break else: transp.setReadError(err) transp.state.incl(ReadPaused) @@ -174,6 +184,7 @@ when defined(windows): if ret != 0: let err = osLastError() if int(err) == ERROR_OPERATION_ABORTED: + # CancelIO() interrupt transp.state.excl(ReadPending) transp.state.incl(ReadPaused) elif int(err) == WSAECONNRESET: @@ -478,7 +489,7 @@ proc newDatagramTransport6*(cbproc: DatagramCallback, ## ``sock`` - application-driven socket to use. ## ``flags`` - flags that will be applied to socket. ## ``udata`` - custom argument which will be passed to ``cbproc``. - ## ``bufSize`` - size of internal buffer + ## ``bufSize`` - size of internal buffer. result = newDatagramTransportCommon(cbproc, remote, local, sock, flags, udata, bufSize) diff --git a/asyncdispatch2/transports/stream.nim b/asyncdispatch2/transports/stream.nim index 412feeaa..de0030d9 100644 --- a/asyncdispatch2/transports/stream.nim +++ b/asyncdispatch2/transports/stream.nim @@ -58,7 +58,10 @@ type StreamCallback* = proc(server: StreamServer, client: StreamTransport, udata: pointer): Future[void] {.gcsafe.} - ## New connection callback + ## New remote client connection callback + ## ``server`` - StreamServer object. + ## ``client`` - accepted client transport. + ## ``udata`` - user-defined pointer passed at ``createStreamServer()`` call. StreamServer* = ref object of SocketServer function*: StreamCallback @@ -134,7 +137,7 @@ when defined(windows): WindowsStreamServer* = ref object of RootRef server: SocketServer # Server object domain: Domain # Current server domain (IPv4 or IPv6) - abuffer: array[128, byte] # Windows AcceptEx buffer + abuffer: array[128, byte] # Windows AcceptEx() buffer aovl: CustomOverlapped # AcceptEx OVERLAPPED structure const SO_UPDATE_CONNECT_CONTEXT = 0x7010 @@ -172,8 +175,6 @@ when defined(windows): proc writeStreamLoop(udata: pointer) {.gcsafe, nimcall.} = var bytesCount: int32 - if isNil(udata): - return var ovl = cast[PtrCustomOverlapped](udata) var transp = cast[WindowsStreamTransport](ovl.data.udata) @@ -201,6 +202,9 @@ when defined(windows): transp.queue.addFirst(vector) else: vector.writer.complete() + elif int(err) == ERROR_OPERATION_ABORTED: + # CancelIO() interrupt + transp.finishWriter() else: transp.setWriteError(err) transp.finishWriter() @@ -219,6 +223,7 @@ when defined(windows): if ret != 0: let err = osLastError() if int(err) == ERROR_OPERATION_ABORTED: + # CancelIO() interrupt transp.state.excl(WritePending) transp.state.incl(WritePaused) elif int(err) == ERROR_IO_PENDING: @@ -246,6 +251,7 @@ when defined(windows): if ret == 0: let err = osLastError() if int(err) == ERROR_OPERATION_ABORTED: + # CancelIO() interrupt transp.state.excl(WritePending) transp.state.incl(WritePaused) elif int(err) == ERROR_IO_PENDING: @@ -262,8 +268,6 @@ when defined(windows): transp.state.incl(WritePaused) proc readStreamLoop(udata: pointer) {.gcsafe, nimcall.} = - if isNil(udata): - return var ovl = cast[PtrCustomOverlapped](udata) var transp = cast[WindowsStreamTransport](ovl.data.udata) @@ -289,6 +293,9 @@ when defined(windows): transp.roffset = transp.offset if transp.offset == len(transp.buffer): transp.state.incl(ReadPaused) + elif int(err) == ERROR_OPERATION_ABORTED: + # CancelIO() interrupt + discard else: transp.setReadError(err) if not isNil(transp.reader): @@ -313,6 +320,7 @@ when defined(windows): if ret != 0: let err = osLastError() if int(err) == ERROR_OPERATION_ABORTED: + # CancelIO() interrupt transp.state.excl(ReadPending) transp.state.incl(ReadPaused) elif int32(err) != ERROR_IO_PENDING: @@ -355,7 +363,7 @@ when defined(windows): bufferSize = DefaultStreamBufferSize): Future[StreamTransport] = ## Open new connection to remote peer with address ``address`` and create ## new transport object ``StreamTransport`` for established connection. - ## ``bufferSize`` - size of internal buffer for transport. + ## ``bufferSize`` is size of internal buffer for transport. let loop = getGlobalDispatcher() var saddr: Sockaddr_storage @@ -437,6 +445,7 @@ when defined(windows): else: retFuture.complete(sock) elif int32(ovl.data.errCode) == ERROR_OPERATION_ABORTED: + # CancelIO() interrupt sock.closeAsyncSocket() retFuture.complete(asyncInvalidSocket) else: @@ -500,7 +509,6 @@ when defined(windows): newStreamSocketTransport(sock, server.bufferSize), server.udata) - proc resumeRead(transp: StreamTransport) {.inline.} = var wtransp = cast[WindowsStreamTransport](transp) wtransp.state.excl(ReadPaused) @@ -533,79 +541,77 @@ else: var cdata = cast[ptr CompletionData](udata) var transp = cast[UnixStreamTransport](cdata.udata) let fd = SocketHandle(cdata.fd) - if not isNil(transp): - if len(transp.queue) > 0: - var vector = transp.queue.popFirst() - while true: - if transp.kind == TransportKind.Socket: - if vector.kind == VectorKind.DataBuffer: - let res = posix.send(fd, vector.buf, vector.buflen, MSG_NOSIGNAL) - if res >= 0: - if vector.buflen - res == 0: - vector.writer.complete() - else: - vector.shiftVectorBuffer(res) - transp.queue.addFirst(vector) + if len(transp.queue) > 0: + var vector = transp.queue.popFirst() + while true: + if transp.kind == TransportKind.Socket: + if vector.kind == VectorKind.DataBuffer: + let res = posix.send(fd, vector.buf, vector.buflen, MSG_NOSIGNAL) + if res >= 0: + if vector.buflen - res == 0: + vector.writer.complete() else: - let err = osLastError() - if int(err) == EINTR: - continue - else: - transp.setWriteError(err) - vector.writer.complete() + vector.shiftVectorBuffer(res) + transp.queue.addFirst(vector) else: - let res = sendfile(int(fd), cast[int](vector.buflen), - int(vector.offset), - cast[int](vector.buf)) - if res >= 0: - if cast[int](vector.buf) - res == 0: - vector.writer.complete() - else: - vector.shiftVectorFile(res) - transp.queue.addFirst(vector) + let err = osLastError() + if int(err) == EINTR: + continue else: - let err = osLastError() - if int(err) == EINTR: - continue - else: - transp.setWriteError(err) - vector.writer.complete() - break - else: - transp.state.incl(WritePaused) - transp.fd.removeWriter() + transp.setWriteError(err) + vector.writer.complete() + else: + let res = sendfile(int(fd), cast[int](vector.buflen), + int(vector.offset), + cast[int](vector.buf)) + if res >= 0: + if cast[int](vector.buf) - res == 0: + vector.writer.complete() + else: + vector.shiftVectorFile(res) + transp.queue.addFirst(vector) + else: + let err = osLastError() + if int(err) == EINTR: + continue + else: + transp.setWriteError(err) + vector.writer.complete() + break + else: + transp.state.incl(WritePaused) + transp.fd.removeWriter() proc readStreamLoop(udata: pointer) {.gcsafe.} = var cdata = cast[ptr CompletionData](udata) var transp = cast[UnixStreamTransport](cdata.udata) let fd = SocketHandle(cdata.fd) - if not isNil(transp): - while true: - var res = posix.recv(fd, addr transp.buffer[transp.offset], - len(transp.buffer) - transp.offset, cint(0)) - if res < 0: - let err = osLastError() - if int(err) == EINTR: - continue - elif int(err) in {ECONNRESET}: - transp.state.incl(ReadEof) - transp.state.incl(ReadPaused) - cdata.fd.removeReader() - else: - transp.setReadError(err) - cdata.fd.removeReader() - elif res == 0: + while true: + var res = posix.recv(fd, addr transp.buffer[transp.offset], + len(transp.buffer) - transp.offset, cint(0)) + if res < 0: + let err = osLastError() + if int(err) == EINTR: + continue + elif int(err) in {ECONNRESET}: transp.state.incl(ReadEof) transp.state.incl(ReadPaused) cdata.fd.removeReader() else: - transp.offset += res - if transp.offset == len(transp.buffer): - transp.state.incl(ReadPaused) - cdata.fd.removeReader() - if not isNil(transp.reader): - transp.finishReader() - break + transp.setReadError(err) + cdata.fd.removeReader() + elif res == 0: + transp.state.incl(ReadEof) + transp.state.incl(ReadPaused) + cdata.fd.removeReader() + else: + transp.offset += res + if transp.offset == len(transp.buffer): + transp.state.incl(ReadPaused) + cdata.fd.removeReader() + if not isNil(transp.reader): + transp.finishReader() + break proc newStreamSocketTransport(sock: AsyncFD, bufsize: int): StreamTransport = var transp = UnixStreamTransport(kind: TransportKind.Socket) @@ -753,14 +759,17 @@ proc createStreamServer*(host: TransportAddress, backlog: int = 100, bufferSize: int = DefaultStreamBufferSize, udata: pointer = nil): StreamServer = - ## Create new TCP server. + ## Create new TCP stream server. ## ## ``host`` - address to which server will be bound. ## ``flags`` - flags to apply to server socket. ## ``cbproc`` - callback function which will be called, when new client ## connection will be established. - ## ``sock`` - application-driven socket to use. - ## ``backlog`` - number of + ## ``sock`` - user-driven socket to use. + ## ``backlog`` - number of outstanding connections in the socket's listen + ## queue. + ## ``bufferSize`` - size of internal buffer for transport. + ## ``udata`` - user-defined pointer. var saddr: Sockaddr_storage slen: SockLen @@ -919,6 +928,14 @@ proc readUntil*(transp: StreamTransport, pbytes: pointer, nbytes: int, ## ## On success, the data and separator will be removed from the internal ## buffer (consumed). Returned data will NOT include the separator at the end. + ## + ## If EOF is received, and `sep` was not found, procedure will raise + ## ``TransportIncompleteError``. + ## + ## If ``nbytes`` bytes has been received and `sep` was not found, procedure + ## will raise ``TransportLimitError``. + ## + ## Procedure returns actual number of bytes read. checkClosed(transp) checkPending(transp) @@ -1026,6 +1043,9 @@ proc readLine*(transp: StreamTransport, limit = 0, transp.reader = nil proc read*(transp: StreamTransport, n = -1): Future[seq[byte]] {.async.} = + ## Read all bytes (n == -1) or `n` bytes from transport ``transp``. + ## + ## This procedure allocates buffer seq[byte] and return it as result. checkClosed(transp) checkPending(transp)