From 1763c9dcff7b6993540fcc114fdc944fd74964f5 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Tue, 7 May 2019 23:11:40 +0300 Subject: [PATCH] Add AsyncStreams. Add Chunked-Encoding AsyncStream reader/writer. Add tests. --- chronos.nimble | 2 +- chronos/asyncloop.nim | 4 +- chronos/handles.nim | 20 + chronos/streams/asyncstream.nim | 906 ++++++++++++++++++++++++++++++++ chronos/streams/chunkstream.nim | 383 ++++++++++++++ chronos/transport.nim | 4 +- chronos/transports/stream.nim | 24 +- tests/testall.nim | 3 +- tests/testasyncstream.nim | 472 +++++++++++++++++ 9 files changed, 1802 insertions(+), 16 deletions(-) create mode 100644 chronos/streams/asyncstream.nim create mode 100644 chronos/streams/chunkstream.nim create mode 100644 tests/testasyncstream.nim diff --git a/chronos.nimble b/chronos.nimble index 3427fa4..7b951af 100644 --- a/chronos.nimble +++ b/chronos.nimble @@ -1,5 +1,5 @@ packageName = "chronos" -version = "2.2.5" +version = "2.2.6" author = "Status Research & Development GmbH" description = "Chronos" license = "Apache License 2.0 or MIT" diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index c49c612..fe9a5e7 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -770,7 +770,7 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = if not retFuture.finished: if isNil(udata): fut.removeCallback(continuation) - retFuture.fail(newException(AsyncTimeoutError, "")) + retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!")) else: if not retFuture.finished: if fut.failed: @@ -792,7 +792,7 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = else: retFuture.complete(fut.read()) else: - retFuture.fail(newException(AsyncTimeoutError, "")) + retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!")) else: addTimer(Moment.fromNow(timeout), continuation, nil) fut.addCallback(continuation) diff --git a/chronos/handles.nim b/chronos/handles.nim index 9d22887..c374783 100644 --- a/chronos/handles.nim +++ b/chronos/handles.nim @@ -48,8 +48,18 @@ proc setSockOpt*(socket: AsyncFD, level, optname, optval: int): bool = sizeof(value).SockLen) < 0'i32: result = false +proc setSockOpt*(socket: AsyncFD, level, optname: int, value: pointer, + valuelen: int): bool = + ## `setsockopt()` for custom options (pointer and length). + ## Returns ``true`` on success, ``false`` on error. + result = true + if setsockopt(SocketHandle(socket), cint(level), cint(optname), value, + Socklen(valuelen)) < 0'i32: + result = false + proc getSockOpt*(socket: AsyncFD, level, optname: int, value: var int): bool = ## `getsockopt()` for integer options. + ## Returns ``true`` on success, ``false`` on error. var res: cint var size = sizeof(res).SockLen result = true @@ -58,6 +68,16 @@ proc getSockOpt*(socket: AsyncFD, level, optname: int, value: var int): bool = return false value = int(res) +proc getSockOpt*(socket: AsyncFD, level, optname: int, value: pointer, + valuelen: var int): bool = + ## `getsockopt()` for custom options (pointer and length). + ## Returns ``true`` on success, ``false`` on error. + var res: cint + result = true + if getsockopt(SocketHandle(socket), cint(level), cint(optname), + value, cast[ptr Socklen](addr valuelen)) < 0'i32: + result = false + proc getSocketError*(socket: AsyncFD, err: var int): bool = ## Recover error code associated with socket handle ``socket``. if not getSockOpt(socket, cint(SOL_SOCKET), cint(SO_ERROR), err): diff --git a/chronos/streams/asyncstream.nim b/chronos/streams/asyncstream.nim new file mode 100644 index 0000000..dc64e9b --- /dev/null +++ b/chronos/streams/asyncstream.nim @@ -0,0 +1,906 @@ +# +# Chronos Asynchronous Streams +# (c) Copyright 2019-Present +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) +import ../asyncloop, ../asyncsync +import ../transports/common, ../transports/stream +export asyncsync, stream, common + +const + AsyncStreamDefaultBufferSize* = 4096 + ## Default reading stream internal buffer size. + AsyncStreamDefaultQueueSize* = 0 + ## Default writing stream internal queue size. + AsyncStreamReaderTrackerName* = "async.stream.reader" + ## AsyncStreamReader leaks tracker name + AsyncStreamWriterTrackerName* = "async.stream.writer" + ## AsyncStreamWriter leaks tracker name + +type + AsyncBuffer* = object + offset*: int + buffer*: seq[byte] + events*: array[2, AsyncEvent] + + WriteType* = enum + Pointer, Sequence, String + + WriteItem* = object + case kind*: WriteType + of Pointer: + data1*: pointer + of Sequence: + data2*: seq[byte] + of String: + data3*: string + size*: int + future*: Future[void] + + AsyncStreamState* = enum + Running, ## Stream is online and working + Error, ## Stream has stored error + Stopped, ## Stream was closed while working + Finished, ## Stream was properly finished + Closed ## Stream was closed + + StreamReaderLoop* = proc (stream: AsyncStreamReader): Future[void] {.gcsafe.} + ## Main read loop for read streams. + StreamWriterLoop* = proc (stream: AsyncStreamWriter): Future[void] {.gcsafe.} + ## Main write loop for write streams. + + AsyncStreamReader* = ref object of RootRef + rsource*: AsyncStreamReader + tsource*: StreamTransport + readerLoop*: StreamReaderLoop + state*: AsyncStreamState + exevent*: AsyncEvent + buffer*: AsyncBuffer + udata: pointer + error*: ref Exception + future: Future[void] + + AsyncStreamWriter* = ref object of RootRef + wsource*: AsyncStreamWriter + tsource*: StreamTransport + writerLoop*: StreamWriterLoop + state*: AsyncStreamState + exevent*: AsyncEvent + queue*: AsyncQueue[WriteItem] + udata: pointer + future: Future[void] + + AsyncStream* = ref object of RootRef + reader*: AsyncStreamReader + writer*: AsyncStreamWriter + + AsyncStreamTracker* = ref object of TrackerBase + opened*: int64 + closed*: int64 + + AsyncStreamRW* = AsyncStreamReader | AsyncStreamWriter + + AsyncStreamError* = object of Exception + AsyncStreamIncompleteError* = object of AsyncStreamError + AsyncStreamIncorrectError* = object of AsyncStreamError + AsyncStreamLimitError* = object of AsyncStreamError + AsyncStreamReadError* = object of AsyncStreamError + par*: ref Exception + AsyncStreamWriteError* = object of AsyncStreamError + par*: ref Exception + AsyncStreamTimeoutError* = object of AsyncStreamError + +proc init*(t: typedesc[AsyncBuffer], size: int): AsyncBuffer = + result.buffer = newSeq[byte](size) + result.events[0] = newAsyncEvent() + result.events[1] = newAsyncEvent() + result.offset = 0 + +proc getBuffer*(sb: AsyncBuffer): pointer {.inline.} = + result = unsafeAddr sb.buffer[sb.offset] + +proc bufferLen*(sb: AsyncBuffer): int {.inline.} = + result = len(sb.buffer) - sb.offset + +proc getData*(sb: AsyncBuffer): pointer {.inline.} = + result = unsafeAddr sb.buffer[0] + +proc dataLen*(sb: AsyncBuffer): int {.inline.} = + result = sb.offset + +proc `[]`*(sb: AsyncBuffer, index: int): byte {.inline.} = + doAssert(index < sb.offset) + result = sb.buffer[index] + +proc update*(sb: var AsyncBuffer, size: int) {.inline.} = + doAssert(sb.offset + size < len(sb.buffer)) + sb.offset += size + +proc wait*(sb: var AsyncBuffer): Future[void] = + sb.events[0].clear() + sb.events[1].fire() + result = sb.events[0].wait() + +proc transfer*(sb: var AsyncBuffer): Future[void] = + sb.events[1].clear() + sb.events[0].fire() + result = sb.events[1].wait() + +proc forget*(sb: var AsyncBuffer) {.inline.} = + sb.events[1].clear() + sb.events[0].fire() + +proc shift*(sb: var AsyncBuffer, size: int) {.inline.} = + if sb.offset > size: + moveMem(addr sb.buffer[0], addr sb.buffer[size], sb.offset - size) + sb.offset = sb.offset - size + else: + sb.offset = 0 + +proc copyData*(sb: AsyncBuffer, dest: pointer, offset, length: int) {.inline.} = + doAssert(length <= sb.dataLen()) + copyMem(cast[pointer](cast[uint](dest) + cast[uint](offset)), + unsafeAddr sb.buffer[0], length) + +template toDataOpenArray*(sb: AsyncBuffer): auto = + toOpenArray(sb.buffer, 0, sb.offset - 1) + +template toBufferOpenArray*(sb: AsyncBuffer): auto = + toOpenArray(sb.buffer, sb.offset, len(sb.buffer) - 1) + +proc newAsyncStreamReadError(p: ref Exception): ref Exception {.inline.} = + var w = newException(AsyncStreamReadError, "Read stream failed") + w.par = p + result = w + +proc newAsyncStreamWriteError(p: ref Exception): ref Exception {.inline.} = + var w = newException(AsyncStreamWriteError, "Write stream failed") + w.par = p + result = w + +proc newAsyncStreamIncompleteError(): ref Exception {.inline.} = + result = newException(AsyncStreamIncompleteError, "Incomplete data received") + +proc newAsyncStreamLimitError(): ref Exception {.inline.} = + result = newException(AsyncStreamLimitError, "Buffer limit reached") + +proc newAsyncStreamIncorrectError(m: string): ref Exception {.inline.} = + result = newException(AsyncStreamIncorrectError, m) + +proc atEof*(rstream: AsyncStreamReader): bool = + ## Returns ``true`` is reading stream is closed or finished and internal + ## buffer do not have any bytes left. + result = rstream.state in {AsyncStreamState.Stopped, Finished, Closed} and + (rstream.buffer.dataLen() == 0) + +proc atEof*(wstream: AsyncStreamWriter): bool = + ## Returns ``true`` is writing stream ``wstream`` closed or finished. + result = wstream.state in {AsyncStreamState.Stopped, Finished, Closed} + +proc closed*(rw: AsyncStreamRW): bool {.inline.} = + ## Returns ``true`` is reading/writing stream is closed. + (rw.state == AsyncStreamState.Closed) + +proc finished*(rw: AsyncStreamRW): bool {.inline.} = + ## Returns ``true`` is reading/writing stream is finished (completed). + (rw.state == AsyncStreamState.Finished) + +proc stopped*(rw: AsyncStreamRW): bool {.inline.} = + ## Returns ``true`` is reading/writing stream is stopped (interrupted). + (rw.state == AsyncStreamState.Stopped) + +proc running*(rw: AsyncStreamRW): bool {.inline.} = + ## Returns ``true`` is reading/writing stream is still pending. + (rw.state == AsyncStreamState.Running) + +proc setupAsyncStreamReaderTracker(): AsyncStreamTracker {.gcsafe.} +proc setupAsyncStreamWriterTracker(): AsyncStreamTracker {.gcsafe.} + +proc getAsyncStreamReaderTracker(): AsyncStreamTracker {.inline.} = + result = cast[AsyncStreamTracker](getTracker(AsyncStreamReaderTrackerName)) + if isNil(result): + result = setupAsyncStreamReaderTracker() + +proc getAsyncStreamWriterTracker(): AsyncStreamTracker {.inline.} = + result = cast[AsyncStreamTracker](getTracker(AsyncStreamWriterTrackerName)) + if isNil(result): + result = setupAsyncStreamWriterTracker() + +proc dumpAsyncStreamReaderTracking(): string {.gcsafe.} = + var tracker = getAsyncStreamReaderTracker() + result = "Opened async stream readers: " & $tracker.opened & "\n" & + "Closed async stream readers: " & $tracker.closed + +proc dumpAsyncStreamWriterTracking(): string {.gcsafe.} = + var tracker = getAsyncStreamWriterTracker() + result = "Opened async stream writers: " & $tracker.opened & "\n" & + "Closed async stream writers: " & $tracker.closed + +proc leakAsyncStreamReader(): bool {.gcsafe.} = + var tracker = getAsyncStreamReaderTracker() + result = tracker.opened != tracker.closed + +proc leakAsyncStreamWriter(): bool {.gcsafe.} = + var tracker = getAsyncStreamWriterTracker() + result = tracker.opened != tracker.closed + +proc trackAsyncStreamReader(t: AsyncStreamReader) {.inline.} = + var tracker = getAsyncStreamReaderTracker() + inc(tracker.opened) + +proc untrackAsyncStreamReader*(t: AsyncStreamReader) {.inline.} = + var tracker = getAsyncStreamReaderTracker() + inc(tracker.closed) + +proc trackAsyncStreamWriter(t: AsyncStreamWriter) {.inline.} = + var tracker = getAsyncStreamWriterTracker() + inc(tracker.opened) + +proc untrackAsyncStreamWriter*(t: AsyncStreamWriter) {.inline.} = + var tracker = getAsyncStreamWriterTracker() + inc(tracker.closed) + +proc setupAsyncStreamReaderTracker(): AsyncStreamTracker {.gcsafe.} = + result = new AsyncStreamTracker + result.opened = 0 + result.closed = 0 + result.dump = dumpAsyncStreamReaderTracking + result.isLeaked = leakAsyncStreamReader + addTracker(AsyncStreamReaderTrackerName, result) + +proc setupAsyncStreamWriterTracker(): AsyncStreamTracker {.gcsafe.} = + result = new AsyncStreamTracker + result.opened = 0 + result.closed = 0 + result.dump = dumpAsyncStreamWriterTracking + result.isLeaked = leakAsyncStreamWriter + addTracker(AsyncStreamWriterTrackerName, result) + +proc readExactly*(rstream: AsyncStreamReader, pbytes: pointer, + nbytes: int) {.async.} = + ## Read exactly ``nbytes`` bytes from read-only stream ``rstream`` and store + ## it to ``pbytes``. + ## + ## If EOF is received and ``nbytes`` is not yet readed, the procedure + ## will raise ``AsyncStreamIncompleteError``. + if not rstream.running(): + raise newAsyncStreamIncorrectError("Incorrect stream state") + + if isNil(rstream.rsource): + try: + await readExactly(rstream.tsource, pbytes, nbytes) + except TransportIncompleteError: + raise newAsyncStreamIncompleteError() + except: + raise newAsyncStreamReadError(getCurrentException()) + else: + var index = 0 + while true: + let datalen = rstream.buffer.dataLen() + if rstream.state == Error: + raise newAsyncStreamReadError(rstream.error) + if datalen == 0 and rstream.atEof(): + raise newAsyncStreamIncompleteError() + + if datalen >= (nbytes - index): + rstream.buffer.copyData(pbytes, index, nbytes - index) + rstream.buffer.shift(nbytes - index) + break + else: + rstream.buffer.copyData(pbytes, index, datalen) + index += datalen + rstream.buffer.shift(datalen) + await rstream.buffer.wait() + +proc readOnce*(rstream: AsyncStreamReader, pbytes: pointer, + nbytes: int): Future[int] {.async.} = + ## Perform one read operation on read-only stream ``rstream``. + ## + ## If internal buffer is not empty, ``nbytes`` bytes will be transferred from + ## internal buffer, otherwise it will wait until some bytes will be received. + if not rstream.running(): + raise newAsyncStreamIncorrectError("Incorrect stream state") + + if isNil(rstream.rsource): + try: + result = await readOnce(rstream.tsource, pbytes, nbytes) + except: + raise newAsyncStreamReadError(getCurrentException()) + else: + while true: + let datalen = rstream.buffer.dataLen() + if rstream.state == Error: + raise newAsyncStreamReadError(rstream.error) + if datalen == 0: + if rstream.atEof(): + result = 0 + break + await rstream.buffer.wait() + else: + let size = min(datalen, nbytes) + rstream.buffer.copyData(pbytes, 0, size) + rstream.buffer.shift(size) + result = size + break + +proc readUntil*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int, + sep: seq[byte]): Future[int] {.async.} = + ## Read data from the read-only stream ``rstream`` until separator ``sep`` is + ## found. + ## + ## On success, the data and separator will be removed from the internal + ## buffer (consumed). Returned data will include the separator at the end. + ## + ## If EOF is received, and `sep` was not found, procedure will raise + ## ``AsyncStreamIncompleteError``. + ## + ## If ``nbytes`` bytes has been received and `sep` was not found, procedure + ## will raise ``AsyncStreamLimitError``. + ## + ## Procedure returns actual number of bytes read. + if not rstream.running(): + raise newAsyncStreamIncorrectError("Incorrect stream state") + + if isNil(rstream.rsource): + try: + result = await readUntil(rstream.tsource, pbytes, nbytes, sep) + except TransportIncompleteError: + raise newAsyncStreamIncompleteError() + except TransportLimitError: + raise newAsyncStreamLimitError() + except: + raise newAsyncStreamReadError(getCurrentException()) + else: + var + dest = cast[ptr UncheckedArray[byte]](pbytes) + state = 0 + k = 0 + + while true: + let datalen = rstream.buffer.dataLen() + if rstream.state == Error: + raise newAsyncStreamReadError(rstream.error) + if datalen == 0 and rstream.atEof(): + raise newAsyncStreamIncompleteError() + + var index = 0 + while index < datalen: + let ch = rstream.buffer[index] + if sep[state] == ch: + inc(state) + else: + state = 0 + if k < nbytes: + dest[k] = ch + inc(k) + else: + raise newAsyncStreamLimitError() + if state == len(sep): + break + inc(index) + + if state == len(sep): + rstream.buffer.shift(index + 1) + result = k + break + else: + rstream.buffer.shift(datalen) + await rstream.buffer.wait() + +proc readLine*(rstream: AsyncStreamReader, limit = 0, + sep = "\r\n"): Future[string] {.async.} = + ## Read one line from read-only stream ``rstream``, where ``"line"`` is a + ## sequence of bytes ending with ``sep`` (default is ``"\r\n"``). + ## + ## If EOF is received, and ``sep`` was not found, the method will return the + ## partial read bytes. + ## + ## If the EOF was received and the internal buffer is empty, return an + ## empty string. + ## + ## If ``limit`` more then 0, then result string will be limited to ``limit`` + ## bytes. + if not rstream.running(): + raise newAsyncStreamIncorrectError("Incorrect stream state") + + if isNil(rstream.rsource): + try: + result = await readLine(rstream.tsource, limit, sep) + except: + raise newAsyncStreamReadError(getCurrentException()) + else: + var res = "" + var + lim = if limit <= 0: -1 else: limit + state = 0 + + while true: + let datalen = rstream.buffer.dataLen() + if rstream.state == Error: + raise newAsyncStreamReadError(rstream.error) + if datalen == 0 and rstream.atEof(): + result = res + break + + var index = 0 + while index < datalen: + let ch = char(rstream.buffer[index]) + if sep[state] == ch: + inc(state) + if state == len(sep) or len(res) == lim: + rstream.buffer.shift(index + 1) + break + else: + state = 0 + res.add(ch) + if len(res) == lim: + rstream.buffer.shift(index + 1) + break + inc(index) + + if state == len(sep) or (lim == len(res)): + result = res + break + else: + rstream.buffer.shift(datalen) + await rstream.buffer.wait() + +proc read*(rstream: AsyncStreamReader, n = 0): Future[seq[byte]] {.async.} = + ## Read all bytes (n <= 0) or exactly `n` bytes from read-only stream + ## ``rstream``. + ## + ## This procedure allocates buffer seq[byte] and return it as result. + if not rstream.running(): + raise newAsyncStreamIncorrectError("Incorrect stream state") + + if isNil(rstream.rsource): + try: + result = await read(rstream.tsource, n) + except: + raise newAsyncStreamReadError(getCurrentException()) + else: + var res = newSeq[byte]() + while true: + let datalen = rstream.buffer.dataLen() + if rstream.state == Error: + raise newAsyncStreamReadError(rstream.error) + if datalen == 0 and rstream.atEof(): + result = res + break + + if datalen > 0: + let s = len(res) + let o = s + datalen + if n <= 0: + res.setLen(o) + rstream.buffer.copyData(addr res[s], 0, datalen) + rstream.buffer.shift(datalen) + else: + let left = n - s + if datalen >= left: + res.setLen(n) + rstream.buffer.copyData(addr res[s], 0, left) + rstream.buffer.shift(left) + result = res + break + else: + res.setLen(o) + rstream.buffer.copyData(addr res[s], 0, datalen) + rstream.buffer.shift(datalen) + + await rstream.buffer.wait() + +proc consume*(rstream: AsyncStreamReader, n = -1): Future[int] {.async.} = + ## Consume (discard) all bytes (n <= 0) or ``n`` bytes from read-only stream + ## ``rstream``. + ## + ## Return number of bytes actually consumed (discarded). + if not rstream.running(): + raise newAsyncStreamIncorrectError("Incorrect stream state") + + if isNil(rstream.rsource): + try: + result = await consume(rstream.tsource, n) + except TransportLimitError: + raise newAsyncStreamLimitError() + except: + raise newAsyncStreamReadError(getCurrentException()) + else: + var res = 0 + while true: + let datalen = rstream.buffer.dataLen() + if rstream.state == Error: + raise newAsyncStreamReadError(rstream.error) + if datalen == 0: + if rstream.atEof(): + if n <= 0: + result = res + break + else: + raise newAsyncStreamLimitError() + else: + if n <= 0: + res += datalen + rstream.buffer.shift(datalen) + else: + let left = n - res + if datalen >= left: + res += left + rstream.buffer.shift(left) + result = res + break + else: + res += datalen + rstream.buffer.shift(datalen) + + await rstream.buffer.wait() + +proc write*(wstream: AsyncStreamWriter, pbytes: pointer, + nbytes: int) {.async.} = + ## Write sequence of bytes pointed by ``pbytes`` of length ``nbytes`` to + ## writer stream ``wstream``. + ## + ## ``nbytes` must be more then zero. + if not wstream.running(): + raise newAsyncStreamIncorrectError("Incorrect stream state") + if nbytes <= 0: + raise newAsyncStreamIncorrectError("Zero length message") + + if isNil(wstream.wsource): + var resFut = write(wstream.tsource, pbytes, nbytes) + yield resFut + if resFut.failed: + raise newAsyncStreamWriteError(resFut.error) + if resFut.read() != nbytes: + raise newAsyncStreamIncompleteError() + else: + var item = WriteItem(kind: Pointer) + item.data1 = pbytes + item.size = nbytes + item.future = newFuture[void]("async.stream.write(pointer)") + await wstream.queue.put(item) + yield item.future + if item.future.failed: + raise newAsyncStreamWriteError(item.future.error) + +proc write*(wstream: AsyncStreamWriter, sbytes: seq[byte], + msglen = -1) {.async.} = + ## Write sequence of bytes ``sbytes`` of length ``msglen`` to writer + ## stream ``wstream``. + ## + ## Sequence of bytes ``sbytes`` must not be zero-length. + ## + ## If ``msglen < 0`` whole sequence ``sbytes`` will be writen to stream. + ## If ``msglen > len(sbytes)`` only ``len(sbytes)`` bytes will be written to + ## stream. + let length = if msglen <= 0: len(sbytes) else: min(msglen, len(sbytes)) + + if not wstream.running(): + raise newAsyncStreamIncorrectError("Incorrect stream state") + if length <= 0: + raise newAsyncStreamIncorrectError("Zero length message") + + if isNil(wstream.wsource): + var resFut = write(wstream.tsource, sbytes) + yield resFut + if resFut.failed: + raise newAsyncStreamWriteError(resFut.error) + if resFut.read() != length: + raise newAsyncStreamIncompleteError() + else: + var item = WriteItem(kind: Sequence) + if not isLiteral(sbytes): + shallowCopy(item.data2, sbytes) + else: + item.data2 = sbytes + item.size = length + item.future = newFuture[void]("async.stream.write(seq)") + await wstream.queue.put(item) + yield item.future + if item.future.failed: + raise newAsyncStreamWriteError(item.future.error) + +proc write*(wstream: AsyncStreamWriter, sbytes: string, + msglen = -1) {.async.} = + ## Write string ``sbytes`` of length ``msglen`` to writer stream ``wstream``. + ## + ## String ``sbytes`` must not be zero-length. + ## + ## If ``msglen < 0`` whole string ``sbytes`` will be writen to stream. + ## If ``msglen > len(sbytes)`` only ``len(sbytes)`` bytes will be written to + ## stream. + let length = if msglen <= 0: len(sbytes) else: min(msglen, len(sbytes)) + + if not wstream.running(): + raise newAsyncStreamIncorrectError("Incorrect stream state") + if length <= 0: + raise newAsyncStreamIncorrectError("Zero length message") + + if isNil(wstream.wsource): + var resFut = write(wstream.tsource, sbytes, msglen) + yield resFut + if resFut.failed: + raise newAsyncStreamWriteError(resFut.error) + if resFut.read() != length: + raise newAsyncStreamIncompleteError() + else: + var item = WriteItem(kind: String) + if not isLiteral(sbytes): + shallowCopy(item.data3, sbytes) + else: + item.data3 = sbytes + item.size = length + item.future = newFuture[void]("async.stream.write(string)") + await wstream.queue.put(item) + yield item.future + if item.future.failed: + raise newAsyncStreamWriteError(item.future.error) + +proc finish*(wstream: AsyncStreamWriter) {.async.} = + ## Finish write stream ``wstream``. + if not wstream.running(): + raise newAsyncStreamIncorrectError("Incorrect stream state") + + if not isNil(wstream.wsource): + var item = WriteItem(kind: Pointer) + item.size = 0 + item.future = newFuture[void]("async.stream.finish") + await wstream.queue.put(item) + yield item.future + if item.future.failed: + raise newAsyncStreamWriteError(item.future.error) + +proc join*(rw: AsyncStreamRW): Future[void] = + ## Get Future[void] which will be completed when stream become finished or + ## closed. + when rw is AsyncStreamReader: + var retFuture = newFuture[void]("async.stream.reader.join") + else: + var retFuture = newFuture[void]("async.stream.writer.join") + proc continuation(udata: pointer) = retFuture.complete() + if not rw.future.finished: + rw.future.addCallback(continuation) + else: + retFuture.complete() + return retFuture + +proc close*(rw: AsyncStreamRW) = + ## Close and frees resources of stream ``rw``. + ## + ## Note close() procedure is not completed immediately! + if rw.closed(): + raise newAsyncStreamIncorrectError("Stream is already closed!") + + proc continuation(udata: pointer) = + if not isNil(rw.udata): + GC_unref(cast[ref int](rw.udata)) + rw.state = AsyncStreamState.Closed + rw.future.complete() + when rw is AsyncStreamReader: + untrackAsyncStreamReader(rw) + elif rw is AsyncStreamWriter: + untrackAsyncStreamWriter(rw) + + when rw is AsyncStreamReader: + if isNil(rw.rsource): + callSoon(continuation) + else: + rw.exevent.fire() + elif rw is AsyncStreamWriter: + if isNil(rw.wsource): + callSoon(continuation) + else: + rw.exevent.fire() + +proc closeWait*(rw: AsyncStreamRW): Future[void] = + ## Close and frees resources of stream ``rw``. + rw.close() + result = rw.join() + +proc startReader(rstream: AsyncStreamReader) = + rstream.state = Running + if not isNil(rstream.readerLoop): + rstream.future = rstream.readerLoop(rstream) + else: + rstream.future = newFuture[void]("async.stream.empty.reader") + +proc startWriter(wstream: AsyncStreamWriter) = + wstream.state = Running + if not isNil(wstream.writerLoop): + wstream.future = wstream.writerLoop(wstream) + else: + wstream.future = newFuture[void]("async.stream.empty.writer") + +proc init*(child, wsource: AsyncStreamWriter, loop: StreamWriterLoop, + queueSize = AsyncStreamDefaultQueueSize) = + ## Initialize newly allocated object ``child`` with AsyncStreamWriter + ## parameters. + child.writerLoop = loop + child.wsource = wsource + child.tsource = wsource.tsource + child.exevent = newAsyncEvent() + child.queue = newAsyncQueue[WriteItem](queueSize) + trackAsyncStreamWriter(child) + child.startWriter() + +proc init*[T](child, wsource: AsyncStreamWriter, loop: StreamWriterLoop, + queueSize = AsyncStreamDefaultQueueSize, udata: ref T) = + ## Initialize newly allocated object ``child`` with AsyncStreamWriter + ## parameters. + child.writerLoop = loop + child.wsource = wsource + child.tsource = wsource.tsource + child.exevent = newAsyncEvent() + child.queue = newAsyncQueue[WriteItem](queueSize) + if not isNil(udata): + GC_ref(udata) + child.udata = cast[pointer](udata) + trackAsyncStreamWriter(child) + child.startWriter() + +proc init*(child, rsource: AsyncStreamReader, loop: StreamReaderLoop, + bufferSize = AsyncStreamDefaultBufferSize) = + ## Initialize newly allocated object ``child`` with AsyncStreamReader + ## parameters. + child.readerLoop = loop + child.rsource = rsource + child.tsource = rsource.tsource + child.exevent = newAsyncEvent() + child.buffer = AsyncBuffer.init(bufferSize) + trackAsyncStreamReader(child) + child.startReader() + +proc init*[T](child, rsource: AsyncStreamReader, loop: StreamReaderLoop, + bufferSize = AsyncStreamDefaultBufferSize, + udata: ref T) = + ## Initialize newly allocated object ``child`` with AsyncStreamReader + ## parameters. + child.readerLoop = loop + child.rsource = rsource + child.tsource = rsource.tsource + child.exevent = newAsyncEvent() + child.buffer = AsyncBuffer.init(bufferSize) + if not isNil(udata): + GC_ref(udata) + child.udata = cast[pointer](udata) + trackAsyncStreamReader(child) + child.startReader() + +proc init*(child: AsyncStreamWriter, tsource: StreamTransport) = + ## Initialize newly allocated object ``child`` with AsyncStreamWriter + ## parameters. + child.writerLoop = nil + child.wsource = nil + child.tsource = tsource + trackAsyncStreamWriter(child) + child.startWriter() + +proc init*[T](child: AsyncStreamWriter, tsource: StreamTransport, + udata: ref T) = + ## Initialize newly allocated object ``child`` with AsyncStreamWriter + ## parameters. + child.writerLoop = nil + child.wsource = nil + child.tsource = tsource + trackAsyncStreamWriter(child) + child.startWriter() + +proc init*(child: AsyncStreamReader, tsource: StreamTransport) = + ## Initialize newly allocated object ``child`` with AsyncStreamReader + ## parameters. + child.readerLoop = nil + child.rsource = nil + child.tsource = tsource + trackAsyncStreamReader(child) + child.startReader() + +proc init*[T](child: AsyncStreamReader, tsource: StreamTransport, + udata: ref T) = + ## Initialize newly allocated object ``child`` with AsyncStreamReader + ## parameters. + child.readerLoop = nil + child.rsource = nil + child.tsource = tsource + if not isNil(udata): + GC_ref(udata) + child.udata = cast[pointer](udata) + trackAsyncStreamReader(child) + child.startReader() + +proc newAsyncStreamReader*[T](rsource: AsyncStreamReader, + loop: StreamReaderLoop, + bufferSize = AsyncStreamDefaultBufferSize, + udata: ref T): AsyncStreamReader = + ## Create new AsyncStreamReader object, which will use other async stream + ## reader ``rsource`` as source data channel. + ## + ## ``loop`` is main reading loop procedure. + ## + ## ``bufferSize`` is internal buffer size. + ## + ## ``udata`` - user object which will be associated with new AsyncStreamReader + ## object. + result = new AsyncStreamReader + result.init(rsource, loop, bufferSize, udata) + +proc newAsyncStreamReader*(rsource: AsyncStreamReader, + loop: StreamReaderLoop, + bufferSize = AsyncStreamDefaultBufferSize + ): AsyncStreamReader = + ## Create new AsyncStreamReader object, which will use other async stream + ## reader ``rsource`` as source data channel. + ## + ## ``loop`` is main reading loop procedure. + ## + ## ``bufferSize`` is internal buffer size. + result = new AsyncStreamReader + result.init(rsource, loop, bufferSize) + +proc newAsyncStreamReader*[T](tsource: StreamTransport, + udata: ref T): AsyncStreamReader = + ## Create new AsyncStreamReader object, which will use stream transport + ## ``tsource`` as source data channel. + ## + ## ``udata`` - user object which will be associated with new AsyncStreamWriter + ## object. + result = new AsyncStreamReader + result.init(tsource, udata) + +proc newAsyncStreamReader*(tsource: StreamTransport): AsyncStreamReader = + ## Create new AsyncStreamReader object, which will use stream transport + ## ``tsource`` as source data channel. + result = new AsyncStreamReader + result.init(tsource) + +proc newAsyncStreamWriter*[T](wsource: AsyncStreamWriter, + loop: StreamWriterLoop, + queueSize = AsyncStreamDefaultQueueSize, + udata: ref T): AsyncStreamWriter = + ## Create new AsyncStreamWriter object which will use other AsyncStreamWriter + ## object ``wsource`` as data channel. + ## + ## ``loop`` is main writing loop procedure. + ## + ## ``queueSize`` is writing queue size (default size is unlimited). + ## + ## ``udata`` - user object which will be associated with new AsyncStreamWriter + ## object. + result = new AsyncStreamWriter + result.init(wsource, loop, queueSize, udata) + +proc newAsyncStreamWriter*(wsource: AsyncStreamWriter, + loop: StreamWriterLoop, + queueSize = AsyncStreamDefaultQueueSize + ): AsyncStreamWriter = + ## Create new AsyncStreamWriter object which will use other AsyncStreamWriter + ## object ``wsource`` as data channel. + ## + ## ``loop`` is main writing loop procedure. + ## + ## ``queueSize`` is writing queue size (default size is unlimited). + result = new AsyncStreamWriter + result.init(wsource, loop, queueSize) + +proc newAsyncStreamWriter*[T](tsource: StreamTransport, + udata: ref T): AsyncStreamWriter = + ## Create new AsyncStreamWriter object which will use stream transport + ## ``tsource`` as data channel. + ## + ## ``udata`` - user object which will be associated with new AsyncStreamWriter + ## object. + result = new AsyncStreamWriter + result.init(tsource, udata) + +proc newAsyncStreamWriter*(tsource: StreamTransport): AsyncStreamWriter = + ## Create new AsyncStreamWriter object which will use stream transport + ## ``tsource`` as data channel. + result = new AsyncStreamWriter + result.init(tsource) + +proc getUserData*[T](rw: AsyncStreamRW): T {.inline.} = + ## Obtain user data associated with AsyncStreamReader or AsyncStreamWriter + ## object ``rw``. + result = cast[T](rw.udata) diff --git a/chronos/streams/chunkstream.nim b/chronos/streams/chunkstream.nim new file mode 100644 index 0000000..a58e4dd --- /dev/null +++ b/chronos/streams/chunkstream.nim @@ -0,0 +1,383 @@ +# +# Chronos Asynchronous Chunked-Encoding Stream +# (c) Copyright 2019-Present +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) +import ../asyncloop, ../timer +import asyncstream, ../transports/stream, ../transports/common +export asyncstream, stream, timer, common + +const + ChunkBufferSize = 4096 + ChunkHeaderSize = 8 + ChunkDefaultTimeout = 10.seconds + CRLF = @[byte(0x0D), byte(0x0A)] + +type + ChunkedStreamReader* = ref object of AsyncStreamReader + timeout: Duration + + ChunkedStreamWriter* = ref object of AsyncStreamWriter + + ChunkedStreamError* = object of Exception + ChunkedStreamTimeoutError* = object of ChunkedStreamError + ChunkedStreamProtocolError* = object of ChunkedStreamError + +proc newTimeoutError(): ref Exception {.inline.} = + newException(ChunkedStreamTimeoutError, "Timeout exceeded!") + +proc newProtocolError(): ref Exception {.inline.} = + newException(ChunkedStreamProtocolError, "Protocol error!") + +proc oneOf*[A, B, C](fut1: Future[A], fut2: Future[B], + fut3: Future[C]): Future[void] = + ## Wait for completion of one future from list [`fut1`, `fut2`, `fut3`], + ## resulting future do not care about futures error, so you need to check + ## `fut1`, `fut2`, `fut3` for error manually. + var retFuture = newFuture[void]("chunked.oneOf()") + proc cb(data: pointer) {.gcsafe.} = + var index: int + if not retFuture.finished: + var fut = cast[FutureBase](data) + if cast[pointer](fut1) == data: + fut2.removeCallback(cb) + fut3.removeCallback(cb) + elif cast[pointer](fut2) == data: + fut1.removeCallback(cb) + fut3.removeCallback(cb) + elif cast[pointer](fut3) == data: + fut2.removeCallback(cb) + fut3.removeCallback(cb) + retFuture.complete() + fut1.callback = cb + fut2.callback = cb + fut3.callback = cb + return retFuture + +proc oneOf*[A, B](fut1: Future[A], fut2: Future[B]): Future[void] = + ## Wait for completion of `fut1` or `fut2`, resulting future do not care about + ## error, so you need to check `fut1` and `fut2` for error. + var retFuture = newFuture[void]("chunked.oneOf()") + proc cb(data: pointer) {.gcsafe.} = + var index: int + if not retFuture.finished: + var fut = cast[FutureBase](data) + if cast[pointer](fut1) == data: + fut2.removeCallback(cb) + elif cast[pointer](fut2) == data: + fut1.removeCallback(cb) + retFuture.complete() + fut1.callback = cb + fut2.callback = cb + return retFuture + +proc getChunkSize(buffer: openarray[byte]): uint64 = + # We using `uint64` representation, but allow only 2^32 chunk size, + # ChunkHeaderSize. + for i in 0..= byte('0') and ch <= byte('9'): + result = (result shl 4) or uint64(ch - byte('0')) + else: + result = (result shl 4) or uint64((ch and 0x0F) + 9) + else: + result = 0xFFFF_FFFF_FFFF_FFFF'u64 + break + +proc setChunkSize(buffer: var openarray[byte], length: int64): int = + # Store length as chunk header size (hexadecimal value) with CRLF. + # Maximum stored value is ``0xFFFF_FFFF``. + # Buffer ``buffer`` length must be at least 10 octets. + doAssert(length <= int64(uint32.high)) + var n = 0xF000_0000'i64 + var i = 32 + var c = 0 + if length == 0: + buffer[0] = byte('0') + buffer[1] = byte(0x0D) + buffer[2] = byte(0x0A) + result = 3 + else: + while n != 0: + var v = length and n + if v != 0 or c != 0: + let digit = byte((length and n) shr (i - 4)) + var ch = digit + byte('0') + if ch > byte('9'): + ch = ch + 0x07'u8 + buffer[c] = ch + inc(c) + n = n shr 4 + i = i - 4 + buffer[c] = byte(0x0D) + buffer[c + 1] = byte(0x0A) + result = c + 2 + +proc chunkedReadLoop(stream: AsyncStreamReader) {.async.} = + var rstream = cast[ChunkedStreamReader](stream) + var exitFut = rstream.exevent.wait() + var buffer = newSeq[byte](1024) + var timerFut: Future[void] + + if rstream.timeout == InfiniteDuration: + timerFut = newFuture[void]() + + while true: + if rstream.timeout != InfiniteDuration: + timerFut = sleepAsync(rstream.timeout) + + # Reading chunk size + var ruFut1 = rstream.rsource.readUntil(addr buffer[0], 1024, CRLF) + await oneOf(ruFut1, timerFut, exitFut) + + if exitFut.finished or timerFut.finished: + if timerFut.finished: + rstream.error = newTimeoutError() + rstream.state = AsyncStreamState.Error + else: + rstream.state = AsyncStreamState.Stopped + break + + if ruFut1.failed: + rstream.error = ruFut1.error + rstream.state = AsyncStreamState.Error + break + + let length = ruFut1.read() + var chunksize = getChunkSize(buffer.toOpenArray(0, length - len(CRLF) - 1)) + + if chunksize == 0xFFFF_FFFF_FFFF_FFFF'u64: + rstream.error = newProtocolError() + rstream.state = AsyncStreamState.Error + break + elif chunksize > 0'u64: + while chunksize > 0'u64: + let toRead = min(int(chunksize), rstream.buffer.bufferLen()) + var reFut2 = rstream.rsource.readExactly(rstream.buffer.getBuffer(), + toRead) + await oneOf(reFut2, timerFut, exitFut) + + if exitFut.finished or timerFut.finished: + if timerFut.finished: + rstream.error = newTimeoutError() + rstream.state = AsyncStreamState.Error + else: + rstream.state = AsyncStreamState.Stopped + break + + if reFut2.failed: + rstream.error = reFut2.error + rstream.state = AsyncStreamState.Error + break + + rstream.buffer.update(toRead) + await rstream.buffer.transfer() or exitFut + if exitFut.finished: + rstream.state = AsyncStreamState.Stopped + break + + chunksize = chunksize - uint64(toRead) + + if rstream.state != AsyncStreamState.Running: + break + + # Reading chunk trailing CRLF + var reFut3 = rstream.rsource.readExactly(addr buffer[0], 2) + await oneOf(reFut3, timerFut, exitFut) + if exitFut.finished or timerFut.finished: + if timerFut.finished: + rstream.error = newTimeoutError() + rstream.state = AsyncStreamState.Error + else: + rstream.state = AsyncStreamState.Stopped + break + + if reFut3.failed: + rstream.error = reFut3.error + rstream.state = AsyncStreamState.Error + break + + if buffer[0] != CRLF[0] or buffer[1] != CRLF[1]: + rstream.error = newProtocolError() + rstream.state = AsyncStreamState.Error + break + else: + # Reading trailing line for last chunk + var ruFut4 = rstream.rsource.readUntil(addr buffer[0], len(buffer), CRLF) + await oneOf(ruFut4, timerFut, exitFut) + + if exitFut.finished or timerFut.finished: + if timerFut.finished: + rstream.error = newTimeoutError() + rstream.state = AsyncStreamState.Error + else: + rstream.state = AsyncStreamState.Stopped + break + + if ruFut4.failed: + rstream.error = ruFut4.error + rstream.state = AsyncStreamState.Error + break + + rstream.state = AsyncStreamState.Finished + await rstream.buffer.transfer() or exitFut + if exitFut.finished: + rstream.state = AsyncStreamState.Stopped + break + + if rstream.state in {AsyncStreamState.Stopped, AsyncStreamState.Error}: + # We need to notify consumer about error/close, but we do not care about + # incoming data anymore. + rstream.buffer.forget() + + untrackAsyncStreamReader(rstream) + +proc chunkedWriteLoop(stream: AsyncStreamWriter) {.async.} = + var wstream = cast[ChunkedStreamWriter](stream) + var exitFut = wstream.exevent.wait() + var buffer: array[16, byte] + var error: ref Exception + var wFut1, wFut2, wFut3: Future[void] + + wstream.state = AsyncStreamState.Running + while true: + # Getting new item from stream's queue. + var getFut = wstream.queue.get() + await oneOf(getFut, exitFut) + if exitFut.finished: + wstream.state = AsyncStreamState.Stopped + break + var item = getFut.read() + # `item.size == 0` is marker of stream finish, while `item.size != 0` is + # data's marker. + if item.size > 0: + let length = setChunkSize(buffer, int64(item.size)) + # Writing chunk header CRLF. + wFut1 = wstream.wsource.write(addr buffer[0], length) + await oneOf(wFut1, exitFut) + + if exitFut.finished: + wstream.state = AsyncStreamState.Stopped + break + + if wFut1.failed: + item.future.fail(wFut1.error) + continue + + # Writing chunk data. + if item.kind == Pointer: + wFut2 = wstream.wsource.write(item.data1, item.size) + elif item.kind == Sequence: + wFut2 = wstream.wsource.write(addr item.data2[0], item.size) + elif item.kind == String: + wFut2 = wstream.wsource.write(addr item.data3[0], item.size) + + await oneOf(wFut2, exitFut) + if exitFut.finished: + wstream.state = AsyncStreamState.Stopped + break + + if wFut2.failed: + item.future.fail(wFut2.error) + continue + + # Writing chunk footer CRLF. + var wFut3 = wstream.wsource.write(CRLF) + await oneOf(wFut3, exitFut) + if exitFut.finished: + wstream.state = AsyncStreamState.Stopped + break + + if wFut3.failed: + item.future.fail(wFut3.error) + continue + + # Everything is fine, completing queue item's future. + item.future.complete() + else: + let length = setChunkSize(buffer, 0'i64) + + # Write finish chunk `0`. + wFut1 = wstream.wsource.write(addr buffer[0], length) + await oneOf(wFut1, exitFut) + if exitFut.finished: + wstream.state = AsyncStreamState.Stopped + break + + if wFut1.failed: + item.future.fail(wFut1.error) + # We break here, because this is last chunk + break + + # Write trailing CRLF. + wFut2 = wstream.wsource.write(CRLF) + await oneOf(wFut2, exitFut) + + if exitFut.finished: + wstream.state = AsyncStreamState.Stopped + break + + if wFut2.failed: + item.future.fail(wFut2.error) + # We break here, because this is last chunk + break + + # Everything is fine, completing queue item's future. + item.future.complete() + + # Set stream state to Finished. + wstream.state = AsyncStreamState.Finished + break + + untrackAsyncStreamWriter(wstream) + +proc init*[T](child: ChunkedStreamReader, rsource: AsyncStreamReader, + bufferSize = ChunkBufferSize, timeout = ChunkDefaultTimeout, + udata: ref T) = + child.timeout = timeout + init(cast[AsyncStreamReader](child), rsource, chunkedReadLoop, bufferSize, + udata) + +proc init*(child: ChunkedStreamReader, rsource: AsyncStreamReader, + bufferSize = ChunkBufferSize, timeout = ChunkDefaultTimeout) = + child.timeout = timeout + init(cast[AsyncStreamReader](child), rsource, chunkedReadLoop, bufferSize) + +proc newChunkedStreamReader*[T](rsource: AsyncStreamReader, + bufferSize = AsyncStreamDefaultBufferSize, + timeout = ChunkDefaultTimeout, + udata: ref T): ChunkedStreamReader = + result = new ChunkedStreamReader + result.init(rsource, bufferSize, timeout, udata) + +proc newChunkedStreamReader*(rsource: AsyncStreamReader, + bufferSize = AsyncStreamDefaultBufferSize, + timeout = ChunkDefaultTimeout, + ): ChunkedStreamReader = + result = new ChunkedStreamReader + result.init(rsource, bufferSize, timeout) + +proc init*[T](child: ChunkedStreamWriter, wsource: AsyncStreamWriter, + queueSize = AsyncStreamDefaultQueueSize, udata: ref T) = + init(cast[AsyncStreamWriter](child), wsource, chunkedWriteLoop, queueSize, + udata) + +proc init*(child: ChunkedStreamWriter, wsource: AsyncStreamWriter, + queueSize = AsyncStreamDefaultQueueSize) = + init(cast[AsyncStreamWriter](child), wsource, chunkedWriteLoop, queueSize) + +proc newChunkedStreamWriter*[T](wsource: AsyncStreamWriter, + queueSize = AsyncStreamDefaultQueueSize, + udata: ref T): ChunkedStreamWriter = + result = new ChunkedStreamWriter + result.init(wsource, queueSize, udata) + +proc newChunkedStreamWriter*(wsource: AsyncStreamWriter, + queueSize = AsyncStreamDefaultQueueSize, + ): ChunkedStreamWriter = + result = new ChunkedStreamWriter + result.init(wsource, queueSize) diff --git a/chronos/transport.nim b/chronos/transport.nim index f007178..4853858 100644 --- a/chronos/transport.nim +++ b/chronos/transport.nim @@ -6,6 +6,8 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) - import transports/[datagram, stream, common, ipnet, osnet] +import streams/[asyncstream, chunkstream] + export datagram, common, stream, ipnet, osnet +export asyncstream, chunkstream diff --git a/chronos/transports/stream.nim b/chronos/transports/stream.nim index 6b32032..193e999 100644 --- a/chronos/transports/stream.nim +++ b/chronos/transports/stream.nim @@ -1469,7 +1469,7 @@ proc readUntil*(transp: StreamTransport, pbytes: pointer, nbytes: int, ## Read data from the transport ``transp`` until separator ``sep`` is found. ## ## On success, the data and separator will be removed from the internal - ## buffer (consumed). Returned data will NOT include the separator at the end. + ## buffer (consumed). Returned data will include the separator at the end. ## ## If EOF is received, and `sep` was not found, procedure will raise ## ``TransportIncompleteError``. @@ -1578,35 +1578,37 @@ proc read*(transp: StreamTransport, n = -1): Future[seq[byte]] {.async.} = ## This procedure allocates buffer seq[byte] and return it as result. checkClosed(transp) checkPending(transp) - result = newSeq[byte]() + var res = newSeq[byte]() while true: if (ReadError in transp.state): raise transp.getError() if (ReadClosed in transp.state) or transp.atEof(): + result = res break if transp.offset > 0: - let s = len(result) + let s = len(res) let o = s + transp.offset - if n < 0: + if n <= 0: # grabbing all incoming data, until EOF - result.setLen(o) - copyMem(cast[pointer](addr result[s]), addr(transp.buffer[0]), + res.setLen(o) + copyMem(cast[pointer](addr res[s]), addr(transp.buffer[0]), transp.offset) transp.offset = 0 else: let left = n - s if transp.offset >= left: # size of buffer data is more then we need, grabbing only part - result.setLen(n) - copyMem(cast[pointer](addr result[s]), addr(transp.buffer[0]), + res.setLen(n) + copyMem(cast[pointer](addr res[s]), addr(transp.buffer[0]), left) transp.shiftBuffer(left) + result = res break else: # there not enough data in buffer, grabbing all - result.setLen(o) - copyMem(cast[pointer](addr result[s]), addr(transp.buffer[0]), + res.setLen(o) + copyMem(cast[pointer](addr res[s]), addr(transp.buffer[0]), transp.offset) transp.offset = 0 @@ -1631,7 +1633,7 @@ proc consume*(transp: StreamTransport, n = -1): Future[int] {.async.} = break if transp.offset > 0: - if n == -1: + if n <= 0: # consume all incoming data, until EOF result += transp.offset transp.offset = 0 diff --git a/tests/testall.nim b/tests/testall.nim index 3a22080..710afee 100644 --- a/tests/testall.nim +++ b/tests/testall.nim @@ -6,4 +6,5 @@ # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) import testsync, testsoon, testtime, testfut, testsignal, testaddress, - testdatagram, teststream, testserver, testbugs, testnet + testdatagram, teststream, testserver, testbugs, testnet, + testasyncstream diff --git a/tests/testasyncstream.nim b/tests/testasyncstream.nim new file mode 100644 index 0000000..9eb36f2 --- /dev/null +++ b/tests/testasyncstream.nim @@ -0,0 +1,472 @@ +# Chronos Test Suite +# (c) Copyright 2019-Present +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) +import strutils, unittest, os +import hexdump +import ../chronos + +suite "AsyncStream test suite": + test "AsyncStream(StreamTransport) readExactly() test": + proc testReadExactly(address: TransportAddress): Future[bool] {.async.} = + proc serveClient(server: StreamServer, + transp: StreamTransport) {.async.} = + var wstream = newAsyncStreamWriter(transp) + await wstream.write("000000000011111111112222222222") + await wstream.finish() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + + var buffer = newSeq[byte](10) + var server = createStreamServer(address, serveClient, {ReuseAddr}) + server.start() + var transp = await connect(address) + var rstream = newAsyncStreamReader(transp) + await rstream.readExactly(addr buffer[0], 10) + check cast[string](buffer) == "0000000000" + await rstream.readExactly(addr buffer[0], 10) + check cast[string](buffer) == "1111111111" + await rstream.readExactly(addr buffer[0], 10) + check cast[string](buffer) == "2222222222" + await rstream.closeWait() + await transp.closeWait() + await server.join() + result = true + check waitFor(testReadExactly(initTAddress("127.0.0.1:46001"))) == true + test "AsyncStream(StreamTransport) readUntil() test": + proc testReadUntil(address: TransportAddress): Future[bool] {.async.} = + proc serveClient(server: StreamServer, + transp: StreamTransport) {.async.} = + var wstream = newAsyncStreamWriter(transp) + await wstream.write("0000000000NNz1111111111NNz2222222222NNz") + await wstream.finish() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + + var buffer = newSeq[byte](13) + var sep = @[byte('N'), byte('N'), byte('z')] + var server = createStreamServer(address, serveClient, {ReuseAddr}) + server.start() + var transp = await connect(address) + var rstream = newAsyncStreamReader(transp) + var r1 = await rstream.readUntil(addr buffer[0], len(buffer), sep) + check: + r1 == 13 + cast[string](buffer) == "0000000000NNz" + var r2 = await rstream.readUntil(addr buffer[0], len(buffer), sep) + check: + r2 == 13 + cast[string](buffer) == "1111111111NNz" + var r3 = await rstream.readUntil(addr buffer[0], len(buffer), sep) + check: + r3 == 13 + cast[string](buffer) == "2222222222NNz" + + await rstream.closeWait() + await transp.closeWait() + await server.join() + result = true + check waitFor(testReadUntil(initTAddress("127.0.0.1:46001"))) == true + test "AsyncStream(StreamTransport) readLine() test": + proc testReadLine(address: TransportAddress): Future[bool] {.async.} = + proc serveClient(server: StreamServer, + transp: StreamTransport) {.async.} = + var wstream = newAsyncStreamWriter(transp) + await wstream.write("0000000000\r\n1111111111\r\n2222222222\r\n") + await wstream.finish() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + + var server = createStreamServer(address, serveClient, {ReuseAddr}) + server.start() + var transp = await connect(address) + var rstream = newAsyncStreamReader(transp) + var r1 = await rstream.readLine() + check r1 == "0000000000" + var r2 = await rstream.readLine() + check r2 == "1111111111" + var r3 = await rstream.readLine() + check r3 == "2222222222" + await rstream.closeWait() + await transp.closeWait() + await server.join() + result = true + check waitFor(testReadLine(initTAddress("127.0.0.1:46001"))) == true + test "AsyncStream(StreamTransport) read() test": + proc testRead(address: TransportAddress): Future[bool] {.async.} = + proc serveClient(server: StreamServer, + transp: StreamTransport) {.async.} = + var wstream = newAsyncStreamWriter(transp) + await wstream.write("000000000011111111112222222222") + await wstream.finish() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + + var server = createStreamServer(address, serveClient, {ReuseAddr}) + server.start() + var transp = await connect(address) + var rstream = newAsyncStreamReader(transp) + var buf1 = await rstream.read(10) + check cast[string](buf1) == "0000000000" + var buf2 = await rstream.read() + check cast[string](buf2) == "11111111112222222222" + await rstream.closeWait() + await transp.closeWait() + await server.join() + result = true + check waitFor(testRead(initTAddress("127.0.0.1:46001"))) == true + test "AsyncStream(StreamTransport) consume() test": + proc testConsume(address: TransportAddress): Future[bool] {.async.} = + proc serveClient(server: StreamServer, + transp: StreamTransport) {.async.} = + var wstream = newAsyncStreamWriter(transp) + await wstream.write("0000000000111111111122222222223333333333") + await wstream.finish() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + + var server = createStreamServer(address, serveClient, {ReuseAddr}) + server.start() + var transp = await connect(address) + var rstream = newAsyncStreamReader(transp) + var res1 = await rstream.consume(10) + check: + res1 == 10 + var buf1 = await rstream.read(10) + check cast[string](buf1) == "1111111111" + var res2 = await rstream.consume(10) + check: + res2 == 10 + var buf2 = await rstream.read(10) + check cast[string](buf2) == "3333333333" + await rstream.closeWait() + await transp.closeWait() + await server.join() + result = true + check waitFor(testConsume(initTAddress("127.0.0.1:46001"))) == true + test "AsyncStream(StreamTransport) leaks test": + check: + getTracker("async.stream.reader").isLeaked() == false + getTracker("async.stream.writer").isLeaked() == false + getTracker("stream.server").isLeaked() == false + getTracker("stream.transport").isLeaked() == false + + test "AsyncStream(AsyncStream) readExactly() test": + proc testReadExactly2(address: TransportAddress): Future[bool] {.async.} = + proc serveClient(server: StreamServer, + transp: StreamTransport) {.async.} = + var wstream = newAsyncStreamWriter(transp) + var wstream2 = newChunkedStreamWriter(wstream) + var s1 = "00000" + var s2 = "11111" + var s3 = "22222" + await wstream2.write("00000") + await wstream2.write(addr s1[0], len(s1)) + await wstream2.write("11111") + await wstream2.write(cast[seq[byte]](s2)) + await wstream2.write("22222") + await wstream2.write(addr s3[0], len(s3)) + + await wstream2.finish() + await wstream.finish() + await wstream2.closeWait() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + + var buffer = newSeq[byte](10) + var server = createStreamServer(address, serveClient, {ReuseAddr}) + server.start() + var transp = await connect(address) + var rstream = newAsyncStreamReader(transp) + var rstream2 = newChunkedStreamReader(rstream) + await rstream2.readExactly(addr buffer[0], 10) + check cast[string](buffer) == "0000000000" + await rstream2.readExactly(addr buffer[0], 10) + check cast[string](buffer) == "1111111111" + await rstream2.readExactly(addr buffer[0], 10) + check cast[string](buffer) == "2222222222" + await rstream2.closeWait() + await rstream.closeWait() + await transp.closeWait() + await server.join() + result = true + check waitFor(testReadExactly2(initTAddress("127.0.0.1:46001"))) == true + test "AsyncStream(AsyncStream) readUntil() test": + proc testReadUntil2(address: TransportAddress): Future[bool] {.async.} = + proc serveClient(server: StreamServer, + transp: StreamTransport) {.async.} = + var wstream = newAsyncStreamWriter(transp) + var wstream2 = newChunkedStreamWriter(wstream) + var s1 = "00000NNz" + var s2 = "11111NNz" + var s3 = "22222NNz" + await wstream2.write("00000") + await wstream2.write(addr s1[0], len(s1)) + await wstream2.write("11111") + await wstream2.write(s2) + await wstream2.write("22222") + await wstream2.write(cast[seq[byte]](s3)) + await wstream2.finish() + await wstream.finish() + await wstream2.closeWait() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + + var buffer = newSeq[byte](13) + var sep = @[byte('N'), byte('N'), byte('z')] + var server = createStreamServer(address, serveClient, {ReuseAddr}) + server.start() + var transp = await connect(address) + var rstream = newAsyncStreamReader(transp) + var rstream2 = newChunkedStreamReader(rstream) + + var r1 = await rstream2.readUntil(addr buffer[0], len(buffer), sep) + check: + r1 == 13 + cast[string](buffer) == "0000000000NNz" + var r2 = await rstream2.readUntil(addr buffer[0], len(buffer), sep) + check: + r2 == 13 + cast[string](buffer) == "1111111111NNz" + var r3 = await rstream2.readUntil(addr buffer[0], len(buffer), sep) + check: + r3 == 13 + cast[string](buffer) == "2222222222NNz" + await rstream2.closeWait() + await rstream.closeWait() + await transp.closeWait() + await server.join() + result = true + check waitFor(testReadUntil2(initTAddress("127.0.0.1:46001"))) == true + test "AsyncStream(AsyncStream) readLine() test": + proc testReadLine2(address: TransportAddress): Future[bool] {.async.} = + proc serveClient(server: StreamServer, + transp: StreamTransport) {.async.} = + var wstream = newAsyncStreamWriter(transp) + var wstream2 = newChunkedStreamWriter(wstream) + await wstream2.write("00000") + await wstream2.write("00000\r\n") + await wstream2.write("11111") + await wstream2.write("11111\r\n") + await wstream2.write("22222") + await wstream2.write("22222\r\n") + await wstream2.finish() + await wstream.finish() + await wstream2.closeWait() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + + var server = createStreamServer(address, serveClient, {ReuseAddr}) + server.start() + var transp = await connect(address) + var rstream = newAsyncStreamReader(transp) + var rstream2 = newChunkedStreamReader(rstream) + var r1 = await rstream2.readLine() + check r1 == "0000000000" + var r2 = await rstream2.readLine() + check r2 == "1111111111" + var r3 = await rstream2.readLine() + check r3 == "2222222222" + await rstream2.closeWait() + await rstream.closeWait() + await transp.closeWait() + await server.join() + result = true + check waitFor(testReadLine2(initTAddress("127.0.0.1:46001"))) == true + test "AsyncStream(AsyncStream) read() test": + proc testRead2(address: TransportAddress): Future[bool] {.async.} = + proc serveClient(server: StreamServer, + transp: StreamTransport) {.async.} = + var wstream = newAsyncStreamWriter(transp) + var wstream2 = newChunkedStreamWriter(wstream) + var s2 = "1111111111" + var s3 = "2222222222" + await wstream2.write("0000000000") + await wstream2.write(s2) + await wstream2.write(cast[seq[byte]](s3)) + await wstream2.finish() + await wstream.finish() + await wstream2.closeWait() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + + var server = createStreamServer(address, serveClient, {ReuseAddr}) + server.start() + var transp = await connect(address) + var rstream = newAsyncStreamReader(transp) + var rstream2 = newChunkedStreamReader(rstream) + var buf1 = await rstream2.read(10) + check cast[string](buf1) == "0000000000" + var buf2 = await rstream2.read() + check cast[string](buf2) == "11111111112222222222" + await rstream2.closeWait() + await rstream.closeWait() + await transp.closeWait() + await server.join() + result = true + check waitFor(testRead2(initTAddress("127.0.0.1:46001"))) == true + test "AsyncStream(AsyncStream) consume() test": + proc testConsume2(address: TransportAddress): Future[bool] {.async.} = + proc serveClient(server: StreamServer, + transp: StreamTransport) {.async.} = + const + S4 = @[byte('3'), byte('3'), byte('3'), byte('3'), byte('3')] + var wstream = newAsyncStreamWriter(transp) + var wstream2 = newChunkedStreamWriter(wstream) + + var s1 = "00000" + var s2 = cast[seq[byte]]("11111") + var s3 = "22222" + + await wstream2.write("00000") + await wstream2.write(s1) + await wstream2.write("11111") + await wstream2.write(s2) + await wstream2.write("22222") + await wstream2.write(addr s3[0], len(s3)) + await wstream2.write("33333") + await wstream2.write(S4) + await wstream2.finish() + await wstream.finish() + await wstream2.closeWait() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + + var server = createStreamServer(address, serveClient, {ReuseAddr}) + server.start() + var transp = await connect(address) + var rstream = newAsyncStreamReader(transp) + var rstream2 = newChunkedStreamReader(rstream) + + var res1 = await rstream2.consume(10) + check: + res1 == 10 + var buf1 = await rstream2.read(10) + check cast[string](buf1) == "1111111111" + var res2 = await rstream2.consume(10) + check: + res2 == 10 + var buf2 = await rstream2.read(10) + check cast[string](buf2) == "3333333333" + await rstream2.closeWait() + await rstream.closeWait() + await transp.closeWait() + await server.join() + result = true + check waitFor(testConsume2(initTAddress("127.0.0.1:46001"))) == true + test "AsyncStream(AsyncStream) leaks test": + check: + getTracker("async.stream.reader").isLeaked() == false + getTracker("async.stream.writer").isLeaked() == false + getTracker("stream.server").isLeaked() == false + getTracker("stream.transport").isLeaked() == false + +suite "ChunkedStream test suite": + test "ChunkedStream test vectors": + const ChunkedVectors = [ + ["4\r\nWiki\r\n5\r\npedia\r\nE\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n", + "Wikipedia in\r\n\r\nchunks."], + ["4\r\nWiki\r\n5\r\npedia\r\nE\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n0\r\n\r\n", + "Wikipedia in\r\n\r\nchunks."], + ] + proc checkVector(address: TransportAddress, + inputstr: string): Future[string] {.async.} = + proc serveClient(server: StreamServer, + transp: StreamTransport) {.async.} = + var wstream = newAsyncStreamWriter(transp) + var data = inputstr + await wstream.write(data) + await wstream.finish() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + + var server = createStreamServer(address, serveClient, {ReuseAddr}) + server.start() + var transp = await connect(address) + var rstream = newAsyncStreamReader(transp) + var rstream2 = newChunkedStreamReader(rstream) + var res = await rstream2.read() + var ress = cast[string](res) + await rstream2.closeWait() + await rstream.closeWait() + await transp.closeWait() + await server.join() + result = ress + + proc testVectors(address: TransportAddress): Future[bool] {.async.} = + var res = true + for i in 0..