Remove fastpath execution and finally fix issue.

This commit is contained in:
cheatfate 2021-04-27 00:07:49 +03:00
parent 39f4060e07
commit 645d62db78
No known key found for this signature in database
GPG Key ID: 46ADD633A7201F95
1 changed files with 26 additions and 11 deletions

View File

@ -115,9 +115,18 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async.} =
if endsWith(buffer.toOpenArray(0, res - 1), rstream.boundary):
let length = res - len(rstream.boundary)
rstream.offset = rstream.offset + uint64(length)
rstream.state = AsyncStreamState.Finished
# There should be one step between transferring last bytes to the
# consumer and declaring stream EOF. Otherwise could not be
# consumed.
await upload(addr rstream.buffer, addr buffer[0], length)
rstream.state = AsyncStreamState.Finished
else:
rstream.offset = rstream.offset + uint64(res)
# There should be one step between transferring last bytes to the
# consumer and declaring stream EOF. Otherwise could not be
# consumed.
await upload(addr rstream.buffer, addr buffer[0], res)
if (res < toRead) and rstream.rsource.atEof():
case rstream.cmpop
of BoundCmp.Equal:
@ -126,13 +135,17 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async.} =
of BoundCmp.LessOrEqual:
rstream.state = AsyncStreamState.Finished
rstream.offset = rstream.offset + uint64(res)
if rstream.boundSize.isSome() and
if rstream.state == AsyncStreamState.Running and
rstream.boundSize.isSome() and
(rstream.offset == rstream.boundSize.get()):
# This is "fast-path" to avoid one more iteration until EOF
rstream.state = AsyncStreamState.Finished
await upload(addr rstream.buffer, addr buffer[0], res)
else:
rstream.offset = rstream.offset + uint64(res)
# There should be one step between transferring last bytes to the
# consumer and declaring stream EOF. Otherwise could not be
# consumed.
await upload(addr rstream.buffer, addr buffer[0], res)
if (res < toRead) and rstream.rsource.atEof():
case rstream.cmpop
of BoundCmp.Equal:
@ -141,12 +154,10 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async.} =
of BoundCmp.LessOrEqual:
rstream.state = AsyncStreamState.Finished
rstream.offset = rstream.offset + uint64(res)
if rstream.boundSize.isSome() and
if rstream.state == AsyncStreamState.Running and
rstream.boundSize.isSome() and
(rstream.offset == rstream.boundSize.get()):
# This is "fast-path" to avoid one more iteration until EOF
rstream.state = AsyncStreamState.Finished
await upload(addr rstream.buffer, addr buffer[0], res)
else:
case rstream.cmpop
of BoundCmp.Equal:
@ -154,7 +165,6 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async.} =
rstream.error = newBoundedStreamIncompleteError()
of BoundCmp.LessOrEqual:
rstream.state = AsyncStreamState.Finished
await rstream.buffer.transfer()
except AsyncStreamError as exc:
rstream.state = AsyncStreamState.Error
@ -167,9 +177,14 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async.} =
of AsyncStreamState.Running:
discard
of AsyncStreamState.Error, AsyncStreamState.Stopped:
# Send `Error` or `Stopped` state to the consumer without waiting.
rstream.buffer.forget()
break
of AsyncStreamState.Finished, AsyncStreamState.Closed:
of AsyncStreamState.Finished:
# Send `EOF` state to the consumer and wait until it will be received.
await rstream.buffer.transfer()
break
of AsyncStreamState.Closed:
break
proc boundedWriteLoop(stream: AsyncStreamWriter) {.async.} =