# Chronos Test Suite # (c) Copyright 2018-Present # Status Research & Development GmbH # # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) import strutils, unittest, os import ../chronos when defined(nimHasUsed): {.used.} when defined(windows): import winlean else: import posix suite "Stream Transport test suite": const ConstantMessage = "SOMEDATA" BigMessagePattern = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" FilesTestName = "tests/teststream.nim" BigMessageCount = 100 ClientsCount = 5 MessagesCount = 10 MessageSize = 20 FilesCount = 10 m1 = "readLine() multiple clients with messages (" & $ClientsCount & " clients x " & $MessagesCount & " messages)" m2 = "readExactly() multiple clients with messages (" & $ClientsCount & " clients x " & $MessagesCount & " messages)" m3 = "readUntil() multiple clients with messages (" & $ClientsCount & " clients x " & $MessagesCount & " messages)" m4 = "writeFile() multiple clients (" & $FilesCount & " files)" m5 = "write(string)/read(int) multiple clients (" & $ClientsCount & " clients x " & $MessagesCount & " messages)" m6 = "write(seq[byte])/consume(int)/read(int) multiple clients (" & $ClientsCount & " clients x " & $MessagesCount & " messages)" m7 = "readLine() buffer overflow test" m8 = "readUntil() buffer overflow test" m11 = "readExactly() unexpected disconnect test" m12 = "readUntil() unexpected disconnect test" m13 = "readLine() unexpected disconnect empty string test" m14 = "Closing socket while operation pending test (issue #8)" m15 = "Connection refused test" m16 = "readOnce() read until atEof() test" m17 = "0.0.0.0/::0 (INADDR_ANY) test" when defined(windows): var addresses = [ initTAddress("127.0.0.1:33335"), initTAddress(r"/LOCAL\testpipe") ] else: var addresses = [ initTAddress("127.0.0.1:33335"), initTAddress(r"/tmp/testpipe") ] var prefixes = ["[IP] ", "[UNIX] "] proc createBigMessage(size: int): seq[byte] = var message = "MESSAGE" result = newSeq[byte](size) for i in 0 ..< len(result): result[i] = byte(message[i mod len(message)]) proc serveClient1(server: StreamServer, transp: StreamTransport) {.async.} = while not transp.atEof(): var data = await transp.readLine() if len(data) == 0: doAssert(transp.atEof()) break doAssert(data.startsWith("REQUEST")) var numstr = data[7..^1] var num = parseInt(numstr) var ans = "ANSWER" & $num & "\r\n" var res = await transp.write(cast[pointer](addr ans[0]), len(ans)) doAssert(res == len(ans)) transp.close() await transp.join() proc serveClient2(server: StreamServer, transp: StreamTransport) {.async.} = var buffer: array[20, char] var check = "REQUEST" while not transp.atEof(): zeroMem(addr buffer[0], MessageSize) try: await transp.readExactly(addr buffer[0], MessageSize) except TransportIncompleteError: break doAssert(equalMem(addr buffer[0], addr check[0], len(check))) var numstr = "" var i = 7 while i < MessageSize and (buffer[i] in {'0'..'9'}): numstr.add(buffer[i]) inc(i) var num = parseInt(numstr) var ans = "ANSWER" & $num zeroMem(addr buffer[0], MessageSize) copyMem(addr buffer[0], addr ans[0], len(ans)) var res = await transp.write(cast[pointer](addr buffer[0]), MessageSize) doAssert(res == MessageSize) transp.close() await transp.join() proc serveClient3(server: StreamServer, transp: StreamTransport) {.async.} = var buffer: array[20, char] var check = "REQUEST" var suffixStr = "SUFFIX" var suffix = newSeq[byte](6) copyMem(addr suffix[0], addr suffixStr[0], len(suffixStr)) var counter = MessagesCount while counter > 0: zeroMem(addr buffer[0], MessageSize) var res = await transp.readUntil(addr buffer[0], MessageSize, suffix) doAssert(equalMem(addr buffer[0], addr check[0], len(check))) var numstr = "" var i = 7 while i < MessageSize and (buffer[i] in {'0'..'9'}): numstr.add(buffer[i]) inc(i) var num = parseInt(numstr) doAssert(len(numstr) < 8) var ans = "ANSWER" & $num & "SUFFIX" zeroMem(addr buffer[0], MessageSize) copyMem(addr buffer[0], addr ans[0], len(ans)) res = await transp.write(cast[pointer](addr buffer[0]), len(ans)) doAssert(res == len(ans)) dec(counter) transp.close() await transp.join() proc serveClient4(server: StreamServer, transp: StreamTransport) {.async.} = var pathname = await transp.readLine() var size = await transp.readLine() var sizeNum = parseInt(size) doAssert(sizeNum >= 0) var rbuffer = newSeq[byte](sizeNum) await transp.readExactly(addr rbuffer[0], sizeNum) var lbuffer = readFile(pathname) doAssert(len(lbuffer) == sizeNum) doAssert(equalMem(addr rbuffer[0], addr lbuffer[0], sizeNum)) var answer = "OK\r\n" var res = await transp.write(cast[pointer](addr answer[0]), len(answer)) doAssert(res == len(answer)) transp.close() await transp.join() proc serveClient7(server: StreamServer, transp: StreamTransport) {.async.} = var answer = "DONE\r\n" var expect = "" var line = await transp.readLine() doAssert(len(line) == BigMessageCount * len(BigMessagePattern)) for i in 0.. 0) name = name & "\r\n" var res = await transp.write(cast[pointer](addr name[0]), len(name)) doAssert(res == len(name)) ssize = $size & "\r\n" res = await transp.write(cast[pointer](addr ssize[0]), len(ssize)) doAssert(res == len(ssize)) var checksize = await transp.writeFile(handle, 0'u, size) doAssert(checksize == size) close(fhandle) var ans = await transp.readLine() doAssert(ans == "OK") result = 1 transp.close() await transp.join() proc swarmWorker7(address: TransportAddress): Future[int] {.async.} = var transp = await connect(address) var data = BigMessagePattern var crlf = "\r\n" for i in 0.. buffer.len: buffer.setLen(prevLen + readLength) let bytesRead = await transp.readOnce(addr buffer[prevLen], readLength) inc(prevLen, bytesRead) buffer.setLen(prevLen) doAssert(buffer == BigMessagePattern) result = 1 transp.close() await transp.join() proc test16(address: TransportAddress): Future[int] {.async.} = var server = createStreamServer(address, serveClient16, {ReuseAddr}) server.start() result = await swarmWorker16(address) server.stop() server.close() await server.join() proc testCloseTransport(address: TransportAddress): Future[int] {.async.} = proc client(server: StreamServer, transp: StreamTransport) {.async.} = discard var server = createStreamServer(address, client, {ReuseAddr}) server.start() server.stop server.close() try: await wait(server.join(), 10.seconds) result = 1 except: discard proc testWriteConnReset(address: TransportAddress): Future[int] {.async.} = var syncFut = newFuture[void]() proc client(server: StreamServer, transp: StreamTransport) {.async.} = await transp.closeWait() syncFut.complete() var n = 10 var server = createStreamServer(address, client, {ReuseAddr}) server.start() var msg = "HELLO" var ntransp = await connect(address) await syncFut while true: var res = await ntransp.write(msg) if res == 0: result = 1 break else: dec(n) if n == 0: break server.stop() await ntransp.closeWait() await server.closeWait() proc testAnyAddress(): Future[bool] {.async.} = var serverRemote, serverLocal: TransportAddress var connRemote, connLocal: TransportAddress proc serveClient(server: StreamServer, transp: StreamTransport) {.async.} = serverRemote = transp.remoteAddress() serverLocal = transp.localAddress() await transp.closeWait() server.stop() server.close() var ta = initTAddress("0.0.0.0:0") var server = createStreamServer(ta, serveClient, {ReuseAddr}) var la = server.localAddress() server.start() var connFut = connect(la) if await withTimeout(connFut, 5.seconds): var conn = connFut.read() connRemote = conn.remoteAddress() connLocal = conn.localAddress() await server.join() await conn.closeWait() result = (connRemote == serverLocal) and (connLocal == serverRemote) else: server.stop() server.close() proc testWriteReturn(address: TransportAddress): Future[bool] {.async.} = var bigMessageSize = 10 * 1024 * 1024 - 1 var finishMessage = "DONE" var cdata = newSeqOfCap[byte](bigMessageSize) proc serveClient(server: StreamServer, transp: StreamTransport) {.async.} = cdata = await transp.read(bigMessageSize) var size = await transp.write(finishMessage) doAssert(size == len(finishMessage)) await transp.closeWait() server.stop() server.close() var flag = false var server = createStreamServer(address, serveClient, {ReuseAddr}) server.start() var transp: StreamTransport try: transp = await connect(address) flag = true except: server.stop() server.close() await server.join() if flag: flag = false try: var msg = createBigMessage(bigMessageSize) var size = await transp.write(msg) var data = await transp.read() doAssert(cdata == msg) doAssert(len(data) == len(finishMessage)) doAssert(equalMem(addr data[0], addr finishMessage[0], len(data))) flag = (size == bigMessageSize) finally: await transp.closeWait() await server.join() result = flag for i in 0..