Fix transport.write() unable to send data through OS pipes. (#256)

* Fix transport.write() unable to send data through pipe.
Add test for pipes.

* Fix flaky test.

* Add workaround for Nim's issue #19425.
This commit is contained in:
Eugene Kabanov 2022-01-20 18:38:41 +02:00 committed by GitHub
parent 17fed89c99
commit 519ca463df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 55 additions and 15 deletions

View File

@ -10,6 +10,7 @@
{.push raises: [Defect].}
import std/[net, nativesockets]
import stew/base10
import ./asyncloop
when defined(windows) or defined(nimdoc):
@ -24,7 +25,7 @@ when defined(windows) or defined(nimdoc):
DEFAULT_PIPE_SIZE = 65536'i32
ERROR_PIPE_CONNECTED = 535
ERROR_PIPE_BUSY = 231
pipeHeaderName = r"\\.\pipe\chronos\"
pipeHeaderName = r"\\.\pipe\LOCAL\chronos\"
proc connectNamedPipe(hNamedPipe: Handle, lpOverlapped: pointer): WINBOOL
{.importc: "ConnectNamedPipe", stdcall, dynlib: "kernel32".}
@ -152,18 +153,19 @@ proc createAsyncPipe*(): tuple[read: AsyncFD, write: AsyncFD] =
## on error.
when defined(windows):
var pipeIn, pipeOut: Handle
var pipeName: WideCString
var pipeName: string
var uniq = 0'u64
var sa = SECURITY_ATTRIBUTES(nLength: sizeof(SECURITY_ATTRIBUTES).cint,
lpSecurityDescriptor: nil, bInheritHandle: 0)
while true:
QueryPerformanceCounter(uniq)
pipeName = newWideCString(pipeHeaderName & $uniq)
pipeName = pipeHeaderName & Base10.toString(uniq)
var openMode = FILE_FLAG_FIRST_PIPE_INSTANCE or FILE_FLAG_OVERLAPPED or
PIPE_ACCESS_INBOUND
var pipeMode = PIPE_TYPE_BYTE or PIPE_READMODE_BYTE or PIPE_WAIT
pipeIn = createNamedPipe(pipeName, openMode, pipeMode, 1'i32,
DEFAULT_PIPE_SIZE, DEFAULT_PIPE_SIZE,
pipeIn = createNamedPipe(newWideCString(pipeName), openMode, pipeMode,
1'i32, DEFAULT_PIPE_SIZE, DEFAULT_PIPE_SIZE,
0'i32, addr sa)
if pipeIn == INVALID_HANDLE_VALUE:
let err = osLastError()
@ -176,8 +178,8 @@ proc createAsyncPipe*(): tuple[read: AsyncFD, write: AsyncFD] =
break
var openMode = (GENERIC_WRITE or FILE_WRITE_DATA or SYNCHRONIZE)
pipeOut = createFileW(pipeName, openMode, 0, addr(sa), OPEN_EXISTING,
FILE_FLAG_OVERLAPPED, 0)
pipeOut = createFileW(newWideCString(pipeName), openMode, 0, addr(sa),
OPEN_EXISTING, FILE_FLAG_OVERLAPPED, 0)
if pipeOut == INVALID_HANDLE_VALUE:
discard closeHandle(pipeIn)
return (read: asyncInvalidPipe, write: asyncInvalidPipe)

View File

@ -2048,14 +2048,23 @@ proc getUserData*[T](server: StreamServer): T {.inline.} =
## Obtain user data stored in ``server`` object.
result = cast[T](server.udata)
template fastWrite(fd: auto, pbytes: var ptr byte, rbytes: var int, nbytes: int) =
template fastWrite(transp: auto, pbytes: var ptr byte, rbytes: var int,
nbytes: int) =
# On windows, the write could be initiated here if there is no other write
# ongoing, but the queue is still needed due to the mechanics of iocp
when not defined(windows) and not defined(nimdoc):
if transp.queue.len == 0:
while rbytes > 0:
let res = posix.send(SocketHandle(fd), pbytes, rbytes, MSG_NOSIGNAL)
let res =
case transp.kind
of TransportKind.Socket:
posix.send(SocketHandle(transp.fd), pbytes, rbytes,
MSG_NOSIGNAL)
of TransportKind.Pipe:
posix.write(cint(transp.fd), pbytes, rbytes)
else:
raiseAssert "Unsupported transport kind: " & $transp.kind
if res > 0:
pbytes = cast[ptr byte](cast[uint](pbytes) + cast[uint](res))
rbytes -= res
@ -2094,7 +2103,7 @@ proc write*(transp: StreamTransport, pbytes: pointer,
pbytes = cast[ptr byte](pbytes)
rbytes = nbytes # Remaining bytes
fastWrite(transp.fd, pbytes, rbytes, nbytes)
fastWrite(transp, pbytes, rbytes, nbytes)
var vector = StreamVector(kind: DataBuffer, writer: retFuture,
buf: pbytes, buflen: rbytes, size: nbytes)
@ -2115,7 +2124,7 @@ proc write*(transp: StreamTransport, msg: string, msglen = -1): Future[int] =
pbytes = cast[ptr byte](unsafeAddr msg[0])
rbytes = nbytes
fastWrite(transp.fd, pbytes, rbytes, nbytes)
fastWrite(transp, pbytes, rbytes, nbytes)
let
written = nbytes - rbytes # In case fastWrite wrote some
@ -2146,7 +2155,7 @@ proc write*[T](transp: StreamTransport, msg: seq[T], msglen = -1): Future[int] =
pbytes = cast[ptr byte](unsafeAddr msg[0])
rbytes = nbytes
fastWrite(transp.fd, pbytes, rbytes, nbytes)
fastWrite(transp, pbytes, rbytes, nbytes)
let
written = nbytes - rbytes # In case fastWrite wrote some

View File

@ -943,7 +943,7 @@ suite "BoundedStream test suite":
let flags = {ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay}
var server = createStreamServer(address, processClient, flags = flags)
server.start()
var conn = await connect(address)
var conn = await connect(server.localAddress())
var rstream = newAsyncStreamReader(conn)
case btest
of BoundaryRead:
@ -1057,7 +1057,7 @@ suite "BoundedStream test suite":
let flags = {ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay}
var server = createStreamServer(address, processClient, flags = flags)
server.start()
var conn = await connect(address)
var conn = await connect(server.localAddress())
var rstream = newAsyncStreamReader(conn)
var rbstream = newBoundedStreamReader(rstream, uint64(size),
comparison = cmp)
@ -1108,7 +1108,7 @@ suite "BoundedStream test suite":
await server.join()
return (res and clientRes)
let address = initTAddress("127.0.0.1:48030")
let address = initTAddress("127.0.0.1:0")
let suffix =
case itemComp
of BoundCmp.Equal:

View File

@ -1233,6 +1233,33 @@ suite "Stream Transport test suite":
except AsyncTimeoutError:
return false
proc testPipe(): Future[bool] {.async.} =
let (rfd, wfd) = createAsyncPipe()
let
message = createBigMessage(16384 * 1024)
rtransp = fromPipe(rfd)
wtransp = fromPipe(wfd)
var
buffer = newSeq[byte](16384 * 1024)
proc writer(transp: StreamTransport): Future[int] {.async.} =
let res =
try:
await transp.write(message)
except CatchableError:
-1
return res
var fut = wtransp.writer()
try:
await rtransp.readExactly(addr buffer[0], 16384 * 1024)
except CatchableError:
discard
await allFutures(rtransp.closeWait(), wtransp.closeWait())
return buffer == message
markFD = getCurrentFD()
for i in 0..<len(addresses):
@ -1318,6 +1345,8 @@ suite "Stream Transport test suite":
check waitFor(testWriteOnClose(addresses[i])) == true
test prefixes[i] & "read() notification on close() test":
check waitFor(testReadOnClose(addresses[i])) == true
test "[PIPE] readExactly()/write() test":
check waitFor(testPipe()) == true
test "Servers leak test":
check getTracker("stream.server").isLeaked() == false
test "Transports leak test":