Restore functionality of zero-sized bounded reader/writer streams. (#184)

* Restore functionality of zero-sized bounded reader/writer streams.
Adding tests for it.

* run build_nim.sh unconditionally

Co-authored-by: Ștefan Talpalaru <stefantalpalaru@yahoo.com>
This commit is contained in:
Eugene Kabanov 2021-05-07 18:52:44 +03:00 committed by GitHub
parent c15c985c1f
commit fed6b0ac92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 80 additions and 49 deletions

View File

@ -129,7 +129,6 @@ jobs:
key: 'nim-${{ matrix.target.os }}-${{ matrix.target.cpu }}-${{ steps.versions.outputs.nimbus_build_system }}-2'
- name: Build Nim and associated tools
if: steps.nim-cache.outputs.cache-hit != 'true'
shell: bash
run: |
curl -O -L -s -S https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_nim.sh

View File

@ -42,7 +42,6 @@ type
const
BoundedBufferSize* = 4096
BoundSizeDefectMessage = "Bound size must be bigger than zero"
BoundarySizeDefectMessage = "Boundary must not be empty array"
template newBoundedStreamIncompleteError(): ref BoundedStreamError =
@ -108,18 +107,37 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async.} =
else:
int(min(rstream.boundSize.get() - rstream.offset, uint64(len(buffer))))
try:
let res = await readUntilBoundary(rstream.rsource, addr buffer[0],
toRead, rstream.boundary)
if res > 0:
if len(rstream.boundary) > 0:
if endsWith(buffer.toOpenArray(0, res - 1), rstream.boundary):
let length = res - len(rstream.boundary)
rstream.offset = rstream.offset + uint64(length)
# There should be one step between transferring last bytes to the
# consumer and declaring stream EOF. Otherwise could not be
# consumed.
await upload(addr rstream.buffer, addr buffer[0], length)
rstream.state = AsyncStreamState.Finished
if toRead == 0:
# When ``rstream.boundSize`` is set and we already readed
# ``rstream.boundSize`` bytes.
rstream.state = AsyncStreamState.Finished
else:
let res = await readUntilBoundary(rstream.rsource, addr buffer[0],
toRead, rstream.boundary)
if res > 0:
if len(rstream.boundary) > 0:
if endsWith(buffer.toOpenArray(0, res - 1), rstream.boundary):
let length = res - len(rstream.boundary)
rstream.offset = rstream.offset + uint64(length)
# There should be one step between transferring last bytes to the
# consumer and declaring stream EOF. Otherwise could not be
# consumed.
await upload(addr rstream.buffer, addr buffer[0], length)
rstream.state = AsyncStreamState.Finished
else:
rstream.offset = rstream.offset + uint64(res)
# There should be one step between transferring last bytes to the
# consumer and declaring stream EOF. Otherwise could not be
# consumed.
await upload(addr rstream.buffer, addr buffer[0], res)
if (res < toRead) and rstream.rsource.atEof():
case rstream.cmpop
of BoundCmp.Equal:
rstream.state = AsyncStreamState.Error
rstream.error = newBoundedStreamIncompleteError()
of BoundCmp.LessOrEqual:
rstream.state = AsyncStreamState.Finished
else:
rstream.offset = rstream.offset + uint64(res)
# There should be one step between transferring last bytes to the
@ -134,37 +152,13 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async.} =
rstream.error = newBoundedStreamIncompleteError()
of BoundCmp.LessOrEqual:
rstream.state = AsyncStreamState.Finished
if rstream.state == AsyncStreamState.Running and
rstream.boundSize.isSome() and
(rstream.offset == rstream.boundSize.get()):
rstream.state = AsyncStreamState.Finished
else:
rstream.offset = rstream.offset + uint64(res)
# There should be one step between transferring last bytes to the
# consumer and declaring stream EOF. Otherwise could not be
# consumed.
await upload(addr rstream.buffer, addr buffer[0], res)
if (res < toRead) and rstream.rsource.atEof():
case rstream.cmpop
of BoundCmp.Equal:
rstream.state = AsyncStreamState.Error
rstream.error = newBoundedStreamIncompleteError()
of BoundCmp.LessOrEqual:
rstream.state = AsyncStreamState.Finished
if rstream.state == AsyncStreamState.Running and
rstream.boundSize.isSome() and
(rstream.offset == rstream.boundSize.get()):
case rstream.cmpop
of BoundCmp.Equal:
rstream.state = AsyncStreamState.Error
rstream.error = newBoundedStreamIncompleteError()
of BoundCmp.LessOrEqual:
rstream.state = AsyncStreamState.Finished
else:
case rstream.cmpop
of BoundCmp.Equal:
rstream.state = AsyncStreamState.Error
rstream.error = newBoundedStreamIncompleteError()
of BoundCmp.LessOrEqual:
rstream.state = AsyncStreamState.Finished
except AsyncStreamError as exc:
rstream.state = AsyncStreamState.Error
@ -258,7 +252,6 @@ proc bytesLeft*(stream: BoundedStreamRW): uint64 =
proc init*[T](child: BoundedStreamReader, rsource: AsyncStreamReader,
boundSize: uint64, comparison = BoundCmp.Equal,
bufferSize = BoundedBufferSize, udata: ref T) =
doAssert(boundSize > 0'u64, BoundSizeDefectMessage)
child.boundSize = some(boundSize)
child.cmpop = comparison
init(AsyncStreamReader(child), rsource, boundedReadLoop, bufferSize,
@ -277,7 +270,6 @@ proc init*[T](child: BoundedStreamReader, rsource: AsyncStreamReader,
boundSize: uint64, boundary: openarray[byte],
comparison = BoundCmp.Equal,
bufferSize = BoundedBufferSize, udata: ref T) =
doAssert(boundSize > 0'u64, BoundSizeDefectMessage)
doAssert(len(boundary) > 0, BoundarySizeDefectMessage)
child.boundSize = some(boundSize)
child.boundary = @boundary
@ -288,7 +280,6 @@ proc init*[T](child: BoundedStreamReader, rsource: AsyncStreamReader,
proc init*(child: BoundedStreamReader, rsource: AsyncStreamReader,
boundSize: uint64, comparison = BoundCmp.Equal,
bufferSize = BoundedBufferSize) =
doAssert(boundSize > 0'u64, BoundSizeDefectMessage)
child.boundSize = some(boundSize)
child.cmpop = comparison
init(AsyncStreamReader(child), rsource, boundedReadLoop, bufferSize)
@ -304,7 +295,6 @@ proc init*(child: BoundedStreamReader, rsource: AsyncStreamReader,
proc init*(child: BoundedStreamReader, rsource: AsyncStreamReader,
boundSize: uint64, boundary: openarray[byte],
comparison = BoundCmp.Equal, bufferSize = BoundedBufferSize) =
doAssert(boundSize > 0'u64, BoundSizeDefectMessage)
doAssert(len(boundary) > 0, BoundarySizeDefectMessage)
child.boundSize = some(boundSize)
child.boundary = @boundary
@ -430,7 +420,6 @@ proc newBoundedStreamReader*(rsource: AsyncStreamReader,
proc init*[T](child: BoundedStreamWriter, wsource: AsyncStreamWriter,
boundSize: uint64, comparison = BoundCmp.Equal,
queueSize = AsyncStreamDefaultQueueSize, udata: ref T) =
doAssert(boundSize > 0'u64, BoundSizeDefectMessage)
child.boundSize = boundSize
child.cmpop = comparison
init(AsyncStreamWriter(child), wsource, boundedWriteLoop, queueSize,
@ -439,7 +428,6 @@ proc init*[T](child: BoundedStreamWriter, wsource: AsyncStreamWriter,
proc init*(child: BoundedStreamWriter, wsource: AsyncStreamWriter,
boundSize: uint64, comparison = BoundCmp.Equal,
queueSize = AsyncStreamDefaultQueueSize) =
doAssert(boundSize > 0'u64, BoundSizeDefectMessage)
child.boundSize = boundSize
child.cmpop = comparison
init(AsyncStreamWriter(child), wsource, boundedWriteLoop, queueSize)

View File

@ -1202,6 +1202,50 @@ suite "BoundedStream test suite":
check waitFor(testSmallChunk(address, 262400, 4096, 61)) == true
check waitFor(testSmallChunk(address, 767309, 4457, 173)) == true
test "BoundedStream zero-sized streams test":
proc checkEmptyStreams(address: TransportAddress): Future[bool] {.async.} =
var writer1Res = false
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newBoundedStreamWriter(wstream, 0'u64)
await wstream2.finish()
let res = wstream2.atEof()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
writer1Res = res
var server = createStreamServer(address, serveClient, {ReuseAddr})
server.start()
var transp = await connect(address)
var rstream = newAsyncStreamReader(transp)
var wstream3 = newAsyncStreamWriter(transp)
var rstream2 = newBoundedStreamReader(rstream, 0'u64)
var wstream4 = newBoundedStreamWriter(wstream3, 0'u64)
let readerRes = rstream2.atEof()
let writer2Res =
try:
await wstream4.write("data")
false
except BoundedStreamOverflowError:
true
except CatchableError:
false
await wstream4.closeWait()
await wstream3.closeWait()
await rstream2.closeWait()
await rstream.closeWait()
await transp.closeWait()
await server.join()
return (writer1Res and writer2Res and readerRes)
let address = initTAddress("127.0.0.1:46001")
check waitFor(checkEmptyStreams(address)) == true
test "BoundedStream leaks test":
check: