Add pipe support for StreamTransport.

Bump version to 2.2.8.
This commit is contained in:
cheatfate 2019-07-15 12:59:42 +03:00
parent ec7f2a14a8
commit 03eb8a0157
No known key found for this signature in database
GPG Key ID: 46ADD633A7201F95
5 changed files with 251 additions and 42 deletions

View File

@ -1,5 +1,5 @@
packageName = "chronos" packageName = "chronos"
version = "2.2.7" version = "2.2.8"
author = "Status Research & Development GmbH" author = "Status Research & Development GmbH"
description = "Chronos" description = "Chronos"
license = "Apache License 2.0 or MIT" license = "Apache License 2.0 or MIT"

View File

@ -170,7 +170,7 @@ when defined(windows):
else: else:
import selectors import selectors
from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK, from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
MSG_NOSIGNAL MSG_NOSIGNAL, SIGPIPE
type type
AsyncError* = object of CatchableError AsyncError* = object of CatchableError
@ -624,6 +624,15 @@ else:
var acb = AsyncCallback(function: continuation) var acb = AsyncCallback(function: continuation)
loop.callbacks.addLast(acb) loop.callbacks.addLast(acb)
proc closeHandle*(fd: AsyncFD, aftercb: CallbackFunc = nil) {.inline.} =
## Close asynchronous file/pipe handle.
##
## Please note, that socket is not closed immediately. To avoid bugs with
## closing socket, while operation pending, socket will be closed as
## soon as all pending operations will be notified.
## You can execute ``aftercb`` before actual socket close operation.
closeSocket(fd, aftercb)
when ioselSupportedPlatform: when ioselSupportedPlatform:
proc addSignal*(signal: int, cb: CallbackFunc, proc addSignal*(signal: int, cb: CallbackFunc,
udata: pointer = nil): int = udata: pointer = nil): int =
@ -690,7 +699,12 @@ else:
# poll() call. # poll() call.
loop.processCallbacks() loop.processCallbacks()
const
SIG_IGN = cast[proc(x: cint) {.noconv,gcsafe.}](1)
proc initAPI() = proc initAPI() =
# We are ignoring SIGPIPE signal, because we are working with EPIPE.
posix.signal(cint(SIGPIPE), SIG_IGN)
discard getGlobalDispatcher() discard getGlobalDispatcher()
proc addTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) = proc addTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) =

View File

