From 80351cb928b07493b202fab586eace35c6d018c2 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Wed, 12 Feb 2020 22:54:05 +0200 Subject: [PATCH] Fix #73. --- chronos/transports/stream.nim | 11 ++++---- tests/teststream.nim | 50 +++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 5 deletions(-) diff --git a/chronos/transports/stream.nim b/chronos/transports/stream.nim index a3763ad..4a619cc 100644 --- a/chronos/transports/stream.nim +++ b/chronos/transports/stream.nim @@ -998,7 +998,7 @@ else: if res >= 0: if vector.buflen - res == 0: if not(vector.writer.finished()): - vector.writer.complete(vector.buflen) + vector.writer.complete(vector.size) else: vector.shiftVectorBuffer(res) transp.queue.addFirst(vector) @@ -1062,7 +1062,7 @@ else: if res >= 0: if vector.buflen - res == 0: if not(vector.writer.finished()): - vector.writer.complete(vector.buflen) + vector.writer.complete(vector.size) else: vector.shiftVectorBuffer(res) transp.queue.addFirst(vector) @@ -1612,7 +1612,7 @@ proc write*(transp: StreamTransport, pbytes: pointer, transp.checkClosed(retFuture) transp.checkWriteEof(retFuture) var vector = StreamVector(kind: DataBuffer, writer: retFuture, - buf: pbytes, buflen: nbytes) + buf: pbytes, buflen: nbytes, size: nbytes) transp.queue.addLast(vector) if WritePaused in transp.state: transp.resumeWrite() @@ -1630,7 +1630,8 @@ proc write*(transp: StreamTransport, msg: string, msglen = -1): Future[int] = let length = if msglen <= 0: len(msg) else: msglen var vector = StreamVector(kind: DataBuffer, writer: cast[Future[int]](retFuture), - buf: addr retFuture.gcholder[0], buflen: length) + buf: addr retFuture.gcholder[0], buflen: length, + size: length) transp.queue.addLast(vector) if WritePaused in transp.state: transp.resumeWrite() @@ -1649,7 +1650,7 @@ proc write*[T](transp: StreamTransport, msg: seq[T], msglen = -1): Future[int] = var vector = StreamVector(kind: DataBuffer, writer: cast[Future[int]](retFuture), buf: addr retFuture.gcholder[0], - buflen: length) + buflen: length, size: length) transp.queue.addLast(vector) if WritePaused in transp.state: transp.resumeWrite() diff --git a/tests/teststream.nim b/tests/teststream.nim index b144afe..da9b64b 100644 --- a/tests/teststream.nim +++ b/tests/teststream.nim @@ -59,6 +59,12 @@ suite "Stream Transport test suite": ] var prefixes = ["[IP] ", "[UNIX] "] + proc createBigMessage(size: int): seq[byte] = + var message = "MESSAGE" + result = newSeq[byte](size) + for i in 0 ..< len(result): + result[i] = byte(message[i mod len(message)]) + proc serveClient1(server: StreamServer, transp: StreamTransport) {.async.} = while not transp.atEof(): var data = await transp.readLine() @@ -746,6 +752,48 @@ suite "Stream Transport test suite": server.stop() server.close() + proc testWriteReturn(address: TransportAddress): Future[bool] {.async.} = + var bigMessageSize = 10 * 1024 * 1024 - 1 + var finishMessage = "DONE" + var cdata = newSeqOfCap[byte](bigMessageSize) + proc serveClient(server: StreamServer, transp: StreamTransport) {.async.} = + cdata = await transp.read(bigMessageSize) + var size = await transp.write(finishMessage) + doAssert(size == len(finishMessage)) + await transp.closeWait() + server.stop() + server.close() + + var flag = false + var server = createStreamServer(address, serveClient, {ReuseAddr}) + server.start() + + var transp: StreamTransport + + try: + transp = await connect(address) + flag = true + except: + server.stop() + server.close() + await server.join() + + if flag: + flag = false + try: + var msg = createBigMessage(bigMessageSize) + var size = await transp.write(msg) + var data = await transp.read() + doAssert(cdata == msg) + doAssert(len(data) == len(finishMessage)) + doAssert(equalMem(addr data[0], addr finishMessage[0], len(data))) + + flag = (size == bigMessageSize) + finally: + await transp.closeWait() + await server.join() + result = flag + for i in 0..