Refactor and optimization of BoundedStream. (#180)

Documentation for BoundedStream.
Fix HttpServer bounding.
This commit is contained in:
Eugene Kabanov 2021-04-26 14:05:37 +03:00 committed by GitHub
parent 833d968782
commit 39f4060e07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 468 additions and 178 deletions

View File

@ -6,7 +6,8 @@
# Licensed under either of # Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2) # Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT) # MIT license (LICENSE-MIT)
import stew/results, httputils, strutils, uri import std/[strutils, uri]
import stew/results, httputils
import ../../asyncloop, ../../asyncsync import ../../asyncloop, ../../asyncsync
import ../../streams/[asyncstream, boundstream] import ../../streams/[asyncstream, boundstream]
export results, httputils, strutils export results, httputils, strutils
@ -15,6 +16,8 @@ const
HeadersMark* = @[byte(0x0D), byte(0x0A), byte(0x0D), byte(0x0A)] HeadersMark* = @[byte(0x0D), byte(0x0A), byte(0x0D), byte(0x0A)]
PostMethods* = {MethodPost, MethodPatch, MethodPut, MethodDelete} PostMethods* = {MethodPost, MethodPatch, MethodPut, MethodDelete}
MaximumBodySizeError* = "Maximum size of request's body reached"
type type
HttpResult*[T] = Result[T, string] HttpResult*[T] = Result[T, string]
HttpResultCode*[T] = Result[T, HttpCode] HttpResultCode*[T] = Result[T, HttpCode]
@ -48,20 +51,29 @@ proc newHttpBodyReader*(streams: varargs[AsyncStreamReader]): HttpBodyReader =
proc closeWait*(bstream: HttpBodyReader) {.async.} = proc closeWait*(bstream: HttpBodyReader) {.async.} =
## Close and free resource allocated by body reader. ## Close and free resource allocated by body reader.
if len(bstream.streams) > 0:
var res = newSeq[Future[void]]() var res = newSeq[Future[void]]()
for item in bstream.streams.items(): # We closing streams in reversed order because stream at position [0], uses
res.add(item.closeWait()) # data from stream at position [1].
for index in countdown((len(bstream.streams) - 1), 0):
res.add(bstream.streams[index].closeWait())
await allFutures(res) await allFutures(res)
await procCall(AsyncStreamReader(bstream).closeWait()) await procCall(closeWait(AsyncStreamReader(bstream)))
proc atBound*(bstream: HttpBodyReader): bool {. proc hasOverflow*(bstream: HttpBodyReader): bool {.raises: [Defect].} =
raises: [Defect].} = if len(bstream.streams) == 1:
## Returns ``true`` if lowest stream is at EOF. # If HttpBodyReader has only one stream it has ``BoundedStreamReader``, in
let lreader = bstream.streams[^1] # such case its impossible to get more bytes then expected amount.
doAssert(lreader of BoundedStreamReader) false
let breader = cast[BoundedStreamReader](lreader) else:
breader.atEof() and (breader.bytesLeft() == 0) # If HttpBodyReader has two or more streams, we check if
# ``BoundedStreamReader`` at EOF.
if bstream.streams[0].atEof():
for i in 1 ..< len(bstream.streams):
if not(bstream.streams[1].atEof()):
return true
false
else:
false
proc raiseHttpCriticalError*(msg: string, proc raiseHttpCriticalError*(msg: string,
code = Http400) {.noinline, noreturn.} = code = Http400) {.noinline, noreturn.} =

View File

@ -365,14 +365,14 @@ proc getBodyReader*(request: HttpRequestRef): HttpResult[HttpBodyReader] =
## leaks. ## leaks.
if HttpRequestFlags.BoundBody in request.requestFlags: if HttpRequestFlags.BoundBody in request.requestFlags:
let bstream = newBoundedStreamReader(request.connection.reader, let bstream = newBoundedStreamReader(request.connection.reader,
request.contentLength) uint64(request.contentLength))
ok(newHttpBodyReader(bstream)) ok(newHttpBodyReader(bstream))
elif HttpRequestFlags.UnboundBody in request.requestFlags: elif HttpRequestFlags.UnboundBody in request.requestFlags:
let maxBodySize = request.connection.server.maxRequestBodySize let maxBodySize = request.connection.server.maxRequestBodySize
let bstream = newBoundedStreamReader(request.connection.reader, maxBodySize, let cstream = newChunkedStreamReader(request.connection.reader)
let bstream = newBoundedStreamReader(cstream, uint64(maxBodySize),
comparison = BoundCmp.LessOrEqual) comparison = BoundCmp.LessOrEqual)
let cstream = newChunkedStreamReader(bstream) ok(newHttpBodyReader(bstream, cstream))
ok(newHttpBodyReader(cstream, bstream))
else: else:
err("Request do not have body available") err("Request do not have body available")
@ -399,11 +399,11 @@ proc getBody*(request: HttpRequestRef): Future[seq[byte]] {.async.} =
let reader = res.get() let reader = res.get()
try: try:
await request.handleExpect() await request.handleExpect()
return await reader.read() var res = await reader.read()
if reader.hasOverflow():
raiseHttpCriticalError(MaximumBodySizeError, Http413)
return res
except AsyncStreamError: except AsyncStreamError:
if reader.atBound():
raiseHttpCriticalError("Maximum size of body reached", Http413)
else:
raiseHttpCriticalError("Unable to read request's body") raiseHttpCriticalError("Unable to read request's body")
finally: finally:
await closeWait(res.get()) await closeWait(res.get())
@ -418,10 +418,9 @@ proc consumeBody*(request: HttpRequestRef): Future[void] {.async.} =
try: try:
await request.handleExpect() await request.handleExpect()
discard await reader.consume() discard await reader.consume()
if reader.hasOverflow():
raiseHttpCriticalError(MaximumBodySizeError, Http413)
except AsyncStreamError: except AsyncStreamError:
if reader.atBound():
raiseHttpCriticalError("Maximum size of body reached", Http413)
else:
raiseHttpCriticalError("Unable to read request's body") raiseHttpCriticalError("Unable to read request's body")
finally: finally:
await closeWait(res.get()) await closeWait(res.get())

View File

@ -14,6 +14,9 @@ import ../../streams/[asyncstream, boundstream, chunkstream]
import httptable, httpcommon import httptable, httpcommon
export httptable, httpcommon, asyncstream export httptable, httpcommon, asyncstream
const
UnableToReadMultipartBody = "Unable to read multipart message body"
type type
MultiPartSource* {.pure.} = enum MultiPartSource* {.pure.} = enum
Stream, Buffer Stream, Buffer
@ -165,10 +168,10 @@ proc readPart*(mpr: MultiPartReaderRef): Future[MultiPart] {.async.} =
except CancelledError as exc: except CancelledError as exc:
raise exc raise exc
except AsyncStreamError: except AsyncStreamError:
if mpr.stream.atBound(): if mpr.stream.hasOverflow():
raiseHttpCriticalError("Maximum size of body reached", Http413) raiseHttpCriticalError(MaximumBodySizeError, Http413)
else: else:
raiseHttpCriticalError("Unable to read multipart body") raiseHttpCriticalError(UnableToReadMultipartBody)
# Reading part's headers # Reading part's headers
try: try:
@ -199,7 +202,7 @@ proc readPart*(mpr: MultiPartReaderRef): Future[MultiPart] {.async.} =
kind: MultiPartSource.Stream, kind: MultiPartSource.Stream,
headers: HttpTable.init(), headers: HttpTable.init(),
breader: mpr.stream, breader: mpr.stream,
stream: newBoundedStreamReader(mpr.stream, -1, mpr.boundary), stream: newBoundedStreamReader(mpr.stream, mpr.boundary),
counter: mpr.counter counter: mpr.counter
) )
@ -214,14 +217,10 @@ proc readPart*(mpr: MultiPartReaderRef): Future[MultiPart] {.async.} =
except CancelledError as exc: except CancelledError as exc:
raise exc raise exc
except AsyncStreamError: except AsyncStreamError:
if mpr.stream.atBound(): if mpr.stream.hasOverflow():
raiseHttpCriticalError("Maximum size of body reached", Http413) raiseHttpCriticalError(MaximumBodySizeError, Http413)
else: else:
raiseHttpCriticalError("Unable to read multipart body") raiseHttpCriticalError(UnableToReadMultipartBody)
proc atBound*(mp: MultiPart): bool =
## Returns ``true`` if MultiPart's stream reached request body maximum size.
mp.breader.atBound()
proc getBody*(mp: MultiPart): Future[seq[byte]] {.async.} = proc getBody*(mp: MultiPart): Future[seq[byte]] {.async.} =
## Get multipart's ``mp`` value as sequence of bytes. ## Get multipart's ``mp`` value as sequence of bytes.
@ -231,10 +230,10 @@ proc getBody*(mp: MultiPart): Future[seq[byte]] {.async.} =
let res = await mp.stream.read() let res = await mp.stream.read()
return res return res
except AsyncStreamError: except AsyncStreamError:
if mp.breader.atBound(): if mp.breader.hasOverflow():
raiseHttpCriticalError("Maximum size of body reached", Http413) raiseHttpCriticalError(MaximumBodySizeError, Http413)
else: else:
raiseHttpCriticalError("Unable to read multipart body") raiseHttpCriticalError(UnableToReadMultipartBody)
of MultiPartSource.Buffer: of MultiPartSource.Buffer:
return mp.buffer return mp.buffer
@ -245,10 +244,10 @@ proc consumeBody*(mp: MultiPart) {.async.} =
try: try:
discard await mp.stream.consume() discard await mp.stream.consume()
except AsyncStreamError: except AsyncStreamError:
if mp.breader.atBound(): if mp.breader.hasOverflow():
raiseHttpCriticalError("Maximum size of body reached", Http413) raiseHttpCriticalError(MaximumBodySizeError, Http413)
else: else:
raiseHttpCriticalError("Unable to consume multipart body") raiseHttpCriticalError(UnableToReadMultipartBody)
of MultiPartSource.Buffer: of MultiPartSource.Buffer:
discard discard

