From 519ca463dfd060dfec64a221b8e39e06a70860d3 Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Thu, 20 Jan 2022 18:38:41 +0200 Subject: [PATCH] Fix transport.write() unable to send data through OS pipes. (#256) * Fix transport.write() unable to send data through pipe. Add test for pipes. * Fix flaky test. * Add workaround for Nim's issue #19425. --- chronos/handles.nim | 16 +++++++++------- chronos/transports/stream.nim | 19 ++++++++++++++----- tests/testasyncstream.nim | 6 +++--- tests/teststream.nim | 29 +++++++++++++++++++++++++++++ 4 files changed, 55 insertions(+), 15 deletions(-) diff --git a/chronos/handles.nim b/chronos/handles.nim index a929f76..df58358 100644 --- a/chronos/handles.nim +++ b/chronos/handles.nim @@ -10,6 +10,7 @@ {.push raises: [Defect].} import std/[net, nativesockets] +import stew/base10 import ./asyncloop when defined(windows) or defined(nimdoc): @@ -24,7 +25,7 @@ when defined(windows) or defined(nimdoc): DEFAULT_PIPE_SIZE = 65536'i32 ERROR_PIPE_CONNECTED = 535 ERROR_PIPE_BUSY = 231 - pipeHeaderName = r"\\.\pipe\chronos\" + pipeHeaderName = r"\\.\pipe\LOCAL\chronos\" proc connectNamedPipe(hNamedPipe: Handle, lpOverlapped: pointer): WINBOOL {.importc: "ConnectNamedPipe", stdcall, dynlib: "kernel32".} @@ -152,18 +153,19 @@ proc createAsyncPipe*(): tuple[read: AsyncFD, write: AsyncFD] = ## on error. when defined(windows): var pipeIn, pipeOut: Handle - var pipeName: WideCString + var pipeName: string var uniq = 0'u64 var sa = SECURITY_ATTRIBUTES(nLength: sizeof(SECURITY_ATTRIBUTES).cint, lpSecurityDescriptor: nil, bInheritHandle: 0) while true: QueryPerformanceCounter(uniq) - pipeName = newWideCString(pipeHeaderName & $uniq) + pipeName = pipeHeaderName & Base10.toString(uniq) + var openMode = FILE_FLAG_FIRST_PIPE_INSTANCE or FILE_FLAG_OVERLAPPED or PIPE_ACCESS_INBOUND var pipeMode = PIPE_TYPE_BYTE or PIPE_READMODE_BYTE or PIPE_WAIT - pipeIn = createNamedPipe(pipeName, openMode, pipeMode, 1'i32, - DEFAULT_PIPE_SIZE, DEFAULT_PIPE_SIZE, + pipeIn = createNamedPipe(newWideCString(pipeName), openMode, pipeMode, + 1'i32, DEFAULT_PIPE_SIZE, DEFAULT_PIPE_SIZE, 0'i32, addr sa) if pipeIn == INVALID_HANDLE_VALUE: let err = osLastError() @@ -176,8 +178,8 @@ proc createAsyncPipe*(): tuple[read: AsyncFD, write: AsyncFD] = break var openMode = (GENERIC_WRITE or FILE_WRITE_DATA or SYNCHRONIZE) - pipeOut = createFileW(pipeName, openMode, 0, addr(sa), OPEN_EXISTING, - FILE_FLAG_OVERLAPPED, 0) + pipeOut = createFileW(newWideCString(pipeName), openMode, 0, addr(sa), + OPEN_EXISTING, FILE_FLAG_OVERLAPPED, 0) if pipeOut == INVALID_HANDLE_VALUE: discard closeHandle(pipeIn) return (read: asyncInvalidPipe, write: asyncInvalidPipe) diff --git a/chronos/transports/stream.nim b/chronos/transports/stream.nim index bde4f48..a621cdb 100644 --- a/chronos/transports/stream.nim +++ b/chronos/transports/stream.nim @@ -2048,14 +2048,23 @@ proc getUserData*[T](server: StreamServer): T {.inline.} = ## Obtain user data stored in ``server`` object. result = cast[T](server.udata) -template fastWrite(fd: auto, pbytes: var ptr byte, rbytes: var int, nbytes: int) = +template fastWrite(transp: auto, pbytes: var ptr byte, rbytes: var int, + nbytes: int) = # On windows, the write could be initiated here if there is no other write # ongoing, but the queue is still needed due to the mechanics of iocp when not defined(windows) and not defined(nimdoc): if transp.queue.len == 0: while rbytes > 0: - let res = posix.send(SocketHandle(fd), pbytes, rbytes, MSG_NOSIGNAL) + let res = + case transp.kind + of TransportKind.Socket: + posix.send(SocketHandle(transp.fd), pbytes, rbytes, + MSG_NOSIGNAL) + of TransportKind.Pipe: + posix.write(cint(transp.fd), pbytes, rbytes) + else: + raiseAssert "Unsupported transport kind: " & $transp.kind if res > 0: pbytes = cast[ptr byte](cast[uint](pbytes) + cast[uint](res)) rbytes -= res @@ -2094,7 +2103,7 @@ proc write*(transp: StreamTransport, pbytes: pointer, pbytes = cast[ptr byte](pbytes) rbytes = nbytes # Remaining bytes - fastWrite(transp.fd, pbytes, rbytes, nbytes) + fastWrite(transp, pbytes, rbytes, nbytes) var vector = StreamVector(kind: DataBuffer, writer: retFuture, buf: pbytes, buflen: rbytes, size: nbytes) @@ -2115,7 +2124,7 @@ proc write*(transp: StreamTransport, msg: string, msglen = -1): Future[int] = pbytes = cast[ptr byte](unsafeAddr msg[0]) rbytes = nbytes - fastWrite(transp.fd, pbytes, rbytes, nbytes) + fastWrite(transp, pbytes, rbytes, nbytes) let written = nbytes - rbytes # In case fastWrite wrote some @@ -2146,7 +2155,7 @@ proc write*[T](transp: StreamTransport, msg: seq[T], msglen = -1): Future[int] = pbytes = cast[ptr byte](unsafeAddr msg[0]) rbytes = nbytes - fastWrite(transp.fd, pbytes, rbytes, nbytes) + fastWrite(transp, pbytes, rbytes, nbytes) let written = nbytes - rbytes # In case fastWrite wrote some diff --git a/tests/testasyncstream.nim b/tests/testasyncstream.nim index 61214b6..9dab202 100644 --- a/tests/testasyncstream.nim +++ b/tests/testasyncstream.nim @@ -943,7 +943,7 @@ suite "BoundedStream test suite": let flags = {ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay} var server = createStreamServer(address, processClient, flags = flags) server.start() - var conn = await connect(address) + var conn = await connect(server.localAddress()) var rstream = newAsyncStreamReader(conn) case btest of BoundaryRead: @@ -1057,7 +1057,7 @@ suite "BoundedStream test suite": let flags = {ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay} var server = createStreamServer(address, processClient, flags = flags) server.start() - var conn = await connect(address) + var conn = await connect(server.localAddress()) var rstream = newAsyncStreamReader(conn) var rbstream = newBoundedStreamReader(rstream, uint64(size), comparison = cmp) @@ -1108,7 +1108,7 @@ suite "BoundedStream test suite": await server.join() return (res and clientRes) - let address = initTAddress("127.0.0.1:48030") + let address = initTAddress("127.0.0.1:0") let suffix = case itemComp of BoundCmp.Equal: diff --git a/tests/teststream.nim b/tests/teststream.nim index 2017330..42ac7b1 100644 --- a/tests/teststream.nim +++ b/tests/teststream.nim @@ -1233,6 +1233,33 @@ suite "Stream Transport test suite": except AsyncTimeoutError: return false + proc testPipe(): Future[bool] {.async.} = + let (rfd, wfd) = createAsyncPipe() + + let + message = createBigMessage(16384 * 1024) + rtransp = fromPipe(rfd) + wtransp = fromPipe(wfd) + var + buffer = newSeq[byte](16384 * 1024) + + proc writer(transp: StreamTransport): Future[int] {.async.} = + let res = + try: + await transp.write(message) + except CatchableError: + -1 + return res + + var fut = wtransp.writer() + try: + await rtransp.readExactly(addr buffer[0], 16384 * 1024) + except CatchableError: + discard + + await allFutures(rtransp.closeWait(), wtransp.closeWait()) + return buffer == message + markFD = getCurrentFD() for i in 0..