Fix upload() issue and adding tests. (#179)

This commit is contained in:
Eugene Kabanov 2021-04-24 20:32:21 +03:00 committed by GitHub
parent 7389cfc60b
commit 833d968782
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 134 additions and 25 deletions

View File

@ -151,16 +151,22 @@ proc copyData*(sb: AsyncBuffer, dest: pointer, offset, length: int) {.inline.} =
proc upload*(sb: ptr AsyncBuffer, pbytes: ptr byte,
nbytes: int): Future[void] {.async.} =
## You can upload any amount of bytes to the buffer. If size of internal
## buffer is not enough to fit all the data at once, data will be uploaded
## via chunks of size up to internal buffer size.
var length = nbytes
var srcBuffer = cast[ptr UncheckedArray[byte]](pbytes)
var srcOffset = 0
while length > 0:
let size = min(length, sb[].bufferLen())
if size == 0:
# Internal buffer is full, we need to transfer data to consumer.
await sb[].transfer()
continue
else:
copyMem(addr sb[].buffer[sb.offset], pbytes, size)
# Copy data from `pbytes` to internal buffer.
copyMem(addr sb[].buffer[sb.offset], addr srcBuffer[srcOffset], size)
sb[].offset = sb[].offset + size
srcOffset = srcOffset + size
length = length - size
# We notify consumers that new data is available.
sb[].forget()

View File

@ -72,6 +72,12 @@ N8r5CwGcIX/XPC3lKazzbZ8baA==
-----END CERTIFICATE-----
"""
proc createBigMessage(message: string, size: int): seq[byte] =
var res = newSeq[byte](size)
for i in 0 ..< len(res):
res[i] = byte(ord(message[i mod len(message)]))
res
suite "AsyncStream test suite":
test "AsyncStream(StreamTransport) readExactly() test":
proc testReadExactly(address: TransportAddress): Future[bool] {.async.} =
@ -485,12 +491,6 @@ suite "AsyncStream test suite":
getTracker("stream.transport").isLeaked() == false
suite "ChunkedStream test suite":
proc createBigMessage(message: string, size: int): string =
var res = newString(size)
for i in 0 ..< len(res):
res[i] = chr(ord(message[i mod len(message)]))
res
test "ChunkedStream test vectors":
const ChunkedVectors = [
["4\r\nWiki\r\n5\r\npedia\r\nE\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n",
@ -650,12 +650,11 @@ suite "ChunkedStream test suite":
test "ChunkedStream too big chunk header test":
proc checkTooBigChunkHeader(address: TransportAddress,
inputstr: string): Future[bool] {.async.} =
inputstr: seq[byte]): Future[bool] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
var data = inputstr
await wstream.write(data)
await wstream.write(inputstr)
await wstream.finish()
await wstream.closeWait()
await transp.closeWait()
@ -669,7 +668,7 @@ suite "ChunkedStream test suite":
var rstream2 = newChunkedStreamReader(rstream)
let res =
try:
var data {.used.} = await rstream2.read()
var datares {.used.} = await rstream2.read()
false
except ChunkedStreamProtocolError:
true
@ -689,7 +688,8 @@ suite "ChunkedStream test suite":
test "ChunkedStream read/write test":
proc checkVector(address: TransportAddress,
inputstr: string, chsize: int): Future[string] {.async.} =
inputstr: seq[byte],
chunkSize: int): Future[seq[byte]] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
@ -699,7 +699,7 @@ suite "ChunkedStream test suite":
while true:
if len(data) == offset:
break
let toWrite = min(chsize, len(data) - offset)
let toWrite = min(chunkSize, len(data) - offset)
await wstream2.write(addr data[offset], toWrite)
offset = offset + toWrite
await wstream2.finish()
@ -715,12 +715,11 @@ suite "ChunkedStream test suite":
var rstream = newAsyncStreamReader(transp)
var rstream2 = newChunkedStreamReader(rstream)
var res = await rstream2.read()
var ress = cast[string](res)
await rstream2.closeWait()
await rstream.closeWait()
await transp.closeWait()
await server.join()
result = ress
return res
proc testBigData(address: TransportAddress,
datasize: int, chunksize: int): Future[bool] {.async.} =
@ -733,6 +732,60 @@ suite "ChunkedStream test suite":
check waitFor(testBigData(address, 262400, 4096)) == true
check waitFor(testBigData(address, 767309, 4457)) == true
test "ChunkedStream read small chunks test":
proc checkVector(address: TransportAddress,
inputstr: seq[byte],
writeChunkSize: int,
readChunkSize: int): Future[seq[byte]] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newChunkedStreamWriter(wstream)
var data = inputstr
var offset = 0
while true:
if len(data) == offset:
break
let toWrite = min(writeChunkSize, len(data) - offset)
await wstream2.write(addr data[offset], toWrite)
offset = offset + toWrite
await wstream2.finish()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
var server = createStreamServer(address, serveClient, {ReuseAddr})
server.start()
var transp = await connect(address)
var rstream = newAsyncStreamReader(transp)
var rstream2 = newChunkedStreamReader(rstream)
var res: seq[byte]
while not(rstream2.atEof()):
var chunk = await rstream2.read(readChunkSize)
res.add(chunk)
await rstream2.closeWait()
await rstream.closeWait()
await transp.closeWait()
await server.join()
return res
proc testSmallChunk(address: TransportAddress,
datasize: int,
writeChunkSize: int,
readChunkSize: int): Future[bool] {.async.} =
var data = createBigMessage("REQUESTSTREAMMESSAGE", datasize)
var check = await checkVector(address, data, writeChunkSize,
readChunkSize)
return (data == check)
let address = initTAddress("127.0.0.1:46001")
check waitFor(testSmallChunk(address, 4457, 128, 1)) == true
check waitFor(testSmallChunk(address, 65600, 1024, 17)) == true
check waitFor(testSmallChunk(address, 262400, 4096, 61)) == true
check waitFor(testSmallChunk(address, 767309, 4457, 173)) == true
test "ChunkedStream leaks test":
check:
getTracker("async.stream.reader").isLeaked() == false
@ -835,20 +888,13 @@ suite "BoundedStream test suite":
BoundaryRead, BoundaryDouble, BoundarySize, BoundaryIncomplete,
BoundaryEmpty
proc createBigMessage(size: int): seq[byte] =
var message = "ABCDEFGHIJKLMNOP"
var res = newSeq[byte](size)
for i in 0 ..< len(result):
res[i] = byte(message[i mod len(message)])
res
for itemComp in [BoundCmp.Equal, BoundCmp.LessOrEqual]:
for itemSize in [100, 60000]:
proc boundaryTest(address: TransportAddress, btest: BoundaryBytesTest,
size: int, boundary: seq[byte],
cmp: BoundCmp): Future[bool] {.async.} =
var message = createBigMessage(size)
var message = createBigMessage("ABCDEFGHIJKLMNOP", size)
var clientRes = false
proc processClient(server: StreamServer,
@ -946,7 +992,8 @@ suite "BoundedStream test suite":
var clientRes = false
var res = false
let messagePart = createBigMessage(int(itemSize) div 10)
let messagePart = createBigMessage("ABCDEFGHIJKLMNOP",
int(itemSize) div 10)
var message: seq[byte]
for i in 0 ..< 10:
message.add(messagePart)
@ -1098,6 +1145,62 @@ suite "BoundedStream test suite":
check waitFor(boundaryTest(address, BoundaryEmpty, itemSize,
@[0x2D'u8, 0x2D'u8, 0x2D'u8], itemComp))
test "BoundedStream read small chunks test":
proc checkVector(address: TransportAddress,
inputstr: seq[byte],
writeChunkSize: int,
readChunkSize: int): Future[seq[byte]] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newBoundedStreamWriter(wstream, len(inputstr))
var data = inputstr
var offset = 0
while true:
if len(data) == offset:
break
let toWrite = min(writeChunkSize, len(data) - offset)
await wstream2.write(addr data[offset], toWrite)
offset = offset + toWrite
await wstream2.finish()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
var server = createStreamServer(address, serveClient, {ReuseAddr})
server.start()
var transp = await connect(address)
var rstream = newAsyncStreamReader(transp)
var rstream2 = newBoundedStreamReader(rstream, 1048576,
comparison = BoundCmp.LessOrEqual)
var res: seq[byte]
while not(rstream2.atEof()):
var chunk = await rstream2.read(readChunkSize)
res.add(chunk)
await rstream2.closeWait()
await rstream.closeWait()
await transp.closeWait()
await server.join()
return res
proc testSmallChunk(address: TransportAddress,
datasize: int,
writeChunkSize: int,
readChunkSize: int): Future[bool] {.async.} =
var data = createBigMessage("0123456789ABCDEFGHI", datasize)
var check = await checkVector(address, data, writeChunkSize,
readChunkSize)
return (data == check)
let address = initTAddress("127.0.0.1:46001")
check waitFor(testSmallChunk(address, 4457, 128, 1)) == true
check waitFor(testSmallChunk(address, 65600, 1024, 17)) == true
check waitFor(testSmallChunk(address, 262400, 4096, 61)) == true
check waitFor(testSmallChunk(address, 767309, 4457, 173)) == true
test "BoundedStream leaks test":
check:
getTracker("async.stream.reader").isLeaked() == false