diff --git a/chronos.nimble b/chronos.nimble index af25a26..02840cd 100644 --- a/chronos.nimble +++ b/chronos.nimble @@ -1,5 +1,5 @@ packageName = "chronos" -version = "2.2.7" +version = "2.2.8" author = "Status Research & Development GmbH" description = "Chronos" license = "Apache License 2.0 or MIT" diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index 5e64762..3c32441 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -170,7 +170,7 @@ when defined(windows): else: import selectors from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK, - MSG_NOSIGNAL + MSG_NOSIGNAL, SIGPIPE type AsyncError* = object of CatchableError @@ -624,6 +624,15 @@ else: var acb = AsyncCallback(function: continuation) loop.callbacks.addLast(acb) + proc closeHandle*(fd: AsyncFD, aftercb: CallbackFunc = nil) {.inline.} = + ## Close asynchronous file/pipe handle. + ## + ## Please note, that socket is not closed immediately. To avoid bugs with + ## closing socket, while operation pending, socket will be closed as + ## soon as all pending operations will be notified. + ## You can execute ``aftercb`` before actual socket close operation. + closeSocket(fd, aftercb) + when ioselSupportedPlatform: proc addSignal*(signal: int, cb: CallbackFunc, udata: pointer = nil): int = @@ -690,7 +699,12 @@ else: # poll() call. loop.processCallbacks() + const + SIG_IGN = cast[proc(x: cint) {.noconv,gcsafe.}](1) + proc initAPI() = + # We are ignoring SIGPIPE signal, because we are working with EPIPE. + posix.signal(cint(SIGPIPE), SIG_IGN) discard getGlobalDispatcher() proc addTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) = diff --git a/chronos/handles.nim b/chronos/handles.nim index d75e3ef..19716f2 100644 --- a/chronos/handles.nim +++ b/chronos/handles.nim @@ -7,7 +7,7 @@ # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import net, nativesockets, asyncloop +import net, nativesockets, os, asyncloop when defined(windows): import winlean @@ -15,6 +15,16 @@ when defined(windows): asyncInvalidSocket* = AsyncFD(-1) TCP_NODELAY* = 1 IPPROTO_TCP* = 6 + PIPE_TYPE_BYTE = 0x00000000'i32 + PIPE_READMODE_BYTE = 0x00000000'i32 + PIPE_WAIT = 0x00000000'i32 + DEFAULT_PIPE_SIZE = 65536'i32 + ERROR_PIPE_CONNECTED = 535 + ERROR_PIPE_BUSY = 231 + pipeHeaderName = r"\\.\pipe\chronos\" + + proc connectNamedPipe(hNamedPipe: Handle, lpOverlapped: pointer): WINBOOL + {.importc: "ConnectNamedPipe", stdcall, dynlib: "kernel32".} else: import posix const @@ -22,6 +32,9 @@ else: TCP_NODELAY* = 1 IPPROTO_TCP* = 6 +const + asyncInvalidPipe* = asyncInvalidSocket + proc setSocketBlocking*(s: SocketHandle, blocking: bool): bool = ## Sets blocking mode on socket. when defined(windows): @@ -103,3 +116,74 @@ proc wrapAsyncSocket*(sock: SocketHandle): AsyncFD = return asyncInvalidSocket result = AsyncFD(sock) register(result) + +proc createAsyncPipe*(): tuple[read: AsyncFD, write: AsyncFD] = + ## Create new asynchronouse pipe. + ## Returns tuple of read pipe handle and write pipe handle``asyncInvalidPipe`` on error. + when defined(windows): + var pipeIn, pipeOut: Handle + var pipeName: WideCString + var uniq = 0'u64 + var sa = SECURITY_ATTRIBUTES(nLength: sizeof(SECURITY_ATTRIBUTES).cint, + lpSecurityDescriptor: nil, bInheritHandle: 0) + while true: + QueryPerformanceCounter(uniq) + pipeName = newWideCString(pipeHeaderName & $uniq) + var openMode = FILE_FLAG_FIRST_PIPE_INSTANCE or FILE_FLAG_OVERLAPPED or + PIPE_ACCESS_INBOUND + var pipeMode = PIPE_TYPE_BYTE or PIPE_READMODE_BYTE or PIPE_WAIT + pipeIn = createNamedPipe(pipeName, openMode, pipeMode, 1'i32, + DEFAULT_PIPE_SIZE, DEFAULT_PIPE_SIZE, + 0'i32, addr sa) + if pipeIn == INVALID_HANDLE_VALUE: + let err = osLastError() + # If error in {ERROR_ACCESS_DENIED, ERROR_PIPE_BUSY}, then named pipe + # with such name already exists. + if int32(err) != ERROR_ACCESS_DENIED and int32(err) != ERROR_PIPE_BUSY: + result = (read: asyncInvalidPipe, write: asyncInvalidPipe) + return + continue + else: + break + + var openMode = (GENERIC_WRITE or FILE_WRITE_DATA or SYNCHRONIZE) + pipeOut = createFileW(pipeName, openMode, 0, addr(sa), OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, 0) + if pipeOut == INVALID_HANDLE_VALUE: + discard closeHandle(pipeIn) + result = (read: asyncInvalidPipe, write: asyncInvalidPipe) + return + + var ovl = OVERLAPPED() + let res = connectNamedPipe(pipeIn, cast[pointer](addr ovl)) + if res == 0: + let err = osLastError() + if int32(err) == ERROR_PIPE_CONNECTED: + discard + elif int32(err) == ERROR_IO_PENDING: + var bytesRead = 0.Dword + if getOverlappedResult(pipeIn, addr ovl, bytesRead, 1) == 0: + discard closeHandle(pipeIn) + discard closeHandle(pipeOut) + result = (read: asyncInvalidPipe, write: asyncInvalidPipe) + return + else: + discard closeHandle(pipeIn) + discard closeHandle(pipeOut) + result = (read: asyncInvalidPipe, write: asyncInvalidPipe) + return + + result = (read: AsyncFD(pipeIn), write: AsyncFD(pipeOut)) + else: + var fds: array[2, cint] + + if posix.pipe(fds) == -1: + result = (read: asyncInvalidPipe, write: asyncInvalidPipe) + return + + if not(setSocketBlocking(SocketHandle(fds[0]), false)) or + not(setSocketBlocking(SocketHandle(fds[1]), false)): + result = (read: asyncInvalidPipe, write: asyncInvalidPipe) + return + + result = (read: AsyncFD(fds[0]), write: AsyncFD(fds[1])) diff --git a/chronos/timer.nim b/chronos/timer.nim index 72208a5..850e435 100644 --- a/chronos/timer.nim +++ b/chronos/timer.nim @@ -46,7 +46,7 @@ when defined(windows): cast[uint64](t.dwLowDateTime)) * 100 else: - proc QueryPerformanceCounter(res: var uint64) {. + proc QueryPerformanceCounter*(res: var uint64) {. importc: "QueryPerformanceCounter", stdcall, dynlib: "kernel32".} proc QueryPerformanceFrequency(res: var uint64) {. importc: "QueryPerformanceFrequency", stdcall, dynlib: "kernel32".} diff --git a/chronos/transports/stream.nim b/chronos/transports/stream.nim index 6915504..489a27f 100644 --- a/chronos/transports/stream.nim +++ b/chronos/transports/stream.nim @@ -699,23 +699,22 @@ when defined(windows): proc cancel(udata: pointer) {.gcsafe.} = sock.closeSocket() - retFuture.cancelCallback = cancel - povl = RefCustomOverlapped() GC_ref(povl) povl.data = CompletionData(fd: sock, cb: socketContinuation) - if address.family in {AddressFamily.IPv4, AddressFamily.IPv6}: - var res = loop.connectEx(SocketHandle(sock), - cast[ptr SockAddr](addr saddr), - DWORD(slen), nil, 0, nil, - cast[POVERLAPPED](povl)) - # We will not process immediate completion, to avoid undefined behavior. - if not res: - let err = osLastError() - if int32(err) != ERROR_IO_PENDING: - GC_unref(povl) - sock.closeSocket() - retFuture.fail(getTransportOsError(err)) + var res = loop.connectEx(SocketHandle(sock), + cast[ptr SockAddr](addr saddr), + DWORD(slen), nil, 0, nil, + cast[POVERLAPPED](povl)) + # We will not process immediate completion, to avoid undefined behavior. + if not res: + let err = osLastError() + if int32(err) != ERROR_IO_PENDING: + GC_unref(povl) + sock.closeSocket() + retFuture.fail(getTransportOsError(err)) + + retFuture.cancelCallback = cancel elif address.family == AddressFamily.Unix: ## Unix domain socket emulation with Windows Named Pipes. @@ -1016,7 +1015,65 @@ else: else: if not(vector.writer.finished()): vector.writer.fail(getTransportOsError(err)) - break + break + + elif transp.kind == TransportKind.Pipe: + if vector.kind == VectorKind.DataBuffer: + let res = posix.write(cint(fd), vector.buf, vector.buflen) + if res >= 0: + if vector.buflen - res == 0: + if not(vector.writer.finished()): + vector.writer.complete(vector.buflen) + else: + vector.shiftVectorBuffer(res) + transp.queue.addFirst(vector) + else: + let err = osLastError() + if int(err) == EINTR: + continue + else: + if isConnResetError(err): + # Soft error happens which indicates that remote peer got + # disconnected, complete all pending writes in queue with 0. + transp.state.incl({WriteEof, WritePaused}) + if not(vector.writer.finished()): + vector.writer.complete(0) + completePendingWriteQueue(transp.queue, 0) + transp.fd.removeWriter() + else: + if not(vector.writer.finished()): + vector.writer.fail(getTransportOsError(err)) + else: + var nbytes = cast[int](vector.buf) + let res = sendfile(int(fd), cast[int](vector.buflen), + int(vector.offset), + nbytes) + if res >= 0: + if cast[int](vector.buf) - nbytes == 0: + vector.size += nbytes + if not(vector.writer.finished()): + vector.writer.complete(vector.size) + else: + vector.size += nbytes + vector.shiftVectorFile(nbytes) + transp.queue.addFirst(vector) + else: + let err = osLastError() + if int(err) == EINTR: + continue + else: + if isConnResetError(err): + # Soft error happens which indicates that remote peer got + # disconnected, complete all pending writes in queue with 0. + transp.state.incl({WriteEof, WritePaused}) + if not(vector.writer.finished()): + vector.writer.complete(0) + completePendingWriteQueue(transp.queue, 0) + transp.fd.removeWriter() + else: + if not(vector.writer.finished()): + vector.writer.fail(getTransportOsError(err)) + break else: transp.state.incl(WritePaused) transp.fd.removeWriter() @@ -1036,32 +1093,57 @@ else: transp.reader.complete() transp.reader = nil else: - while true: - var res = posix.recv(fd, addr transp.buffer[transp.offset], - len(transp.buffer) - transp.offset, cint(0)) - if res < 0: - let err = osLastError() - if int(err) == EINTR: - continue - elif int(err) in {ECONNRESET}: + if transp.kind == TransportKind.Socket: + while true: + var res = posix.recv(fd, addr transp.buffer[transp.offset], + len(transp.buffer) - transp.offset, cint(0)) + if res < 0: + let err = osLastError() + if int(err) == EINTR: + continue + elif int(err) in {ECONNRESET}: + transp.state.incl({ReadEof, ReadPaused}) + cdata.fd.removeReader() + else: + transp.state.incl(ReadPaused) + transp.setReadError(err) + cdata.fd.removeReader() + elif res == 0: transp.state.incl({ReadEof, ReadPaused}) cdata.fd.removeReader() else: - transp.state.incl(ReadPaused) - transp.setReadError(err) + transp.offset += res + if transp.offset == len(transp.buffer): + transp.state.incl(ReadPaused) + cdata.fd.removeReader() + if not(isNil(transp.reader)) and not(transp.reader.finished()): + transp.reader.complete() + transp.reader = nil + break + elif transp.kind == TransportKind.Pipe: + while true: + var res = posix.read(cint(fd), addr transp.buffer[transp.offset], + len(transp.buffer) - transp.offset) + if res < 0: + let err = osLastError() + if int(err) == EINTR: + continue + else: + transp.state.incl(ReadPaused) + transp.setReadError(err) + cdata.fd.removeReader() + elif res == 0: + transp.state.incl({ReadEof, ReadPaused}) cdata.fd.removeReader() - elif res == 0: - transp.state.incl({ReadEof, ReadPaused}) - cdata.fd.removeReader() - else: - transp.offset += res - if transp.offset == len(transp.buffer): - transp.state.incl(ReadPaused) - cdata.fd.removeReader() - if not(isNil(transp.reader)) and not(transp.reader.finished()): - transp.reader.complete() - transp.reader = nil - break + else: + transp.offset += res + if transp.offset == len(transp.buffer): + transp.state.incl(ReadPaused) + cdata.fd.removeReader() + if not(isNil(transp.reader)) and not(transp.reader.finished()): + transp.reader.complete() + transp.reader = nil + break proc newStreamSocketTransport(sock: AsyncFD, bufsize: int, child: StreamTransport): StreamTransport = @@ -1079,6 +1161,22 @@ else: GC_ref(transp) result = transp + proc newStreamPipeTransport(fd: AsyncFD, bufsize: int, + child: StreamTransport): StreamTransport = + var transp: StreamTransport + if not isNil(child): + transp = child + else: + transp = StreamTransport(kind: TransportKind.Pipe) + + transp.fd = fd + transp.buffer = newSeq[byte](bufsize) + transp.state = {ReadPaused, WritePaused} + transp.queue = initDeque[StreamVector]() + transp.future = newFuture[void]("pipe.stream.transport") + GC_ref(transp) + result = transp + proc connect*(address: TransportAddress, bufferSize = DefaultStreamBufferSize, child: StreamTransport = nil): Future[StreamTransport] = @@ -1827,7 +1925,10 @@ proc close*(transp: StreamTransport) = # of readStreamLoop(). closeSocket(transp.fd) else: - closeSocket(transp.fd, continuation) + if transp.kind == TransportKind.Pipe: + closeHandle(transp.fd, continuation) + elif transp.kind == TransportKind.Socket: + closeSocket(transp.fd, continuation) proc closeWait*(transp: StreamTransport): Future[void] = ## Close and frees resources of transport ``transp``. @@ -1837,3 +1938,13 @@ proc closeWait*(transp: StreamTransport): Future[void] = proc closed*(transp: StreamTransport): bool {.inline.} = ## Returns ``true`` if transport in closed state. result = ({ReadClosed, WriteClosed} * transp.state != {}) + +proc fromPipe*(fd: AsyncFD, child: StreamTransport = nil, + bufferSize = DefaultStreamBufferSize): StreamTransport = + ## Create new transport object using pipe's file descriptor. + ## + ## ``bufferSize`` is size of internal buffer for transport. + register(fd) + result = newStreamPipeTransport(fd, bufferSize, child) + # Start tracking transport + trackStream(result)