diff --git a/chronos/transports/common.nim b/chronos/transports/common.nim index 976f09b..34d50c5 100644 --- a/chronos/transports/common.nim +++ b/chronos/transports/common.nim @@ -98,6 +98,8 @@ type code*: OSErrorCode TransportNoSupport* = object of TransportError ## Transport's capability not supported exception + TransportUseClosedError* = object of TransportError + ## Usage after transport close exception TransportState* = enum ## Transport's state diff --git a/chronos/transports/stream.nim b/chronos/transports/stream.nim index bdf550d..bc13c8d 100644 --- a/chronos/transports/stream.nim +++ b/chronos/transports/stream.nim @@ -60,6 +60,10 @@ type opened*: int64 closed*: int64 + ReadMessagePredicate* = proc (data: openarray[byte]): tuple[consumed: int, + done: bool] {. + gcsafe, raises: [].} + const StreamTransportTrackerName = "stream.transport" StreamServerTrackerName = "stream.server" @@ -163,6 +167,11 @@ proc localAddress*(server: StreamServer): TransportAddress = ## Returns ``server`` bound local socket address. result = server.local +template completeReader(stream: StreamTransport) = + if not(isNil(transp.reader)) and not(transp.reader.finished()): + transp.reader.complete() + transp.reader = nil + template setReadError(t, e: untyped) = (t).state.incl(ReadError) (t).error = getTransportOsError(e) @@ -173,8 +182,9 @@ template checkPending(t: untyped) = template shiftBuffer(t, c: untyped) = if (t).offset > c: - moveMem(addr((t).buffer[0]), addr((t).buffer[(c)]), (t).offset - (c)) - (t).offset = (t).offset - (c) + if c > 0: + moveMem(addr((t).buffer[0]), addr((t).buffer[(c)]), (t).offset - (c)) + (t).offset = (t).offset - (c) else: (t).offset = 0 @@ -525,9 +535,7 @@ when defined(windows): else: transp.setReadError(err) - if not(isNil(transp.reader)) and not(transp.reader.finished()): - transp.reader.complete() - transp.reader = nil + transp.completeReader() if ReadClosed in transp.state: # Stop tracking transport @@ -563,16 +571,12 @@ when defined(windows): elif int32(err) in {WSAECONNRESET, WSAENETRESET, WSAECONNABORTED}: transp.state.excl(ReadPending) transp.state.incl({ReadEof, ReadPaused}) - if not(isNil(transp.reader)) and not(transp.reader.finished()): - transp.reader.complete() - transp.reader = nil + transp.completeReader() elif int32(err) != ERROR_IO_PENDING: transp.state.excl(ReadPending) transp.state.incl(ReadPaused) transp.setReadError(err) - if not(isNil(transp.reader)) and not(transp.reader.finished()): - transp.reader.complete() - transp.reader = nil + transp.completeReader() elif transp.kind == TransportKind.Pipe: let pipe = Handle(transp.rovl.data.fd) transp.roffset = transp.offset @@ -589,21 +593,15 @@ when defined(windows): elif int32(err) in {ERROR_BROKEN_PIPE, ERROR_PIPE_NOT_CONNECTED}: transp.state.excl(ReadPending) transp.state.incl({ReadEof, ReadPaused}) - if not(isNil(transp.reader)) and not(transp.reader.finished()): - transp.reader.complete() - transp.reader = nil + transp.completeReader() elif int32(err) != ERROR_IO_PENDING: transp.state.excl(ReadPending) transp.state.incl(ReadPaused) transp.setReadError(err) - if not(isNil(transp.reader)) and not(transp.reader.finished()): - transp.reader.complete() - transp.reader = nil + transp.completeReader() else: transp.state.incl(ReadPaused) - if not(isNil(transp.reader)) and not(transp.reader.finished()): - transp.reader.complete() - transp.reader = nil + transp.completeReader() # Transport close happens in callback, and we not started new # WSARecvFrom session. if ReadClosed in transp.state: @@ -947,12 +945,14 @@ when defined(windows): GC_unref(server) proc resumeRead(transp: StreamTransport) {.inline.} = - transp.state.excl(ReadPaused) - readStreamLoop(cast[pointer](addr transp.rovl)) + if ReadPaused in transp.state: + transp.state.excl(ReadPaused) + readStreamLoop(cast[pointer](addr transp.rovl)) proc resumeWrite(transp: StreamTransport) {.inline.} = - transp.state.excl(WritePaused) - writeStreamLoop(cast[pointer](addr transp.wovl)) + if WritePaused in transp.state: + transp.state.excl(WritePaused) + writeStreamLoop(cast[pointer](addr transp.wovl)) proc pauseAccept(server: StreamServer) {.inline.} = if server.apending: @@ -1124,9 +1124,7 @@ else: if ReadClosed in transp.state: transp.state.incl({ReadPaused}) - if not(isNil(transp.reader)) and not(transp.reader.finished()): - transp.reader.complete() - transp.reader = nil + transp.completeReader() else: if transp.kind == TransportKind.Socket: while true: @@ -1151,9 +1149,7 @@ else: if transp.offset == len(transp.buffer): transp.state.incl(ReadPaused) cdata.fd.removeReader() - if not(isNil(transp.reader)) and not(transp.reader.finished()): - transp.reader.complete() - transp.reader = nil + transp.completeReader() break elif transp.kind == TransportKind.Pipe: while true: @@ -1175,9 +1171,7 @@ else: if transp.offset == len(transp.buffer): transp.state.incl(ReadPaused) cdata.fd.removeReader() - if not(isNil(transp.reader)) and not(transp.reader.finished()): - transp.reader.complete() - transp.reader = nil + transp.completeReader() break proc newStreamSocketTransport(sock: AsyncFD, bufsize: int, @@ -1321,12 +1315,14 @@ else: removeReader(server.sock) proc resumeRead(transp: StreamTransport) {.inline.} = - transp.state.excl(ReadPaused) - addReader(transp.fd, readStreamLoop, cast[pointer](transp)) + if ReadPaused in transp.state: + transp.state.excl(ReadPaused) + addReader(transp.fd, readStreamLoop, cast[pointer](transp)) proc resumeWrite(transp: StreamTransport) {.inline.} = - transp.state.excl(WritePaused) - addWriter(transp.fd, writeStreamLoop, cast[pointer](transp)) + if WritePaused in transp.state: + transp.state.excl(WritePaused) + addWriter(transp.fd, writeStreamLoop, cast[pointer](transp)) proc start*(server: StreamServer) = ## Starts ``server``. @@ -1604,8 +1600,7 @@ proc write*(transp: StreamTransport, pbytes: pointer, var vector = StreamVector(kind: DataBuffer, writer: retFuture, buf: pbytes, buflen: nbytes, size: nbytes) transp.queue.addLast(vector) - if WritePaused in transp.state: - transp.resumeWrite() + transp.resumeWrite() return retFuture proc write*(transp: StreamTransport, msg: string, msglen = -1): Future[int] = @@ -1623,8 +1618,7 @@ proc write*(transp: StreamTransport, msg: string, msglen = -1): Future[int] = buf: addr retFuture.gcholder[0], buflen: length, size: length) transp.queue.addLast(vector) - if WritePaused in transp.state: - transp.resumeWrite() + transp.resumeWrite() return retFuture proc write*[T](transp: StreamTransport, msg: seq[T], msglen = -1): Future[int] = @@ -1642,8 +1636,7 @@ proc write*[T](transp: StreamTransport, msg: seq[T], msglen = -1): Future[int] = buf: addr retFuture.gcholder[0], buflen: length, size: length) transp.queue.addLast(vector) - if WritePaused in transp.state: - transp.resumeWrite() + transp.resumeWrite() return retFuture proc writeFile*(transp: StreamTransport, handle: int, @@ -1662,8 +1655,7 @@ proc writeFile*(transp: StreamTransport, handle: int, buf: cast[pointer](size), offset: offset, buflen: handle) transp.queue.addLast(vector) - if WritePaused in transp.state: - transp.resumeWrite() + transp.resumeWrite() return retFuture proc atEof*(transp: StreamTransport): bool {.inline.} = @@ -1671,41 +1663,54 @@ proc atEof*(transp: StreamTransport): bool {.inline.} = result = (transp.offset == 0) and (ReadEof in transp.state) and (ReadPaused in transp.state) +template prepareReader(transp: StreamTransport, + name: static string): Future[void] = + checkPending(transp) + var fut = newFuture[void](name) + transp.reader = fut + resumeRead(transp) + fut + +template readLoop(name, body: untyped): untyped = + # Read data until a predicate is satisfied - the body should return a tuple + # signalling how many bytes have been processed and whether we're done reading + checkClosed(transp) + checkPending(transp) + while true: + if ReadClosed in transp.state: + raise newException(TransportUseClosedError, + "Attempt to read data from closed stream") + if transp.offset == 0: + # We going to raise an error, only if transport buffer is empty. + if ReadError in transp.state: + raise transp.getError() + + let (consumed, done) = body + transp.shiftBuffer(consumed) + if done: + break + else: + await transp.prepareReader(name) + proc readExactly*(transp: StreamTransport, pbytes: pointer, nbytes: int) {.async.} = ## Read exactly ``nbytes`` bytes from transport ``transp`` and store it to ## ``pbytes``. ## ## If EOF is received and ``nbytes`` is not yet readed, the procedure - ## will raise ``TransportIncompleteError``. - checkClosed(transp) - checkPending(transp) - + ## will raise ``TransportIncompleteError``, potentially with some bytes + ## already written. var index = 0 - while true: + var pbuffer = cast[ptr UncheckedArray[byte]](pbytes) + readLoop("stream.transport.readExactly"): if transp.offset == 0: - if (ReadError in transp.state): - raise transp.getError() - if (ReadClosed in transp.state) or transp.atEof(): + if transp.atEof(): raise newException(TransportIncompleteError, "Data incomplete!") - - if transp.offset >= (nbytes - index): - copyMem(cast[pointer](cast[uint](pbytes) + uint(index)), - addr(transp.buffer[0]), nbytes - index) - transp.shiftBuffer(nbytes - index) - break - else: - if transp.offset != 0: - copyMem(cast[pointer](cast[uint](pbytes) + uint(index)), - addr(transp.buffer[0]), transp.offset) - index += transp.offset - - var fut = newFuture[void]("stream.transport.readExactly") - transp.reader = fut - transp.offset = 0 - if ReadPaused in transp.state: - transp.resumeRead() - await fut + let count = min(nbytes - index, transp.offset) + if count > 0: + copyMem(addr pbuffer[index], addr(transp.buffer[0]), count) + index += count + (consumed: count, done: index == nbytes) proc readOnce*(transp: StreamTransport, pbytes: pointer, nbytes: int): Future[int] {.async.} = @@ -1713,31 +1718,15 @@ proc readOnce*(transp: StreamTransport, pbytes: pointer, ## ## If internal buffer is not empty, ``nbytes`` bytes will be transferred from ## internal buffer, otherwise it will wait until some bytes will be received. - checkClosed(transp) - checkPending(transp) - - while true: + var count = 0 + readLoop("stream.transport.readOnce"): if transp.offset == 0: - if (ReadError in transp.state): - raise transp.getError() - if (ReadClosed in transp.state) or transp.atEof(): - result = 0 - break - var fut = newFuture[void]("stream.transport.readOnce") - transp.reader = fut - if ReadPaused in transp.state: - transp.resumeRead() - await fut + (0, transp.atEof()) else: - if transp.offset > nbytes: - copyMem(pbytes, addr(transp.buffer[0]), nbytes) - transp.shiftBuffer(nbytes) - result = nbytes - else: - copyMem(pbytes, addr(transp.buffer[0]), transp.offset) - result = transp.offset - transp.offset = 0 - break + count = min(transp.offset, nbytes) + copyMem(pbytes, addr(transp.buffer[0]), count) + (count, true) + return count proc readUntil*(transp: StreamTransport, pbytes: pointer, nbytes: int, sep: seq[byte]): Future[int] {.async.} = @@ -1753,47 +1742,36 @@ proc readUntil*(transp: StreamTransport, pbytes: pointer, nbytes: int, ## will raise ``TransportLimitError``. ## ## Procedure returns actual number of bytes read. - checkClosed(transp) - checkPending(transp) - - var dest = cast[ptr UncheckedArray[byte]](pbytes) + var pbuffer = cast[ptr UncheckedArray[byte]](pbytes) var state = 0 var k = 0 - var index = 0 - while true: - if ReadError in transp.state: - raise transp.getError() - if (ReadClosed in transp.state) or transp.atEof(): + readLoop("stream.transport.readUntil"): + if transp.atEof(): raise newException(TransportIncompleteError, "Data incomplete!") - index = 0 + var index = 0 + while index < transp.offset: - let ch = transp.buffer[index] - if sep[state] == ch: - inc(state) - else: - state = 0 - if k < nbytes: - dest[k] = ch - inc(k) - else: + if k >= nbytes: raise newException(TransportLimitError, "Limit reached!") - if state == len(sep): - break + + let ch = transp.buffer[index] inc(index) - if state == len(sep): - transp.shiftBuffer(index + 1) - result = k - break - else: - transp.shiftBuffer(transp.offset) - var fut = newFuture[void]("stream.transport.readUntil") - transp.reader = fut - if ReadPaused in transp.state: - transp.resumeRead() - await fut + pbuffer[k] = ch + inc(k) + + if sep[state] == ch: + inc(state) + if state == len(sep): + break + else: + state = 0 + + (index, state == len(sep)) + + return k proc readLine*(transp: StreamTransport, limit = 0, sep = "\r\n"): Future[string] {.async.} = @@ -1807,128 +1785,113 @@ proc readLine*(transp: StreamTransport, limit = 0, ## empty string. ## ## If ``limit`` more then 0, then read is limited to ``limit`` bytes. - checkClosed(transp) - checkPending(transp) - - result = "" - var lim = if limit <= 0: -1 else: limit + let lim = if limit <= 0: -1 else: limit var state = 0 - var index = 0 - while true: - if (ReadError in transp.state): - raise transp.getError() - if (ReadClosed in transp.state) or transp.atEof(): - break - - index = 0 - while index < transp.offset: - let ch = char(transp.buffer[index]) - if sep[state] == ch: - inc(state) - if state == len(sep): - transp.shiftBuffer(index + 1) - break - else: - state = 0 - result.add(ch) - if len(result) == lim: - transp.shiftBuffer(index + 1) - break - inc(index) - - if (state == len(sep)) or (lim == len(result)): - break + readLoop("stream.transport.readLine"): + if transp.atEof(): + (0, true) else: - transp.shiftBuffer(transp.offset) - var fut = newFuture[void]("stream.transport.readLine") - transp.reader = fut - if ReadPaused in transp.state: - transp.resumeRead() - await fut + var index = 0 + while index < transp.offset: + let ch = char(transp.buffer[index]) + index += 1 -proc read*(transp: StreamTransport, n = -1): Future[seq[byte]] {.async.} = + if sep[state] == ch: + inc(state) + if state == len(sep): + break + else: + if state != 0: + if limit > 0: + let missing = min(state, lim - len(result) - 1) + result.add(sep[0 ..< missing]) + else: + result.add(sep[0 ..< state]) + state = 0 + + result.add(ch) + if len(result) == lim: + break + + (index, (state == len(sep)) or (lim == len(result))) + +proc read*(transp: StreamTransport): Future[seq[byte]] {.async.} = + ## Read all bytes from transport ``transp``. + ## + ## This procedure allocates buffer seq[byte] and return it as result. + readLoop("stream.transport.read"): + if transp.atEof(): + (0, true) + else: + result.add(transp.buffer.toOpenArray(0, transp.offset - 1)) + (transp.offset, false) + +proc read*(transp: StreamTransport, n: int): Future[seq[byte]] {.async.} = ## Read all bytes (n <= 0) or exactly `n` bytes from transport ``transp``. ## ## This procedure allocates buffer seq[byte] and return it as result. - checkClosed(transp) - checkPending(transp) - var res = newSeq[byte]() - while true: - if (ReadError in transp.state): - raise transp.getError() - if (ReadClosed in transp.state) or transp.atEof(): - result = res - break - - if transp.offset > 0: - let s = len(res) - let o = s + transp.offset - if n <= 0: - # grabbing all incoming data, until EOF - res.setLen(o) - copyMem(cast[pointer](addr res[s]), addr(transp.buffer[0]), - transp.offset) - transp.offset = 0 + if n <= 0: + return await transp.read() + else: + readLoop("stream.transport.read"): + if transp.atEof(): + (0, true) else: - let left = n - s - if transp.offset >= left: - # size of buffer data is more then we need, grabbing only part - res.setLen(n) - copyMem(cast[pointer](addr res[s]), addr(transp.buffer[0]), - left) - transp.shiftBuffer(left) - result = res - break - else: - # there not enough data in buffer, grabbing all - res.setLen(o) - copyMem(cast[pointer](addr res[s]), addr(transp.buffer[0]), - transp.offset) - transp.offset = 0 + let count = min(transp.offset, n - len(result)) + result.add(transp.buffer.toOpenArray(0, count - 1)) + (count, len(result) == n) - var fut = newFuture[void]("stream.transport.read") - transp.reader = fut - if ReadPaused in transp.state: - transp.resumeRead() - await fut - -proc consume*(transp: StreamTransport, n = -1): Future[int] {.async.} = - ## Consume all bytes (n == -1) or ``n`` bytes from transport ``transp``. +proc consume*(transp: StreamTransport): Future[int] {.async.} = + ## Consume all bytes from transport ``transp`` and discard it. ## - ## Return number of bytes actually consumed - checkClosed(transp) - checkPending(transp) + ## Return number of bytes actually consumed and discarded. + readLoop("stream.transport.consume"): + if transp.atEof(): + (0, true) + else: + result += transp.offset + (transp.offset, false) - result = 0 - while true: - if (ReadError in transp.state): - raise transp.getError() - if ReadClosed in transp.state or transp.atEof(): - break - - if transp.offset > 0: - if n <= 0: - # consume all incoming data, until EOF - result += transp.offset - transp.offset = 0 +proc consume*(transp: StreamTransport, n: int): Future[int] {.async.} = + ## Consume all bytes (n <= 0) or ``n`` bytes from transport ``transp`` and + ## discard it. + ## + ## Return number of bytes actually consumed and discarded. + if n <= 0: + return await transp.consume() + else: + readLoop("stream.transport.consume"): + if transp.atEof(): + (0, true) else: - let left = n - result - if transp.offset >= left: - # size of buffer data is more then we need, consume only part - result += left - transp.shiftBuffer(left) - break - else: - # there not enough data in buffer, consume all - result += transp.offset - transp.offset = 0 + let count = min(transp.offset, n - result) + result += count + (count, result == n) - var fut = newFuture[void]("stream.transport.consume") - transp.reader = fut - if ReadPaused in transp.state: - transp.resumeRead() - await fut +proc readMessage*(transp: StreamTransport, + predicate: ReadMessagePredicate) {.async.} = + ## Read all bytes from transport ``transp`` until ``predicate`` callback + ## will not be satisfied. + ## + ## ``predicate`` callback should return tuple ``(consumed, result)``, where + ## ``consumed`` is the number of bytes processed and ``result`` is a + ## completion flag (``true`` if readMessage() should stop reading data, + ## or ``false`` if readMessage() should continue to read data from transport). + ## + ## ``predicate`` callback must copy all the data from ``data`` array and + ## return number of bytes it is going to consume. + ## ``predicate`` callback will receive (zero-length) openarray, if transport + ## is at EOF. + readLoop("stream.transport.readMessage"): + if transp.offset == 0: + if transp.atEof(): + predicate([]) + else: + # Case, when transport's buffer is not yet filled with data. + (0, false) + else: + predicate(transp.buffer.toOpenArray(0, transp.offset - 1)) proc join*(transp: StreamTransport): Future[void] = ## Wait until ``transp`` will not be closed. diff --git a/tests/teststream.nim b/tests/teststream.nim index da9b64b..b32c0b3 100644 --- a/tests/teststream.nim +++ b/tests/teststream.nim @@ -633,11 +633,14 @@ suite "Stream Transport test suite": server.start() var transp = await connect(address) var fut = swarmWorker(transp) - transp.close() + # We perfrom shutdown(SHUT_RD/SD_RECEIVE) for the socket, in such way its + # possible to emulate socket's EOF. + discard shutdown(SocketHandle(transp.fd), 0) await fut server.stop() server.close() await server.join() + transp.close() await transp.join() result = subres @@ -794,6 +797,266 @@ suite "Stream Transport test suite": await server.join() result = flag + proc testReadLine(address: TransportAddress): Future[bool] {.async.} = + proc serveClient(server: StreamServer, transp: StreamTransport) {.async.} = + discard await transp.write("DATA\r\r\r\r\r\n") + transp.close() + await transp.join() + + var server = createStreamServer(address, serveClient, {ReuseAddr}) + server.start() + try: + var r1, r2, r3, r4, r5: string + var t1 = await connect(address) + try: + r1 = await t1.readLine(4) + finally: + await t1.closeWait() + + var t2 = await connect(address) + try: + r2 = await t2.readLine(6) + finally: + await t2.closeWait() + + var t3 = await connect(address) + try: + r3 = await t3.readLine(8) + finally: + await t3.closeWait() + + var t4 = await connect(address) + try: + r4 = await t4.readLine(8) + finally: + await t4.closeWait() + + var t5 = await connect(address) + try: + r5 = await t5.readLine() + finally: + await t5.closeWait() + + doAssert(r1 == "DATA") + doAssert(r2 == "DATA\r\r") + doAssert(r3 == "DATA\r\r\r\r") + doAssert(r4 == "DATA\r\r\r\r") + doAssert(r5 == "DATA\r\r\r\r") + + result = true + finally: + server.stop() + server.close() + await server.join() + proc readLV(transp: StreamTransport, + maxLen: int): Future[seq[byte]] {.async.} = + # Read length-prefixed value where length is a 32-bit integer in native + # endian (don't do this at home) + var + valueLen = 0'u32 + res: seq[byte] + error: ref Exception + + proc predicate(data: openarray[byte]): tuple[consumed: int, done: bool] = + if len(data) == 0: + # There will be no more data, length-value incomplete + error = newException(TransportIncompleteError, "LV incomplete") + return (0, true) + + var consumed = 0 + + if valueLen == 0: + if len(data) < 4: + return (0, false) + copyMem(addr valueLen, unsafeAddr data[0], sizeof(valueLen)) + if valueLen == 0: + return (sizeof(valueLen), true) + if int(valueLen) > maxLen: + error = newException(ValueError, "Size is too big") + return (sizeof(valueLen), true) + consumed += sizeof(valueLen) + + let + dataLeft = len(data) - consumed + count = min(dataLeft, int(valueLen) - len(res)) + + res.add(data.toOpenArray(consumed, count + consumed - 1)) + return (consumed + count, len(res) == int(valueLen)) + + await transp.readMessage(predicate) + if not isNil(error): + raise error + else: + return res + + proc createMessage(size: uint32): seq[byte] = + var message = "MESSAGE" + result = newSeq[byte](int(size)) + for i in 0 ..< size: + result[int(i)] = byte(message[int(i) mod len(message)]) + + proc createLVMessage(size: uint32): seq[byte] = + var message = "MESSAGE" + result = newSeq[byte](sizeof(size) + int(size)) + copyMem(addr result[0], unsafeAddr size, sizeof(size)) + for i in 0 ..< size: + result[int(i) + sizeof(size)] = byte(message[int(i) mod len(message)]) + + proc testReadMessage(address: TransportAddress): Future[bool] {.async.} = + var state = 0 + var c1, c2, c3, c4, c5, c6, c7: bool + + proc serveClient(server: StreamServer, transp: StreamTransport) {.async.} = + if state == 0: + # EOF from the beginning. + state = 1 + await transp.closeWait() + elif state == 1: + # Message has only zero-size header. + var message = createLVMessage(0'u32) + discard await transp.write(message) + state = 2 + await transp.closeWait() + elif state == 2: + # Message has header, but do not have any data at all. + var message = createLVMessage(4'u32) + message.setLen(4) + discard await transp.write(message) + state = 3 + await transp.closeWait() + elif state == 3: + # Message do not have enough data for specified size in header. + var message = createLVMessage(1024'u32) + message.setLen(1024) + discard await transp.write(message) + state = 4 + await transp.closeWait() + elif state == 4: + # Good encoded message with oversize. + var message = createLVMessage(1024'u32) + discard await transp.write(message) + state = 5 + await transp.closeWait() + elif state == 5: + # Good encoded message. + var message = createLVMessage(1024'u32) + discard await transp.write(message) + state = 6 + await transp.closeWait() + elif state == 6: + # Good encoded message with additional data. + var message = createLVMessage(1024'u32) + discard await transp.write(message) + discard await transp.write("DONE") + state = 7 + await transp.closeWait() + else: + doAssert(false) + + var server = createStreamServer(address, serveClient, {ReuseAddr}) + server.start() + + var t1 = await connect(address) + try: + discard await t1.readLV(2000) + except TransportIncompleteError: + c1 = true + finally: + await t1.closeWait() + + if not c1: + server.stop() + server.close() + await server.join() + return false + + var t2 = await connect(address) + try: + var r2 = await t2.readLV(2000) + c2 = (r2 == @[]) + finally: + await t2.closeWait() + + if not c2: + server.stop() + server.close() + await server.join() + return false + + var t3 = await connect(address) + try: + discard await t3.readLV(2000) + except TransportIncompleteError: + c3 = true + finally: + await t3.closeWait() + + if not c3: + server.stop() + server.close() + await server.join() + return false + + var t4 = await connect(address) + try: + discard await t4.readLV(2000) + except TransportIncompleteError: + c4 = true + finally: + await t4.closeWait() + + if not c4: + server.stop() + server.close() + await server.join() + return false + + var t5 = await connect(address) + try: + discard await t5.readLV(1000) + except ValueError: + c5 = true + finally: + await t5.closeWait() + + if not c5: + server.stop() + server.close() + await server.join() + return false + + var t6 = await connect(address) + try: + var expectMsg = createMessage(1024) + var r6 = await t6.readLV(2000) + if len(r6) == 1024 and r6 == expectMsg: + c6 = true + finally: + await t6.closeWait() + + if not c6: + server.stop() + server.close() + await server.join() + return false + + var t7 = await connect(address) + try: + var expectMsg = createMessage(1024) + var expectDone = "DONE" + var r7 = await t7.readLV(2000) + if len(r7) == 1024 and r7 == expectMsg: + var m = await t7.read(4) + if len(m) == 4 and equalMem(addr m[0], addr expectDone[0], 4): + c7 = true + finally: + await t7.closeWait() + + server.stop() + server.close() + await server.join() + result = c7 + for i in 0..