From c0472bd349e55ebe04f555f6b5b48dc8ce810836 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Sat, 30 Jan 2021 20:15:34 +0200 Subject: [PATCH] Fix streams to check only for Closed state. --- chronos/streams/asyncstream.nim | 32 ++++++++++++++++---------------- chronos/streams/boundstream.nim | 2 +- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/chronos/streams/asyncstream.nim b/chronos/streams/asyncstream.nim index d27920ea..1a63a4e7 100644 --- a/chronos/streams/asyncstream.nim +++ b/chronos/streams/asyncstream.nim @@ -206,9 +206,9 @@ proc newAsyncStreamIncorrectError*(m: string): ref AsyncStreamIncorrectError {. inline.} = newException(AsyncStreamIncorrectError, m) -template checkRunning*(t: untyped) = - if not(t.running()): - raise newAsyncStreamIncorrectError("Incorrect stream state") +template checkStreamClosed*(t: untyped) = + if t.state == AsyncStreamState.Closed: + raise newAsyncStreamUseClosedError() proc atEof*(rstream: AsyncStreamReader): bool = ## Returns ``true`` is reading stream is closed or finished and internal @@ -332,7 +332,7 @@ proc readExactly*(rstream: AsyncStreamReader, pbytes: pointer, doAssert(not(isNil(pbytes)), "pbytes must not be nil") doAssert(nbytes >= 0, "nbytes must be non-negative integer") - checkRunning(rstream) + checkStreamClosed(rstream) if nbytes == 0: return @@ -370,7 +370,7 @@ proc readOnce*(rstream: AsyncStreamReader, pbytes: pointer, ## internal buffer, otherwise it will wait until some bytes will be available. doAssert(not(isNil(pbytes)), "pbytes must not be nil") doAssert(nbytes > 0, "nbytes must be positive value") - checkRunning(rstream) + checkStreamClosed(rstream) if isNil(rstream.rsource): try: @@ -411,7 +411,7 @@ proc readUntil*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int, doAssert(not(isNil(pbytes)), "pbytes must not be nil") doAssert(len(sep) > 0, "separator must not be empty") doAssert(nbytes >= 0, "nbytes must be non-negative value") - checkRunning(rstream) + checkStreamClosed(rstream) if nbytes == 0: raise newAsyncStreamLimitError() @@ -467,7 +467,7 @@ proc readLine*(rstream: AsyncStreamReader, limit = 0, ## ## If ``limit`` more then 0, then result string will be limited to ``limit`` ## bytes. - checkRunning(rstream) + checkStreamClosed(rstream) if isNil(rstream.rsource): try: @@ -513,7 +513,7 @@ proc read*(rstream: AsyncStreamReader): Future[seq[byte]] {.async.} = ## Read all bytes from read-only stream ``rstream``. ## ## This procedure allocates buffer seq[byte] and return it as result. - checkRunning(rstream) + checkStreamClosed(rstream) if isNil(rstream.rsource): try: @@ -543,7 +543,7 @@ proc read*(rstream: AsyncStreamReader, n: int): Future[seq[byte]] {.async.} = ## ``rstream``. ## ## This procedure allocates buffer seq[byte] and return it as result. - checkRunning(rstream) + checkStreamClosed(rstream) if isNil(rstream.rsource): try: @@ -573,7 +573,7 @@ proc consume*(rstream: AsyncStreamReader): Future[int] {.async.} = ## Consume (discard) all bytes from read-only stream ``rstream``. ## ## Return number of bytes actually consumed (discarded). - checkRunning(rstream) + checkStreamClosed(rstream) if isNil(rstream.rsource): try: @@ -602,7 +602,7 @@ proc consume*(rstream: AsyncStreamReader, n: int): Future[int] {.async.} = ## ``rstream``. ## ## Return number of bytes actually consumed (discarded). - checkRunning(rstream) + checkStreamClosed(rstream) if isNil(rstream.rsource): try: @@ -645,7 +645,7 @@ proc readMessage*(rstream: AsyncStreamReader, pred: ReadMessagePredicate) {. ## ``predicate`` callback will receive (zero-length) openarray, if stream ## is at EOF. doAssert(not(isNil(pred)), "`predicate` callback should not be `nil`") - checkRunning(rstream) + checkStreamClosed(rstream) if isNil(rstream.rsource): try: @@ -675,7 +675,7 @@ proc write*(wstream: AsyncStreamWriter, pbytes: pointer, ## writer stream ``wstream``. ## ## ``nbytes` must be more then zero. - checkRunning(wstream) + checkStreamClosed(wstream) if nbytes <= 0: raise newAsyncStreamIncorrectError("Zero length message") @@ -722,7 +722,7 @@ proc write*(wstream: AsyncStreamWriter, sbytes: seq[byte], ## If ``msglen < 0`` whole sequence ``sbytes`` will be writen to stream. ## If ``msglen > len(sbytes)`` only ``len(sbytes)`` bytes will be written to ## stream. - checkRunning(wstream) + checkStreamClosed(wstream) let length = if msglen <= 0: len(sbytes) else: min(msglen, len(sbytes)) if length <= 0: raise newAsyncStreamIncorrectError("Zero length message") @@ -770,7 +770,7 @@ proc write*(wstream: AsyncStreamWriter, sbytes: string, ## If ``msglen < 0`` whole string ``sbytes`` will be writen to stream. ## If ``msglen > len(sbytes)`` only ``len(sbytes)`` bytes will be written to ## stream. - checkRunning(wstream) + checkStreamClosed(wstream) let length = if msglen <= 0: len(sbytes) else: min(msglen, len(sbytes)) if length <= 0: raise newAsyncStreamIncorrectError("Zero length message") @@ -811,7 +811,7 @@ proc write*(wstream: AsyncStreamWriter, sbytes: string, proc finish*(wstream: AsyncStreamWriter) {.async.} = ## Finish write stream ``wstream``. - checkRunning(wstream) + checkStreamClosed(wstream) if not isNil(wstream.wsource): if isNil(wstream.writerLoop): diff --git a/chronos/streams/boundstream.nim b/chronos/streams/boundstream.nim index 67894c7c..1792347b 100644 --- a/chronos/streams/boundstream.nim +++ b/chronos/streams/boundstream.nim @@ -48,7 +48,7 @@ proc readUntilBoundary*(rstream: AsyncStreamReader, pbytes: pointer, doAssert(not(isNil(pbytes)), "pbytes must not be nil") doAssert(len(sep) > 0, "separator must not be empty") doAssert(nbytes >= 0, "nbytes must be non-negative value") - checkRunning(rstream) + checkStreamClosed(rstream) var k = 0 var state = 0