View File

@ -238,31 +238,115 @@ template checkStreamClosed*(t: untyped) =
proc atEof*(rstream: AsyncStreamReader): bool = proc atEof*(rstream: AsyncStreamReader): bool =
## Returns ``true`` is reading stream is closed or finished and internal ## Returns ``true`` is reading stream is closed or finished and internal
## buffer do not have any bytes left. ## buffer do not have any bytes left.
if isNil(rstream.readerLoop):
if isNil(rstream.rsource):
rstream.tsource.atEof()
else:
rstream.rsource.atEof()
else:
rstream.state in {AsyncStreamState.Stopped, Finished, Closed, Error} and rstream.state in {AsyncStreamState.Stopped, Finished, Closed, Error} and
(rstream.buffer.dataLen() == 0) (rstream.buffer.dataLen() == 0)
proc atEof*(wstream: AsyncStreamWriter): bool = proc atEof*(wstream: AsyncStreamWriter): bool =
## Returns ``true`` is writing stream ``wstream`` closed or finished. ## Returns ``true`` is writing stream ``wstream`` closed or finished.
wstream.state in {AsyncStreamState.Stopped, Finished, Closed} if isNil(wstream.writerLoop):
if isNil(wstream.wsource):
wstream.tsource.atEof()
else:
wstream.wsource.atEof()
else:
wstream.state in {AsyncStreamState.Stopped, Finished, Closed, Error}
proc closed*(rw: AsyncStreamRW): bool {.inline.} = proc closed*(reader: AsyncStreamReader): bool =
## Returns ``true`` is reading/writing stream is closed. ## Returns ``true`` is reading/writing stream is closed.
(rw.state == AsyncStreamState.Closed) (reader.state == AsyncStreamState.Closed)
proc finished*(rw: AsyncStreamRW): bool {.inline.} = proc finished*(reader: AsyncStreamReader): bool =
## Returns ``true`` is reading/writing stream is finished (completed). ## Returns ``true`` is reading/writing stream is finished (completed).
(rw.state == AsyncStreamState.Finished) if isNil(reader.readerLoop):
if isNil(reader.rsource):
reader.tsource.finished()
else:
reader.rsource.finished()
else:
(reader.state == AsyncStreamState.Finished)
proc stopped*(rw: AsyncStreamRW): bool {.inline.} = proc stopped*(reader: AsyncStreamReader): bool =
## Returns ``true`` is reading/writing stream is stopped (interrupted). ## Returns ``true`` is reading/writing stream is stopped (interrupted).
(rw.state == AsyncStreamState.Stopped) if isNil(reader.readerLoop):
if isNil(reader.rsource):
false
else:
reader.rsource.stopped()
else:
(reader.state == AsyncStreamState.Stopped)
proc running*(rw: AsyncStreamRW): bool {.inline.} = proc running*(reader: AsyncStreamReader): bool =
## Returns ``true`` is reading/writing stream is still pending. ## Returns ``true`` is reading/writing stream is still pending.
(rw.state == AsyncStreamState.Running) if isNil(reader.readerLoop):
if isNil(reader.rsource):
reader.tsource.running()
else:
reader.rsource.running()
else:
(reader.state == AsyncStreamState.Running)
proc setupAsyncStreamReaderTracker(): AsyncStreamTracker {.gcsafe, raises: [Defect].} proc failed*(reader: AsyncStreamReader): bool =
proc setupAsyncStreamWriterTracker(): AsyncStreamTracker {.gcsafe, raises: [Defect].} if isNil(reader.readerLoop):
if isNil(reader.rsource):
reader.tsource.failed()
else:
reader.rsource.failed()
else:
(reader.state == AsyncStreamState.Error)
proc closed*(writer: AsyncStreamWriter): bool =
## Returns ``true`` is reading/writing stream is closed.
(writer.state == AsyncStreamState.Closed)
proc finished*(writer: AsyncStreamWriter): bool =
## Returns ``true`` is reading/writing stream is finished (completed).
if isNil(writer.writerLoop):
if isNil(writer.wsource):
writer.tsource.finished()
else:
writer.wsource.finished()
else:
(writer.state == AsyncStreamState.Finished)
proc stopped*(writer: AsyncStreamWriter): bool =
## Returns ``true`` is reading/writing stream is stopped (interrupted).
if isNil(writer.writerLoop):
if isNil(writer.wsource):
false
else:
writer.wsource.stopped()
else:
(writer.state == AsyncStreamState.Stopped)
proc running*(writer: AsyncStreamWriter): bool =
## Returns ``true`` is reading/writing stream is still pending.
if isNil(writer.writerLoop):
if isNil(writer.wsource):
writer.tsource.running()
else:
writer.wsource.running()
else:
(writer.state == AsyncStreamState.Running)
proc failed*(writer: AsyncStreamWriter): bool =
if isNil(writer.writerLoop):
if isNil(writer.wsource):
writer.tsource.failed()
else:
writer.wsource.failed()
else:
(writer.state == AsyncStreamState.Error)
proc setupAsyncStreamReaderTracker(): AsyncStreamTracker {.
gcsafe, raises: [Defect].}
proc setupAsyncStreamWriterTracker(): AsyncStreamTracker {.
gcsafe, raises: [Defect].}
proc getAsyncStreamReaderTracker(): AsyncStreamTracker {.inline.} = proc getAsyncStreamReaderTracker(): AsyncStreamTracker {.inline.} =
var res = cast[AsyncStreamTracker](getTracker(AsyncStreamReaderTrackerName)) var res = cast[AsyncStreamTracker](getTracker(AsyncStreamReaderTrackerName))