@ -7,7 +7,7 @@
# Apache License, version 2.0, (LICENSE-APACHEv2) # Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT) # MIT license (LICENSE-MIT)
import net, nativesockets, asyncloop import net, nativesockets, os, asyncloop
when defined(windows): when defined(windows):
import winlean import winlean
@ -15,6 +15,16 @@ when defined(windows):
asyncInvalidSocket* = AsyncFD(-1) asyncInvalidSocket* = AsyncFD(-1)
TCP_NODELAY* = 1 TCP_NODELAY* = 1
IPPROTO_TCP* = 6 IPPROTO_TCP* = 6
PIPE_TYPE_BYTE = 0x00000000'i32
PIPE_READMODE_BYTE = 0x00000000'i32
PIPE_WAIT = 0x00000000'i32
DEFAULT_PIPE_SIZE = 65536'i32
ERROR_PIPE_CONNECTED = 535
ERROR_PIPE_BUSY = 231
pipeHeaderName = r"\\.\pipe\chronos\"
proc connectNamedPipe(hNamedPipe: Handle, lpOverlapped: pointer): WINBOOL
{.importc: "ConnectNamedPipe", stdcall, dynlib: "kernel32".}
else: else:
import posix import posix
const const
@ -22,6 +32,9 @@ else:
TCP_NODELAY* = 1 TCP_NODELAY* = 1
IPPROTO_TCP* = 6 IPPROTO_TCP* = 6
const
asyncInvalidPipe* = asyncInvalidSocket
proc setSocketBlocking*(s: SocketHandle, blocking: bool): bool = proc setSocketBlocking*(s: SocketHandle, blocking: bool): bool =
## Sets blocking mode on socket. ## Sets blocking mode on socket.
when defined(windows): when defined(windows):
@ -103,3 +116,74 @@ proc wrapAsyncSocket*(sock: SocketHandle): AsyncFD =
return asyncInvalidSocket return asyncInvalidSocket
result = AsyncFD(sock) result = AsyncFD(sock)
register(result) register(result)
proc createAsyncPipe*(): tuple[read: AsyncFD, write: AsyncFD] =
## Create new asynchronouse pipe.
## Returns tuple of read pipe handle and write pipe handle``asyncInvalidPipe`` on error.
when defined(windows):
var pipeIn, pipeOut: Handle
var pipeName: WideCString
var uniq = 0'u64
var sa = SECURITY_ATTRIBUTES(nLength: sizeof(SECURITY_ATTRIBUTES).cint,
lpSecurityDescriptor: nil, bInheritHandle: 0)
while true:
QueryPerformanceCounter(uniq)
pipeName = newWideCString(pipeHeaderName & $uniq)
var openMode = FILE_FLAG_FIRST_PIPE_INSTANCE or FILE_FLAG_OVERLAPPED or
PIPE_ACCESS_INBOUND
var pipeMode = PIPE_TYPE_BYTE or PIPE_READMODE_BYTE or PIPE_WAIT
pipeIn = createNamedPipe(pipeName, openMode, pipeMode, 1'i32,
DEFAULT_PIPE_SIZE, DEFAULT_PIPE_SIZE,
0'i32, addr sa)
if pipeIn == INVALID_HANDLE_VALUE:
let err = osLastError()
# If error in {ERROR_ACCESS_DENIED, ERROR_PIPE_BUSY}, then named pipe
# with such name already exists.
if int32(err) != ERROR_ACCESS_DENIED and int32(err) != ERROR_PIPE_BUSY:
result = (read: asyncInvalidPipe, write: asyncInvalidPipe)
return
continue
else:
break
var openMode = (GENERIC_WRITE or FILE_WRITE_DATA or SYNCHRONIZE)
pipeOut = createFileW(pipeName, openMode, 0, addr(sa), OPEN_EXISTING,
FILE_FLAG_OVERLAPPED, 0)
if pipeOut == INVALID_HANDLE_VALUE:
discard closeHandle(pipeIn)
result = (read: asyncInvalidPipe, write: asyncInvalidPipe)
return
var ovl = OVERLAPPED()
let res = connectNamedPipe(pipeIn, cast[pointer](addr ovl))
if res == 0:
let err = osLastError()
if int32(err) == ERROR_PIPE_CONNECTED:
discard
elif int32(err) == ERROR_IO_PENDING:
var bytesRead = 0.Dword
if getOverlappedResult(pipeIn, addr ovl, bytesRead, 1) == 0:
discard closeHandle(pipeIn)
discard closeHandle(pipeOut)
result = (read: asyncInvalidPipe, write: asyncInvalidPipe)
return
else:
discard closeHandle(pipeIn)
discard closeHandle(pipeOut)
result = (read: asyncInvalidPipe, write: asyncInvalidPipe)
return
result = (read: AsyncFD(pipeIn), write: AsyncFD(pipeOut))
else:
var fds: array[2, cint]
if posix.pipe(fds) == -1:
result = (read: asyncInvalidPipe, write: asyncInvalidPipe)
return
if not(setSocketBlocking(SocketHandle(fds[0]), false)) or
not(setSocketBlocking(SocketHandle(fds[1]), false)):
result = (read: asyncInvalidPipe, write: asyncInvalidPipe)
return
result = (read: AsyncFD(fds[0]), write: AsyncFD(fds[1]))

View File

@ -46,7 +46,7 @@ when defined(windows):
cast[uint64](t.dwLowDateTime)) * 100 cast[uint64](t.dwLowDateTime)) * 100
else: else:
proc QueryPerformanceCounter(res: var uint64) {. proc QueryPerformanceCounter*(res: var uint64) {.
importc: "QueryPerformanceCounter", stdcall, dynlib: "kernel32".} importc: "QueryPerformanceCounter", stdcall, dynlib: "kernel32".}
proc QueryPerformanceFrequency(res: var uint64) {. proc QueryPerformanceFrequency(res: var uint64) {.
importc: "QueryPerformanceFrequency", stdcall, dynlib: "kernel32".} importc: "QueryPerformanceFrequency", stdcall, dynlib: "kernel32".}

View File

