diff --git a/chronos/streams/boundstream.nim b/chronos/streams/boundstream.nim index 3d6ad0f..9e05916 100644 --- a/chronos/streams/boundstream.nim +++ b/chronos/streams/boundstream.nim @@ -115,9 +115,18 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async.} = if endsWith(buffer.toOpenArray(0, res - 1), rstream.boundary): let length = res - len(rstream.boundary) rstream.offset = rstream.offset + uint64(length) - rstream.state = AsyncStreamState.Finished + # There should be one step between transferring last bytes to the + # consumer and declaring stream EOF. Otherwise could not be + # consumed. await upload(addr rstream.buffer, addr buffer[0], length) + rstream.state = AsyncStreamState.Finished else: + rstream.offset = rstream.offset + uint64(res) + # There should be one step between transferring last bytes to the + # consumer and declaring stream EOF. Otherwise could not be + # consumed. + await upload(addr rstream.buffer, addr buffer[0], res) + if (res < toRead) and rstream.rsource.atEof(): case rstream.cmpop of BoundCmp.Equal: @@ -126,13 +135,17 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async.} = of BoundCmp.LessOrEqual: rstream.state = AsyncStreamState.Finished - rstream.offset = rstream.offset + uint64(res) - if rstream.boundSize.isSome() and + if rstream.state == AsyncStreamState.Running and + rstream.boundSize.isSome() and (rstream.offset == rstream.boundSize.get()): - # This is "fast-path" to avoid one more iteration until EOF rstream.state = AsyncStreamState.Finished - await upload(addr rstream.buffer, addr buffer[0], res) else: + rstream.offset = rstream.offset + uint64(res) + # There should be one step between transferring last bytes to the + # consumer and declaring stream EOF. Otherwise could not be + # consumed. + await upload(addr rstream.buffer, addr buffer[0], res) + if (res < toRead) and rstream.rsource.atEof(): case rstream.cmpop of BoundCmp.Equal: @@ -141,12 +154,10 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async.} = of BoundCmp.LessOrEqual: rstream.state = AsyncStreamState.Finished - rstream.offset = rstream.offset + uint64(res) - if rstream.boundSize.isSome() and + if rstream.state == AsyncStreamState.Running and + rstream.boundSize.isSome() and (rstream.offset == rstream.boundSize.get()): - # This is "fast-path" to avoid one more iteration until EOF rstream.state = AsyncStreamState.Finished - await upload(addr rstream.buffer, addr buffer[0], res) else: case rstream.cmpop of BoundCmp.Equal: @@ -154,7 +165,6 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async.} = rstream.error = newBoundedStreamIncompleteError() of BoundCmp.LessOrEqual: rstream.state = AsyncStreamState.Finished - await rstream.buffer.transfer() except AsyncStreamError as exc: rstream.state = AsyncStreamState.Error @@ -167,9 +177,14 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async.} = of AsyncStreamState.Running: discard of AsyncStreamState.Error, AsyncStreamState.Stopped: + # Send `Error` or `Stopped` state to the consumer without waiting. rstream.buffer.forget() break - of AsyncStreamState.Finished, AsyncStreamState.Closed: + of AsyncStreamState.Finished: + # Send `EOF` state to the consumer and wait until it will be received. + await rstream.buffer.transfer() + break + of AsyncStreamState.Closed: break proc boundedWriteLoop(stream: AsyncStreamWriter) {.async.} =