diff --git a/chronos/apps/http/httpcommon.nim b/chronos/apps/http/httpcommon.nim index 598deca..ac4b39f 100644 --- a/chronos/apps/http/httpcommon.nim +++ b/chronos/apps/http/httpcommon.nim @@ -6,7 +6,8 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import stew/results, httputils, strutils, uri +import std/[strutils, uri] +import stew/results, httputils import ../../asyncloop, ../../asyncsync import ../../streams/[asyncstream, boundstream] export results, httputils, strutils @@ -15,6 +16,8 @@ const HeadersMark* = @[byte(0x0D), byte(0x0A), byte(0x0D), byte(0x0A)] PostMethods* = {MethodPost, MethodPatch, MethodPut, MethodDelete} + MaximumBodySizeError* = "Maximum size of request's body reached" + type HttpResult*[T] = Result[T, string] HttpResultCode*[T] = Result[T, HttpCode] @@ -48,20 +51,29 @@ proc newHttpBodyReader*(streams: varargs[AsyncStreamReader]): HttpBodyReader = proc closeWait*(bstream: HttpBodyReader) {.async.} = ## Close and free resource allocated by body reader. - if len(bstream.streams) > 0: - var res = newSeq[Future[void]]() - for item in bstream.streams.items(): - res.add(item.closeWait()) - await allFutures(res) - await procCall(AsyncStreamReader(bstream).closeWait()) + var res = newSeq[Future[void]]() + # We closing streams in reversed order because stream at position [0], uses + # data from stream at position [1]. + for index in countdown((len(bstream.streams) - 1), 0): + res.add(bstream.streams[index].closeWait()) + await allFutures(res) + await procCall(closeWait(AsyncStreamReader(bstream))) -proc atBound*(bstream: HttpBodyReader): bool {. - raises: [Defect].} = - ## Returns ``true`` if lowest stream is at EOF. - let lreader = bstream.streams[^1] - doAssert(lreader of BoundedStreamReader) - let breader = cast[BoundedStreamReader](lreader) - breader.atEof() and (breader.bytesLeft() == 0) +proc hasOverflow*(bstream: HttpBodyReader): bool {.raises: [Defect].} = + if len(bstream.streams) == 1: + # If HttpBodyReader has only one stream it has ``BoundedStreamReader``, in + # such case its impossible to get more bytes then expected amount. + false + else: + # If HttpBodyReader has two or more streams, we check if + # ``BoundedStreamReader`` at EOF. + if bstream.streams[0].atEof(): + for i in 1 ..< len(bstream.streams): + if not(bstream.streams[1].atEof()): + return true + false + else: + false proc raiseHttpCriticalError*(msg: string, code = Http400) {.noinline, noreturn.} = diff --git a/chronos/apps/http/httpserver.nim b/chronos/apps/http/httpserver.nim index 00f6031..cc4e5da 100644 --- a/chronos/apps/http/httpserver.nim +++ b/chronos/apps/http/httpserver.nim @@ -365,14 +365,14 @@ proc getBodyReader*(request: HttpRequestRef): HttpResult[HttpBodyReader] = ## leaks. if HttpRequestFlags.BoundBody in request.requestFlags: let bstream = newBoundedStreamReader(request.connection.reader, - request.contentLength) + uint64(request.contentLength)) ok(newHttpBodyReader(bstream)) elif HttpRequestFlags.UnboundBody in request.requestFlags: let maxBodySize = request.connection.server.maxRequestBodySize - let bstream = newBoundedStreamReader(request.connection.reader, maxBodySize, + let cstream = newChunkedStreamReader(request.connection.reader) + let bstream = newBoundedStreamReader(cstream, uint64(maxBodySize), comparison = BoundCmp.LessOrEqual) - let cstream = newChunkedStreamReader(bstream) - ok(newHttpBodyReader(cstream, bstream)) + ok(newHttpBodyReader(bstream, cstream)) else: err("Request do not have body available") @@ -399,12 +399,12 @@ proc getBody*(request: HttpRequestRef): Future[seq[byte]] {.async.} = let reader = res.get() try: await request.handleExpect() - return await reader.read() + var res = await reader.read() + if reader.hasOverflow(): + raiseHttpCriticalError(MaximumBodySizeError, Http413) + return res except AsyncStreamError: - if reader.atBound(): - raiseHttpCriticalError("Maximum size of body reached", Http413) - else: - raiseHttpCriticalError("Unable to read request's body") + raiseHttpCriticalError("Unable to read request's body") finally: await closeWait(res.get()) @@ -418,11 +418,10 @@ proc consumeBody*(request: HttpRequestRef): Future[void] {.async.} = try: await request.handleExpect() discard await reader.consume() + if reader.hasOverflow(): + raiseHttpCriticalError(MaximumBodySizeError, Http413) except AsyncStreamError: - if reader.atBound(): - raiseHttpCriticalError("Maximum size of body reached", Http413) - else: - raiseHttpCriticalError("Unable to read request's body") + raiseHttpCriticalError("Unable to read request's body") finally: await closeWait(res.get()) diff --git a/chronos/apps/http/multipart.nim b/chronos/apps/http/multipart.nim index 275df74..a5d2fc1 100644 --- a/chronos/apps/http/multipart.nim +++ b/chronos/apps/http/multipart.nim @@ -14,6 +14,9 @@ import ../../streams/[asyncstream, boundstream, chunkstream] import httptable, httpcommon export httptable, httpcommon, asyncstream +const + UnableToReadMultipartBody = "Unable to read multipart message body" + type MultiPartSource* {.pure.} = enum Stream, Buffer @@ -165,10 +168,10 @@ proc readPart*(mpr: MultiPartReaderRef): Future[MultiPart] {.async.} = except CancelledError as exc: raise exc except AsyncStreamError: - if mpr.stream.atBound(): - raiseHttpCriticalError("Maximum size of body reached", Http413) + if mpr.stream.hasOverflow(): + raiseHttpCriticalError(MaximumBodySizeError, Http413) else: - raiseHttpCriticalError("Unable to read multipart body") + raiseHttpCriticalError(UnableToReadMultipartBody) # Reading part's headers try: @@ -199,7 +202,7 @@ proc readPart*(mpr: MultiPartReaderRef): Future[MultiPart] {.async.} = kind: MultiPartSource.Stream, headers: HttpTable.init(), breader: mpr.stream, - stream: newBoundedStreamReader(mpr.stream, -1, mpr.boundary), + stream: newBoundedStreamReader(mpr.stream, mpr.boundary), counter: mpr.counter ) @@ -214,14 +217,10 @@ proc readPart*(mpr: MultiPartReaderRef): Future[MultiPart] {.async.} = except CancelledError as exc: raise exc except AsyncStreamError: - if mpr.stream.atBound(): - raiseHttpCriticalError("Maximum size of body reached", Http413) + if mpr.stream.hasOverflow(): + raiseHttpCriticalError(MaximumBodySizeError, Http413) else: - raiseHttpCriticalError("Unable to read multipart body") - -proc atBound*(mp: MultiPart): bool = - ## Returns ``true`` if MultiPart's stream reached request body maximum size. - mp.breader.atBound() + raiseHttpCriticalError(UnableToReadMultipartBody) proc getBody*(mp: MultiPart): Future[seq[byte]] {.async.} = ## Get multipart's ``mp`` value as sequence of bytes. @@ -231,10 +230,10 @@ proc getBody*(mp: MultiPart): Future[seq[byte]] {.async.} = let res = await mp.stream.read() return res except AsyncStreamError: - if mp.breader.atBound(): - raiseHttpCriticalError("Maximum size of body reached", Http413) + if mp.breader.hasOverflow(): + raiseHttpCriticalError(MaximumBodySizeError, Http413) else: - raiseHttpCriticalError("Unable to read multipart body") + raiseHttpCriticalError(UnableToReadMultipartBody) of MultiPartSource.Buffer: return mp.buffer @@ -245,10 +244,10 @@ proc consumeBody*(mp: MultiPart) {.async.} = try: discard await mp.stream.consume() except AsyncStreamError: - if mp.breader.atBound(): - raiseHttpCriticalError("Maximum size of body reached", Http413) + if mp.breader.hasOverflow(): + raiseHttpCriticalError(MaximumBodySizeError, Http413) else: - raiseHttpCriticalError("Unable to consume multipart body") + raiseHttpCriticalError(UnableToReadMultipartBody) of MultiPartSource.Buffer: discard diff --git a/chronos/streams/asyncstream.nim b/chronos/streams/asyncstream.nim index 5cffeb6..04b7ed4 100644 --- a/chronos/streams/asyncstream.nim +++ b/chronos/streams/asyncstream.nim @@ -238,31 +238,115 @@ template checkStreamClosed*(t: untyped) = proc atEof*(rstream: AsyncStreamReader): bool = ## Returns ``true`` is reading stream is closed or finished and internal ## buffer do not have any bytes left. - rstream.state in {AsyncStreamState.Stopped, Finished, Closed, Error} and - (rstream.buffer.dataLen() == 0) + if isNil(rstream.readerLoop): + if isNil(rstream.rsource): + rstream.tsource.atEof() + else: + rstream.rsource.atEof() + else: + rstream.state in {AsyncStreamState.Stopped, Finished, Closed, Error} and + (rstream.buffer.dataLen() == 0) proc atEof*(wstream: AsyncStreamWriter): bool = ## Returns ``true`` is writing stream ``wstream`` closed or finished. - wstream.state in {AsyncStreamState.Stopped, Finished, Closed} + if isNil(wstream.writerLoop): + if isNil(wstream.wsource): + wstream.tsource.atEof() + else: + wstream.wsource.atEof() + else: + wstream.state in {AsyncStreamState.Stopped, Finished, Closed, Error} -proc closed*(rw: AsyncStreamRW): bool {.inline.} = +proc closed*(reader: AsyncStreamReader): bool = ## Returns ``true`` is reading/writing stream is closed. - (rw.state == AsyncStreamState.Closed) + (reader.state == AsyncStreamState.Closed) -proc finished*(rw: AsyncStreamRW): bool {.inline.} = +proc finished*(reader: AsyncStreamReader): bool = ## Returns ``true`` is reading/writing stream is finished (completed). - (rw.state == AsyncStreamState.Finished) + if isNil(reader.readerLoop): + if isNil(reader.rsource): + reader.tsource.finished() + else: + reader.rsource.finished() + else: + (reader.state == AsyncStreamState.Finished) -proc stopped*(rw: AsyncStreamRW): bool {.inline.} = +proc stopped*(reader: AsyncStreamReader): bool = ## Returns ``true`` is reading/writing stream is stopped (interrupted). - (rw.state == AsyncStreamState.Stopped) + if isNil(reader.readerLoop): + if isNil(reader.rsource): + false + else: + reader.rsource.stopped() + else: + (reader.state == AsyncStreamState.Stopped) -proc running*(rw: AsyncStreamRW): bool {.inline.} = +proc running*(reader: AsyncStreamReader): bool = ## Returns ``true`` is reading/writing stream is still pending. - (rw.state == AsyncStreamState.Running) + if isNil(reader.readerLoop): + if isNil(reader.rsource): + reader.tsource.running() + else: + reader.rsource.running() + else: + (reader.state == AsyncStreamState.Running) -proc setupAsyncStreamReaderTracker(): AsyncStreamTracker {.gcsafe, raises: [Defect].} -proc setupAsyncStreamWriterTracker(): AsyncStreamTracker {.gcsafe, raises: [Defect].} +proc failed*(reader: AsyncStreamReader): bool = + if isNil(reader.readerLoop): + if isNil(reader.rsource): + reader.tsource.failed() + else: + reader.rsource.failed() + else: + (reader.state == AsyncStreamState.Error) + +proc closed*(writer: AsyncStreamWriter): bool = + ## Returns ``true`` is reading/writing stream is closed. + (writer.state == AsyncStreamState.Closed) + +proc finished*(writer: AsyncStreamWriter): bool = + ## Returns ``true`` is reading/writing stream is finished (completed). + if isNil(writer.writerLoop): + if isNil(writer.wsource): + writer.tsource.finished() + else: + writer.wsource.finished() + else: + (writer.state == AsyncStreamState.Finished) + +proc stopped*(writer: AsyncStreamWriter): bool = + ## Returns ``true`` is reading/writing stream is stopped (interrupted). + if isNil(writer.writerLoop): + if isNil(writer.wsource): + false + else: + writer.wsource.stopped() + else: + (writer.state == AsyncStreamState.Stopped) + +proc running*(writer: AsyncStreamWriter): bool = + ## Returns ``true`` is reading/writing stream is still pending. + if isNil(writer.writerLoop): + if isNil(writer.wsource): + writer.tsource.running() + else: + writer.wsource.running() + else: + (writer.state == AsyncStreamState.Running) + +proc failed*(writer: AsyncStreamWriter): bool = + if isNil(writer.writerLoop): + if isNil(writer.wsource): + writer.tsource.failed() + else: + writer.wsource.failed() + else: + (writer.state == AsyncStreamState.Error) + +proc setupAsyncStreamReaderTracker(): AsyncStreamTracker {. + gcsafe, raises: [Defect].} +proc setupAsyncStreamWriterTracker(): AsyncStreamTracker {. + gcsafe, raises: [Defect].} proc getAsyncStreamReaderTracker(): AsyncStreamTracker {.inline.} = var res = cast[AsyncStreamTracker](getTracker(AsyncStreamReaderTrackerName)) diff --git a/chronos/streams/boundstream.nim b/chronos/streams/boundstream.nim index afe9166..3d6ad0f 100644 --- a/chronos/streams/boundstream.nim +++ b/chronos/streams/boundstream.nim @@ -14,6 +14,7 @@ ## ## For stream writing it means that you should write exactly bounded size ## of bytes. +import std/options import ../asyncloop, ../timer import asyncstream, ../transports/stream, ../transports/common export asyncstream, stream, timer, common @@ -23,14 +24,14 @@ type Equal, LessOrEqual BoundedStreamReader* = ref object of AsyncStreamReader - boundSize: int + boundSize: Option[uint64] boundary: seq[byte] - offset: int + offset: uint64 cmpop: BoundCmp BoundedStreamWriter* = ref object of AsyncStreamWriter - boundSize: int - offset: int + boundSize: uint64 + offset: uint64 cmpop: BoundCmp BoundedStreamError* = object of AsyncStreamError @@ -41,13 +42,18 @@ type const BoundedBufferSize* = 4096 + BoundSizeDefectMessage = "Bound size must be bigger than zero" + BoundarySizeDefectMessage = "Boundary must not be empty array" -proc newBoundedStreamIncompleteError*(): ref BoundedStreamError {.noinline.} = +template newBoundedStreamIncompleteError(): ref BoundedStreamError = newException(BoundedStreamIncompleteError, "Stream boundary is not reached yet") -proc readUntilBoundary*(rstream: AsyncStreamReader, pbytes: pointer, - nbytes: int, sep: seq[byte]): Future[int] {.async.} = +template newBoundedStreamOverflowError(): ref BoundedStreamOverflowError = + newException(BoundedStreamOverflowError, "Stream boundary exceeded") + +proc readUntilBoundary(rstream: AsyncStreamReader, pbytes: pointer, + nbytes: int, sep: seq[byte]): Future[int] {.async.} = doAssert(not(isNil(pbytes)), "pbytes must not be nil") doAssert(nbytes >= 0, "nbytes must be non-negative value") checkStreamClosed(rstream) @@ -96,36 +102,21 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async.} = rstream.state = AsyncStreamState.Running var buffer = newSeq[byte](rstream.buffer.bufferLen()) while true: - # r1 is `true` if `boundSize` was not set. - let r1 = rstream.boundSize < 0 - # r2 is `true` if number of bytes read is less then `boundSize`. - let r2 = (rstream.boundSize > 0) and (rstream.offset < rstream.boundSize) - if r1 or r2: - let toRead = - if rstream.boundSize < 0: - len(buffer) - else: - min(rstream.boundSize - rstream.offset, len(buffer)) - try: - let res = await readUntilBoundary(rstream.rsource, addr buffer[0], - toRead, rstream.boundary) - if res > 0: - if len(rstream.boundary) > 0: - if endsWith(buffer.toOpenArray(0, res - 1), rstream.boundary): - let length = res - len(rstream.boundary) - rstream.offset = rstream.offset + length - await upload(addr rstream.buffer, addr buffer[0], length) - rstream.state = AsyncStreamState.Finished - else: - if (res < toRead) and rstream.rsource.atEof(): - case rstream.cmpop - of BoundCmp.Equal: - rstream.state = AsyncStreamState.Error - rstream.error = newBoundedStreamIncompleteError() - of BoundCmp.LessOrEqual: - rstream.state = AsyncStreamState.Finished - rstream.offset = rstream.offset + res - await upload(addr rstream.buffer, addr buffer[0], res) + let toRead = + if rstream.boundSize.isNone(): + len(buffer) + else: + int(min(rstream.boundSize.get() - rstream.offset, uint64(len(buffer)))) + try: + let res = await readUntilBoundary(rstream.rsource, addr buffer[0], + toRead, rstream.boundary) + if res > 0: + if len(rstream.boundary) > 0: + if endsWith(buffer.toOpenArray(0, res - 1), rstream.boundary): + let length = res - len(rstream.boundary) + rstream.offset = rstream.offset + uint64(length) + rstream.state = AsyncStreamState.Finished + await upload(addr rstream.buffer, addr buffer[0], length) else: if (res < toRead) and rstream.rsource.atEof(): case rstream.cmpop @@ -134,49 +125,64 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async.} = rstream.error = newBoundedStreamIncompleteError() of BoundCmp.LessOrEqual: rstream.state = AsyncStreamState.Finished - rstream.offset = rstream.offset + res + + rstream.offset = rstream.offset + uint64(res) + if rstream.boundSize.isSome() and + (rstream.offset == rstream.boundSize.get()): + # This is "fast-path" to avoid one more iteration until EOF + rstream.state = AsyncStreamState.Finished await upload(addr rstream.buffer, addr buffer[0], res) else: - case rstream.cmpop - of BoundCmp.Equal: - rstream.state = AsyncStreamState.Error - rstream.error = newBoundedStreamIncompleteError() - of BoundCmp.LessOrEqual: + if (res < toRead) and rstream.rsource.atEof(): + case rstream.cmpop + of BoundCmp.Equal: + rstream.state = AsyncStreamState.Error + rstream.error = newBoundedStreamIncompleteError() + of BoundCmp.LessOrEqual: + rstream.state = AsyncStreamState.Finished + + rstream.offset = rstream.offset + uint64(res) + if rstream.boundSize.isSome() and + (rstream.offset == rstream.boundSize.get()): + # This is "fast-path" to avoid one more iteration until EOF rstream.state = AsyncStreamState.Finished - - except AsyncStreamReadError as exc: - rstream.state = AsyncStreamState.Error - rstream.error = exc - except CancelledError: - rstream.state = AsyncStreamState.Stopped - - if rstream.state != AsyncStreamState.Running: - if rstream.state == AsyncStreamState.Finished: - # This is state when BoundCmp.LessOrEqual and readExactly returned - # `AsyncStreamIncompleteError`. + await upload(addr rstream.buffer, addr buffer[0], res) + else: + case rstream.cmpop + of BoundCmp.Equal: + rstream.state = AsyncStreamState.Error + rstream.error = newBoundedStreamIncompleteError() + of BoundCmp.LessOrEqual: + rstream.state = AsyncStreamState.Finished await rstream.buffer.transfer() - break - else: - rstream.state = AsyncStreamState.Finished + + except AsyncStreamError as exc: + rstream.state = AsyncStreamState.Error + rstream.error = exc + except CancelledError: + rstream.state = AsyncStreamState.Error + rstream.error = newAsyncStreamUseClosedError() + + case rstream.state + of AsyncStreamState.Running: + discard + of AsyncStreamState.Error, AsyncStreamState.Stopped: + rstream.buffer.forget() + break + of AsyncStreamState.Finished, AsyncStreamState.Closed: break - # We need to notify consumer about error/close, but we do not care about - # incoming data anymore. - rstream.buffer.forget() - proc boundedWriteLoop(stream: AsyncStreamWriter) {.async.} = + var error: ref AsyncStreamError var wstream = BoundedStreamWriter(stream) wstream.state = AsyncStreamState.Running while true: - var - item: WriteItem - error: ref AsyncStreamError - + var item: WriteItem try: item = await wstream.queue.get() if item.size > 0: - if item.size <= (wstream.boundSize - wstream.offset): + if uint64(item.size) <= (wstream.boundSize - wstream.offset): # Writing chunk data. case item.kind of WriteType.Pointer: @@ -185,14 +191,16 @@ proc boundedWriteLoop(stream: AsyncStreamWriter) {.async.} = await wstream.wsource.write(addr item.dataSeq[0], item.size) of WriteType.String: await wstream.wsource.write(addr item.dataStr[0], item.size) - wstream.offset = wstream.offset + item.size + wstream.offset = wstream.offset + uint64(item.size) item.future.complete() else: wstream.state = AsyncStreamState.Error - error = newException(BoundedStreamOverflowError, - "Stream boundary exceeded") + error = newBoundedStreamOverflowError() else: - if wstream.offset != wstream.boundSize: + if wstream.offset == wstream.boundSize: + wstream.state = AsyncStreamState.Finished + item.future.complete() + else: case wstream.cmpop of BoundCmp.Equal: wstream.state = AsyncStreamState.Error @@ -200,96 +208,269 @@ proc boundedWriteLoop(stream: AsyncStreamWriter) {.async.} = of BoundCmp.LessOrEqual: wstream.state = AsyncStreamState.Finished item.future.complete() - else: - wstream.state = AsyncStreamState.Finished - item.future.complete() except CancelledError: wstream.state = AsyncStreamState.Stopped error = newAsyncStreamUseClosedError() - except AsyncStreamWriteError as exc: - wstream.state = AsyncStreamState.Error - error = exc - except AsyncStreamIncompleteError as exc: + except AsyncStreamError as exc: wstream.state = AsyncStreamState.Error error = exc - if wstream.state != AsyncStreamState.Running: - if wstream.state == AsyncStreamState.Finished: - error = newAsyncStreamUseClosedError() - else: - if not(isNil(item.future)): - if not(item.future.finished()): - item.future.fail(error) - while not(wstream.queue.empty()): - let pitem = wstream.queue.popFirstNoWait() - if not(pitem.future.finished()): - pitem.future.fail(error) + case wstream.state + of AsyncStreamState.Running: + discard + of AsyncStreamState.Error, AsyncStreamState.Stopped: + if not(isNil(item.future)): + if not(item.future.finished()): + item.future.fail(error) break + of AsyncStreamState.Finished, AsyncStreamState.Closed: + error = newAsyncStreamUseClosedError() + break + + doAssert(not(isNil(error))) + while not(wstream.queue.empty()): + let item = wstream.queue.popFirstNoWait() + if not(item.future.finished()): + item.future.fail(error) proc bytesLeft*(stream: BoundedStreamRW): uint64 = ## Returns number of bytes left in stream. - uint64(stream.boundSize) - stream.bytesCount + if stream.boundSize.isSome(): + stream.boundSize.get() - stream.bytesCount + else: + 0'u64 proc init*[T](child: BoundedStreamReader, rsource: AsyncStreamReader, + boundSize: uint64, comparison = BoundCmp.Equal, bufferSize = BoundedBufferSize, udata: ref T) = + doAssert(boundSize > 0'u64, BoundSizeDefectMessage) + child.boundSize = some(boundSize) + child.cmpop = comparison + init(AsyncStreamReader(child), rsource, boundedReadLoop, bufferSize, + udata) + +proc init*[T](child: BoundedStreamReader, rsource: AsyncStreamReader, + boundary: openarray[byte], comparison = BoundCmp.Equal, + bufferSize = BoundedBufferSize, udata: ref T) = + doAssert(len(boundary) > 0, BoundarySizeDefectMessage) + child.boundary = @boundary + child.cmpop = comparison + init(AsyncStreamReader(child), rsource, boundedReadLoop, bufferSize, + udata) + +proc init*[T](child: BoundedStreamReader, rsource: AsyncStreamReader, + boundSize: uint64, boundary: openarray[byte], + comparison = BoundCmp.Equal, + bufferSize = BoundedBufferSize, udata: ref T) = + doAssert(boundSize > 0'u64, BoundSizeDefectMessage) + doAssert(len(boundary) > 0, BoundarySizeDefectMessage) + child.boundSize = some(boundSize) + child.boundary = @boundary + child.cmpop = comparison init(AsyncStreamReader(child), rsource, boundedReadLoop, bufferSize, udata) proc init*(child: BoundedStreamReader, rsource: AsyncStreamReader, + boundSize: uint64, comparison = BoundCmp.Equal, bufferSize = BoundedBufferSize) = + doAssert(boundSize > 0'u64, BoundSizeDefectMessage) + child.boundSize = some(boundSize) + child.cmpop = comparison + init(AsyncStreamReader(child), rsource, boundedReadLoop, bufferSize) + +proc init*(child: BoundedStreamReader, rsource: AsyncStreamReader, + boundary: openarray[byte], comparison = BoundCmp.Equal, + bufferSize = BoundedBufferSize) = + doAssert(len(boundary) > 0, BoundarySizeDefectMessage) + child.boundary = @boundary + child.cmpop = comparison + init(AsyncStreamReader(child), rsource, boundedReadLoop, bufferSize) + +proc init*(child: BoundedStreamReader, rsource: AsyncStreamReader, + boundSize: uint64, boundary: openarray[byte], + comparison = BoundCmp.Equal, bufferSize = BoundedBufferSize) = + doAssert(boundSize > 0'u64, BoundSizeDefectMessage) + doAssert(len(boundary) > 0, BoundarySizeDefectMessage) + child.boundSize = some(boundSize) + child.boundary = @boundary + child.cmpop = comparison init(AsyncStreamReader(child), rsource, boundedReadLoop, bufferSize) proc newBoundedStreamReader*[T](rsource: AsyncStreamReader, - boundSize: int, - boundary: openarray[byte] = [], + boundSize: uint64, comparison = BoundCmp.Equal, bufferSize = BoundedBufferSize, udata: ref T): BoundedStreamReader = - doAssert(not(boundSize <= 0 and (len(boundary) == 0)), - "At least one type of boundary should be set") - var res = BoundedStreamReader(boundSize: boundSize, boundary: @boundary, - cmpop: comparison) - res.init(rsource, bufferSize, udata) + ## Create new stream reader which will be limited by size ``boundSize``. When + ## number of bytes readed by consumer reaches ``boundSize``, + ## BoundedStreamReader will enter EOF state (no more bytes will be returned + ## to the consumer). + ## + ## If ``comparison`` operator is ``BoundCmp.Equal`` and number of bytes readed + ## from source stream reader ``rsource`` is less than ``boundSize`` - + ## ``BoundedStreamIncompleteError`` will be raised. But comparison operator + ## ``BoundCmp.LessOrEqual`` allows to consume less bytes without + ## ``BoundedStreamIncompleteError`` exception. + var res = BoundedStreamReader() + res.init(rsource, boundSize, comparison, bufferSize, udata) + res + +proc newBoundedStreamReader*[T](rsource: AsyncStreamReader, + boundary: openarray[byte], + comparison = BoundCmp.Equal, + bufferSize = BoundedBufferSize, + udata: ref T): BoundedStreamReader = + ## Create new stream reader which will be limited by binary boundary + ## ``boundary``. As soon as reader reaches ``boundary`` BoundedStreamReader + ## will enter EOF state (no more bytes will be returned to the consumer). + ## + ## If ``comparison`` operator is ``BoundCmp.Equal`` and number of bytes readed + ## from source stream reader ``rsource`` is less than ``boundSize`` - + ## ``BoundedStreamIncompleteError`` will be raised. But comparison operator + ## ``BoundCmp.LessOrEqual`` allows to consume less bytes without + ## ``BoundedStreamIncompleteError`` exception. + var res = BoundedStreamReader() + res.init(rsource, boundary, comparison, bufferSize, udata) + res + +proc newBoundedStreamReader*[T](rsource: AsyncStreamReader, + boundSize: uint64, + boundary: openarray[byte], + comparison = BoundCmp.Equal, + bufferSize = BoundedBufferSize, + udata: ref T): BoundedStreamReader = + ## Create new stream reader which will be limited by size ``boundSize`` or + ## boundary ``boundary``. As soon as reader reaches ``boundary`` ``OR`` number + ## of bytes readed from source stream reader ``rsource`` reaches ``boundSize`` + ## BoundStreamReader will enter EOF state (no more bytes will be returned to + ## the consumer). + ## + ## If ``comparison`` operator is ``BoundCmp.Equal`` and number of bytes readed + ## from source stream reader ``rsource`` is less than ``boundSize`` - + ## ``BoundedStreamIncompleteError`` will be raised. But comparison operator + ## ``BoundCmp.LessOrEqual`` allows to consume less bytes without + ## ``BoundedStreamIncompleteError`` exception. + var res = BoundedStreamReader() + res.init(rsource, boundSize, boundary, comparison, bufferSize, udata) res proc newBoundedStreamReader*(rsource: AsyncStreamReader, - boundSize: int, - boundary: openarray[byte] = [], + boundSize: uint64, comparison = BoundCmp.Equal, bufferSize = BoundedBufferSize, - ): BoundedStreamReader = - doAssert(not(boundSize <= 0 and (len(boundary) == 0)), - "At least one type of boundary should be set") - var res = BoundedStreamReader(boundSize: boundSize, boundary: @boundary, - cmpop: comparison) - res.init(rsource, bufferSize) + ): BoundedStreamReader = + ## Create new stream reader which will be limited by size ``boundSize``. When + ## number of bytes readed by consumer reaches ``boundSize``, + ## BoundedStreamReader will enter EOF state (no more bytes will be returned + ## to the consumer). + ## + ## If ``comparison`` operator is ``BoundCmp.Equal`` and number of bytes readed + ## from source stream reader ``rsource`` is less than ``boundSize`` - + ## ``BoundedStreamIncompleteError`` will be raised. But comparison operator + ## ``BoundCmp.LessOrEqual`` allows to consume less bytes without + ## ``BoundedStreamIncompleteError`` exception. + var res = BoundedStreamReader() + res.init(rsource, boundSize, comparison, bufferSize) + res + +proc newBoundedStreamReader*(rsource: AsyncStreamReader, + boundary: openarray[byte], + comparison = BoundCmp.Equal, + bufferSize = BoundedBufferSize, + ): BoundedStreamReader = + ## Create new stream reader which will be limited by binary boundary + ## ``boundary``. As soon as reader reaches ``boundary`` BoundedStreamReader + ## will enter EOF state (no more bytes will be returned to the consumer). + ## + ## If ``comparison`` operator is ``BoundCmp.Equal`` and number of bytes readed + ## from source stream reader ``rsource`` is less than ``boundSize`` - + ## ``BoundedStreamIncompleteError`` will be raised. But comparison operator + ## ``BoundCmp.LessOrEqual`` allows to consume less bytes without + ## ``BoundedStreamIncompleteError`` exception. + var res = BoundedStreamReader() + res.init(rsource, boundary, comparison, bufferSize) + res + +proc newBoundedStreamReader*(rsource: AsyncStreamReader, + boundSize: uint64, + boundary: openarray[byte], + comparison = BoundCmp.Equal, + bufferSize = BoundedBufferSize, + ): BoundedStreamReader = + ## Create new stream reader which will be limited by size ``boundSize`` or + ## boundary ``boundary``. As soon as reader reaches ``boundary`` ``OR`` number + ## of bytes readed from source stream reader ``rsource`` reaches ``boundSize`` + ## BoundStreamReader will enter EOF state (no more bytes will be returned to + ## the consumer). + ## + ## If ``comparison`` operator is ``BoundCmp.Equal`` and number of bytes readed + ## from source stream reader ``rsource`` is less than ``boundSize`` - + ## ``BoundedStreamIncompleteError`` will be raised. But comparison operator + ## ``BoundCmp.LessOrEqual`` allows to consume less bytes without + ## ``BoundedStreamIncompleteError`` exception. + var res = BoundedStreamReader() + res.init(rsource, boundSize, boundary, comparison, bufferSize) res proc init*[T](child: BoundedStreamWriter, wsource: AsyncStreamWriter, + boundSize: uint64, comparison = BoundCmp.Equal, queueSize = AsyncStreamDefaultQueueSize, udata: ref T) = + doAssert(boundSize > 0'u64, BoundSizeDefectMessage) + child.boundSize = boundSize + child.cmpop = comparison init(AsyncStreamWriter(child), wsource, boundedWriteLoop, queueSize, udata) proc init*(child: BoundedStreamWriter, wsource: AsyncStreamWriter, + boundSize: uint64, comparison = BoundCmp.Equal, queueSize = AsyncStreamDefaultQueueSize) = + doAssert(boundSize > 0'u64, BoundSizeDefectMessage) + child.boundSize = boundSize + child.cmpop = comparison init(AsyncStreamWriter(child), wsource, boundedWriteLoop, queueSize) proc newBoundedStreamWriter*[T](wsource: AsyncStreamWriter, - boundSize: int, + boundSize: uint64, comparison = BoundCmp.Equal, queueSize = AsyncStreamDefaultQueueSize, udata: ref T): BoundedStreamWriter = - doAssert(boundSize > 0, "Bound size must be bigger then zero") - var res = BoundedStreamWriter(boundSize: boundSize, cmpop: comparison) - res.init(wsource, queueSize, udata) + ## Create new stream writer which will be limited by size ``boundSize``. As + ## soon as number of bytes written to the destination stream ``wsource`` + ## reaches ``boundSize`` stream will enter EOF state (no more bytes will be + ## sent to remote destination stream ``wsource``). + ## + ## If ``comparison`` operator is ``BoundCmp.Equal`` and number of bytes + ## written to destination stream ``wsource`` is less than ``boundSize`` - + ## ``BoundedStreamIncompleteError`` will be raised on stream finishing. But + ## comparison operator ``BoundCmp.LessOrEqual`` allows to send less bytes + ## without ``BoundedStreamIncompleteError`` exception. + ## + ## For both comparison operators any attempt to write more bytes than + ## ``boundSize`` will be interrupted with ``BoundedStreamOverflowError`` + ## exception. + var res = BoundedStreamWriter() + res.init(wsource, boundSize, comparison, queueSize, udata) res proc newBoundedStreamWriter*(wsource: AsyncStreamWriter, - boundSize: int, + boundSize: uint64, comparison = BoundCmp.Equal, queueSize = AsyncStreamDefaultQueueSize, ): BoundedStreamWriter = - doAssert(boundSize > 0, "Bound size must be bigger then zero") - var res = BoundedStreamWriter(boundSize: boundSize, cmpop: comparison) - res.init(wsource, queueSize) + ## Create new stream writer which will be limited by size ``boundSize``. As + ## soon as number of bytes written to the destination stream ``wsource`` + ## reaches ``boundSize`` stream will enter EOF state (no more bytes will be + ## sent to remote destination stream ``wsource``). + ## + ## If ``comparison`` operator is ``BoundCmp.Equal`` and number of bytes + ## written to destination stream ``wsource`` is less than ``boundSize`` - + ## ``BoundedStreamIncompleteError`` will be raised on stream finishing. But + ## comparison operator ``BoundCmp.LessOrEqual`` allows to send less bytes + ## without ``BoundedStreamIncompleteError`` exception. + ## + ## For both comparison operators any attempt to write more bytes than + ## ``boundSize`` will be interrupted with ``BoundedStreamOverflowError`` + ## exception. + var res = BoundedStreamWriter() + res.init(wsource, boundSize, comparison, queueSize) res diff --git a/chronos/transports/stream.nim b/chronos/transports/stream.nim index 9c5f7bb..baf00c7 100644 --- a/chronos/transports/stream.nim +++ b/chronos/transports/stream.nim @@ -2458,7 +2458,20 @@ proc closeWait*(transp: StreamTransport): Future[void] = proc closed*(transp: StreamTransport): bool {.inline.} = ## Returns ``true`` if transport in closed state. - result = ({ReadClosed, WriteClosed} * transp.state != {}) + ({ReadClosed, WriteClosed} * transp.state != {}) + +proc finished*(transp: StreamTransport): bool {.inline.} = + ## Returns ``true`` if transport in finished (EOF) state. + ({ReadEof, WriteEof} * transp.state != {}) + +proc failed*(transp: StreamTransport): bool {.inline.} = + ## Returns ``true`` if transport in error state. + ({ReadError, WriteError} * transp.state != {}) + +proc running*(transp: StreamTransport): bool {.inline.} = + ## Returns ``true`` if transport is still pending. + ({ReadClosed, ReadEof, ReadError, + WriteClosed, WriteEof, WriteError} * transp.state == {}) proc fromPipe*(fd: AsyncFD, child: StreamTransport = nil, bufferSize = DefaultStreamBufferSize): StreamTransport {. diff --git a/tests/testasyncstream.nim b/tests/testasyncstream.nim index adc781f..208aa77 100644 --- a/tests/testasyncstream.nim +++ b/tests/testasyncstream.nim @@ -947,13 +947,13 @@ suite "BoundedStream test suite": var rstream = newAsyncStreamReader(conn) case btest of BoundaryRead: - var rbstream = newBoundedStreamReader(rstream, -1, boundary) + var rbstream = newBoundedStreamReader(rstream, boundary) let response = await rbstream.read() if response == message: res = true await rbstream.closeWait() of BoundaryDouble: - var rbstream = newBoundedStreamReader(rstream, -1, boundary) + var rbstream = newBoundedStreamReader(rstream, boundary) let response1 = await rbstream.read() await rbstream.closeWait() let response2 = await rstream.read() @@ -963,20 +963,20 @@ suite "BoundedStream test suite": var expectMessage = message expectMessage[^2] = 0x2D'u8 expectMessage[^1] = 0x2D'u8 - var rbstream = newBoundedStreamReader(rstream, size, boundary) + var rbstream = newBoundedStreamReader(rstream, uint64(size), boundary) let response = await rbstream.read() await rbstream.closeWait() if (len(response) == size) and response == expectMessage: res = true of BoundaryIncomplete: - var rbstream = newBoundedStreamReader(rstream, -1, boundary) + var rbstream = newBoundedStreamReader(rstream, boundary) try: let response {.used.} = await rbstream.read() except BoundedStreamIncompleteError: res = true await rbstream.closeWait() of BoundaryEmpty: - var rbstream = newBoundedStreamReader(rstream, -1, boundary) + var rbstream = newBoundedStreamReader(rstream, boundary) let response = await rbstream.read() await rbstream.closeWait() if len(response) == 0: @@ -1001,7 +1001,8 @@ suite "BoundedStream test suite": proc processClient(server: StreamServer, transp: StreamTransport) {.async.} = var wstream = newAsyncStreamWriter(transp) - var wbstream = newBoundedStreamWriter(wstream, size, comparison = cmp) + var wbstream = newBoundedStreamWriter(wstream, uint64(size), + comparison = cmp) case stest of SizeReadWrite: for i in 0 ..< 10: @@ -1058,7 +1059,8 @@ suite "BoundedStream test suite": server.start() var conn = await connect(address) var rstream = newAsyncStreamReader(conn) - var rbstream = newBoundedStreamReader(rstream, size, comparison = cmp) + var rbstream = newBoundedStreamReader(rstream, uint64(size), + comparison = cmp) case stest of SizeReadWrite: let response = await rbstream.read() @@ -1153,7 +1155,7 @@ suite "BoundedStream test suite": proc serveClient(server: StreamServer, transp: StreamTransport) {.async.} = var wstream = newAsyncStreamWriter(transp) - var wstream2 = newBoundedStreamWriter(wstream, len(inputstr)) + var wstream2 = newBoundedStreamWriter(wstream, uint64(len(inputstr))) var data = inputstr var offset = 0 while true: