From fed6b0ac924bd832b9ab1cb47a5042bc2587cd7f Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Fri, 7 May 2021 18:52:44 +0300 Subject: [PATCH] Restore functionality of zero-sized bounded reader/writer streams. (#184) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Restore functionality of zero-sized bounded reader/writer streams. Adding tests for it. * run build_nim.sh unconditionally Co-authored-by: Ștefan Talpalaru --- .github/workflows/ci.yml | 1 - chronos/streams/boundstream.nim | 84 ++++++++++++++------------------- tests/testasyncstream.nim | 44 +++++++++++++++++ 3 files changed, 80 insertions(+), 49 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 40358ef2..07699a98 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -129,7 +129,6 @@ jobs: key: 'nim-${{ matrix.target.os }}-${{ matrix.target.cpu }}-${{ steps.versions.outputs.nimbus_build_system }}-2' - name: Build Nim and associated tools - if: steps.nim-cache.outputs.cache-hit != 'true' shell: bash run: | curl -O -L -s -S https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_nim.sh diff --git a/chronos/streams/boundstream.nim b/chronos/streams/boundstream.nim index 9e059161..b63d21ed 100644 --- a/chronos/streams/boundstream.nim +++ b/chronos/streams/boundstream.nim @@ -42,7 +42,6 @@ type const BoundedBufferSize* = 4096 - BoundSizeDefectMessage = "Bound size must be bigger than zero" BoundarySizeDefectMessage = "Boundary must not be empty array" template newBoundedStreamIncompleteError(): ref BoundedStreamError = @@ -108,18 +107,37 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async.} = else: int(min(rstream.boundSize.get() - rstream.offset, uint64(len(buffer)))) try: - let res = await readUntilBoundary(rstream.rsource, addr buffer[0], - toRead, rstream.boundary) - if res > 0: - if len(rstream.boundary) > 0: - if endsWith(buffer.toOpenArray(0, res - 1), rstream.boundary): - let length = res - len(rstream.boundary) - rstream.offset = rstream.offset + uint64(length) - # There should be one step between transferring last bytes to the - # consumer and declaring stream EOF. Otherwise could not be - # consumed. - await upload(addr rstream.buffer, addr buffer[0], length) - rstream.state = AsyncStreamState.Finished + if toRead == 0: + # When ``rstream.boundSize`` is set and we already readed + # ``rstream.boundSize`` bytes. + rstream.state = AsyncStreamState.Finished + else: + let res = await readUntilBoundary(rstream.rsource, addr buffer[0], + toRead, rstream.boundary) + if res > 0: + if len(rstream.boundary) > 0: + if endsWith(buffer.toOpenArray(0, res - 1), rstream.boundary): + let length = res - len(rstream.boundary) + rstream.offset = rstream.offset + uint64(length) + # There should be one step between transferring last bytes to the + # consumer and declaring stream EOF. Otherwise could not be + # consumed. + await upload(addr rstream.buffer, addr buffer[0], length) + rstream.state = AsyncStreamState.Finished + else: + rstream.offset = rstream.offset + uint64(res) + # There should be one step between transferring last bytes to the + # consumer and declaring stream EOF. Otherwise could not be + # consumed. + await upload(addr rstream.buffer, addr buffer[0], res) + + if (res < toRead) and rstream.rsource.atEof(): + case rstream.cmpop + of BoundCmp.Equal: + rstream.state = AsyncStreamState.Error + rstream.error = newBoundedStreamIncompleteError() + of BoundCmp.LessOrEqual: + rstream.state = AsyncStreamState.Finished else: rstream.offset = rstream.offset + uint64(res) # There should be one step between transferring last bytes to the @@ -134,37 +152,13 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async.} = rstream.error = newBoundedStreamIncompleteError() of BoundCmp.LessOrEqual: rstream.state = AsyncStreamState.Finished - - if rstream.state == AsyncStreamState.Running and - rstream.boundSize.isSome() and - (rstream.offset == rstream.boundSize.get()): - rstream.state = AsyncStreamState.Finished else: - rstream.offset = rstream.offset + uint64(res) - # There should be one step between transferring last bytes to the - # consumer and declaring stream EOF. Otherwise could not be - # consumed. - await upload(addr rstream.buffer, addr buffer[0], res) - - if (res < toRead) and rstream.rsource.atEof(): - case rstream.cmpop - of BoundCmp.Equal: - rstream.state = AsyncStreamState.Error - rstream.error = newBoundedStreamIncompleteError() - of BoundCmp.LessOrEqual: - rstream.state = AsyncStreamState.Finished - - if rstream.state == AsyncStreamState.Running and - rstream.boundSize.isSome() and - (rstream.offset == rstream.boundSize.get()): + case rstream.cmpop + of BoundCmp.Equal: + rstream.state = AsyncStreamState.Error + rstream.error = newBoundedStreamIncompleteError() + of BoundCmp.LessOrEqual: rstream.state = AsyncStreamState.Finished - else: - case rstream.cmpop - of BoundCmp.Equal: - rstream.state = AsyncStreamState.Error - rstream.error = newBoundedStreamIncompleteError() - of BoundCmp.LessOrEqual: - rstream.state = AsyncStreamState.Finished except AsyncStreamError as exc: rstream.state = AsyncStreamState.Error @@ -258,7 +252,6 @@ proc bytesLeft*(stream: BoundedStreamRW): uint64 = proc init*[T](child: BoundedStreamReader, rsource: AsyncStreamReader, boundSize: uint64, comparison = BoundCmp.Equal, bufferSize = BoundedBufferSize, udata: ref T) = - doAssert(boundSize > 0'u64, BoundSizeDefectMessage) child.boundSize = some(boundSize) child.cmpop = comparison init(AsyncStreamReader(child), rsource, boundedReadLoop, bufferSize, @@ -277,7 +270,6 @@ proc init*[T](child: BoundedStreamReader, rsource: AsyncStreamReader, boundSize: uint64, boundary: openarray[byte], comparison = BoundCmp.Equal, bufferSize = BoundedBufferSize, udata: ref T) = - doAssert(boundSize > 0'u64, BoundSizeDefectMessage) doAssert(len(boundary) > 0, BoundarySizeDefectMessage) child.boundSize = some(boundSize) child.boundary = @boundary @@ -288,7 +280,6 @@ proc init*[T](child: BoundedStreamReader, rsource: AsyncStreamReader, proc init*(child: BoundedStreamReader, rsource: AsyncStreamReader, boundSize: uint64, comparison = BoundCmp.Equal, bufferSize = BoundedBufferSize) = - doAssert(boundSize > 0'u64, BoundSizeDefectMessage) child.boundSize = some(boundSize) child.cmpop = comparison init(AsyncStreamReader(child), rsource, boundedReadLoop, bufferSize) @@ -304,7 +295,6 @@ proc init*(child: BoundedStreamReader, rsource: AsyncStreamReader, proc init*(child: BoundedStreamReader, rsource: AsyncStreamReader, boundSize: uint64, boundary: openarray[byte], comparison = BoundCmp.Equal, bufferSize = BoundedBufferSize) = - doAssert(boundSize > 0'u64, BoundSizeDefectMessage) doAssert(len(boundary) > 0, BoundarySizeDefectMessage) child.boundSize = some(boundSize) child.boundary = @boundary @@ -430,7 +420,6 @@ proc newBoundedStreamReader*(rsource: AsyncStreamReader, proc init*[T](child: BoundedStreamWriter, wsource: AsyncStreamWriter, boundSize: uint64, comparison = BoundCmp.Equal, queueSize = AsyncStreamDefaultQueueSize, udata: ref T) = - doAssert(boundSize > 0'u64, BoundSizeDefectMessage) child.boundSize = boundSize child.cmpop = comparison init(AsyncStreamWriter(child), wsource, boundedWriteLoop, queueSize, @@ -439,7 +428,6 @@ proc init*[T](child: BoundedStreamWriter, wsource: AsyncStreamWriter, proc init*(child: BoundedStreamWriter, wsource: AsyncStreamWriter, boundSize: uint64, comparison = BoundCmp.Equal, queueSize = AsyncStreamDefaultQueueSize) = - doAssert(boundSize > 0'u64, BoundSizeDefectMessage) child.boundSize = boundSize child.cmpop = comparison init(AsyncStreamWriter(child), wsource, boundedWriteLoop, queueSize) diff --git a/tests/testasyncstream.nim b/tests/testasyncstream.nim index 208aa774..61214b6a 100644 --- a/tests/testasyncstream.nim +++ b/tests/testasyncstream.nim @@ -1202,6 +1202,50 @@ suite "BoundedStream test suite": check waitFor(testSmallChunk(address, 262400, 4096, 61)) == true check waitFor(testSmallChunk(address, 767309, 4457, 173)) == true + test "BoundedStream zero-sized streams test": + proc checkEmptyStreams(address: TransportAddress): Future[bool] {.async.} = + var writer1Res = false + proc serveClient(server: StreamServer, + transp: StreamTransport) {.async.} = + var wstream = newAsyncStreamWriter(transp) + var wstream2 = newBoundedStreamWriter(wstream, 0'u64) + await wstream2.finish() + let res = wstream2.atEof() + await wstream2.closeWait() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + writer1Res = res + + var server = createStreamServer(address, serveClient, {ReuseAddr}) + server.start() + var transp = await connect(address) + var rstream = newAsyncStreamReader(transp) + var wstream3 = newAsyncStreamWriter(transp) + var rstream2 = newBoundedStreamReader(rstream, 0'u64) + var wstream4 = newBoundedStreamWriter(wstream3, 0'u64) + + let readerRes = rstream2.atEof() + let writer2Res = + try: + await wstream4.write("data") + false + except BoundedStreamOverflowError: + true + except CatchableError: + false + + await wstream4.closeWait() + await wstream3.closeWait() + await rstream2.closeWait() + await rstream.closeWait() + await transp.closeWait() + await server.join() + return (writer1Res and writer2Res and readerRes) + + let address = initTAddress("127.0.0.1:46001") + check waitFor(checkEmptyStreams(address)) == true test "BoundedStream leaks test": check: