Add sequence of bytes as boundary to BoundStream and tests.

This commit is contained in:
cheatfate 2021-01-24 23:40:52 +02:00 committed by zah
parent fb137112be
commit 2defc4b822
3 changed files with 241 additions and 47 deletions

View File

@ -206,7 +206,7 @@ proc newAsyncStreamIncorrectError*(m: string): ref AsyncStreamIncorrectError {.
inline.} = inline.} =
newException(AsyncStreamIncorrectError, m) newException(AsyncStreamIncorrectError, m)
template checkRunning(t: untyped) = template checkRunning*(t: untyped) =
if not(t.running()): if not(t.running()):
raise newAsyncStreamIncorrectError("Incorrect stream state") raise newAsyncStreamIncorrectError("Incorrect stream state")
@ -320,7 +320,7 @@ template readLoop(body: untyped): untyped =
if done: if done:
break break
else: else:
await rstream.buffer.wait() await rstream.buffer.wait()
proc readExactly*(rstream: AsyncStreamReader, pbytes: pointer, proc readExactly*(rstream: AsyncStreamReader, pbytes: pointer,
nbytes: int) {.async.} = nbytes: int) {.async.} =

View File

@ -10,23 +10,23 @@
## This module implements bounded stream reading and writing. ## This module implements bounded stream reading and writing.
## ##
## For stream reading it means that you should read exactly bounded size of ## For stream reading it means that you should read exactly bounded size of
## bytes. ## bytes or you should read all bytes until specific boundary.
## ##
## For stream writing it means that you should write exactly bounded size ## For stream writing it means that you should write exactly bounded size
## of bytes, and if you wrote not enough bytes error will appear on stream ## of bytes.
## close.
import ../asyncloop, ../timer import ../asyncloop, ../timer
import asyncstream, ../transports/stream, ../transports/common import asyncstream, ../transports/stream, ../transports/common
export asyncstream, stream, timer, common export asyncstream, stream, timer, common
type type
BoundedStreamReader* = ref object of AsyncStreamReader BoundedStreamReader* = ref object of AsyncStreamReader
boundSize: uint64 boundSize: int
offset: uint64 boundary: seq[byte]
offset: int
BoundedStreamWriter* = ref object of AsyncStreamWriter BoundedStreamWriter* = ref object of AsyncStreamWriter
boundSize: uint64 boundSize: int
offset: uint64 offset: int
BoundedStreamError* = object of AsyncStreamError BoundedStreamError* = object of AsyncStreamError
BoundedStreamIncompleteError* = object of BoundedStreamError BoundedStreamIncompleteError* = object of BoundedStreamError
@ -43,38 +43,125 @@ template newBoundedStreamIncompleteError*(): ref BoundedStreamError =
template newBoundedStreamOverflowError*(): ref BoundedStreamError = template newBoundedStreamOverflowError*(): ref BoundedStreamError =
newException(BoundedStreamOverflowError, "Stream boundary exceeded") newException(BoundedStreamOverflowError, "Stream boundary exceeded")
proc readUntilBoundary*(rstream: AsyncStreamReader, pbytes: pointer,
nbytes: int, sep: seq[byte]): Future[int] {.async.} =
doAssert(not(isNil(pbytes)), "pbytes must not be nil")
doAssert(len(sep) > 0, "separator must not be empty")
doAssert(nbytes >= 0, "nbytes must be non-negative value")
checkRunning(rstream)
var k = 0
var state = 0
var pbuffer = cast[ptr UncheckedArray[byte]](pbytes)
var error: ref AsyncStreamIncompleteError
proc predicate(data: openarray[byte]): tuple[consumed: int, done: bool] =
if len(data) == 0:
error = newAsyncStreamIncompleteError()
(0, true)
else:
var index = 0
while index < len(data):
if k >= nbytes:
return (index, true)
let ch = data[index]
inc(index)
pbuffer[k] = ch
inc(k)
if sep[state] == ch:
inc(state)
if state == len(sep):
break
else:
state = 0
(index, state == len(sep) or (k == nbytes))
await rstream.readMessage(predicate)
if not isNil(error):
raise error
else:
return k
func endsWith(s, suffix: openarray[byte]): bool =
var i = 0
var j = len(s) - len(suffix)
while i + j >= 0 and i + j < len(s):
if s[i + j] != suffix[i]: return false
inc(i)
if i >= len(suffix): return true
proc boundedReadLoop(stream: AsyncStreamReader) {.async.} = proc boundedReadLoop(stream: AsyncStreamReader) {.async.} =
var rstream = cast[BoundedStreamReader](stream) var rstream = cast[BoundedStreamReader](stream)
rstream.state = AsyncStreamState.Running rstream.state = AsyncStreamState.Running
var buffer = newSeq[byte](rstream.buffer.bufferLen())
while true: while true:
if rstream.offset < rstream.boundSize: if len(rstream.boundary) == 0:
let toRead = int(min(rstream.boundSize - rstream.offset, # Only size boundary set
uint64(rstream.buffer.bufferLen()))) if rstream.offset < rstream.boundSize:
try: let toRead = min(rstream.boundSize - rstream.offset,
await rstream.rsource.readExactly(rstream.buffer.getBuffer(), toRead) rstream.buffer.bufferLen())
rstream.offset = rstream.offset + uint64(toRead) try:
rstream.buffer.update(toRead) await rstream.rsource.readExactly(rstream.buffer.getBuffer(), toRead)
await rstream.buffer.transfer() rstream.offset = rstream.offset + toRead
except AsyncStreamIncompleteError: rstream.buffer.update(toRead)
rstream.state = AsyncStreamState.Error await rstream.buffer.transfer()
rstream.error = newBoundedStreamIncompleteError() except AsyncStreamIncompleteError:
except AsyncStreamReadError as exc: rstream.state = AsyncStreamState.Error
rstream.state = AsyncStreamState.Error rstream.error = newBoundedStreamIncompleteError()
rstream.error = exc except AsyncStreamReadError as exc:
except CancelledError: rstream.state = AsyncStreamState.Error
rstream.state = AsyncStreamState.Stopped rstream.error = exc
except CancelledError:
rstream.state = AsyncStreamState.Stopped
if rstream.state != AsyncStreamState.Running: if rstream.state != AsyncStreamState.Running:
break
else:
rstream.state = AsyncStreamState.Finished
await rstream.buffer.transfer()
break break
else: else:
rstream.state = AsyncStreamState.Finished # Sequence boundary set
await rstream.buffer.transfer() if ((rstream.boundSize >= 0) and (rstream.offset < rstream.boundSize)) or
break (rstream.boundSize < 0):
let toRead =
if rstream.boundSize < 0:
len(buffer)
else:
min(rstream.boundSize - rstream.offset, len(buffer))
try:
let res = await readUntilBoundary(rstream.rsource, addr buffer[0],
toRead, rstream.boundary)
if endsWith(buffer.toOpenArray(0, res - 1), rstream.boundary):
let length = res - len(rstream.boundary)
rstream.offset = rstream.offset + length
await upload(addr rstream.buffer, addr buffer[0], length)
rstream.state = AsyncStreamState.Finished
else:
rstream.offset = rstream.offset + res
await upload(addr rstream.buffer, addr buffer[0], res)
except AsyncStreamIncompleteError:
rstream.state = AsyncStreamState.Error
rstream.error = newBoundedStreamIncompleteError()
except AsyncStreamReadError as exc:
rstream.state = AsyncStreamState.Error
rstream.error = exc
except CancelledError:
rstream.state = AsyncStreamState.Stopped
if rstream.state in {AsyncStreamState.Stopped, AsyncStreamState.Error}: if rstream.state != AsyncStreamState.Running:
# We need to notify consumer about error/close, but we do not care about break
# incoming data anymore. else:
rstream.buffer.forget() rstream.state = AsyncStreamState.Finished
break
# Without this additional wait, procedures such as `read()` could got stuck
# in `await.buffer.wait()` because procedures are unable to detect EOF while
# inside readLoop body.
await stepsAsync(1)
# We need to notify consumer about error/close, but we do not care about
# incoming data anymore.
rstream.buffer.forget()
proc boundedWriteLoop(stream: AsyncStreamWriter) {.async.} = proc boundedWriteLoop(stream: AsyncStreamWriter) {.async.} =
var wstream = cast[BoundedStreamWriter](stream) var wstream = cast[BoundedStreamWriter](stream)
@ -88,7 +175,7 @@ proc boundedWriteLoop(stream: AsyncStreamWriter) {.async.} =
try: try:
item = await wstream.queue.get() item = await wstream.queue.get()
if item.size > 0: if item.size > 0:
if uint64(item.size) <= (wstream.boundSize - wstream.offset): if item.size <= (wstream.boundSize - wstream.offset):
# Writing chunk data. # Writing chunk data.
case item.kind case item.kind
of WriteType.Pointer: of WriteType.Pointer:
@ -97,7 +184,7 @@ proc boundedWriteLoop(stream: AsyncStreamWriter) {.async.} =
await wstream.wsource.write(addr item.data2[0], item.size) await wstream.wsource.write(addr item.data2[0], item.size)
of WriteType.String: of WriteType.String:
await wstream.wsource.write(addr item.data3[0], item.size) await wstream.wsource.write(addr item.data3[0], item.size)
wstream.offset = wstream.offset + uint64(item.size) wstream.offset = wstream.offset + item.size
item.future.complete() item.future.complete()
else: else:
wstream.state = AsyncStreamState.Error wstream.state = AsyncStreamState.Error
@ -146,19 +233,24 @@ proc init*(child: BoundedStreamReader, rsource: AsyncStreamReader,
init(cast[AsyncStreamReader](child), rsource, boundedReadLoop, bufferSize) init(cast[AsyncStreamReader](child), rsource, boundedReadLoop, bufferSize)
proc newBoundedStreamReader*[T](rsource: AsyncStreamReader, proc newBoundedStreamReader*[T](rsource: AsyncStreamReader,
boundSize: uint64, boundSize: int,
boundary: openarray[byte] = [],
bufferSize = BoundedBufferSize, bufferSize = BoundedBufferSize,
udata: ref T): BoundedStreamReader = udata: ref T): BoundedStreamReader =
var res = BoundedStreamReader(boundSize: boundSize) doAssert(boundSize >= 0 or len(boundary) > 0,
"At least one type of boundary should be set")
var res = BoundedStreamReader(boundSize: boundSize, boundary: @boundary)
res.init(rsource, bufferSize, udata) res.init(rsource, bufferSize, udata)
res res
proc newBoundedStreamReader*(rsource: AsyncStreamReader, proc newBoundedStreamReader*(rsource: AsyncStreamReader,
boundSize: uint64, boundSize: int,
boundary: openarray[byte] = [],
bufferSize = BoundedBufferSize, bufferSize = BoundedBufferSize,
): BoundedStreamReader = ): BoundedStreamReader =
doAssert(boundSize >= 0) doAssert(boundSize >= 0 or len(boundary) > 0,
var res = BoundedStreamReader(boundSize: boundSize) "At least one type of boundary should be set")
var res = BoundedStreamReader(boundSize: boundSize, boundary: @boundary)
res.init(rsource, bufferSize) res.init(rsource, bufferSize)
res res
@ -172,7 +264,7 @@ proc init*(child: BoundedStreamWriter, wsource: AsyncStreamWriter,
init(cast[AsyncStreamWriter](child), wsource, boundedWriteLoop, queueSize) init(cast[AsyncStreamWriter](child), wsource, boundedWriteLoop, queueSize)
proc newBoundedStreamWriter*[T](wsource: AsyncStreamWriter, proc newBoundedStreamWriter*[T](wsource: AsyncStreamWriter,
boundSize: uint64, boundSize: int,
queueSize = AsyncStreamDefaultQueueSize, queueSize = AsyncStreamDefaultQueueSize,
udata: ref T): BoundedStreamWriter = udata: ref T): BoundedStreamWriter =
var res = BoundedStreamWriter(boundSize: boundSize) var res = BoundedStreamWriter(boundSize: boundSize)
@ -180,7 +272,7 @@ proc newBoundedStreamWriter*[T](wsource: AsyncStreamWriter,
res res
proc newBoundedStreamWriter*(wsource: AsyncStreamWriter, proc newBoundedStreamWriter*(wsource: AsyncStreamWriter,
boundSize: uint64, boundSize: int,
queueSize = AsyncStreamDefaultQueueSize, queueSize = AsyncStreamDefaultQueueSize,
): BoundedStreamWriter = ): BoundedStreamWriter =
var res = BoundedStreamWriter(boundSize: boundSize) var res = BoundedStreamWriter(boundSize: boundSize)

View File

@ -677,10 +677,93 @@ suite "BoundedStream test suite":
for i in 0 ..< len(result): for i in 0 ..< len(result):
result[i] = byte(message[i mod len(message)]) result[i] = byte(message[i mod len(message)])
for item in [100'u64, 60000'u64]:
for item in [100, 60000]:
proc boundaryTest(address: TransportAddress, test: int, size: int,
boundary: seq[byte]): Future[bool] {.async.} =
var message = createBigMessage(size)
var clientRes = false
proc processClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
if test == 0:
await wstream.write(message)
await wstream.write(boundary)
await wstream.finish()
await wstream.closeWait()
clientRes = true
elif test == 1:
await wstream.write(message)
await wstream.write(boundary)
await wstream.write(message)
await wstream.finish()
await wstream.closeWait()
clientRes = true
elif test == 2:
var ncmessage = message
ncmessage.setLen(len(message) - 2)
await wstream.write(ncmessage)
await wstream.write(@[0x2D'u8, 0x2D'u8])
await wstream.finish()
await wstream.closeWait()
clientRes = true
elif test == 3:
var ncmessage = message
ncmessage.setLen(len(message) - 2)
await wstream.write(ncmessage)
await wstream.finish()
await wstream.closeWait()
clientRes = true
await transp.closeWait()
server.stop()
server.close()
var res = false
var server = createStreamServer(address, processClient,
flags = {ReuseAddr})
server.start()
var conn = await connect(address)
var rstream = newAsyncStreamReader(conn)
if test == 0:
var rbstream = newBoundedStreamReader(rstream, -1, boundary)
let response = await rbstream.read()
if response == message:
res = true
await rbstream.closeWait()
elif test == 1:
var rbstream = newBoundedStreamReader(rstream, -1, boundary)
let response1 = await rbstream.read()
await rbstream.closeWait()
let response2 = await rstream.read()
if (response1 == message) and (response2 == message):
res = true
elif test == 2:
var expectMessage = message
expectMessage[^2] = 0x2D'u8
expectMessage[^1] = 0x2D'u8
var rbstream = newBoundedStreamReader(rstream, size, boundary)
let response = await rbstream.read()
await rbstream.closeWait()
if (len(response) == size) and response == expectMessage:
res = true
elif test == 3:
var rbstream = newBoundedStreamReader(rstream, -1, boundary)
try:
let response {.used.} = await rbstream.read()
except BoundedStreamIncompleteError:
res = true
await rbstream.closeWait()
await rstream.closeWait()
await conn.closeWait()
await server.join()
return (res and clientRes)
proc boundedTest(address: TransportAddress, test: int, proc boundedTest(address: TransportAddress, test: int,
size: uint64): Future[bool] {.async.} = size: int): Future[bool] {.async.} =
var clientRes = false var clientRes = false
var res = false var res = false
@ -749,9 +832,28 @@ suite "BoundedStream test suite":
return (res and clientRes) return (res and clientRes)
let address = initTAddress("127.0.0.1:48030") let address = initTAddress("127.0.0.1:48030")
test "BoundedStream reading/writing test [" & $item & "]": test "BoundedStream(size) reading/writing test [" & $item & "]":
check waitFor(boundedTest(address, 0, item)) == true check waitFor(boundedTest(address, 0, item)) == true
test "BoundedStream overflow test [" & $item & "]": test "BoundedStream(size) overflow test [" & $item & "]":
check waitFor(boundedTest(address, 1, item)) == true check waitFor(boundedTest(address, 1, item)) == true
test "BoundedStream incomplete test [" & $item & "]": test "BoundedStream(size) incomplete test [" & $item & "]":
check waitFor(boundedTest(address, 2, item)) == true check waitFor(boundedTest(address, 2, item)) == true
test "BoundedStream(boundary) reading test [" & $item & "]":
check waitFor(boundaryTest(address, 0, item,
@[0x2D'u8, 0x2D'u8, 0x2D'u8]))
test "BoundedStream(boundary) double message test [" & $item & "]":
check waitFor(boundaryTest(address, 1, item,
@[0x2D'u8, 0x2D'u8, 0x2D'u8]))
test "BoundedStream(size+boundary) reading size-bound test [" & $item & "]":
check waitFor(boundaryTest(address, 2, item,
@[0x2D'u8, 0x2D'u8, 0x2D'u8]))
test "BoundedStream(boundary) reading incomplete test [" & $item & "]":
check waitFor(boundaryTest(address, 3, item,
@[0x2D'u8, 0x2D'u8, 0x2D'u8]))
test "BoundedStream leaks test":
check:
getTracker("async.stream.reader").isLeaked() == false
getTracker("async.stream.writer").isLeaked() == false
getTracker("stream.server").isLeaked() == false
getTracker("stream.transport").isLeaked() == false