This commit is contained in:
cheatfate 2020-02-12 22:54:05 +02:00
parent e3ced62d4b
commit 80351cb928
No known key found for this signature in database
GPG Key ID: 46ADD633A7201F95
2 changed files with 56 additions and 5 deletions

View File

@ -998,7 +998,7 @@ else:
if res >= 0:
if vector.buflen - res == 0:
if not(vector.writer.finished()):
vector.writer.complete(vector.buflen)
vector.writer.complete(vector.size)
else:
vector.shiftVectorBuffer(res)
transp.queue.addFirst(vector)
@ -1062,7 +1062,7 @@ else:
if res >= 0:
if vector.buflen - res == 0:
if not(vector.writer.finished()):
vector.writer.complete(vector.buflen)
vector.writer.complete(vector.size)
else:
vector.shiftVectorBuffer(res)
transp.queue.addFirst(vector)
@ -1612,7 +1612,7 @@ proc write*(transp: StreamTransport, pbytes: pointer,
transp.checkClosed(retFuture)
transp.checkWriteEof(retFuture)
var vector = StreamVector(kind: DataBuffer, writer: retFuture,
buf: pbytes, buflen: nbytes)
buf: pbytes, buflen: nbytes, size: nbytes)
transp.queue.addLast(vector)
if WritePaused in transp.state:
transp.resumeWrite()
@ -1630,7 +1630,8 @@ proc write*(transp: StreamTransport, msg: string, msglen = -1): Future[int] =
let length = if msglen <= 0: len(msg) else: msglen
var vector = StreamVector(kind: DataBuffer,
writer: cast[Future[int]](retFuture),
buf: addr retFuture.gcholder[0], buflen: length)
buf: addr retFuture.gcholder[0], buflen: length,
size: length)
transp.queue.addLast(vector)
if WritePaused in transp.state:
transp.resumeWrite()
@ -1649,7 +1650,7 @@ proc write*[T](transp: StreamTransport, msg: seq[T], msglen = -1): Future[int] =
var vector = StreamVector(kind: DataBuffer,
writer: cast[Future[int]](retFuture),
buf: addr retFuture.gcholder[0],
buflen: length)
buflen: length, size: length)
transp.queue.addLast(vector)
if WritePaused in transp.state:
transp.resumeWrite()

View File

@ -59,6 +59,12 @@ suite "Stream Transport test suite":
]
var prefixes = ["[IP] ", "[UNIX] "]
proc createBigMessage(size: int): seq[byte] =
var message = "MESSAGE"
result = newSeq[byte](size)
for i in 0 ..< len(result):
result[i] = byte(message[i mod len(message)])
proc serveClient1(server: StreamServer, transp: StreamTransport) {.async.} =
while not transp.atEof():
var data = await transp.readLine()
@ -746,6 +752,48 @@ suite "Stream Transport test suite":
server.stop()
server.close()
proc testWriteReturn(address: TransportAddress): Future[bool] {.async.} =
var bigMessageSize = 10 * 1024 * 1024 - 1
var finishMessage = "DONE"
var cdata = newSeqOfCap[byte](bigMessageSize)
proc serveClient(server: StreamServer, transp: StreamTransport) {.async.} =
cdata = await transp.read(bigMessageSize)
var size = await transp.write(finishMessage)
doAssert(size == len(finishMessage))
await transp.closeWait()
server.stop()
server.close()
var flag = false
var server = createStreamServer(address, serveClient, {ReuseAddr})
server.start()
var transp: StreamTransport
try:
transp = await connect(address)
flag = true
except:
server.stop()
server.close()
await server.join()
if flag:
flag = false
try:
var msg = createBigMessage(bigMessageSize)
var size = await transp.write(msg)
var data = await transp.read()
doAssert(cdata == msg)
doAssert(len(data) == len(finishMessage))
doAssert(equalMem(addr data[0], addr finishMessage[0], len(data)))
flag = (size == bigMessageSize)
finally:
await transp.closeWait()
await server.join()
result = flag
for i in 0..<len(addresses):
test prefixes[i] & "close(transport) test":
check waitFor(testCloseTransport(addresses[i])) == 1
@ -795,6 +843,8 @@ suite "Stream Transport test suite":
check waitFor(testAnyAddress()) == true
else:
skip()
test prefixes[i] & "write() return value test (issue #73)":
check waitFor(testWriteReturn(addresses[i])) == true
test "Servers leak test":
check getTracker("stream.server").isLeaked() == false