@ -699,23 +699,22 @@ when defined(windows):
proc cancel(udata: pointer) {.gcsafe.} = proc cancel(udata: pointer) {.gcsafe.} =
sock.closeSocket() sock.closeSocket()
retFuture.cancelCallback = cancel
povl = RefCustomOverlapped() povl = RefCustomOverlapped()
GC_ref(povl) GC_ref(povl)
povl.data = CompletionData(fd: sock, cb: socketContinuation) povl.data = CompletionData(fd: sock, cb: socketContinuation)
if address.family in {AddressFamily.IPv4, AddressFamily.IPv6}: var res = loop.connectEx(SocketHandle(sock),
var res = loop.connectEx(SocketHandle(sock), cast[ptr SockAddr](addr saddr),
cast[ptr SockAddr](addr saddr), DWORD(slen), nil, 0, nil,
DWORD(slen), nil, 0, nil, cast[POVERLAPPED](povl))
cast[POVERLAPPED](povl)) # We will not process immediate completion, to avoid undefined behavior.
# We will not process immediate completion, to avoid undefined behavior. if not res:
if not res: let err = osLastError()
let err = osLastError() if int32(err) != ERROR_IO_PENDING:
if int32(err) != ERROR_IO_PENDING: GC_unref(povl)
GC_unref(povl) sock.closeSocket()
sock.closeSocket() retFuture.fail(getTransportOsError(err))
retFuture.fail(getTransportOsError(err))
retFuture.cancelCallback = cancel
elif address.family == AddressFamily.Unix: elif address.family == AddressFamily.Unix:
## Unix domain socket emulation with Windows Named Pipes. ## Unix domain socket emulation with Windows Named Pipes.
@ -1016,7 +1015,65 @@ else:
else: else:
if not(vector.writer.finished()): if not(vector.writer.finished()):
vector.writer.fail(getTransportOsError(err)) vector.writer.fail(getTransportOsError(err))
break break
elif transp.kind == TransportKind.Pipe:
if vector.kind == VectorKind.DataBuffer:
let res = posix.write(cint(fd), vector.buf, vector.buflen)
if res >= 0:
if vector.buflen - res == 0:
if not(vector.writer.finished()):
vector.writer.complete(vector.buflen)
else:
vector.shiftVectorBuffer(res)
transp.queue.addFirst(vector)
else:
let err = osLastError()
if int(err) == EINTR:
continue
else:
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.state.incl({WriteEof, WritePaused})
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
transp.fd.removeWriter()
else:
if not(vector.writer.finished()):
vector.writer.fail(getTransportOsError(err))
else:
var nbytes = cast[int](vector.buf)
let res = sendfile(int(fd), cast[int](vector.buflen),
int(vector.offset),
nbytes)
if res >= 0:
if cast[int](vector.buf) - nbytes == 0:
vector.size += nbytes
if not(vector.writer.finished()):
vector.writer.complete(vector.size)
else:
vector.size += nbytes
vector.shiftVectorFile(nbytes)
transp.queue.addFirst(vector)
else:
let err = osLastError()
if int(err) == EINTR:
continue
else:
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.state.incl({WriteEof, WritePaused})
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
transp.fd.removeWriter()
else:
if not(vector.writer.finished()):
vector.writer.fail(getTransportOsError(err))
break
else: else:
transp.state.incl(WritePaused) transp.state.incl(WritePaused)
transp.fd.removeWriter() transp.fd.removeWriter()
@ -1036,32 +1093,57 @@ else:
transp.reader.complete() transp.reader.complete()
transp.reader = nil transp.reader = nil
else: else:
while true: if transp.kind == TransportKind.Socket:
var res = posix.recv(fd, addr transp.buffer[transp.offset], while true:
len(transp.buffer) - transp.offset, cint(0)) var res = posix.recv(fd, addr transp.buffer[transp.offset],
if res < 0: len(transp.buffer) - transp.offset, cint(0))
let err = osLastError() if res < 0:
if int(err) == EINTR: let err = osLastError()
continue if int(err) == EINTR:
elif int(err) in {ECONNRESET}: continue
elif int(err) in {ECONNRESET}:
transp.state.incl({ReadEof, ReadPaused})
cdata.fd.removeReader()
else:
transp.state.incl(ReadPaused)
transp.setReadError(err)
cdata.fd.removeReader()
elif res == 0:
transp.state.incl({ReadEof, ReadPaused}) transp.state.incl({ReadEof, ReadPaused})
cdata.fd.removeReader() cdata.fd.removeReader()
else: else:
transp.state.incl(ReadPaused) transp.offset += res
transp.setReadError(err) if transp.offset == len(transp.buffer):
transp.state.incl(ReadPaused)
cdata.fd.removeReader()
if not(isNil(transp.reader)) and not(transp.reader.finished()):
transp.reader.complete()
transp.reader = nil
break
elif transp.kind == TransportKind.Pipe:
while true:
var res = posix.read(cint(fd), addr transp.buffer[transp.offset],
len(transp.buffer) - transp.offset)
if res < 0:
let err = osLastError()
if int(err) == EINTR:
continue
else:
transp.state.incl(ReadPaused)
transp.setReadError(err)
cdata.fd.removeReader()
elif res == 0:
transp.state.incl({ReadEof, ReadPaused})
cdata.fd.removeReader() cdata.fd.removeReader()
elif res == 0: else:
transp.state.incl({ReadEof, ReadPaused}) transp.offset += res
cdata.fd.removeReader() if transp.offset == len(transp.buffer):
else: transp.state.incl(ReadPaused)
transp.offset += res cdata.fd.removeReader()
if transp.offset == len(transp.buffer): if not(isNil(transp.reader)) and not(transp.reader.finished()):
transp.state.incl(ReadPaused) transp.reader.complete()
cdata.fd.removeReader() transp.reader = nil
if not(isNil(transp.reader)) and not(transp.reader.finished()): break
transp.reader.complete()
transp.reader = nil
break
proc newStreamSocketTransport(sock: AsyncFD, bufsize: int, proc newStreamSocketTransport(sock: AsyncFD, bufsize: int,
child: StreamTransport): StreamTransport = child: StreamTransport): StreamTransport =
@ -1079,6 +1161,22 @@ else:
GC_ref(transp) GC_ref(transp)
result = transp result = transp
proc newStreamPipeTransport(fd: AsyncFD, bufsize: int,
child: StreamTransport): StreamTransport =
var transp: StreamTransport
if not isNil(child):
transp = child
else:
transp = StreamTransport(kind: TransportKind.Pipe)
transp.fd = fd
transp.buffer = newSeq[byte](bufsize)
transp.state = {ReadPaused, WritePaused}
transp.queue = initDeque[StreamVector]()
transp.future = newFuture[void]("pipe.stream.transport")
GC_ref(transp)
result = transp
proc connect*(address: TransportAddress, proc connect*(address: TransportAddress,
bufferSize = DefaultStreamBufferSize, bufferSize = DefaultStreamBufferSize,
child: StreamTransport = nil): Future[StreamTransport] = child: StreamTransport = nil): Future[StreamTransport] =
@ -1827,7 +1925,10 @@ proc close*(transp: StreamTransport) =
# of readStreamLoop(). # of readStreamLoop().
closeSocket(transp.fd) closeSocket(transp.fd)
else: else:
closeSocket(transp.fd, continuation) if transp.kind == TransportKind.Pipe:
closeHandle(transp.fd, continuation)
elif transp.kind == TransportKind.Socket:
closeSocket(transp.fd, continuation)
proc closeWait*(transp: StreamTransport): Future[void] = proc closeWait*(transp: StreamTransport): Future[void] =
## Close and frees resources of transport ``transp``. ## Close and frees resources of transport ``transp``.
@ -1837,3 +1938,13 @@ proc closeWait*(transp: StreamTransport): Future[void] =
proc closed*(transp: StreamTransport): bool {.inline.} = proc closed*(transp: StreamTransport): bool {.inline.} =
## Returns ``true`` if transport in closed state. ## Returns ``true`` if transport in closed state.
result = ({ReadClosed, WriteClosed} * transp.state != {}) result = ({ReadClosed, WriteClosed} * transp.state != {})
proc fromPipe*(fd: AsyncFD, child: StreamTransport = nil,
bufferSize = DefaultStreamBufferSize): StreamTransport =
## Create new transport object using pipe's file descriptor.
##
## ``bufferSize`` is size of internal buffer for transport.
register(fd)
result = newStreamPipeTransport(fd, bufferSize, child)
# Start tracking transport
trackStream(result)