diff --git a/asyncdispatch2/transports/stream.nim b/asyncdispatch2/transports/stream.nim index 2fedf20f..dab5b066 100644 --- a/asyncdispatch2/transports/stream.nim +++ b/asyncdispatch2/transports/stream.nim @@ -115,7 +115,6 @@ template checkPending(t: untyped) = raise newException(TransportError, "Read operation already pending!") template shiftBuffer(t, c: untyped) = - # ZAH: Nim is not C, you don't need to put () around template parameters if (t).offset > c: moveMem(addr((t).buffer[0]), addr((t).buffer[(c)]), (t).offset - (c)) (t).offset = (t).offset - (c) @@ -812,6 +811,14 @@ proc write*(transp: StreamTransport, pbytes: pointer, raise transp.getError() result = nbytes +template write*(transp: StreamTransport, msg: var string): untyped = + ## Write string ``msg`` using transport ``transp``. + write(transp, addr msg[0], len(msg)) + +template write*(transp: StreamTransport, msg: var seq[byte]): untyped = + ## Write seq[byte] ``msg`` using transport ``transp``. + write(transp, addr msg[0], len(msg)) + proc writeFile*(transp: StreamTransport, handle: int, offset: uint = 0, size: int = 0): Future[void] {.async.} = @@ -833,6 +840,11 @@ proc writeFile*(transp: StreamTransport, handle: int, if WriteError in transp.state: raise transp.getError() +proc atEof*(transp: StreamTransport): bool {.inline.} = + ## Returns ``true`` if ``transp`` is at EOF. + result = (transp.offset == 0) and (ReadEof in transp.state) and + (ReadPaused in transp.state) + proc readExactly*(transp: StreamTransport, pbytes: pointer, nbytes: int) {.async.} = ## Read exactly ``nbytes`` bytes from transport ``transp`` and store it to @@ -847,7 +859,7 @@ proc readExactly*(transp: StreamTransport, pbytes: pointer, if transp.offset == 0: if (ReadError in transp.state): raise transp.getError() - if (ReadEof in transp.state) or (ReadClosed in transp.state): + if ReadClosed in transp.state or transp.atEof(): raise newException(TransportIncompleteError, "Data incomplete!") if transp.offset >= (nbytes - index): @@ -1028,13 +1040,11 @@ proc read*(transp: StreamTransport, n = -1): Future[seq[byte]] {.async.} = ## This procedure allocates buffer seq[byte] and return it as result. checkClosed(transp) checkPending(transp) - result = newSeq[byte]() - while true: if (ReadError in transp.state): raise transp.getError() - if {ReadEof, ReadClosed} * transp.state != {}: + if ReadClosed in transp.state or transp.atEof(): break if transp.offset > 0: @@ -1049,11 +1059,10 @@ proc read*(transp: StreamTransport, n = -1): Future[seq[byte]] {.async.} = else: if transp.offset >= (n - s): # size of buffer data is more then we need, grabbing only part - let part = transp.offset - (n - s) result.setLen(n) copyMem(cast[pointer](addr result[s]), addr(transp.buffer[0]), - part) - transp.shiftBuffer(part) + n - s) + transp.shiftBuffer(n - s) break else: # there not enough data in buffer, grabbing all @@ -1070,10 +1079,42 @@ proc read*(transp: StreamTransport, n = -1): Future[seq[byte]] {.async.} = # Future[T], because readLoop continues working. transp.reader = nil -proc atEof*(transp: StreamTransport): bool {.inline.} = - ## Returns ``true`` if ``transp`` is at EOF. - result = (transp.offset == 0) and (ReadEof in transp.state) and - (ReadPaused in transp.state) +proc consume*(transp: StreamTransport, n = -1): Future[int] {.async.} = + ## Consume all bytes (n == -1) or ``n`` bytes from transport ``transp``. + ## + ## Return number of bytes actually consumed + checkClosed(transp) + checkPending(transp) + result = 0 + while true: + if (ReadError in transp.state): + raise transp.getError() + if ReadClosed in transp.state or transp.atEof(): + break + if transp.offset > 0: + if n == -1: + # consume all incoming data, until EOF + result += transp.offset + transp.offset = 0 + else: + let left = n - result + if transp.offset >= left: + # size of buffer data is more then we need, consume only part + result += left + transp.shiftBuffer(left) + break + else: + # there not enough data in buffer, consume all + result += transp.offset + transp.offset = 0 + + transp.reader = newFuture[void]("stream.transport.consume") + if ReadPaused in transp.state: + transp.resumeRead() + await transp.reader + # we need to clear transp.reader to avoid double completion of this + # Future[T], because readLoop continues working. + transp.reader = nil proc join*(transp: StreamTransport) {.async.} = ## Wait until ``transp`` will not be closed. diff --git a/tests/teststream.nim b/tests/teststream.nim index a1dd3488..6362fb33 100644 --- a/tests/teststream.nim +++ b/tests/teststream.nim @@ -15,6 +15,7 @@ else: import posix const + ConstantMessage = "SOMEDATA" ClientsCount = 100 MessagesCount = 100 MessageSize = 20 @@ -100,6 +101,36 @@ proc serveClient4(server: StreamServer, var res = await transp.write(cast[pointer](addr answer[0]), len(answer)) doAssert(res == len(answer)) +proc serveClient5(server: StreamServer, + transp: StreamTransport, udata: pointer) {.async.} = + var data = await transp.read() + doAssert(len(data) == len(ConstantMessage) * MessagesCount) + transp.close() + var expect = "" + for i in 0..