diff --git a/chronos/streams/asyncstream.nim b/chronos/streams/asyncstream.nim index 05b48cee..5cffeb6f 100644 --- a/chronos/streams/asyncstream.nim +++ b/chronos/streams/asyncstream.nim @@ -151,16 +151,22 @@ proc copyData*(sb: AsyncBuffer, dest: pointer, offset, length: int) {.inline.} = proc upload*(sb: ptr AsyncBuffer, pbytes: ptr byte, nbytes: int): Future[void] {.async.} = + ## You can upload any amount of bytes to the buffer. If size of internal + ## buffer is not enough to fit all the data at once, data will be uploaded + ## via chunks of size up to internal buffer size. var length = nbytes + var srcBuffer = cast[ptr UncheckedArray[byte]](pbytes) + var srcOffset = 0 while length > 0: let size = min(length, sb[].bufferLen()) if size == 0: # Internal buffer is full, we need to transfer data to consumer. await sb[].transfer() - continue else: - copyMem(addr sb[].buffer[sb.offset], pbytes, size) + # Copy data from `pbytes` to internal buffer. + copyMem(addr sb[].buffer[sb.offset], addr srcBuffer[srcOffset], size) sb[].offset = sb[].offset + size + srcOffset = srcOffset + size length = length - size # We notify consumers that new data is available. sb[].forget() diff --git a/tests/testasyncstream.nim b/tests/testasyncstream.nim index 39abbe45..adc781f6 100644 --- a/tests/testasyncstream.nim +++ b/tests/testasyncstream.nim @@ -72,6 +72,12 @@ N8r5CwGcIX/XPC3lKazzbZ8baA== -----END CERTIFICATE----- """ +proc createBigMessage(message: string, size: int): seq[byte] = + var res = newSeq[byte](size) + for i in 0 ..< len(res): + res[i] = byte(ord(message[i mod len(message)])) + res + suite "AsyncStream test suite": test "AsyncStream(StreamTransport) readExactly() test": proc testReadExactly(address: TransportAddress): Future[bool] {.async.} = @@ -485,12 +491,6 @@ suite "AsyncStream test suite": getTracker("stream.transport").isLeaked() == false suite "ChunkedStream test suite": - proc createBigMessage(message: string, size: int): string = - var res = newString(size) - for i in 0 ..< len(res): - res[i] = chr(ord(message[i mod len(message)])) - res - test "ChunkedStream test vectors": const ChunkedVectors = [ ["4\r\nWiki\r\n5\r\npedia\r\nE\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n", @@ -650,12 +650,11 @@ suite "ChunkedStream test suite": test "ChunkedStream too big chunk header test": proc checkTooBigChunkHeader(address: TransportAddress, - inputstr: string): Future[bool] {.async.} = + inputstr: seq[byte]): Future[bool] {.async.} = proc serveClient(server: StreamServer, transp: StreamTransport) {.async.} = var wstream = newAsyncStreamWriter(transp) - var data = inputstr - await wstream.write(data) + await wstream.write(inputstr) await wstream.finish() await wstream.closeWait() await transp.closeWait() @@ -669,7 +668,7 @@ suite "ChunkedStream test suite": var rstream2 = newChunkedStreamReader(rstream) let res = try: - var data {.used.} = await rstream2.read() + var datares {.used.} = await rstream2.read() false except ChunkedStreamProtocolError: true @@ -689,7 +688,8 @@ suite "ChunkedStream test suite": test "ChunkedStream read/write test": proc checkVector(address: TransportAddress, - inputstr: string, chsize: int): Future[string] {.async.} = + inputstr: seq[byte], + chunkSize: int): Future[seq[byte]] {.async.} = proc serveClient(server: StreamServer, transp: StreamTransport) {.async.} = var wstream = newAsyncStreamWriter(transp) @@ -699,7 +699,7 @@ suite "ChunkedStream test suite": while true: if len(data) == offset: break - let toWrite = min(chsize, len(data) - offset) + let toWrite = min(chunkSize, len(data) - offset) await wstream2.write(addr data[offset], toWrite) offset = offset + toWrite await wstream2.finish() @@ -715,12 +715,11 @@ suite "ChunkedStream test suite": var rstream = newAsyncStreamReader(transp) var rstream2 = newChunkedStreamReader(rstream) var res = await rstream2.read() - var ress = cast[string](res) await rstream2.closeWait() await rstream.closeWait() await transp.closeWait() await server.join() - result = ress + return res proc testBigData(address: TransportAddress, datasize: int, chunksize: int): Future[bool] {.async.} = @@ -733,6 +732,60 @@ suite "ChunkedStream test suite": check waitFor(testBigData(address, 262400, 4096)) == true check waitFor(testBigData(address, 767309, 4457)) == true + test "ChunkedStream read small chunks test": + proc checkVector(address: TransportAddress, + inputstr: seq[byte], + writeChunkSize: int, + readChunkSize: int): Future[seq[byte]] {.async.} = + proc serveClient(server: StreamServer, + transp: StreamTransport) {.async.} = + var wstream = newAsyncStreamWriter(transp) + var wstream2 = newChunkedStreamWriter(wstream) + var data = inputstr + var offset = 0 + while true: + if len(data) == offset: + break + let toWrite = min(writeChunkSize, len(data) - offset) + await wstream2.write(addr data[offset], toWrite) + offset = offset + toWrite + await wstream2.finish() + await wstream2.closeWait() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + + var server = createStreamServer(address, serveClient, {ReuseAddr}) + server.start() + var transp = await connect(address) + var rstream = newAsyncStreamReader(transp) + var rstream2 = newChunkedStreamReader(rstream) + var res: seq[byte] + while not(rstream2.atEof()): + var chunk = await rstream2.read(readChunkSize) + res.add(chunk) + await rstream2.closeWait() + await rstream.closeWait() + await transp.closeWait() + await server.join() + return res + + proc testSmallChunk(address: TransportAddress, + datasize: int, + writeChunkSize: int, + readChunkSize: int): Future[bool] {.async.} = + var data = createBigMessage("REQUESTSTREAMMESSAGE", datasize) + var check = await checkVector(address, data, writeChunkSize, + readChunkSize) + return (data == check) + + let address = initTAddress("127.0.0.1:46001") + check waitFor(testSmallChunk(address, 4457, 128, 1)) == true + check waitFor(testSmallChunk(address, 65600, 1024, 17)) == true + check waitFor(testSmallChunk(address, 262400, 4096, 61)) == true + check waitFor(testSmallChunk(address, 767309, 4457, 173)) == true + test "ChunkedStream leaks test": check: getTracker("async.stream.reader").isLeaked() == false @@ -835,20 +888,13 @@ suite "BoundedStream test suite": BoundaryRead, BoundaryDouble, BoundarySize, BoundaryIncomplete, BoundaryEmpty - proc createBigMessage(size: int): seq[byte] = - var message = "ABCDEFGHIJKLMNOP" - var res = newSeq[byte](size) - for i in 0 ..< len(result): - res[i] = byte(message[i mod len(message)]) - res - for itemComp in [BoundCmp.Equal, BoundCmp.LessOrEqual]: for itemSize in [100, 60000]: proc boundaryTest(address: TransportAddress, btest: BoundaryBytesTest, size: int, boundary: seq[byte], cmp: BoundCmp): Future[bool] {.async.} = - var message = createBigMessage(size) + var message = createBigMessage("ABCDEFGHIJKLMNOP", size) var clientRes = false proc processClient(server: StreamServer, @@ -946,7 +992,8 @@ suite "BoundedStream test suite": var clientRes = false var res = false - let messagePart = createBigMessage(int(itemSize) div 10) + let messagePart = createBigMessage("ABCDEFGHIJKLMNOP", + int(itemSize) div 10) var message: seq[byte] for i in 0 ..< 10: message.add(messagePart) @@ -1098,6 +1145,62 @@ suite "BoundedStream test suite": check waitFor(boundaryTest(address, BoundaryEmpty, itemSize, @[0x2D'u8, 0x2D'u8, 0x2D'u8], itemComp)) + test "BoundedStream read small chunks test": + proc checkVector(address: TransportAddress, + inputstr: seq[byte], + writeChunkSize: int, + readChunkSize: int): Future[seq[byte]] {.async.} = + proc serveClient(server: StreamServer, + transp: StreamTransport) {.async.} = + var wstream = newAsyncStreamWriter(transp) + var wstream2 = newBoundedStreamWriter(wstream, len(inputstr)) + var data = inputstr + var offset = 0 + while true: + if len(data) == offset: + break + let toWrite = min(writeChunkSize, len(data) - offset) + await wstream2.write(addr data[offset], toWrite) + offset = offset + toWrite + await wstream2.finish() + await wstream2.closeWait() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + + var server = createStreamServer(address, serveClient, {ReuseAddr}) + server.start() + var transp = await connect(address) + var rstream = newAsyncStreamReader(transp) + var rstream2 = newBoundedStreamReader(rstream, 1048576, + comparison = BoundCmp.LessOrEqual) + var res: seq[byte] + while not(rstream2.atEof()): + var chunk = await rstream2.read(readChunkSize) + res.add(chunk) + await rstream2.closeWait() + await rstream.closeWait() + await transp.closeWait() + await server.join() + return res + + proc testSmallChunk(address: TransportAddress, + datasize: int, + writeChunkSize: int, + readChunkSize: int): Future[bool] {.async.} = + var data = createBigMessage("0123456789ABCDEFGHI", datasize) + var check = await checkVector(address, data, writeChunkSize, + readChunkSize) + return (data == check) + + let address = initTAddress("127.0.0.1:46001") + check waitFor(testSmallChunk(address, 4457, 128, 1)) == true + check waitFor(testSmallChunk(address, 65600, 1024, 17)) == true + check waitFor(testSmallChunk(address, 262400, 4096, 61)) == true + check waitFor(testSmallChunk(address, 767309, 4457, 173)) == true + + test "BoundedStream leaks test": check: getTracker("async.stream.reader").isLeaked() == false