View File

@ -14,6 +14,7 @@
## ##
## For stream writing it means that you should write exactly bounded size ## For stream writing it means that you should write exactly bounded size
## of bytes. ## of bytes.
import std/options
import ../asyncloop, ../timer import ../asyncloop, ../timer
import asyncstream, ../transports/stream, ../transports/common import asyncstream, ../transports/stream, ../transports/common
export asyncstream, stream, timer, common export asyncstream, stream, timer, common
@ -23,14 +24,14 @@ type
Equal, LessOrEqual Equal, LessOrEqual
BoundedStreamReader* = ref object of AsyncStreamReader BoundedStreamReader* = ref object of AsyncStreamReader
boundSize: int boundSize: Option[uint64]
boundary: seq[byte] boundary: seq[byte]
offset: int offset: uint64
cmpop: BoundCmp cmpop: BoundCmp
BoundedStreamWriter* = ref object of AsyncStreamWriter BoundedStreamWriter* = ref object of AsyncStreamWriter
boundSize: int boundSize: uint64
offset: int offset: uint64
cmpop: BoundCmp cmpop: BoundCmp
BoundedStreamError* = object of AsyncStreamError BoundedStreamError* = object of AsyncStreamError
@ -41,12 +42,17 @@ type
const const
BoundedBufferSize* = 4096 BoundedBufferSize* = 4096
BoundSizeDefectMessage = "Bound size must be bigger than zero"
BoundarySizeDefectMessage = "Boundary must not be empty array"
proc newBoundedStreamIncompleteError*(): ref BoundedStreamError {.noinline.} = template newBoundedStreamIncompleteError(): ref BoundedStreamError =
newException(BoundedStreamIncompleteError, newException(BoundedStreamIncompleteError,
"Stream boundary is not reached yet") "Stream boundary is not reached yet")
proc readUntilBoundary*(rstream: AsyncStreamReader, pbytes: pointer, template newBoundedStreamOverflowError(): ref BoundedStreamOverflowError =
newException(BoundedStreamOverflowError, "Stream boundary exceeded")
proc readUntilBoundary(rstream: AsyncStreamReader, pbytes: pointer,
nbytes: int, sep: seq[byte]): Future[int] {.async.} = nbytes: int, sep: seq[byte]): Future[int] {.async.} =
doAssert(not(isNil(pbytes)), "pbytes must not be nil") doAssert(not(isNil(pbytes)), "pbytes must not be nil")
doAssert(nbytes >= 0, "nbytes must be non-negative value") doAssert(nbytes >= 0, "nbytes must be non-negative value")
@ -96,16 +102,11 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async.} =
rstream.state = AsyncStreamState.Running rstream.state = AsyncStreamState.Running
var buffer = newSeq[byte](rstream.buffer.bufferLen()) var buffer = newSeq[byte](rstream.buffer.bufferLen())
while true: while true:
# r1 is `true` if `boundSize` was not set.
let r1 = rstream.boundSize < 0
# r2 is `true` if number of bytes read is less then `boundSize`.
let r2 = (rstream.boundSize > 0) and (rstream.offset < rstream.boundSize)
if r1 or r2:
let toRead = let toRead =
if rstream.boundSize < 0: if rstream.boundSize.isNone():
len(buffer) len(buffer)
else: else:
min(rstream.boundSize - rstream.offset, len(buffer)) int(min(rstream.boundSize.get() - rstream.offset, uint64(len(buffer))))
try: try:
let res = await readUntilBoundary(rstream.rsource, addr buffer[0], let res = await readUntilBoundary(rstream.rsource, addr buffer[0],
toRead, rstream.boundary) toRead, rstream.boundary)
@ -113,9 +114,9 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async.} =
if len(rstream.boundary) > 0: if len(rstream.boundary) > 0:
if endsWith(buffer.toOpenArray(0, res - 1), rstream.boundary): if endsWith(buffer.toOpenArray(0, res - 1), rstream.boundary):
let length = res - len(rstream.boundary) let length = res - len(rstream.boundary)
rstream.offset = rstream.offset + length rstream.offset = rstream.offset + uint64(length)
rstream.state = AsyncStreamState.Finished
await upload(addr rstream.buffer, addr buffer[0], length) await upload(addr rstream.buffer, addr buffer[0], length)
rstream.state = AsyncStreamState.Finished
else: else:
if (res < toRead) and rstream.rsource.atEof(): if (res < toRead) and rstream.rsource.atEof():
case rstream.cmpop case rstream.cmpop
@ -124,59 +125,64 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async.} =
rstream.error = newBoundedStreamIncompleteError() rstream.error = newBoundedStreamIncompleteError()
of BoundCmp.LessOrEqual: of BoundCmp.LessOrEqual:
rstream.state = AsyncStreamState.Finished rstream.state = AsyncStreamState.Finished
rstream.offset = rstream.offset + res
await upload(addr rstream.buffer, addr buffer[0], res)
else:
if (res < toRead) and rstream.rsource.atEof():
case rstream.cmpop
of BoundCmp.Equal:
rstream.state = AsyncStreamState.Error
rstream.error = newBoundedStreamIncompleteError()
of BoundCmp.LessOrEqual:
rstream.state = AsyncStreamState.Finished
rstream.offset = rstream.offset + res
await upload(addr rstream.buffer, addr buffer[0], res)
else:
case rstream.cmpop
of BoundCmp.Equal:
rstream.state = AsyncStreamState.Error
rstream.error = newBoundedStreamIncompleteError()
of BoundCmp.LessOrEqual:
rstream.state = AsyncStreamState.Finished
except AsyncStreamReadError as exc: rstream.offset = rstream.offset + uint64(res)
if rstream.boundSize.isSome() and
(rstream.offset == rstream.boundSize.get()):
# This is "fast-path" to avoid one more iteration until EOF
rstream.state = AsyncStreamState.Finished
await upload(addr rstream.buffer, addr buffer[0], res)
else:
if (res < toRead) and rstream.rsource.atEof():
case rstream.cmpop
of BoundCmp.Equal:
rstream.state = AsyncStreamState.Error
rstream.error = newBoundedStreamIncompleteError()
of BoundCmp.LessOrEqual:
rstream.state = AsyncStreamState.Finished
rstream.offset = rstream.offset + uint64(res)
if rstream.boundSize.isSome() and
(rstream.offset == rstream.boundSize.get()):
# This is "fast-path" to avoid one more iteration until EOF
rstream.state = AsyncStreamState.Finished
await upload(addr rstream.buffer, addr buffer[0], res)
else:
case rstream.cmpop
of BoundCmp.Equal:
rstream.state = AsyncStreamState.Error
rstream.error = newBoundedStreamIncompleteError()
of BoundCmp.LessOrEqual:
rstream.state = AsyncStreamState.Finished
await rstream.buffer.transfer()
except AsyncStreamError as exc:
rstream.state = AsyncStreamState.Error rstream.state = AsyncStreamState.Error
rstream.error = exc rstream.error = exc
except CancelledError: except CancelledError:
rstream.state = AsyncStreamState.Stopped rstream.state = AsyncStreamState.Error
rstream.error = newAsyncStreamUseClosedError()
if rstream.state != AsyncStreamState.Running: case rstream.state
if rstream.state == AsyncStreamState.Finished: of AsyncStreamState.Running:
# This is state when BoundCmp.LessOrEqual and readExactly returned discard
# `AsyncStreamIncompleteError`. of AsyncStreamState.Error, AsyncStreamState.Stopped:
await rstream.buffer.transfer()
break
else:
rstream.state = AsyncStreamState.Finished
break
# We need to notify consumer about error/close, but we do not care about
# incoming data anymore.
rstream.buffer.forget() rstream.buffer.forget()
break
of AsyncStreamState.Finished, AsyncStreamState.Closed:
break
proc boundedWriteLoop(stream: AsyncStreamWriter) {.async.} = proc boundedWriteLoop(stream: AsyncStreamWriter) {.async.} =
var error: ref AsyncStreamError
var wstream = BoundedStreamWriter(stream) var wstream = BoundedStreamWriter(stream)
wstream.state = AsyncStreamState.Running wstream.state = AsyncStreamState.Running
while true: while true:
var var item: WriteItem
item: WriteItem
error: ref AsyncStreamError
try: try:
item = await wstream.queue.get() item = await wstream.queue.get()
if item.size > 0: if item.size > 0:
if item.size <= (wstream.boundSize - wstream.offset): if uint64(item.size) <= (wstream.boundSize - wstream.offset):
# Writing chunk data. # Writing chunk data.
case item.kind case item.kind
of WriteType.Pointer: of WriteType.Pointer:
@ -185,14 +191,16 @@ proc boundedWriteLoop(stream: AsyncStreamWriter) {.async.} =
await wstream.wsource.write(addr item.dataSeq[0], item.size) await wstream.wsource.write(addr item.dataSeq[0], item.size)
of WriteType.String: of WriteType.String:
await wstream.wsource.write(addr item.dataStr[0], item.size) await wstream.wsource.write(addr item.dataStr[0], item.size)
wstream.offset = wstream.offset + item.size wstream.offset = wstream.offset + uint64(item.size)
item.future.complete() item.future.complete()
else: else:
wstream.state = AsyncStreamState.Error wstream.state = AsyncStreamState.Error
error = newException(BoundedStreamOverflowError, error = newBoundedStreamOverflowError()
"Stream boundary exceeded") else:
if wstream.offset == wstream.boundSize:
wstream.state = AsyncStreamState.Finished
item.future.complete()
else: else:
if wstream.offset != wstream.boundSize:
case wstream.cmpop case wstream.cmpop
of BoundCmp.Equal: of BoundCmp.Equal:
wstream.state = AsyncStreamState.Error wstream.state = AsyncStreamState.Error
@ -200,96 +208,269 @@ proc boundedWriteLoop(stream: AsyncStreamWriter) {.async.} =
of BoundCmp.LessOrEqual: of BoundCmp.LessOrEqual:
wstream.state = AsyncStreamState.Finished wstream.state = AsyncStreamState.Finished
item.future.complete() item.future.complete()
else:
wstream.state = AsyncStreamState.Finished
item.future.complete()
except CancelledError: except CancelledError:
wstream.state = AsyncStreamState.Stopped wstream.state = AsyncStreamState.Stopped
error = newAsyncStreamUseClosedError() error = newAsyncStreamUseClosedError()
except AsyncStreamWriteError as exc: except AsyncStreamError as exc:
wstream.state = AsyncStreamState.Error
error = exc
except AsyncStreamIncompleteError as exc:
wstream.state = AsyncStreamState.Error wstream.state = AsyncStreamState.Error
error = exc error = exc
if wstream.state != AsyncStreamState.Running: case wstream.state
if wstream.state == AsyncStreamState.Finished: of AsyncStreamState.Running:
error = newAsyncStreamUseClosedError() discard
else: of AsyncStreamState.Error, AsyncStreamState.Stopped:
if not(isNil(item.future)): if not(isNil(item.future)):
if not(item.future.finished()): if not(item.future.finished()):
item.future.fail(error) item.future.fail(error)
while not(wstream.queue.empty()):
let pitem = wstream.queue.popFirstNoWait()
if not(pitem.future.finished()):
pitem.future.fail(error)
break break
of AsyncStreamState.Finished, AsyncStreamState.Closed:
error = newAsyncStreamUseClosedError()
break
doAssert(not(isNil(error)))
while not(wstream.queue.empty()):
let item = wstream.queue.popFirstNoWait()
if not(item.future.finished()):
item.future.fail(error)
proc bytesLeft*(stream: BoundedStreamRW): uint64 = proc bytesLeft*(stream: BoundedStreamRW): uint64 =
## Returns number of bytes left in stream. ## Returns number of bytes left in stream.
uint64(stream.boundSize) - stream.bytesCount if stream.boundSize.isSome():
stream.boundSize.get() - stream.bytesCount
else:
0'u64
proc init*[T](child: BoundedStreamReader, rsource: AsyncStreamReader, proc init*[T](child: BoundedStreamReader, rsource: AsyncStreamReader,
boundSize: uint64, comparison = BoundCmp.Equal,
bufferSize = BoundedBufferSize, udata: ref T) = bufferSize = BoundedBufferSize, udata: ref T) =
doAssert(boundSize > 0'u64, BoundSizeDefectMessage)
child.boundSize = some(boundSize)
child.cmpop = comparison
init(AsyncStreamReader(child), rsource, boundedReadLoop, bufferSize,
udata)
proc init*[T](child: BoundedStreamReader, rsource: AsyncStreamReader,
boundary: openarray[byte], comparison = BoundCmp.Equal,
bufferSize = BoundedBufferSize, udata: ref T) =
doAssert(len(boundary) > 0, BoundarySizeDefectMessage)
child.boundary = @boundary
child.cmpop = comparison
init(AsyncStreamReader(child), rsource, boundedReadLoop, bufferSize,
udata)
proc init*[T](child: BoundedStreamReader, rsource: AsyncStreamReader,
boundSize: uint64, boundary: openarray[byte],
comparison = BoundCmp.Equal,
bufferSize = BoundedBufferSize, udata: ref T) =
doAssert(boundSize > 0'u64, BoundSizeDefectMessage)
doAssert(len(boundary) > 0, BoundarySizeDefectMessage)
child.boundSize = some(boundSize)
child.boundary = @boundary
child.cmpop = comparison
init(AsyncStreamReader(child), rsource, boundedReadLoop, bufferSize, init(AsyncStreamReader(child), rsource, boundedReadLoop, bufferSize,
udata) udata)
proc init*(child: BoundedStreamReader, rsource: AsyncStreamReader, proc init*(child: BoundedStreamReader, rsource: AsyncStreamReader,
boundSize: uint64, comparison = BoundCmp.Equal,
bufferSize = BoundedBufferSize) = bufferSize = BoundedBufferSize) =
doAssert(boundSize > 0'u64, BoundSizeDefectMessage)
child.boundSize = some(boundSize)
child.cmpop = comparison
init(AsyncStreamReader(child), rsource, boundedReadLoop, bufferSize)
proc init*(child: BoundedStreamReader, rsource: AsyncStreamReader,
boundary: openarray[byte], comparison = BoundCmp.Equal,
bufferSize = BoundedBufferSize) =
doAssert(len(boundary) > 0, BoundarySizeDefectMessage)
child.boundary = @boundary
child.cmpop = comparison
init(AsyncStreamReader(child), rsource, boundedReadLoop, bufferSize)
proc init*(child: BoundedStreamReader, rsource: AsyncStreamReader,
boundSize: uint64, boundary: openarray[byte],
comparison = BoundCmp.Equal, bufferSize = BoundedBufferSize) =
doAssert(boundSize > 0'u64, BoundSizeDefectMessage)
doAssert(len(boundary) > 0, BoundarySizeDefectMessage)
child.boundSize = some(boundSize)
child.boundary = @boundary
child.cmpop = comparison
init(AsyncStreamReader(child), rsource, boundedReadLoop, bufferSize) init(AsyncStreamReader(child), rsource, boundedReadLoop, bufferSize)
proc newBoundedStreamReader*[T](rsource: AsyncStreamReader, proc newBoundedStreamReader*[T](rsource: AsyncStreamReader,
boundSize: int, boundSize: uint64,
boundary: openarray[byte] = [],
comparison = BoundCmp.Equal, comparison = BoundCmp.Equal,
bufferSize = BoundedBufferSize, bufferSize = BoundedBufferSize,
udata: ref T): BoundedStreamReader = udata: ref T): BoundedStreamReader =
doAssert(not(boundSize <= 0 and (len(boundary) == 0)), ## Create new stream reader which will be limited by size ``boundSize``. When
"At least one type of boundary should be set") ## number of bytes readed by consumer reaches ``boundSize``,
var res = BoundedStreamReader(boundSize: boundSize, boundary: @boundary, ## BoundedStreamReader will enter EOF state (no more bytes will be returned
cmpop: comparison) ## to the consumer).
res.init(rsource, bufferSize, udata) ##
## If ``comparison`` operator is ``BoundCmp.Equal`` and number of bytes readed
## from source stream reader ``rsource`` is less than ``boundSize`` -
## ``BoundedStreamIncompleteError`` will be raised. But comparison operator
## ``BoundCmp.LessOrEqual`` allows to consume less bytes without
## ``BoundedStreamIncompleteError`` exception.
var res = BoundedStreamReader()
res.init(rsource, boundSize, comparison, bufferSize, udata)
res
proc newBoundedStreamReader*[T](rsource: AsyncStreamReader,
boundary: openarray[byte],
comparison = BoundCmp.Equal,
bufferSize = BoundedBufferSize,
udata: ref T): BoundedStreamReader =
## Create new stream reader which will be limited by binary boundary
## ``boundary``. As soon as reader reaches ``boundary`` BoundedStreamReader
## will enter EOF state (no more bytes will be returned to the consumer).
##
## If ``comparison`` operator is ``BoundCmp.Equal`` and number of bytes readed
## from source stream reader ``rsource`` is less than ``boundSize`` -
## ``BoundedStreamIncompleteError`` will be raised. But comparison operator
## ``BoundCmp.LessOrEqual`` allows to consume less bytes without
## ``BoundedStreamIncompleteError`` exception.
var res = BoundedStreamReader()
res.init(rsource, boundary, comparison, bufferSize, udata)
res
proc newBoundedStreamReader*[T](rsource: AsyncStreamReader,
boundSize: uint64,
boundary: openarray[byte],
comparison = BoundCmp.Equal,
bufferSize = BoundedBufferSize,
udata: ref T): BoundedStreamReader =
## Create new stream reader which will be limited by size ``boundSize`` or
## boundary ``boundary``. As soon as reader reaches ``boundary`` ``OR`` number
## of bytes readed from source stream reader ``rsource`` reaches ``boundSize``
## BoundStreamReader will enter EOF state (no more bytes will be returned to
## the consumer).
##
## If ``comparison`` operator is ``BoundCmp.Equal`` and number of bytes readed
## from source stream reader ``rsource`` is less than ``boundSize`` -
## ``BoundedStreamIncompleteError`` will be raised. But comparison operator
## ``BoundCmp.LessOrEqual`` allows to consume less bytes without
## ``BoundedStreamIncompleteError`` exception.
var res = BoundedStreamReader()
res.init(rsource, boundSize, boundary, comparison, bufferSize, udata)
res res
proc newBoundedStreamReader*(rsource: AsyncStreamReader, proc newBoundedStreamReader*(rsource: AsyncStreamReader,
boundSize: int, boundSize: uint64,
boundary: openarray[byte] = [],
comparison = BoundCmp.Equal, comparison = BoundCmp.Equal,
bufferSize = BoundedBufferSize, bufferSize = BoundedBufferSize,
): BoundedStreamReader = ): BoundedStreamReader =
doAssert(not(boundSize <= 0 and (len(boundary) == 0)), ## Create new stream reader which will be limited by size ``boundSize``. When
"At least one type of boundary should be set") ## number of bytes readed by consumer reaches ``boundSize``,
var res = BoundedStreamReader(boundSize: boundSize, boundary: @boundary, ## BoundedStreamReader will enter EOF state (no more bytes will be returned
cmpop: comparison) ## to the consumer).
res.init(rsource, bufferSize) ##
## If ``comparison`` operator is ``BoundCmp.Equal`` and number of bytes readed
## from source stream reader ``rsource`` is less than ``boundSize`` -
## ``BoundedStreamIncompleteError`` will be raised. But comparison operator
## ``BoundCmp.LessOrEqual`` allows to consume less bytes without
## ``BoundedStreamIncompleteError`` exception.
var res = BoundedStreamReader()
res.init(rsource, boundSize, comparison, bufferSize)
res
proc newBoundedStreamReader*(rsource: AsyncStreamReader,
boundary: openarray[byte],
comparison = BoundCmp.Equal,
bufferSize = BoundedBufferSize,
): BoundedStreamReader =
## Create new stream reader which will be limited by binary boundary
## ``boundary``. As soon as reader reaches ``boundary`` BoundedStreamReader
## will enter EOF state (no more bytes will be returned to the consumer).
##
## If ``comparison`` operator is ``BoundCmp.Equal`` and number of bytes readed
## from source stream reader ``rsource`` is less than ``boundSize`` -
## ``BoundedStreamIncompleteError`` will be raised. But comparison operator
## ``BoundCmp.LessOrEqual`` allows to consume less bytes without
## ``BoundedStreamIncompleteError`` exception.
var res = BoundedStreamReader()
res.init(rsource, boundary, comparison, bufferSize)
res
proc newBoundedStreamReader*(rsource: AsyncStreamReader,
boundSize: uint64,
boundary: openarray[byte],
comparison = BoundCmp.Equal,
bufferSize = BoundedBufferSize,
): BoundedStreamReader =
## Create new stream reader which will be limited by size ``boundSize`` or
## boundary ``boundary``. As soon as reader reaches ``boundary`` ``OR`` number
## of bytes readed from source stream reader ``rsource`` reaches ``boundSize``
## BoundStreamReader will enter EOF state (no more bytes will be returned to
## the consumer).
##
## If ``comparison`` operator is ``BoundCmp.Equal`` and number of bytes readed
## from source stream reader ``rsource`` is less than ``boundSize`` -
## ``BoundedStreamIncompleteError`` will be raised. But comparison operator
## ``BoundCmp.LessOrEqual`` allows to consume less bytes without
## ``BoundedStreamIncompleteError`` exception.
var res = BoundedStreamReader()
res.init(rsource, boundSize, boundary, comparison, bufferSize)
res res
proc init*[T](child: BoundedStreamWriter, wsource: AsyncStreamWriter, proc init*[T](child: BoundedStreamWriter, wsource: AsyncStreamWriter,
boundSize: uint64, comparison = BoundCmp.Equal,
queueSize = AsyncStreamDefaultQueueSize, udata: ref T) = queueSize = AsyncStreamDefaultQueueSize, udata: ref T) =
doAssert(boundSize > 0'u64, BoundSizeDefectMessage)
child.boundSize = boundSize
child.cmpop = comparison
init(AsyncStreamWriter(child), wsource, boundedWriteLoop, queueSize, init(AsyncStreamWriter(child), wsource, boundedWriteLoop, queueSize,
udata) udata)
proc init*(child: BoundedStreamWriter, wsource: AsyncStreamWriter, proc init*(child: BoundedStreamWriter, wsource: AsyncStreamWriter,
boundSize: uint64, comparison = BoundCmp.Equal,
queueSize = AsyncStreamDefaultQueueSize) = queueSize = AsyncStreamDefaultQueueSize) =
doAssert(boundSize > 0'u64, BoundSizeDefectMessage)
child.boundSize = boundSize
child.cmpop = comparison
init(AsyncStreamWriter(child), wsource, boundedWriteLoop, queueSize) init(AsyncStreamWriter(child), wsource, boundedWriteLoop, queueSize)
proc newBoundedStreamWriter*[T](wsource: AsyncStreamWriter, proc newBoundedStreamWriter*[T](wsource: AsyncStreamWriter,
boundSize: int, boundSize: uint64,
comparison = BoundCmp.Equal, comparison = BoundCmp.Equal,
queueSize = AsyncStreamDefaultQueueSize, queueSize = AsyncStreamDefaultQueueSize,
udata: ref T): BoundedStreamWriter = udata: ref T): BoundedStreamWriter =
doAssert(boundSize > 0, "Bound size must be bigger then zero") ## Create new stream writer which will be limited by size ``boundSize``. As
var res = BoundedStreamWriter(boundSize: boundSize, cmpop: comparison) ## soon as number of bytes written to the destination stream ``wsource``
res.init(wsource, queueSize, udata) ## reaches ``boundSize`` stream will enter EOF state (no more bytes will be
## sent to remote destination stream ``wsource``).
##
## If ``comparison`` operator is ``BoundCmp.Equal`` and number of bytes
## written to destination stream ``wsource`` is less than ``boundSize`` -
## ``BoundedStreamIncompleteError`` will be raised on stream finishing. But
## comparison operator ``BoundCmp.LessOrEqual`` allows to send less bytes
## without ``BoundedStreamIncompleteError`` exception.
##
## For both comparison operators any attempt to write more bytes than
## ``boundSize`` will be interrupted with ``BoundedStreamOverflowError``
## exception.
var res = BoundedStreamWriter()
res.init(wsource, boundSize, comparison, queueSize, udata)
res res
proc newBoundedStreamWriter*(wsource: AsyncStreamWriter, proc newBoundedStreamWriter*(wsource: AsyncStreamWriter,
boundSize: int, boundSize: uint64,
comparison = BoundCmp.Equal, comparison = BoundCmp.Equal,
queueSize = AsyncStreamDefaultQueueSize, queueSize = AsyncStreamDefaultQueueSize,
): BoundedStreamWriter = ): BoundedStreamWriter =
doAssert(boundSize > 0, "Bound size must be bigger then zero") ## Create new stream writer which will be limited by size ``boundSize``. As
var res = BoundedStreamWriter(boundSize: boundSize, cmpop: comparison) ## soon as number of bytes written to the destination stream ``wsource``
res.init(wsource, queueSize) ## reaches ``boundSize`` stream will enter EOF state (no more bytes will be
## sent to remote destination stream ``wsource``).
##
## If ``comparison`` operator is ``BoundCmp.Equal`` and number of bytes
## written to destination stream ``wsource`` is less than ``boundSize`` -
## ``BoundedStreamIncompleteError`` will be raised on stream finishing. But
## comparison operator ``BoundCmp.LessOrEqual`` allows to send less bytes
## without ``BoundedStreamIncompleteError`` exception.
##
## For both comparison operators any attempt to write more bytes than
## ``boundSize`` will be interrupted with ``BoundedStreamOverflowError``
## exception.
var res = BoundedStreamWriter()
res.init(wsource, boundSize, comparison, queueSize)
res res

View File

@ -2458,7 +2458,20 @@ proc closeWait*(transp: StreamTransport): Future[void] =
proc closed*(transp: StreamTransport): bool {.inline.} = proc closed*(transp: StreamTransport): bool {.inline.} =
## Returns ``true`` if transport in closed state. ## Returns ``true`` if transport in closed state.
result = ({ReadClosed, WriteClosed} * transp.state != {}) ({ReadClosed, WriteClosed} * transp.state != {})
proc finished*(transp: StreamTransport): bool {.inline.} =
## Returns ``true`` if transport in finished (EOF) state.
({ReadEof, WriteEof} * transp.state != {})
proc failed*(transp: StreamTransport): bool {.inline.} =
## Returns ``true`` if transport in error state.
({ReadError, WriteError} * transp.state != {})
proc running*(transp: StreamTransport): bool {.inline.} =
## Returns ``true`` if transport is still pending.
({ReadClosed, ReadEof, ReadError,
WriteClosed, WriteEof, WriteError} * transp.state == {})
proc fromPipe*(fd: AsyncFD, child: StreamTransport = nil, proc fromPipe*(fd: AsyncFD, child: StreamTransport = nil,
bufferSize = DefaultStreamBufferSize): StreamTransport {. bufferSize = DefaultStreamBufferSize): StreamTransport {.

View File

@ -947,13 +947,13 @@ suite "BoundedStream test suite":
var rstream = newAsyncStreamReader(conn) var rstream = newAsyncStreamReader(conn)
case btest case btest
of BoundaryRead: of BoundaryRead:
var rbstream = newBoundedStreamReader(rstream, -1, boundary) var rbstream = newBoundedStreamReader(rstream, boundary)
let response = await rbstream.read() let response = await rbstream.read()
if response == message: if response == message:
res = true res = true
await rbstream.closeWait() await rbstream.closeWait()
of BoundaryDouble: of BoundaryDouble:
var rbstream = newBoundedStreamReader(rstream, -1, boundary) var rbstream = newBoundedStreamReader(rstream, boundary)
let response1 = await rbstream.read() let response1 = await rbstream.read()
await rbstream.closeWait() await rbstream.closeWait()
let response2 = await rstream.read() let response2 = await rstream.read()
@ -963,20 +963,20 @@ suite "BoundedStream test suite":
var expectMessage = message var expectMessage = message
expectMessage[^2] = 0x2D'u8 expectMessage[^2] = 0x2D'u8
expectMessage[^1] = 0x2D'u8 expectMessage[^1] = 0x2D'u8
var rbstream = newBoundedStreamReader(rstream, size, boundary) var rbstream = newBoundedStreamReader(rstream, uint64(size), boundary)
let response = await rbstream.read() let response = await rbstream.read()
await rbstream.closeWait() await rbstream.closeWait()
if (len(response) == size) and response == expectMessage: if (len(response) == size) and response == expectMessage:
res = true res = true
of BoundaryIncomplete: of BoundaryIncomplete:
var rbstream = newBoundedStreamReader(rstream, -1, boundary) var rbstream = newBoundedStreamReader(rstream, boundary)
try: try:
let response {.used.} = await rbstream.read() let response {.used.} = await rbstream.read()
except BoundedStreamIncompleteError: except BoundedStreamIncompleteError:
res = true res = true
await rbstream.closeWait() await rbstream.closeWait()
of BoundaryEmpty: of BoundaryEmpty:
var rbstream = newBoundedStreamReader(rstream, -1, boundary) var rbstream = newBoundedStreamReader(rstream, boundary)
let response = await rbstream.read() let response = await rbstream.read()
await rbstream.closeWait() await rbstream.closeWait()
if len(response) == 0: if len(response) == 0:
@ -1001,7 +1001,8 @@ suite "BoundedStream test suite":
proc processClient(server: StreamServer, proc processClient(server: StreamServer,
transp: StreamTransport) {.async.} = transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp) var wstream = newAsyncStreamWriter(transp)
var wbstream = newBoundedStreamWriter(wstream, size, comparison = cmp) var wbstream = newBoundedStreamWriter(wstream, uint64(size),
comparison = cmp)
case stest case stest
of SizeReadWrite: of SizeReadWrite:
for i in 0 ..< 10: for i in 0 ..< 10:
@ -1058,7 +1059,8 @@ suite "BoundedStream test suite":
server.start() server.start()
var conn = await connect(address) var conn = await connect(address)
var rstream = newAsyncStreamReader(conn) var rstream = newAsyncStreamReader(conn)
var rbstream = newBoundedStreamReader(rstream, size, comparison = cmp) var rbstream = newBoundedStreamReader(rstream, uint64(size),
comparison = cmp)
case stest case stest
of SizeReadWrite: of SizeReadWrite:
let response = await rbstream.read() let response = await rbstream.read()
@ -1153,7 +1155,7 @@ suite "BoundedStream test suite":
proc serveClient(server: StreamServer, proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} = transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp) var wstream = newAsyncStreamWriter(transp)
var wstream2 = newBoundedStreamWriter(wstream, len(inputstr)) var wstream2 = newBoundedStreamWriter(wstream, uint64(len(inputstr)))
var data = inputstr var data = inputstr
var offset = 0 var offset = 0
while true: while true: