From 2defc4b822d8e66e44440e6dfd74c15e5429c3c9 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Sun, 24 Jan 2021 23:40:52 +0200 Subject: [PATCH] Add sequence of bytes as boundary to BoundStream and tests. --- chronos/streams/asyncstream.nim | 4 +- chronos/streams/boundstream.nim | 172 ++++++++++++++++++++++++-------- tests/testasyncstream.nim | 112 ++++++++++++++++++++- 3 files changed, 241 insertions(+), 47 deletions(-) diff --git a/chronos/streams/asyncstream.nim b/chronos/streams/asyncstream.nim index 5d883a93..d27920ea 100644 --- a/chronos/streams/asyncstream.nim +++ b/chronos/streams/asyncstream.nim @@ -206,7 +206,7 @@ proc newAsyncStreamIncorrectError*(m: string): ref AsyncStreamIncorrectError {. inline.} = newException(AsyncStreamIncorrectError, m) -template checkRunning(t: untyped) = +template checkRunning*(t: untyped) = if not(t.running()): raise newAsyncStreamIncorrectError("Incorrect stream state") @@ -320,7 +320,7 @@ template readLoop(body: untyped): untyped = if done: break else: - await rstream.buffer.wait() + await rstream.buffer.wait() proc readExactly*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int) {.async.} = diff --git a/chronos/streams/boundstream.nim b/chronos/streams/boundstream.nim index 542c187d..67894c7c 100644 --- a/chronos/streams/boundstream.nim +++ b/chronos/streams/boundstream.nim @@ -10,23 +10,23 @@ ## This module implements bounded stream reading and writing. ## ## For stream reading it means that you should read exactly bounded size of -## bytes. +## bytes or you should read all bytes until specific boundary. ## ## For stream writing it means that you should write exactly bounded size -## of bytes, and if you wrote not enough bytes error will appear on stream -## close. +## of bytes. import ../asyncloop, ../timer import asyncstream, ../transports/stream, ../transports/common export asyncstream, stream, timer, common type BoundedStreamReader* = ref object of AsyncStreamReader - boundSize: uint64 - offset: uint64 + boundSize: int + boundary: seq[byte] + offset: int BoundedStreamWriter* = ref object of AsyncStreamWriter - boundSize: uint64 - offset: uint64 + boundSize: int + offset: int BoundedStreamError* = object of AsyncStreamError BoundedStreamIncompleteError* = object of BoundedStreamError @@ -43,38 +43,125 @@ template newBoundedStreamIncompleteError*(): ref BoundedStreamError = template newBoundedStreamOverflowError*(): ref BoundedStreamError = newException(BoundedStreamOverflowError, "Stream boundary exceeded") +proc readUntilBoundary*(rstream: AsyncStreamReader, pbytes: pointer, + nbytes: int, sep: seq[byte]): Future[int] {.async.} = + doAssert(not(isNil(pbytes)), "pbytes must not be nil") + doAssert(len(sep) > 0, "separator must not be empty") + doAssert(nbytes >= 0, "nbytes must be non-negative value") + checkRunning(rstream) + + var k = 0 + var state = 0 + var pbuffer = cast[ptr UncheckedArray[byte]](pbytes) + var error: ref AsyncStreamIncompleteError + + proc predicate(data: openarray[byte]): tuple[consumed: int, done: bool] = + if len(data) == 0: + error = newAsyncStreamIncompleteError() + (0, true) + else: + var index = 0 + while index < len(data): + if k >= nbytes: + return (index, true) + let ch = data[index] + inc(index) + pbuffer[k] = ch + inc(k) + if sep[state] == ch: + inc(state) + if state == len(sep): + break + else: + state = 0 + (index, state == len(sep) or (k == nbytes)) + + await rstream.readMessage(predicate) + if not isNil(error): + raise error + else: + return k + +func endsWith(s, suffix: openarray[byte]): bool = + var i = 0 + var j = len(s) - len(suffix) + while i + j >= 0 and i + j < len(s): + if s[i + j] != suffix[i]: return false + inc(i) + if i >= len(suffix): return true + proc boundedReadLoop(stream: AsyncStreamReader) {.async.} = var rstream = cast[BoundedStreamReader](stream) rstream.state = AsyncStreamState.Running + var buffer = newSeq[byte](rstream.buffer.bufferLen()) while true: - if rstream.offset < rstream.boundSize: - let toRead = int(min(rstream.boundSize - rstream.offset, - uint64(rstream.buffer.bufferLen()))) - try: - await rstream.rsource.readExactly(rstream.buffer.getBuffer(), toRead) - rstream.offset = rstream.offset + uint64(toRead) - rstream.buffer.update(toRead) - await rstream.buffer.transfer() - except AsyncStreamIncompleteError: - rstream.state = AsyncStreamState.Error - rstream.error = newBoundedStreamIncompleteError() - except AsyncStreamReadError as exc: - rstream.state = AsyncStreamState.Error - rstream.error = exc - except CancelledError: - rstream.state = AsyncStreamState.Stopped + if len(rstream.boundary) == 0: + # Only size boundary set + if rstream.offset < rstream.boundSize: + let toRead = min(rstream.boundSize - rstream.offset, + rstream.buffer.bufferLen()) + try: + await rstream.rsource.readExactly(rstream.buffer.getBuffer(), toRead) + rstream.offset = rstream.offset + toRead + rstream.buffer.update(toRead) + await rstream.buffer.transfer() + except AsyncStreamIncompleteError: + rstream.state = AsyncStreamState.Error + rstream.error = newBoundedStreamIncompleteError() + except AsyncStreamReadError as exc: + rstream.state = AsyncStreamState.Error + rstream.error = exc + except CancelledError: + rstream.state = AsyncStreamState.Stopped - if rstream.state != AsyncStreamState.Running: + if rstream.state != AsyncStreamState.Running: + break + else: + rstream.state = AsyncStreamState.Finished + await rstream.buffer.transfer() break else: - rstream.state = AsyncStreamState.Finished - await rstream.buffer.transfer() - break + # Sequence boundary set + if ((rstream.boundSize >= 0) and (rstream.offset < rstream.boundSize)) or + (rstream.boundSize < 0): + let toRead = + if rstream.boundSize < 0: + len(buffer) + else: + min(rstream.boundSize - rstream.offset, len(buffer)) + try: + let res = await readUntilBoundary(rstream.rsource, addr buffer[0], + toRead, rstream.boundary) + if endsWith(buffer.toOpenArray(0, res - 1), rstream.boundary): + let length = res - len(rstream.boundary) + rstream.offset = rstream.offset + length + await upload(addr rstream.buffer, addr buffer[0], length) + rstream.state = AsyncStreamState.Finished + else: + rstream.offset = rstream.offset + res + await upload(addr rstream.buffer, addr buffer[0], res) + except AsyncStreamIncompleteError: + rstream.state = AsyncStreamState.Error + rstream.error = newBoundedStreamIncompleteError() + except AsyncStreamReadError as exc: + rstream.state = AsyncStreamState.Error + rstream.error = exc + except CancelledError: + rstream.state = AsyncStreamState.Stopped - if rstream.state in {AsyncStreamState.Stopped, AsyncStreamState.Error}: - # We need to notify consumer about error/close, but we do not care about - # incoming data anymore. - rstream.buffer.forget() + if rstream.state != AsyncStreamState.Running: + break + else: + rstream.state = AsyncStreamState.Finished + break + + # Without this additional wait, procedures such as `read()` could got stuck + # in `await.buffer.wait()` because procedures are unable to detect EOF while + # inside readLoop body. + await stepsAsync(1) + # We need to notify consumer about error/close, but we do not care about + # incoming data anymore. + rstream.buffer.forget() proc boundedWriteLoop(stream: AsyncStreamWriter) {.async.} = var wstream = cast[BoundedStreamWriter](stream) @@ -88,7 +175,7 @@ proc boundedWriteLoop(stream: AsyncStreamWriter) {.async.} = try: item = await wstream.queue.get() if item.size > 0: - if uint64(item.size) <= (wstream.boundSize - wstream.offset): + if item.size <= (wstream.boundSize - wstream.offset): # Writing chunk data. case item.kind of WriteType.Pointer: @@ -97,7 +184,7 @@ proc boundedWriteLoop(stream: AsyncStreamWriter) {.async.} = await wstream.wsource.write(addr item.data2[0], item.size) of WriteType.String: await wstream.wsource.write(addr item.data3[0], item.size) - wstream.offset = wstream.offset + uint64(item.size) + wstream.offset = wstream.offset + item.size item.future.complete() else: wstream.state = AsyncStreamState.Error @@ -146,19 +233,24 @@ proc init*(child: BoundedStreamReader, rsource: AsyncStreamReader, init(cast[AsyncStreamReader](child), rsource, boundedReadLoop, bufferSize) proc newBoundedStreamReader*[T](rsource: AsyncStreamReader, - boundSize: uint64, + boundSize: int, + boundary: openarray[byte] = [], bufferSize = BoundedBufferSize, udata: ref T): BoundedStreamReader = - var res = BoundedStreamReader(boundSize: boundSize) + doAssert(boundSize >= 0 or len(boundary) > 0, + "At least one type of boundary should be set") + var res = BoundedStreamReader(boundSize: boundSize, boundary: @boundary) res.init(rsource, bufferSize, udata) res proc newBoundedStreamReader*(rsource: AsyncStreamReader, - boundSize: uint64, + boundSize: int, + boundary: openarray[byte] = [], bufferSize = BoundedBufferSize, ): BoundedStreamReader = - doAssert(boundSize >= 0) - var res = BoundedStreamReader(boundSize: boundSize) + doAssert(boundSize >= 0 or len(boundary) > 0, + "At least one type of boundary should be set") + var res = BoundedStreamReader(boundSize: boundSize, boundary: @boundary) res.init(rsource, bufferSize) res @@ -172,7 +264,7 @@ proc init*(child: BoundedStreamWriter, wsource: AsyncStreamWriter, init(cast[AsyncStreamWriter](child), wsource, boundedWriteLoop, queueSize) proc newBoundedStreamWriter*[T](wsource: AsyncStreamWriter, - boundSize: uint64, + boundSize: int, queueSize = AsyncStreamDefaultQueueSize, udata: ref T): BoundedStreamWriter = var res = BoundedStreamWriter(boundSize: boundSize) @@ -180,7 +272,7 @@ proc newBoundedStreamWriter*[T](wsource: AsyncStreamWriter, res proc newBoundedStreamWriter*(wsource: AsyncStreamWriter, - boundSize: uint64, + boundSize: int, queueSize = AsyncStreamDefaultQueueSize, ): BoundedStreamWriter = var res = BoundedStreamWriter(boundSize: boundSize) diff --git a/tests/testasyncstream.nim b/tests/testasyncstream.nim index 7ee1c973..39041ac7 100644 --- a/tests/testasyncstream.nim +++ b/tests/testasyncstream.nim @@ -677,10 +677,93 @@ suite "BoundedStream test suite": for i in 0 ..< len(result): result[i] = byte(message[i mod len(message)]) - for item in [100'u64, 60000'u64]: + + for item in [100, 60000]: + + proc boundaryTest(address: TransportAddress, test: int, size: int, + boundary: seq[byte]): Future[bool] {.async.} = + var message = createBigMessage(size) + var clientRes = false + + proc processClient(server: StreamServer, + transp: StreamTransport) {.async.} = + var wstream = newAsyncStreamWriter(transp) + if test == 0: + await wstream.write(message) + await wstream.write(boundary) + await wstream.finish() + await wstream.closeWait() + clientRes = true + elif test == 1: + await wstream.write(message) + await wstream.write(boundary) + await wstream.write(message) + await wstream.finish() + await wstream.closeWait() + clientRes = true + elif test == 2: + var ncmessage = message + ncmessage.setLen(len(message) - 2) + await wstream.write(ncmessage) + await wstream.write(@[0x2D'u8, 0x2D'u8]) + await wstream.finish() + await wstream.closeWait() + clientRes = true + elif test == 3: + var ncmessage = message + ncmessage.setLen(len(message) - 2) + await wstream.write(ncmessage) + await wstream.finish() + await wstream.closeWait() + clientRes = true + + await transp.closeWait() + server.stop() + server.close() + + var res = false + var server = createStreamServer(address, processClient, + flags = {ReuseAddr}) + server.start() + var conn = await connect(address) + var rstream = newAsyncStreamReader(conn) + if test == 0: + var rbstream = newBoundedStreamReader(rstream, -1, boundary) + let response = await rbstream.read() + if response == message: + res = true + await rbstream.closeWait() + elif test == 1: + var rbstream = newBoundedStreamReader(rstream, -1, boundary) + let response1 = await rbstream.read() + await rbstream.closeWait() + let response2 = await rstream.read() + if (response1 == message) and (response2 == message): + res = true + elif test == 2: + var expectMessage = message + expectMessage[^2] = 0x2D'u8 + expectMessage[^1] = 0x2D'u8 + var rbstream = newBoundedStreamReader(rstream, size, boundary) + let response = await rbstream.read() + await rbstream.closeWait() + if (len(response) == size) and response == expectMessage: + res = true + elif test == 3: + var rbstream = newBoundedStreamReader(rstream, -1, boundary) + try: + let response {.used.} = await rbstream.read() + except BoundedStreamIncompleteError: + res = true + await rbstream.closeWait() + + await rstream.closeWait() + await conn.closeWait() + await server.join() + return (res and clientRes) proc boundedTest(address: TransportAddress, test: int, - size: uint64): Future[bool] {.async.} = + size: int): Future[bool] {.async.} = var clientRes = false var res = false @@ -749,9 +832,28 @@ suite "BoundedStream test suite": return (res and clientRes) let address = initTAddress("127.0.0.1:48030") - test "BoundedStream reading/writing test [" & $item & "]": + test "BoundedStream(size) reading/writing test [" & $item & "]": check waitFor(boundedTest(address, 0, item)) == true - test "BoundedStream overflow test [" & $item & "]": + test "BoundedStream(size) overflow test [" & $item & "]": check waitFor(boundedTest(address, 1, item)) == true - test "BoundedStream incomplete test [" & $item & "]": + test "BoundedStream(size) incomplete test [" & $item & "]": check waitFor(boundedTest(address, 2, item)) == true + test "BoundedStream(boundary) reading test [" & $item & "]": + check waitFor(boundaryTest(address, 0, item, + @[0x2D'u8, 0x2D'u8, 0x2D'u8])) + test "BoundedStream(boundary) double message test [" & $item & "]": + check waitFor(boundaryTest(address, 1, item, + @[0x2D'u8, 0x2D'u8, 0x2D'u8])) + test "BoundedStream(size+boundary) reading size-bound test [" & $item & "]": + check waitFor(boundaryTest(address, 2, item, + @[0x2D'u8, 0x2D'u8, 0x2D'u8])) + test "BoundedStream(boundary) reading incomplete test [" & $item & "]": + check waitFor(boundaryTest(address, 3, item, + @[0x2D'u8, 0x2D'u8, 0x2D'u8])) + + test "BoundedStream leaks test": + check: + getTracker("async.stream.reader").isLeaked() == false + getTracker("async.stream.writer").isLeaked() == false + getTracker("stream.server").isLeaked() == false + getTracker("stream.transport").isLeaked() == false \ No newline at end of file