diff --git a/chronos.nim b/chronos.nim index 3294631..53e39e6 100644 --- a/chronos.nim +++ b/chronos.nim @@ -5,7 +5,6 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) - import chronos/[asyncloop, asyncfutures2, asyncsync, handles, transport, - timer] + timer] export asyncloop, asyncfutures2, asyncsync, handles, transport, timer diff --git a/chronos.nimble b/chronos.nimble index 83cab10..1bec900 100644 --- a/chronos.nimble +++ b/chronos.nimble @@ -1,5 +1,5 @@ packageName = "chronos" -version = "2.2.3" +version = "2.2.4" author = "Status Research & Development GmbH" description = "Chronos" license = "Apache License 2.0 or MIT" @@ -10,43 +10,14 @@ skipDirs = @["tests"] requires "nim > 0.18.0" task test, "Run all tests": - - var testFiles = @[ - "testsync", - "testsoon", - "testtime", - "testfut", - "testsignal", - "testaddress", - "testdatagram", - "teststream", - "testserver", - "testbugs", + var commands = [ + "nim c -r -d:useSysAssert -d:useGcAssert tests/testall", + "nim c -r tests/testall", + "nim c -r -d:release tests/testall" ] - - var testCommands = @[ - "nim c -r -d:useSysAssert -d:useGcAssert", - "nim c -r", - "nim c -r -d:release" - ] - - var timerCommands = @[ - " -d:asyncTimer=system", - " -d:asyncTimer=mono" - ] - - for tfile in testFiles: - if tfile == "testtime": - for cmd in testCommands: - for def in timerCommands: - var commandLine = cmd & def & " tests/" & tfile - echo "\n" & commandLine - exec commandLine - rmFile("tests/" & tfile.toExe()) - else: - for cmd in testCommands: - var commandLine = cmd & " tests/" & tfile - echo "\n" & commandLine - exec commandLine - rmFile("tests/" & tfile.toExe()) - + echo "\n" & commands[0] + exec commands[0] + echo "\n" & commands[1] + exec commands[1] + echo "\n" & commands[2] + exec commands[2] diff --git a/chronos/asyncfutures2.nim b/chronos/asyncfutures2.nim index 06a61cb..978b08a 100644 --- a/chronos/asyncfutures2.nim +++ b/chronos/asyncfutures2.nim @@ -40,6 +40,14 @@ type Future*[T] = ref object of FutureBase ## Typed future. value: T ## Stored value + FutureStr*[T] = ref object of Future[T] + ## Future to hold GC strings + gcholder*: string + + FutureSeq*[A, B] = ref object of Future[A] + ## Future to hold GC seqs + gcholder*: seq[B] + FutureVar*[T] = distinct Future[T] FutureError* = object of Exception @@ -92,6 +100,22 @@ proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] = ## that this future belongs to, is a good habit as it helps with debugging. result = FutureVar[T](newFuture[T](fromProc)) +proc newFutureSeq*[A, B](fromProc = "unspecified"): FutureSeq[A, B] = + ## Create a new future which can hold/preserve GC string until future will + ## not be completed. + ## + ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## that this future belongs to, is a good habit as it helps with debugging. + setupFutureBase(fromProc) + +proc newFutureStr*[A](fromProc = "unspecified"): FutureStr[A] = + ## Create a new future which can hold/preserve GC seq[T] until future will + ## not be completed. + ## + ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## that this future belongs to, is a good habit as it helps with debugging. + setupFutureBase(fromProc) + proc clean*[T](future: FutureVar[T]) = ## Resets the ``finished`` status of ``future``. Future[T](future).finished = false diff --git a/chronos/transports/common.nim b/chronos/transports/common.nim index 2e0fda7..412f162 100644 --- a/chronos/transports/common.nim +++ b/chronos/transports/common.nim @@ -24,7 +24,7 @@ type ServerFlags* = enum ## Server's flags ReuseAddr, ReusePort, TcpNoDelay, NoAutoRead, GCUserData, FirstPipe, - NoPipeFlash + NoPipeFlash, Broadcast AddressFamily* {.pure.} = enum None, IPv4, IPv6, Unix @@ -55,14 +55,6 @@ type Running, # Server running Closed # Server closed - FutureGCString*[T] = ref object of Future[T] - ## Future to hold GC strings - gcholder*: string - - FutureGCSeq*[A, B] = ref object of Future[A] - ## Future to hold GC seqs - gcholder*: seq[B] - when defined(windows): type SocketServer* = ref object of RootRef @@ -510,6 +502,7 @@ when defined(windows): ERROR_BROKEN_PIPE* = 109 ERROR_PIPE_NOT_CONNECTED* = 233 ERROR_NO_DATA* = 232 + ERROR_CONNECTION_ABORTED* = 1236 proc cancelIo*(hFile: HANDLE): WINBOOL {.stdcall, dynlib: "kernel32", importc: "CancelIo".} diff --git a/chronos/transports/datagram.nim b/chronos/transports/datagram.nim index cb056f7..7e55fe9 100644 --- a/chronos/transports/datagram.nim +++ b/chronos/transports/datagram.nim @@ -141,8 +141,12 @@ when defined(windows): transp.buflen = bytesCount asyncCheck transp.function(transp, raddr) elif int(err) == ERROR_OPERATION_ABORTED: - # CancelIO() interrupt + # CancelIO() interrupt or closeSocket() call. transp.state.incl(ReadPaused) + if ReadClosed in transp.state: + # If `ReadClosed` present, then close(transport) was called. + transp.future.complete() + GC_unref(transp) break else: transp.setReadError(err) @@ -179,6 +183,12 @@ when defined(windows): transp.setReadError(err) transp.buflen = 0 asyncCheck transp.function(transp, raddr) + else: + # Transport closure happens in callback, and we not started new + # WSARecvFrom session. + if ReadClosed in transp.state: + if not transp.future.finished: + transp.future.complete() break proc resumeRead(transp: DatagramTransport) {.inline.} = @@ -450,11 +460,8 @@ proc close*(transp: DatagramTransport) = ## Closes and frees resources of transport ``transp``. when defined(windows): if {ReadClosed, WriteClosed} * transp.state == {}: - discard cancelIo(Handle(transp.fd)) - closeSocket(transp.fd) transp.state.incl({WriteClosed, ReadClosed}) - transp.future.complete() - GC_unref(transp) + closeSocket(transp.fd) else: proc continuation(udata: pointer) = transp.future.complete() @@ -539,7 +546,7 @@ proc newDatagramTransport6*[T](cbproc: DatagramCallback, proc join*(transp: DatagramTransport): Future[void] = ## Wait until the transport ``transp`` will be closed. - var retFuture = newFuture[void]("datagramtransport.join") + var retFuture = newFuture[void]("datagram.transport.join") proc continuation(udata: pointer) = retFuture.complete() if not transp.future.finished: transp.future.addCallback(continuation) @@ -556,7 +563,7 @@ proc send*(transp: DatagramTransport, pbytes: pointer, nbytes: int): Future[void] = ## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport ## ``transp`` to remote destination address which was bounded on transport. - var retFuture = newFuture[void]() + var retFuture = newFuture[void]("datagram.transport.send(pointer)") transp.checkClosed(retFuture) if transp.remote.port == Port(0): retFuture.fail(newException(TransportError, "Remote peer not set!")) @@ -571,7 +578,7 @@ proc send*(transp: DatagramTransport, pbytes: pointer, proc send*(transp: DatagramTransport, msg: string, msglen = -1): Future[void] = ## Send string ``msg`` using transport ``transp`` to remote destination ## address which was bounded on transport. - var retFuture = FutureGCString[void]() + var retFuture = newFutureStr[void]("datagram.transport.send(string)") transp.checkClosed(retFuture) if not isLiteral(msg): shallowCopy(retFuture.gcholder, msg) @@ -590,7 +597,7 @@ proc send*[T](transp: DatagramTransport, msg: seq[T], msglen = -1): Future[void] = ## Send string ``msg`` using transport ``transp`` to remote destination ## address which was bounded on transport. - var retFuture = FutureGCSeq[void, T]() + var retFuture = newFutureSeq[void, T]("datagram.transport.send(seq)") transp.checkClosed(retFuture) if not isLiteral(msg): shallowCopy(retFuture.gcholder, msg) @@ -609,7 +616,7 @@ proc sendTo*(transp: DatagramTransport, remote: TransportAddress, pbytes: pointer, nbytes: int): Future[void] = ## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport ## ``transp`` to remote destination address ``remote``. - var retFuture = newFuture[void]() + var retFuture = newFuture[void]("datagram.transport.sendTo(pointer)") transp.checkClosed(retFuture) let vector = GramVector(kind: WithAddress, buf: pbytes, buflen: nbytes, writer: retFuture, address: remote) @@ -622,7 +629,7 @@ proc sendTo*(transp: DatagramTransport, remote: TransportAddress, msg: string, msglen = -1): Future[void] = ## Send string ``msg`` using transport ``transp`` to remote destination ## address ``remote``. - var retFuture = FutureGCString[void]() + var retFuture = newFutureStr[void]("datagram.transport.sendTo(string)") transp.checkClosed(retFuture) if not isLiteral(msg): shallowCopy(retFuture.gcholder, msg) @@ -642,7 +649,7 @@ proc sendTo*[T](transp: DatagramTransport, remote: TransportAddress, msg: seq[T], msglen = -1): Future[void] = ## Send sequence ``msg`` using transport ``transp`` to remote destination ## address ``remote``. - var retFuture = FutureGCSeq[void, T]() + var retFuture = newFutureSeq[void, T]("datagram.transport.sendTo(seq)") transp.checkClosed(retFuture) if not isLiteral(msg): shallowCopy(retFuture.gcholder, msg) diff --git a/chronos/transports/stream.nim b/chronos/transports/stream.nim index 5165301..4cd288d 100644 --- a/chronos/transports/stream.nim +++ b/chronos/transports/stream.nim @@ -343,13 +343,6 @@ when defined(windows): if ReadPending in transp.state: ## Continuation transp.state.excl(ReadPending) - if ReadClosed in transp.state: - transp.state.incl({ReadPaused}) - if not isNil(transp.reader): - if not transp.reader.finished: - transp.reader.complete() - transp.reader = nil - break let err = transp.rovl.data.errCode if err == OSErrorCode(-1): let bytesCount = transp.rovl.data.bytesCount @@ -364,14 +357,23 @@ when defined(windows): transp.roffset = transp.offset if transp.offset == len(transp.buffer): transp.state.incl(ReadPaused) - elif int(err) == ERROR_OPERATION_ABORTED: - # CancelIO() interrupt + elif int(err) in {ERROR_OPERATION_ABORTED, ERROR_CONNECTION_ABORTED, + ERROR_BROKEN_PIPE, ERROR_NETNAME_DELETED}: + # CancelIO() interrupt or closeSocket() call. transp.state.incl(ReadPaused) + if ReadClosed in transp.state: + if not isNil(transp.reader): + if not transp.reader.finished: + transp.reader.complete() + transp.reader = nil + # If `ReadClosed` present, then close(transport) was called. + transp.future.complete() + GC_unref(transp) elif transp.kind == TransportKind.Socket and (int(err) in {ERROR_NETNAME_DELETED, WSAECONNABORTED}): transp.state.incl({ReadEof, ReadPaused}) elif transp.kind == TransportKind.Pipe and - (int(err) in {ERROR_BROKEN_PIPE, ERROR_PIPE_NOT_CONNECTED}): + (int(err) in {ERROR_PIPE_NOT_CONNECTED}): transp.state.incl({ReadEof, ReadPaused}) else: transp.setReadError(err) @@ -446,6 +448,11 @@ when defined(windows): if not isNil(transp.reader): transp.reader.complete() transp.reader = nil + # Transport close happens in callback, and we not started new + # WSARecvFrom session. + if ReadClosed in transp.state: + if not transp.future.finished: + transp.future.complete() ## Finish Loop break @@ -602,64 +609,75 @@ when defined(windows): if server.apending: ## Continuation server.apending = false - if server.status in {ServerStatus.Stopped, ServerStatus.Closed}: + if ovl.data.errCode == OSErrorCode(-1): + var ntransp: StreamTransport + var flags = {WinServerPipe} + if NoPipeFlash in server.flags: + flags.incl(WinNoPipeFlash) + if not isNil(server.init): + var transp = server.init(server, server.sock) + ntransp = newStreamPipeTransport(server.sock, server.bufferSize, + transp, flags) + else: + ntransp = newStreamPipeTransport(server.sock, server.bufferSize, + nil, flags) + asyncCheck server.function(server, ntransp) + elif int32(ovl.data.errCode) == ERROR_OPERATION_ABORTED: + # CancelIO() interrupt or close call. + if server.status == ServerStatus.Closed: + server.loopFuture.complete() + if not isNil(server.udata) and GCUserData in server.flags: + GC_unref(cast[ref int](server.udata)) + GC_unref(server) break else: - if ovl.data.errCode == OSErrorCode(-1): - var ntransp: StreamTransport - var flags = {WinServerPipe} - if NoPipeFlash in server.flags: - flags.incl(WinNoPipeFlash) - if not isNil(server.init): - var transp = server.init(server, server.sock) - ntransp = newStreamPipeTransport(server.sock, server.bufferSize, - transp, flags) - else: - ntransp = newStreamPipeTransport(server.sock, server.bufferSize, - nil, flags) - asyncCheck server.function(server, ntransp) - elif int32(ovl.data.errCode) == ERROR_OPERATION_ABORTED: - # CancelIO() interrupt - break - else: - doAssert disconnectNamedPipe(Handle(server.sock)) == 1 - doAssert closeHandle(HANDLE(server.sock)) == 1 - raiseTransportOsError(osLastError()) + doAssert disconnectNamedPipe(Handle(server.sock)) == 1 + doAssert closeHandle(HANDLE(server.sock)) == 1 + raiseTransportOsError(osLastError()) else: ## Initiation - server.apending = true - if server.status in {ServerStatus.Stopped, ServerStatus.Closed}: - ## Server was already stopped/closed exiting + if server.status notin {ServerStatus.Stopped, ServerStatus.Closed}: + server.apending = true + var pipeSuffix = $cast[cstring](addr server.local.address_un) + var pipeName = newWideCString(r"\\.\pipe\" & pipeSuffix[1 .. ^1]) + var openMode = PIPE_ACCESS_DUPLEX or FILE_FLAG_OVERLAPPED + if FirstPipe notin server.flags: + openMode = openMode or FILE_FLAG_FIRST_PIPE_INSTANCE + server.flags.incl(FirstPipe) + let pipeMode = int32(PIPE_TYPE_BYTE or PIPE_READMODE_BYTE or PIPE_WAIT) + let pipeHandle = createNamedPipe(pipeName, openMode, pipeMode, + PIPE_UNLIMITED_INSTANCES, + DWORD(server.bufferSize), + DWORD(server.bufferSize), + DWORD(0), nil) + if pipeHandle == INVALID_HANDLE_VALUE: + raiseTransportOsError(osLastError()) + server.sock = AsyncFD(pipeHandle) + server.aovl.data.fd = AsyncFD(pipeHandle) + register(server.sock) + let res = connectNamedPipe(pipeHandle, + cast[POVERLAPPED](addr server.aovl)) + if res == 0: + let err = osLastError() + if int32(err) == ERROR_OPERATION_ABORTED: + server.apending = false + break + elif int32(err) == ERROR_IO_PENDING: + discard + elif int32(err) == ERROR_PIPE_CONNECTED: + discard + else: + raiseTransportOsError(err) break - - var pipeSuffix = $cast[cstring](addr server.local.address_un) - var pipeName = newWideCString(r"\\.\pipe\" & pipeSuffix[1 .. ^1]) - var openMode = PIPE_ACCESS_DUPLEX or FILE_FLAG_OVERLAPPED - if FirstPipe notin server.flags: - openMode = openMode or FILE_FLAG_FIRST_PIPE_INSTANCE - server.flags.incl(FirstPipe) - let pipeMode = int32(PIPE_TYPE_BYTE or PIPE_READMODE_BYTE or PIPE_WAIT) - let pipeHandle = createNamedPipe(pipeName, openMode, pipeMode, - PIPE_UNLIMITED_INSTANCES, - DWORD(server.bufferSize), - DWORD(server.bufferSize), - DWORD(0), nil) - if pipeHandle == INVALID_HANDLE_VALUE: - raiseTransportOsError(osLastError()) - server.sock = AsyncFD(pipeHandle) - server.aovl.data.fd = AsyncFD(pipeHandle) - register(server.sock) - let res = connectNamedPipe(pipeHandle, - cast[POVERLAPPED](addr server.aovl)) - if res == 0: - let err = osLastError() - if int32(err) == ERROR_IO_PENDING: - discard - elif int32(err) == ERROR_PIPE_CONNECTED: - discard - else: - raiseTransportOsError(err) - break + else: + # Server close happens in callback, and we are not started new + # connectNamedPipe session. + if server.status == ServerStatus.Closed: + if not server.loopFuture.finished: + server.loopFuture.complete() + if not isNil(server.udata) and GCUserData in server.flags: + GC_unref(cast[ref int](server.udata)) + GC_unref(server) proc acceptLoop(udata: pointer) {.gcsafe, nimcall.} = var ovl = cast[PtrCustomOverlapped](udata) @@ -670,70 +688,75 @@ when defined(windows): if server.apending: ## Continuation server.apending = false - if server.status in {ServerStatus.Stopped, ServerStatus.Closed}: - ## Server was already stopped/closed exiting - server.asock.closeSocket() + if ovl.data.errCode == OSErrorCode(-1): + if setsockopt(SocketHandle(server.asock), cint(SOL_SOCKET), + cint(SO_UPDATE_ACCEPT_CONTEXT), addr server.sock, + SockLen(sizeof(SocketHandle))) != 0'i32: + let err = OSErrorCode(wsaGetLastError()) + server.asock.closeSocket() + raiseTransportOsError(err) + else: + var ntransp: StreamTransport + if not isNil(server.init): + let transp = server.init(server, server.asock) + ntransp = newStreamSocketTransport(server.asock, + server.bufferSize, + transp) + else: + ntransp = newStreamSocketTransport(server.asock, + server.bufferSize, nil) + asyncCheck server.function(server, ntransp) + + elif int32(ovl.data.errCode) == ERROR_OPERATION_ABORTED: + # CancelIO() interrupt or close. + if server.status == ServerStatus.Closed: + server.loopFuture.complete() + if not isNil(server.udata) and GCUserData in server.flags: + GC_unref(cast[ref int](server.udata)) + GC_unref(server) break else: - if ovl.data.errCode == OSErrorCode(-1): - if setsockopt(SocketHandle(server.asock), cint(SOL_SOCKET), - cint(SO_UPDATE_ACCEPT_CONTEXT), addr server.sock, - SockLen(sizeof(SocketHandle))) != 0'i32: - let err = OSErrorCode(wsaGetLastError()) - server.asock.closeSocket() - raiseTransportOsError(err) - else: - var ntransp: StreamTransport - if not isNil(server.init): - let transp = server.init(server, server.asock) - ntransp = newStreamSocketTransport(server.asock, - server.bufferSize, - transp) - else: - ntransp = newStreamSocketTransport(server.asock, - server.bufferSize, nil) - asyncCheck server.function(server, ntransp) - - elif int32(ovl.data.errCode) == ERROR_OPERATION_ABORTED: - # CancelIO() interrupt - server.asock.closeSocket() - break - else: - server.asock.closeSocket() - raiseTransportOsError(ovl.data.errCode) + server.asock.closeSocket() + raiseTransportOsError(ovl.data.errCode) else: ## Initiation - if server.status in {ServerStatus.Stopped, ServerStatus.Closed}: - ## Server was already stopped/closed exiting + if server.status notin {ServerStatus.Stopped, ServerStatus.Closed}: + server.apending = true + server.asock = createAsyncSocket(server.domain, SockType.SOCK_STREAM, + Protocol.IPPROTO_TCP) + if server.asock == asyncInvalidSocket: + raiseTransportOsError(OSErrorCode(wsaGetLastError())) + + var dwBytesReceived = DWORD(0) + let dwReceiveDataLength = DWORD(0) + let dwLocalAddressLength = DWORD(sizeof(Sockaddr_in6) + 16) + let dwRemoteAddressLength = DWORD(sizeof(Sockaddr_in6) + 16) + + let res = loop.acceptEx(SocketHandle(server.sock), + SocketHandle(server.asock), + addr server.abuffer[0], + dwReceiveDataLength, dwLocalAddressLength, + dwRemoteAddressLength, addr dwBytesReceived, + cast[POVERLAPPED](addr server.aovl)) + if not res: + let err = osLastError() + if int32(err) == ERROR_OPERATION_ABORTED: + server.apending = false + break + elif int32(err) == ERROR_IO_PENDING: + discard + else: + raiseTransportOsError(err) break - - server.apending = true - server.asock = createAsyncSocket(server.domain, SockType.SOCK_STREAM, - Protocol.IPPROTO_TCP) - if server.asock == asyncInvalidSocket: - raiseTransportOsError(OSErrorCode(wsaGetLastError())) - - var dwBytesReceived = DWORD(0) - let dwReceiveDataLength = DWORD(0) - let dwLocalAddressLength = DWORD(sizeof(Sockaddr_in6) + 16) - let dwRemoteAddressLength = DWORD(sizeof(Sockaddr_in6) + 16) - - let res = loop.acceptEx(SocketHandle(server.sock), - SocketHandle(server.asock), - addr server.abuffer[0], - dwReceiveDataLength, dwLocalAddressLength, - dwRemoteAddressLength, addr dwBytesReceived, - cast[POVERLAPPED](addr server.aovl)) - if not res: - let err = osLastError() - if int32(err) == ERROR_OPERATION_ABORTED: - server.apending = false - break - elif int32(err) == ERROR_IO_PENDING: - discard - else: - raiseTransportOsError(err) - break + else: + # Server close happens in callback, and we are not started new + # AcceptEx session. + if server.status == ServerStatus.Closed: + if not server.loopFuture.finished: + server.loopFuture.complete() + if not isNil(server.udata) and GCUserData in server.flags: + GC_unref(cast[ref int](server.udata)) + GC_unref(server) proc resumeRead(transp: StreamTransport) {.inline.} = transp.state.excl(ReadPaused) @@ -881,7 +904,7 @@ else: slen: SockLen sock: AsyncFD proto: Protocol - var retFuture = newFuture[StreamTransport]("transport.connect") + var retFuture = newFuture[StreamTransport]("stream.transport.connect") address.toSAddr(saddr, slen) proto = Protocol.IPPROTO_TCP if address.family == AddressFamily.Unix: @@ -985,7 +1008,7 @@ proc stop*(server: StreamServer) = proc join*(server: StreamServer): Future[void] = ## Waits until ``server`` is not closed. - var retFuture = newFuture[void]("stream.server.join") + var retFuture = newFuture[void]("stream.transport.server.join") proc continuation(udata: pointer) = retFuture.complete() if not server.loopFuture.finished: server.loopFuture.addCallback(continuation) @@ -998,21 +1021,22 @@ proc close*(server: StreamServer) = ## ## Please note that release of resources is not completed immediately, to be ## sure all resources got released please use ``await server.join()``. - proc continuation(udata: pointer) = - server.loopFuture.complete() - if not isNil(server.udata) and GCUserData in server.flags: - GC_unref(cast[ref int](server.udata)) - GC_unref(server) + when not defined(windows): + proc continuation(udata: pointer) = + server.loopFuture.complete() + if not isNil(server.udata) and GCUserData in server.flags: + GC_unref(cast[ref int](server.udata)) + GC_unref(server) if server.status == ServerStatus.Stopped: server.status = ServerStatus.Closed when defined(windows): if server.local.family in {AddressFamily.IPv4, AddressFamily.IPv6}: - server.sock.closeSocket(continuation) + server.sock.closeSocket() elif server.local.family in {AddressFamily.Unix}: if NoPipeFlash notin server.flags: discard flushFileBuffers(Handle(server.sock)) doAssert disconnectNamedPipe(Handle(server.sock)) == 1 - closeHandle(server.sock, continuation) + closeHandle(server.sock) else: server.sock.closeSocket(continuation) @@ -1157,7 +1181,7 @@ proc createStreamServer*(host: TransportAddress, result.init = init result.bufferSize = bufferSize result.status = Starting - result.loopFuture = newFuture[void]("stream.server") + result.loopFuture = newFuture[void]("stream.transport.server") result.udata = udata result.local = host @@ -1197,7 +1221,7 @@ proc write*(transp: StreamTransport, pbytes: pointer, nbytes: int): Future[int] = ## Write data from buffer ``pbytes`` with size ``nbytes`` using transport ## ``transp``. - var retFuture = newFuture[int]() + var retFuture = newFuture[int]("stream.transport.write(pointer)") transp.checkClosed(retFuture) var vector = StreamVector(kind: DataBuffer, writer: retFuture, buf: pbytes, buflen: nbytes) @@ -1208,7 +1232,7 @@ proc write*(transp: StreamTransport, pbytes: pointer, proc write*(transp: StreamTransport, msg: string, msglen = -1): Future[int] = ## Write data from string ``msg`` using transport ``transp``. - var retFuture = FutureGCString[int]() + var retFuture = newFutureStr[int]("stream.transport.write(string)") transp.checkClosed(retFuture) if not isLiteral(msg): shallowCopy(retFuture.gcholder, msg) @@ -1225,7 +1249,7 @@ proc write*(transp: StreamTransport, msg: string, msglen = -1): Future[int] = proc write*[T](transp: StreamTransport, msg: seq[T], msglen = -1): Future[int] = ## Write sequence ``msg`` using transport ``transp``. - var retFuture = FutureGCSeq[int, T]() + var retFuture = newFutureSeq[int, T]("stream.transport.write(seq)") transp.checkClosed(retFuture) if not isLiteral(msg): shallowCopy(retFuture.gcholder, msg) @@ -1250,7 +1274,7 @@ proc writeFile*(transp: StreamTransport, handle: int, when defined(windows): if transp.kind != TransportKind.Socket: raise newException(TransportNoSupport, "writeFile() is not supported!") - var retFuture = newFuture[int]("transport.writeFile") + var retFuture = newFuture[int]("stream.transport.writeFile") transp.checkClosed(retFuture) var vector = StreamVector(kind: DataFile, writer: retFuture, buf: cast[pointer](size), offset: offset, @@ -1309,6 +1333,7 @@ proc readOnce*(transp: StreamTransport, pbytes: pointer, ## internal buffer, otherwise it will wait until some bytes will be received. checkClosed(transp) checkPending(transp) + while true: if transp.offset == 0: if (ReadError in transp.state): @@ -1490,6 +1515,7 @@ proc consume*(transp: StreamTransport, n = -1): Future[int] {.async.} = ## Return number of bytes actually consumed checkClosed(transp) checkPending(transp) + result = 0 while true: if (ReadError in transp.state): @@ -1522,7 +1548,7 @@ proc consume*(transp: StreamTransport, n = -1): Future[int] {.async.} = proc join*(transp: StreamTransport): Future[void] = ## Wait until ``transp`` will not be closed. - var retFuture = newFuture[void]("streamtransport.join") + var retFuture = newFuture[void]("stream.transport.join") proc continuation(udata: pointer) = retFuture.complete() if not transp.future.finished: transp.future.addCallback(continuation) @@ -1542,7 +1568,6 @@ proc close*(transp: StreamTransport) = if {ReadClosed, WriteClosed} * transp.state == {}: transp.state.incl({WriteClosed, ReadClosed}) when defined(windows): - discard cancelIo(Handle(transp.fd)) if transp.kind == TransportKind.Pipe: if WinServerPipe in transp.flags: if WinNoPipeFlash notin transp.flags: @@ -1551,9 +1576,23 @@ proc close*(transp: StreamTransport) = else: if WinNoPipeFlash notin transp.flags: discard flushFileBuffers(Handle(transp.fd)) - closeHandle(transp.fd, continuation) + if ReadPaused in transp.state: + # If readStreamLoop() is not running we need to finish in + # continuation step. + closeHandle(transp.fd, continuation) + else: + # If readStreamLoop() is running, it will be properly finished inside + # of readStreamLoop(). + closeHandle(transp.fd) elif transp.kind == TransportKind.Socket: - closeSocket(transp.fd, continuation) + if ReadPaused in transp.state: + # If readStreamLoop() is not running we need to finish in + # continuation step. + closeSocket(transp.fd, continuation) + else: + # If readStreamLoop() is running, it will be properly finished inside + # of readStreamLoop(). + closeSocket(transp.fd) else: closeSocket(transp.fd, continuation) diff --git a/tests/testaddress.nim b/tests/testaddress.nim index c92b66e..7855a21 100644 --- a/tests/testaddress.nim +++ b/tests/testaddress.nim @@ -5,192 +5,190 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) - import strutils, unittest import ../chronos -when isMainModule: - suite "TransportAddress test suite": - test "initTAddress(string)": - check $initTAddress("0.0.0.0:1") == "0.0.0.0:1" - check $initTAddress("255.255.255.255:65535") == "255.255.255.255:65535" - check $initTAddress("[::]:1") == "[::]:1" - check $initTAddress("[FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF]:65535") == - "[ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:65535" +suite "TransportAddress test suite": + test "initTAddress(string)": + check $initTAddress("0.0.0.0:1") == "0.0.0.0:1" + check $initTAddress("255.255.255.255:65535") == "255.255.255.255:65535" + check $initTAddress("[::]:1") == "[::]:1" + check $initTAddress("[FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF]:65535") == + "[ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:65535" - test "initTAddress(string, Port)": - check $initTAddress("0.0.0.0", Port(0)) == "0.0.0.0:0" - check $initTAddress("255.255.255.255", Port(65535)) == - "255.255.255.255:65535" - check $initTAddress("::", Port(0)) == "[::]:0" - check $initTAddress("FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF", - Port(65535)) == - "[ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:65535" + test "initTAddress(string, Port)": + check $initTAddress("0.0.0.0", Port(0)) == "0.0.0.0:0" + check $initTAddress("255.255.255.255", Port(65535)) == + "255.255.255.255:65535" + check $initTAddress("::", Port(0)) == "[::]:0" + check $initTAddress("FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF", + Port(65535)) == + "[ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:65535" - test "initTAddress(string, int)": - check $initTAddress("0.0.0.0", 1) == "0.0.0.0:1" - check $initTAddress("255.255.255.255", 65535) == - "255.255.255.255:65535" - check $initTAddress("::", 0) == "[::]:0" - check $initTAddress("FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF", 65535) == - "[ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:65535" + test "initTAddress(string, int)": + check $initTAddress("0.0.0.0", 1) == "0.0.0.0:1" + check $initTAddress("255.255.255.255", 65535) == + "255.255.255.255:65535" + check $initTAddress("::", 0) == "[::]:0" + check $initTAddress("FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF", 65535) == + "[ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:65535" - test "resolveTAddress(string, IPv4)": - var numeric = ["0.0.0.0:1", "255.0.0.255:54321", "128.128.128.128:12345", - "255.255.255.255:65535"] - var hostnames = ["www.google.com:443", "www.github.com:443"] + test "resolveTAddress(string, IPv4)": + var numeric = ["0.0.0.0:1", "255.0.0.255:54321", "128.128.128.128:12345", + "255.255.255.255:65535"] + var hostnames = ["www.google.com:443", "www.github.com:443"] - for item in numeric: - var taseq = resolveTAddress(item) - check len(taseq) == 1 - check $taseq[0] == item + for item in numeric: + var taseq = resolveTAddress(item) + check len(taseq) == 1 + check $taseq[0] == item - for item in hostnames: - var taseq = resolveTAddress(item) - check len(taseq) >= 1 + for item in hostnames: + var taseq = resolveTAddress(item) + check len(taseq) >= 1 - # test "resolveTAddress(string, IPv6)": - # var numeric = [ - # "[::]:1", - # "[ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:65535", - # "[aaaa:bbbb:cccc:dddd:eeee:ffff::1111]:12345", - # "[aaaa:bbbb:cccc:dddd:eeee:ffff::]:12345", - # "[a:b:c:d:e:f::]:12345", - # "[2222:3333:4444:5555:6666:7777:8888:9999]:56789" - # ] - # var hostnames = ["localhost:443"] + # test "resolveTAddress(string, IPv6)": + # var numeric = [ + # "[::]:1", + # "[ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:65535", + # "[aaaa:bbbb:cccc:dddd:eeee:ffff::1111]:12345", + # "[aaaa:bbbb:cccc:dddd:eeee:ffff::]:12345", + # "[a:b:c:d:e:f::]:12345", + # "[2222:3333:4444:5555:6666:7777:8888:9999]:56789" + # ] + # var hostnames = ["localhost:443"] - # for item in numeric: - # var taseq = resolveTAddress(item, IpAddressFamily.IPv6) - # check len(taseq) == 1 - # check $taseq[0] == item + # for item in numeric: + # var taseq = resolveTAddress(item, IpAddressFamily.IPv6) + # check len(taseq) == 1 + # check $taseq[0] == item - # for item in hostnames: - # var taseq = resolveTAddress(item, IpAddressFamily.IPv6) - # check len(taseq) >= 1 + # for item in hostnames: + # var taseq = resolveTAddress(item, IpAddressFamily.IPv6) + # check len(taseq) >= 1 - test "resolveTAddress(string, Port, IPv4)": - var numeric = ["0.0.0.0", "255.0.0.255", "128.128.128.128", - "255.255.255.255"] - var hostnames = ["www.google.com", "www.github.com", "localhost"] + test "resolveTAddress(string, Port, IPv4)": + var numeric = ["0.0.0.0", "255.0.0.255", "128.128.128.128", + "255.255.255.255"] + var hostnames = ["www.google.com", "www.github.com", "localhost"] - for item in numeric: - var taseq = resolveTAddress(item, Port(443)) - check len(taseq) == 1 - check $taseq[0] == item & ":443" + for item in numeric: + var taseq = resolveTAddress(item, Port(443)) + check len(taseq) == 1 + check $taseq[0] == item & ":443" - for item in hostnames: - var taseq = resolveTAddress(item, Port(443)) - check len(taseq) >= 1 + for item in hostnames: + var taseq = resolveTAddress(item, Port(443)) + check len(taseq) >= 1 - # test "resolveTAddress(string, Port, IPv6)": - # var numeric = [ - # "::", - # "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", - # "aaaa:bbbb:cccc:dddd:eeee:ffff::1111", - # "aaaa:bbbb:cccc:dddd:eeee:ffff::", - # "a:b:c:d:e:f::", - # "2222:3333:4444:5555:6666:7777:8888:9999" - # ] - # var hostnames = ["localhost"] - # for item in numeric: - # var taseq = resolveTAddress(item, Port(443), IpAddressFamily.IPv6) - # check len(taseq) == 1 - # check $taseq[0] == "[" & item & "]:443" + # test "resolveTAddress(string, Port, IPv6)": + # var numeric = [ + # "::", + # "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", + # "aaaa:bbbb:cccc:dddd:eeee:ffff::1111", + # "aaaa:bbbb:cccc:dddd:eeee:ffff::", + # "a:b:c:d:e:f::", + # "2222:3333:4444:5555:6666:7777:8888:9999" + # ] + # var hostnames = ["localhost"] + # for item in numeric: + # var taseq = resolveTAddress(item, Port(443), IpAddressFamily.IPv6) + # check len(taseq) == 1 + # check $taseq[0] == "[" & item & "]:443" - # for item in hostnames: - # var taseq = resolveTAddress(item, Port(443), IpAddressFamily.IPv6) - # check len(taseq) >= 1 + # for item in hostnames: + # var taseq = resolveTAddress(item, Port(443), IpAddressFamily.IPv6) + # check len(taseq) >= 1 - test "Faulty initTAddress(string)": - var tests = [ - "z:1", - "256.256.256.256:65534", - "127.0.0.1:65536" - ] - var errcounter = 0 - for item in tests: - try: - var ta = initTAddress(item) - except TransportAddressError: - inc(errcounter) - check errcounter == len(tests) - - test "Faulty initTAddress(string, Port)": - var tests = [ - ":::", - "999.999.999.999", - "gggg:aaaa:bbbb:gggg:aaaa:bbbb:gggg:aaaa", - "hostname" - ] - var errcounter = 0 - for item in tests: - try: - var ta = initTAddress(item, Port(443)) - except TransportAddressError: - inc(errcounter) - check errcounter == len(tests) - - test "Faulty initTAddress(string, Port)": - var errcounter = 0 + test "Faulty initTAddress(string)": + var tests = [ + "z:1", + "256.256.256.256:65534", + "127.0.0.1:65536" + ] + var errcounter = 0 + for item in tests: try: - var ta = initTAddress("127.0.0.1", 100000) + var ta = initTAddress(item) except TransportAddressError: inc(errcounter) - check errcounter == 1 + check errcounter == len(tests) - test "Faulty resolveTAddress(string, IPv4) for IPv6 address": - var numeric = [ - "[::]:1", - "[ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:65535", - "[aaaa:bbbb:cccc:dddd:eeee:ffff::1111]:12345", - "[aaaa:bbbb:cccc:dddd:eeee:ffff::]:12345", - "[a:b:c:d:e:f::]:12345", - "[2222:3333:4444:5555:6666:7777:8888:9999]:56789" - ] - var errcounter = 0 - for item in numeric: - try: - var taseq = resolveTAddress(item) - except TransportAddressError: - inc(errcounter) - check errcounter == len(numeric) + test "Faulty initTAddress(string, Port)": + var tests = [ + ":::", + "999.999.999.999", + "gggg:aaaa:bbbb:gggg:aaaa:bbbb:gggg:aaaa", + "hostname" + ] + var errcounter = 0 + for item in tests: + try: + var ta = initTAddress(item, Port(443)) + except TransportAddressError: + inc(errcounter) + check errcounter == len(tests) - test "Faulty resolveTAddress(string, Port, IPv4) for IPv6 address": - var numeric = [ - "::", - "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", - "aaaa:bbbb:cccc:dddd:eeee:ffff::1111", - "aaaa:bbbb:cccc:dddd:eeee:ffff::", - "a:b:c:d:e:f::", - "2222:3333:4444:5555:6666:7777:8888:9999" - ] - var errcounter = 0 - for item in numeric: - try: - var taseq = resolveTAddress(item, Port(443)) - except TransportAddressError: - inc(errcounter) - check errcounter == len(numeric) + test "Faulty initTAddress(string, Port)": + var errcounter = 0 + try: + var ta = initTAddress("127.0.0.1", 100000) + except TransportAddressError: + inc(errcounter) + check errcounter == 1 - # test "Faulty resolveTAddress(string, IPv6) for IPv4 address": - # var numeric = ["0.0.0.0:0", "255.0.0.255:54321", "128.128.128.128:12345", - # "255.255.255.255:65535"] - # var errcounter = 0 - # for item in numeric: - # try: - # var taseq = resolveTAddress(item, IpAddressFamily.IPv6) - # except TransportAddressError: - # inc(errcounter) - # check errcounter == len(numeric) + test "Faulty resolveTAddress(string, IPv4) for IPv6 address": + var numeric = [ + "[::]:1", + "[ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:65535", + "[aaaa:bbbb:cccc:dddd:eeee:ffff::1111]:12345", + "[aaaa:bbbb:cccc:dddd:eeee:ffff::]:12345", + "[a:b:c:d:e:f::]:12345", + "[2222:3333:4444:5555:6666:7777:8888:9999]:56789" + ] + var errcounter = 0 + for item in numeric: + try: + var taseq = resolveTAddress(item) + except TransportAddressError: + inc(errcounter) + check errcounter == len(numeric) - # test "Faulty resolveTAddress(string, Port, IPv6) for IPv4 address": - # var numeric = ["0.0.0.0", "255.0.0.255", "128.128.128.128", - # "255.255.255.255"] - # var errcounter = 0 - # for item in numeric: - # try: - # var taseq = resolveTAddress(item, Port(443), IpAddressFamily.IPv6) - # except TransportAddressError: - # inc(errcounter) - # check errcounter == len(numeric) + test "Faulty resolveTAddress(string, Port, IPv4) for IPv6 address": + var numeric = [ + "::", + "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", + "aaaa:bbbb:cccc:dddd:eeee:ffff::1111", + "aaaa:bbbb:cccc:dddd:eeee:ffff::", + "a:b:c:d:e:f::", + "2222:3333:4444:5555:6666:7777:8888:9999" + ] + var errcounter = 0 + for item in numeric: + try: + var taseq = resolveTAddress(item, Port(443)) + except TransportAddressError: + inc(errcounter) + check errcounter == len(numeric) + + # test "Faulty resolveTAddress(string, IPv6) for IPv4 address": + # var numeric = ["0.0.0.0:0", "255.0.0.255:54321", "128.128.128.128:12345", + # "255.255.255.255:65535"] + # var errcounter = 0 + # for item in numeric: + # try: + # var taseq = resolveTAddress(item, IpAddressFamily.IPv6) + # except TransportAddressError: + # inc(errcounter) + # check errcounter == len(numeric) + + # test "Faulty resolveTAddress(string, Port, IPv6) for IPv4 address": + # var numeric = ["0.0.0.0", "255.0.0.255", "128.128.128.128", + # "255.255.255.255"] + # var errcounter = 0 + # for item in numeric: + # try: + # var taseq = resolveTAddress(item, Port(443), IpAddressFamily.IPv6) + # except TransportAddressError: + # inc(errcounter) + # check errcounter == len(numeric) diff --git a/tests/testall.nim b/tests/testall.nim new file mode 100644 index 0000000..0b45caf --- /dev/null +++ b/tests/testall.nim @@ -0,0 +1,9 @@ +# Chronos Test Suite +# (c) Copyright 2018-Present +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) +import testsync, testsoon, testtime, testfut, testsignal, testaddress, + testdatagram, teststream, testserver, testbugs diff --git a/tests/testbugs.nim b/tests/testbugs.nim index 58fc129..3a78f8b 100644 --- a/tests/testbugs.nim +++ b/tests/testbugs.nim @@ -5,43 +5,41 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) - import unittest import ../chronos -const HELLO_PORT = 45679 -const TEST_MSG = "testmsg" -const MSG_LEN = TEST_MSG.len() +suite "Asynchronous issues test suite": + const HELLO_PORT = 45679 + const TEST_MSG = "testmsg" + const MSG_LEN = TEST_MSG.len() -type - CustomData = ref object - test: string + type + CustomData = ref object + test: string -proc udp4DataAvailable(transp: DatagramTransport, - remote: TransportAddress): Future[void] {.async, gcsafe.} = - var udata = getUserData[CustomData](transp) - var expect = TEST_MSG - var data: seq[byte] - var datalen: int - transp.peekMessage(data, datalen) - if udata.test == "CHECK" and datalen == MSG_LEN and - equalMem(addr data[0], addr expect[0], datalen): - udata.test = "OK" - transp.close() + proc udp4DataAvailable(transp: DatagramTransport, + remote: TransportAddress): Future[void] {.async, gcsafe.} = + var udata = getUserData[CustomData](transp) + var expect = TEST_MSG + var data: seq[byte] + var datalen: int + transp.peekMessage(data, datalen) + if udata.test == "CHECK" and datalen == MSG_LEN and + equalMem(addr data[0], addr expect[0], datalen): + udata.test = "OK" + transp.close() -proc issue6(): Future[bool] {.async.} = - var myself = initTAddress("127.0.0.1:" & $HELLO_PORT) - var data = CustomData() - data.test = "CHECK" - var dsock4 = newDatagramTransport(udp4DataAvailable, udata = data, - local = myself) - await dsock4.sendTo(myself, TEST_MSG, MSG_LEN) - await dsock4.join() - if data.test == "OK": - result = true + proc issue6(): Future[bool] {.async.} = + var myself = initTAddress("127.0.0.1:" & $HELLO_PORT) + var data = CustomData() + data.test = "CHECK" + var dsock4 = newDatagramTransport(udp4DataAvailable, udata = data, + local = myself) + await dsock4.sendTo(myself, TEST_MSG, MSG_LEN) + await dsock4.join() + if data.test == "OK": + result = true -when isMainModule: - suite "Asynchronous issues test suite": - test "Issue #6": - var res = waitFor(issue6()) - check res == true + test "Issue #6": + var res = waitFor(issue6()) + check res == true diff --git a/tests/testdatagram.nim b/tests/testdatagram.nim index 955ddec..5a25ef0 100644 --- a/tests/testdatagram.nim +++ b/tests/testdatagram.nim @@ -5,452 +5,15 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) - import strutils, net, unittest import ../chronos -const - TestsCount = 2000 - ClientsCount = 20 - MessagesCount = 20 - -proc client1(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = - var pbytes: seq[byte] - var nbytes: int - transp.peekMessage(pbytes, nbytes) - if nbytes > 0: - var data = newString(nbytes + 1) - copyMem(addr data[0], addr pbytes[0], nbytes) - data.setLen(nbytes) - if data.startsWith("REQUEST"): - var numstr = data[7..^1] - var num = parseInt(numstr) - var ans = "ANSWER" & $num - await transp.sendTo(raddr, addr ans[0], len(ans)) - else: - var err = "ERROR" - await transp.sendTo(raddr, addr err[0], len(err)) - else: - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() - -proc client2(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = - var pbytes: seq[byte] - var nbytes: int - transp.peekMessage(pbytes, nbytes) - if nbytes > 0: - var data = newString(nbytes + 1) - copyMem(addr data[0], addr pbytes[0], nbytes) - data.setLen(nbytes) - if data.startsWith("ANSWER"): - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = counterPtr[] + 1 - if counterPtr[] == TestsCount: - transp.close() - else: - var ta = initTAddress("127.0.0.1:33336") - var req = "REQUEST" & $counterPtr[] - await transp.sendTo(ta, addr req[0], len(req)) - else: - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() - else: - ## Read operation failed with error - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() - -proc client3(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = - var pbytes: seq[byte] - var nbytes: int - transp.peekMessage(pbytes, nbytes) - if nbytes > 0: - var data = newString(nbytes + 1) - copyMem(addr data[0], addr pbytes[0], nbytes) - data.setLen(nbytes) - if data.startsWith("ANSWER"): - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = counterPtr[] + 1 - if counterPtr[] == TestsCount: - transp.close() - else: - var req = "REQUEST" & $counterPtr[] - await transp.send(addr req[0], len(req)) - else: - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() - else: - ## Read operation failed with error - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() - -proc client4(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = - var pbytes: seq[byte] - var nbytes: int - transp.peekMessage(pbytes, nbytes) - if nbytes > 0: - var data = newString(nbytes + 1) - copyMem(addr data[0], addr pbytes[0], nbytes) - data.setLen(nbytes) - if data.startsWith("ANSWER"): - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = counterPtr[] + 1 - if counterPtr[] == MessagesCount: - transp.close() - else: - var req = "REQUEST" & $counterPtr[] - await transp.send(addr req[0], len(req)) - else: - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() - else: - ## Read operation failed with error - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() - -proc client5(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = - var pbytes: seq[byte] - var nbytes: int - transp.peekMessage(pbytes, nbytes) - if nbytes > 0: - var data = newString(nbytes + 1) - copyMem(addr data[0], addr pbytes[0], nbytes) - data.setLen(nbytes) - if data.startsWith("ANSWER"): - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = counterPtr[] + 1 - if counterPtr[] == MessagesCount: - transp.close() - else: - var req = "REQUEST" & $counterPtr[] - await transp.sendTo(raddr, addr req[0], len(req)) - else: - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() - else: - ## Read operation failed with error - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() - -proc client6(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = - var pbytes: seq[byte] - var nbytes: int - transp.peekMessage(pbytes, nbytes) - if nbytes > 0: - var data = newString(nbytes + 1) - copyMem(addr data[0], addr pbytes[0], nbytes) - data.setLen(nbytes) - if data.startsWith("REQUEST"): - var numstr = data[7..^1] - var num = parseInt(numstr) - var ans = "ANSWER" & $num - await transp.sendTo(raddr, ans) - else: - var err = "ERROR" - await transp.sendTo(raddr, err) - else: - ## Read operation failed with error - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() - -proc client7(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = - var pbytes: seq[byte] - var nbytes: int - transp.peekMessage(pbytes, nbytes) - if nbytes > 0: - var data = newString(nbytes + 1) - copyMem(addr data[0], addr pbytes[0], nbytes) - data.setLen(nbytes) - if data.startsWith("ANSWER"): - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = counterPtr[] + 1 - if counterPtr[] == TestsCount: - transp.close() - else: - var req = "REQUEST" & $counterPtr[] - await transp.sendTo(raddr, req) - else: - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() - else: - ## Read operation failed with error - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() - -proc client8(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = - var pbytes: seq[byte] - var nbytes: int - transp.peekMessage(pbytes, nbytes) - if nbytes > 0: - var data = newString(nbytes + 1) - copyMem(addr data[0], addr pbytes[0], nbytes) - data.setLen(nbytes) - if data.startsWith("ANSWER"): - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = counterPtr[] + 1 - if counterPtr[] == TestsCount: - transp.close() - else: - var req = "REQUEST" & $counterPtr[] - await transp.send(req) - else: - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() - else: - ## Read operation failed with error - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() - -proc client9(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = - var pbytes: seq[byte] - var nbytes: int - transp.peekMessage(pbytes, nbytes) - if nbytes > 0: - var data = newString(nbytes + 1) - copyMem(addr data[0], addr pbytes[0], nbytes) - data.setLen(nbytes) - if data.startsWith("REQUEST"): - var numstr = data[7..^1] - var num = parseInt(numstr) - var ans = "ANSWER" & $num - var ansseq = newSeq[byte](len(ans)) - copyMem(addr ansseq[0], addr ans[0], len(ans)) - await transp.sendTo(raddr, ansseq) - else: - var err = "ERROR" - var errseq = newSeq[byte](len(err)) - copyMem(addr errseq[0], addr err[0], len(err)) - await transp.sendTo(raddr, errseq) - else: - ## Read operation failed with error - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() - -proc client10(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = - var pbytes: seq[byte] - var nbytes: int - transp.peekMessage(pbytes, nbytes) - if nbytes > 0: - var data = newString(nbytes + 1) - copyMem(addr data[0], addr pbytes[0], nbytes) - data.setLen(nbytes) - if data.startsWith("ANSWER"): - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = counterPtr[] + 1 - if counterPtr[] == TestsCount: - transp.close() - else: - var req = "REQUEST" & $counterPtr[] - var reqseq = newSeq[byte](len(req)) - copyMem(addr reqseq[0], addr req[0], len(req)) - await transp.sendTo(raddr, reqseq) - else: - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() - else: - ## Read operation failed with error - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() - -proc client11(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = - var pbytes: seq[byte] - var nbytes: int - transp.peekMessage(pbytes, nbytes) - if nbytes > 0: - var data = newString(nbytes + 1) - copyMem(addr data[0], addr pbytes[0], nbytes) - data.setLen(nbytes) - if data.startsWith("ANSWER"): - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = counterPtr[] + 1 - if counterPtr[] == TestsCount: - transp.close() - else: - var req = "REQUEST" & $counterPtr[] - var reqseq = newSeq[byte](len(req)) - copyMem(addr reqseq[0], addr req[0], len(req)) - await transp.send(reqseq) - else: - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() - else: - ## Read operation failed with error - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() - -proc testPointerSendTo(): Future[int] {.async.} = - ## sendTo(pointer) test - var ta = initTAddress("127.0.0.1:33336") - var counter = 0 - var dgram1 = newDatagramTransport(client1, udata = addr counter, local = ta) - var dgram2 = newDatagramTransport(client2, udata = addr counter) - var data = "REQUEST0" - await dgram2.sendTo(ta, addr data[0], len(data)) - await dgram2.join() - dgram1.close() - await dgram1.join() - result = counter - -proc testPointerSend(): Future[int] {.async.} = - ## send(pointer) test - var ta = initTAddress("127.0.0.1:33337") - var counter = 0 - var dgram1 = newDatagramTransport(client1, udata = addr counter, local = ta) - var dgram2 = newDatagramTransport(client3, udata = addr counter, remote = ta) - var data = "REQUEST0" - await dgram2.send(addr data[0], len(data)) - await dgram2.join() - dgram1.close() - await dgram1.join() - result = counter - -proc testStringSendTo(): Future[int] {.async.} = - ## sendTo(string) test - var ta = initTAddress("127.0.0.1:33338") - var counter = 0 - var dgram1 = newDatagramTransport(client6, udata = addr counter, local = ta) - var dgram2 = newDatagramTransport(client7, udata = addr counter) - var data = "REQUEST0" - await dgram2.sendTo(ta, data) - await dgram2.join() - dgram1.close() - await dgram1.join() - result = counter - -proc testStringSend(): Future[int] {.async.} = - ## send(string) test - var ta = initTAddress("127.0.0.1:33339") - var counter = 0 - var dgram1 = newDatagramTransport(client6, udata = addr counter, local = ta) - var dgram2 = newDatagramTransport(client8, udata = addr counter, remote = ta) - var data = "REQUEST0" - await dgram2.send(data) - await dgram2.join() - dgram1.close() - await dgram1.join() - result = counter - -proc testSeqSendTo(): Future[int] {.async.} = - ## sendTo(string) test - var ta = initTAddress("127.0.0.1:33340") - var counter = 0 - var dgram1 = newDatagramTransport(client9, udata = addr counter, local = ta) - var dgram2 = newDatagramTransport(client10, udata = addr counter) - var data = "REQUEST0" - var dataseq = newSeq[byte](len(data)) - copyMem(addr dataseq[0], addr data[0], len(data)) - await dgram2.sendTo(ta, dataseq) - await dgram2.join() - dgram1.close() - await dgram1.join() - result = counter - -proc testSeqSend(): Future[int] {.async.} = - ## send(string) test - var ta = initTAddress("127.0.0.1:33341") - var counter = 0 - var dgram1 = newDatagramTransport(client9, udata = addr counter, local = ta) - var dgram2 = newDatagramTransport(client11, udata = addr counter, remote = ta) - var data = "REQUEST0" - var dataseq = newSeq[byte](len(data)) - copyMem(addr dataseq[0], addr data[0], len(data)) - await dgram2.send(data) - await dgram2.join() - dgram1.close() - await dgram1.join() - result = counter - -# - -proc waitAll(futs: seq[Future[void]]): Future[void] = - var counter = len(futs) - var retFuture = newFuture[void]("waitAll") - proc cb(udata: pointer) = - dec(counter) - if counter == 0: - retFuture.complete() - for fut in futs: - fut.addCallback(cb) - return retFuture - -proc test3(bounded: bool): Future[int] {.async.} = - var ta: TransportAddress - if bounded: - ta = initTAddress("127.0.0.1:33240") - else: - ta = initTAddress("127.0.0.1:33241") - var counter = 0 - var dgram1 = newDatagramTransport(client1, udata = addr counter, local = ta) - var clients = newSeq[Future[void]](ClientsCount) - var grams = newSeq[DatagramTransport](ClientsCount) - var counters = newSeq[int](ClientsCount) - for i in 0.. 0: + var data = newString(nbytes + 1) + copyMem(addr data[0], addr pbytes[0], nbytes) + data.setLen(nbytes) + if data.startsWith("REQUEST"): + var numstr = data[7..^1] + var num = parseInt(numstr) + var ans = "ANSWER" & $num + await transp.sendTo(raddr, addr ans[0], len(ans)) + else: + var err = "ERROR" + await transp.sendTo(raddr, addr err[0], len(err)) + else: + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() + + proc client2(transp: DatagramTransport, + raddr: TransportAddress): Future[void] {.async.} = + var pbytes = transp.getMessage() + var nbytes = len(pbytes) + if nbytes > 0: + var data = newString(nbytes + 1) + copyMem(addr data[0], addr pbytes[0], nbytes) + data.setLen(nbytes) + if data.startsWith("ANSWER"): + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = counterPtr[] + 1 + if counterPtr[] == TestsCount: + transp.close() + else: + var ta = initTAddress("127.0.0.1:33336") + var req = "REQUEST" & $counterPtr[] + await transp.sendTo(ta, addr req[0], len(req)) + else: + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() + else: + ## Read operation failed with error + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() + + proc client3(transp: DatagramTransport, + raddr: TransportAddress): Future[void] {.async.} = + var pbytes = transp.getMessage() + var nbytes = len(pbytes) + if nbytes > 0: + var data = newString(nbytes + 1) + copyMem(addr data[0], addr pbytes[0], nbytes) + data.setLen(nbytes) + if data.startsWith("ANSWER"): + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = counterPtr[] + 1 + if counterPtr[] == TestsCount: + transp.close() + else: + var req = "REQUEST" & $counterPtr[] + await transp.send(addr req[0], len(req)) + else: + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() + else: + ## Read operation failed with error + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() + + proc client4(transp: DatagramTransport, + raddr: TransportAddress): Future[void] {.async.} = + var pbytes = transp.getMessage() + var nbytes = len(pbytes) + if nbytes > 0: + var data = newString(nbytes + 1) + copyMem(addr data[0], addr pbytes[0], nbytes) + data.setLen(nbytes) + if data.startsWith("ANSWER"): + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = counterPtr[] + 1 + if counterPtr[] == MessagesCount: + transp.close() + else: + var req = "REQUEST" & $counterPtr[] + await transp.send(addr req[0], len(req)) + else: + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() + else: + ## Read operation failed with error + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() + + proc client5(transp: DatagramTransport, + raddr: TransportAddress): Future[void] {.async.} = + var pbytes = transp.getMessage() + var nbytes = len(pbytes) + if nbytes > 0: + var data = newString(nbytes + 1) + copyMem(addr data[0], addr pbytes[0], nbytes) + data.setLen(nbytes) + if data.startsWith("ANSWER"): + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = counterPtr[] + 1 + if counterPtr[] == MessagesCount: + transp.close() + else: + var req = "REQUEST" & $counterPtr[] + await transp.sendTo(raddr, addr req[0], len(req)) + else: + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() + else: + ## Read operation failed with error + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() + + proc client6(transp: DatagramTransport, + raddr: TransportAddress): Future[void] {.async.} = + var pbytes = transp.getMessage() + var nbytes = len(pbytes) + if nbytes > 0: + var data = newString(nbytes + 1) + copyMem(addr data[0], addr pbytes[0], nbytes) + data.setLen(nbytes) + if data.startsWith("REQUEST"): + var numstr = data[7..^1] + var num = parseInt(numstr) + var ans = "ANSWER" & $num + await transp.sendTo(raddr, ans) + else: + var err = "ERROR" + await transp.sendTo(raddr, err) + else: + ## Read operation failed with error + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() + + proc client7(transp: DatagramTransport, + raddr: TransportAddress): Future[void] {.async.} = + var pbytes = transp.getMessage() + var nbytes = len(pbytes) + if nbytes > 0: + var data = newString(nbytes + 1) + copyMem(addr data[0], addr pbytes[0], nbytes) + data.setLen(nbytes) + if data.startsWith("ANSWER"): + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = counterPtr[] + 1 + if counterPtr[] == TestsCount: + transp.close() + else: + var req = "REQUEST" & $counterPtr[] + await transp.sendTo(raddr, req) + else: + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() + else: + ## Read operation failed with error + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() + + proc client8(transp: DatagramTransport, + raddr: TransportAddress): Future[void] {.async.} = + var pbytes = transp.getMessage() + var nbytes = len(pbytes) + if nbytes > 0: + var data = newString(nbytes + 1) + copyMem(addr data[0], addr pbytes[0], nbytes) + data.setLen(nbytes) + if data.startsWith("ANSWER"): + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = counterPtr[] + 1 + if counterPtr[] == TestsCount: + transp.close() + else: + var req = "REQUEST" & $counterPtr[] + await transp.send(req) + else: + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() + else: + ## Read operation failed with error + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() + + proc client9(transp: DatagramTransport, + raddr: TransportAddress): Future[void] {.async.} = + var pbytes = transp.getMessage() + var nbytes = len(pbytes) + if nbytes > 0: + var data = newString(nbytes + 1) + copyMem(addr data[0], addr pbytes[0], nbytes) + data.setLen(nbytes) + if data.startsWith("REQUEST"): + var numstr = data[7..^1] + var num = parseInt(numstr) + var ans = "ANSWER" & $num + var ansseq = newSeq[byte](len(ans)) + copyMem(addr ansseq[0], addr ans[0], len(ans)) + await transp.sendTo(raddr, ansseq) + else: + var err = "ERROR" + var errseq = newSeq[byte](len(err)) + copyMem(addr errseq[0], addr err[0], len(err)) + await transp.sendTo(raddr, errseq) + else: + ## Read operation failed with error + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() + + proc client10(transp: DatagramTransport, + raddr: TransportAddress): Future[void] {.async.} = + var pbytes = transp.getMessage() + var nbytes = len(pbytes) + if nbytes > 0: + var data = newString(nbytes + 1) + copyMem(addr data[0], addr pbytes[0], nbytes) + data.setLen(nbytes) + if data.startsWith("ANSWER"): + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = counterPtr[] + 1 + if counterPtr[] == TestsCount: + transp.close() + else: + var req = "REQUEST" & $counterPtr[] + var reqseq = newSeq[byte](len(req)) + copyMem(addr reqseq[0], addr req[0], len(req)) + await transp.sendTo(raddr, reqseq) + else: + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() + else: + ## Read operation failed with error + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() + + proc client11(transp: DatagramTransport, + raddr: TransportAddress): Future[void] {.async.} = + var pbytes = transp.getMessage() + var nbytes = len(pbytes) + if nbytes > 0: + var data = newString(nbytes + 1) + copyMem(addr data[0], addr pbytes[0], nbytes) + data.setLen(nbytes) + if data.startsWith("ANSWER"): + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = counterPtr[] + 1 + if counterPtr[] == TestsCount: + transp.close() + else: + var req = "REQUEST" & $counterPtr[] + var reqseq = newSeq[byte](len(req)) + copyMem(addr reqseq[0], addr req[0], len(req)) + await transp.send(reqseq) + else: + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() + else: + ## Read operation failed with error + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() + + proc testPointerSendTo(): Future[int] {.async.} = + ## sendTo(pointer) test + var ta = initTAddress("127.0.0.1:33336") + var counter = 0 + var dgram1 = newDatagramTransport(client1, udata = addr counter, local = ta) + var dgram2 = newDatagramTransport(client2, udata = addr counter) + var data = "REQUEST0" + await dgram2.sendTo(ta, addr data[0], len(data)) + await dgram2.join() + dgram1.close() + await dgram1.join() + result = counter + + proc testPointerSend(): Future[int] {.async.} = + ## send(pointer) test + var ta = initTAddress("127.0.0.1:33337") + var counter = 0 + var dgram1 = newDatagramTransport(client1, udata = addr counter, local = ta) + var dgram2 = newDatagramTransport(client3, udata = addr counter, remote = ta) + var data = "REQUEST0" + await dgram2.send(addr data[0], len(data)) + await dgram2.join() + dgram1.close() + await dgram1.join() + result = counter + + proc testStringSendTo(): Future[int] {.async.} = + ## sendTo(string) test + var ta = initTAddress("127.0.0.1:33338") + var counter = 0 + var dgram1 = newDatagramTransport(client6, udata = addr counter, local = ta) + var dgram2 = newDatagramTransport(client7, udata = addr counter) + var data = "REQUEST0" + await dgram2.sendTo(ta, data) + await dgram2.join() + dgram1.close() + await dgram1.join() + result = counter + + proc testStringSend(): Future[int] {.async.} = + ## send(string) test + var ta = initTAddress("127.0.0.1:33339") + var counter = 0 + var dgram1 = newDatagramTransport(client6, udata = addr counter, local = ta) + var dgram2 = newDatagramTransport(client8, udata = addr counter, remote = ta) + var data = "REQUEST0" + await dgram2.send(data) + await dgram2.join() + dgram1.close() + await dgram1.join() + result = counter + + proc testSeqSendTo(): Future[int] {.async.} = + ## sendTo(string) test + var ta = initTAddress("127.0.0.1:33340") + var counter = 0 + var dgram1 = newDatagramTransport(client9, udata = addr counter, local = ta) + var dgram2 = newDatagramTransport(client10, udata = addr counter) + var data = "REQUEST0" + var dataseq = newSeq[byte](len(data)) + copyMem(addr dataseq[0], addr data[0], len(data)) + await dgram2.sendTo(ta, dataseq) + await dgram2.join() + dgram1.close() + await dgram1.join() + result = counter + + proc testSeqSend(): Future[int] {.async.} = + ## send(seq) test + var ta = initTAddress("127.0.0.1:33341") + var counter = 0 + var dgram1 = newDatagramTransport(client9, udata = addr counter, local = ta) + var dgram2 = newDatagramTransport(client11, udata = addr counter, remote = ta) + var data = "REQUEST0" + var dataseq = newSeq[byte](len(data)) + copyMem(addr dataseq[0], addr data[0], len(data)) + await dgram2.send(data) + await dgram2.join() + dgram1.close() + await dgram1.join() + result = counter + + # + + proc waitAll(futs: seq[Future[void]]): Future[void] = + var counter = len(futs) + var retFuture = newFuture[void]("waitAll") + proc cb(udata: pointer) = + dec(counter) + if counter == 0: + retFuture.complete() + for fut in futs: + fut.addCallback(cb) + return retFuture + + proc test3(bounded: bool): Future[int] {.async.} = + var ta: TransportAddress + if bounded: + ta = initTAddress("127.0.0.1:33240") + else: + ta = initTAddress("127.0.0.1:33241") + var counter = 0 + var dgram1 = newDatagramTransport(client1, udata = addr counter, local = ta) + var clients = newSeq[Future[void]](ClientsCount) + var grams = newSeq[DatagramTransport](ClientsCount) + var counters = newSeq[int](ClientsCount) + for i in 0.. CallSoonTests * 2 - test "`callSoon() is not working prior getGlobalDispatcher()` #7192 test": - check test3() == true + test "User-defined callback argument test": + var values = [0x12345678'u, 0x23456789'u, 0x3456789A'u, 0x456789AB'u, + 0x56789ABC'u, 0x6789ABCD'u, 0x789ABCDE'u, 0x89ABCDEF'u, + 0x9ABCDEF1'u, 0xABCDEF12'u, 0xBCDEF123'u, 0xCDEF1234'u, + 0xDEF12345'u, 0xEF123456'u, 0xF1234567'u, 0x12345678'u] + var expect = 0'u + for item in values: + expect = expect xor item + check test1() == expect + test "`Asynchronous dead end` #7193 test": + var timers, callbacks: int + test2(timers, callbacks) + check: + timers == CallSoonTests + callbacks > CallSoonTests * 2 + test "`callSoon() is not working prior getGlobalDispatcher()` #7192 test": + check test3() == true diff --git a/tests/teststream.nim b/tests/teststream.nim index 217c7ab..aa9c7ac 100644 --- a/tests/teststream.nim +++ b/tests/teststream.nim @@ -5,7 +5,6 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) - import strutils, unittest, os import ../chronos @@ -14,640 +13,17 @@ when defined(windows): else: import posix -const - ConstantMessage = "SOMEDATA" - BigMessagePattern = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" - FilesTestName = "tests/teststream.nim" - BigMessageCount = 100 - ClientsCount = 5 - MessagesCount = 10 - MessageSize = 20 - FilesCount = 10 - -proc serveClient1(server: StreamServer, transp: StreamTransport) {.async.} = - while not transp.atEof(): - var data = await transp.readLine() - if len(data) == 0: - doAssert(transp.atEof()) - break - doAssert(data.startsWith("REQUEST")) - var numstr = data[7..^1] - var num = parseInt(numstr) - var ans = "ANSWER" & $num & "\r\n" - var res = await transp.write(cast[pointer](addr ans[0]), len(ans)) - doAssert(res == len(ans)) - transp.close() - await transp.join() - -proc serveClient2(server: StreamServer, transp: StreamTransport) {.async.} = - var buffer: array[20, char] - var check = "REQUEST" - while not transp.atEof(): - zeroMem(addr buffer[0], MessageSize) - try: - await transp.readExactly(addr buffer[0], MessageSize) - except TransportIncompleteError: - break - doAssert(equalMem(addr buffer[0], addr check[0], len(check))) - var numstr = "" - var i = 7 - while i < MessageSize and (buffer[i] in {'0'..'9'}): - numstr.add(buffer[i]) - inc(i) - var num = parseInt(numstr) - var ans = "ANSWER" & $num - zeroMem(addr buffer[0], MessageSize) - copyMem(addr buffer[0], addr ans[0], len(ans)) - var res = await transp.write(cast[pointer](addr buffer[0]), MessageSize) - doAssert(res == MessageSize) - transp.close() - await transp.join() - -proc serveClient3(server: StreamServer, transp: StreamTransport) {.async.} = - var buffer: array[20, char] - var check = "REQUEST" - var suffixStr = "SUFFIX" - var suffix = newSeq[byte](6) - copyMem(addr suffix[0], addr suffixStr[0], len(suffixStr)) - var counter = MessagesCount - while counter > 0: - zeroMem(addr buffer[0], MessageSize) - var res = await transp.readUntil(addr buffer[0], MessageSize, suffix) - doAssert(equalMem(addr buffer[0], addr check[0], len(check))) - var numstr = "" - var i = 7 - while i < MessageSize and (buffer[i] in {'0'..'9'}): - numstr.add(buffer[i]) - inc(i) - var num = parseInt(numstr) - doAssert(len(numstr) < 8) - var ans = "ANSWER" & $num & "SUFFIX" - zeroMem(addr buffer[0], MessageSize) - copyMem(addr buffer[0], addr ans[0], len(ans)) - res = await transp.write(cast[pointer](addr buffer[0]), len(ans)) - doAssert(res == len(ans)) - dec(counter) - transp.close() - await transp.join() - -proc serveClient4(server: StreamServer, transp: StreamTransport) {.async.} = - var pathname = await transp.readLine() - var size = await transp.readLine() - var sizeNum = parseInt(size) - doAssert(sizeNum >= 0) - var rbuffer = newSeq[byte](sizeNum) - await transp.readExactly(addr rbuffer[0], sizeNum) - var lbuffer = readFile(pathname) - doAssert(len(lbuffer) == sizeNum) - doAssert(equalMem(addr rbuffer[0], addr lbuffer[0], sizeNum)) - var answer = "OK\r\n" - var res = await transp.write(cast[pointer](addr answer[0]), len(answer)) - doAssert(res == len(answer)) - transp.close() - await transp.join() - -proc serveClient7(server: StreamServer, transp: StreamTransport) {.async.} = - var answer = "DONE\r\n" - var expect = "" - var line = await transp.readLine() - doAssert(len(line) == BigMessageCount * len(BigMessagePattern)) - for i in 0.. 0) - name = name & "\r\n" - var res = await transp.write(cast[pointer](addr name[0]), len(name)) - doAssert(res == len(name)) - ssize = $size & "\r\n" - res = await transp.write(cast[pointer](addr ssize[0]), len(ssize)) - doAssert(res == len(ssize)) - var checksize = await transp.writeFile(handle, 0'u, size) - doAssert(checksize == size) - close(fhandle) - var ans = await transp.readLine() - doAssert(ans == "OK") - result = 1 - transp.close() - await transp.join() - -proc swarmWorker7(address: TransportAddress): Future[int] {.async.} = - var transp = await connect(address) - var data = BigMessagePattern - var crlf = "\r\n" - for i in 0.. buffer.len: - buffer.setLen(prevLen + readLength) - - let bytesRead = await transp.readOnce(addr buffer[prevLen], readLength) - inc(prevLen, bytesRead) - - buffer.setLen(prevLen) - doAssert(buffer == BigMessagePattern) - - result = 1 - transp.close() - await transp.join() - -proc test16(address: TransportAddress): Future[int] {.async.} = - var server = createStreamServer(address, serveClient16, {ReuseAddr}) - server.start() - result = await swarmWorker16(address) - server.stop() - server.close() - await server.join() - -when isMainModule: +suite "Stream Transport test suite": const + ConstantMessage = "SOMEDATA" + BigMessagePattern = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + FilesTestName = "tests/teststream.nim" + BigMessageCount = 100 + ClientsCount = 5 + MessagesCount = 10 + MessageSize = 20 + FilesCount = 10 + m1 = "readLine() multiple clients with messages (" & $ClientsCount & " clients x " & $MessagesCount & " messages)" m2 = "readExactly() multiple clients with messages (" & $ClientsCount & @@ -679,44 +55,681 @@ when isMainModule: initTAddress(r"/tmp/testpipe") ] var prefixes = ["[IP] ", "[UNIX] "] - suite "Stream Transport test suite": - for i in 0.. 0: + zeroMem(addr buffer[0], MessageSize) + var res = await transp.readUntil(addr buffer[0], MessageSize, suffix) + doAssert(equalMem(addr buffer[0], addr check[0], len(check))) + var numstr = "" + var i = 7 + while i < MessageSize and (buffer[i] in {'0'..'9'}): + numstr.add(buffer[i]) + inc(i) + var num = parseInt(numstr) + doAssert(len(numstr) < 8) + var ans = "ANSWER" & $num & "SUFFIX" + zeroMem(addr buffer[0], MessageSize) + copyMem(addr buffer[0], addr ans[0], len(ans)) + res = await transp.write(cast[pointer](addr buffer[0]), len(ans)) + doAssert(res == len(ans)) + dec(counter) + transp.close() + await transp.join() + + proc serveClient4(server: StreamServer, transp: StreamTransport) {.async.} = + var pathname = await transp.readLine() + var size = await transp.readLine() + var sizeNum = parseInt(size) + doAssert(sizeNum >= 0) + var rbuffer = newSeq[byte](sizeNum) + await transp.readExactly(addr rbuffer[0], sizeNum) + var lbuffer = readFile(pathname) + doAssert(len(lbuffer) == sizeNum) + doAssert(equalMem(addr rbuffer[0], addr lbuffer[0], sizeNum)) + var answer = "OK\r\n" + var res = await transp.write(cast[pointer](addr answer[0]), len(answer)) + doAssert(res == len(answer)) + transp.close() + await transp.join() + + proc serveClient7(server: StreamServer, transp: StreamTransport) {.async.} = + var answer = "DONE\r\n" + var expect = "" + var line = await transp.readLine() + doAssert(len(line) == BigMessageCount * len(BigMessagePattern)) + for i in 0.. 0) + name = name & "\r\n" + var res = await transp.write(cast[pointer](addr name[0]), len(name)) + doAssert(res == len(name)) + ssize = $size & "\r\n" + res = await transp.write(cast[pointer](addr ssize[0]), len(ssize)) + doAssert(res == len(ssize)) + var checksize = await transp.writeFile(handle, 0'u, size) + doAssert(checksize == size) + close(fhandle) + var ans = await transp.readLine() + doAssert(ans == "OK") + result = 1 + transp.close() + await transp.join() + + proc swarmWorker7(address: TransportAddress): Future[int] {.async.} = + var transp = await connect(address) + var data = BigMessagePattern + var crlf = "\r\n" + for i in 0.. buffer.len: + buffer.setLen(prevLen + readLength) + + let bytesRead = await transp.readOnce(addr buffer[prevLen], readLength) + inc(prevLen, bytesRead) + + buffer.setLen(prevLen) + doAssert(buffer == BigMessagePattern) + + result = 1 + transp.close() + await transp.join() + + proc test16(address: TransportAddress): Future[int] {.async.} = + var server = createStreamServer(address, serveClient16, {ReuseAddr}) + server.start() + result = await swarmWorker16(address) + server.stop() + server.close() + await server.join() + + proc testCloseTransport(address: TransportAddress): Future[int] {.async.} = + proc client(server: StreamServer, transp: StreamTransport) {.async.} = + discard + var server = createStreamServer(address, client, {ReuseAddr}) + server.start() + server.stop + server.close() + try: + await wait(server.join(), 1.seconds) + result = 1 + except: + discard + + for i in 0..= 1000.milliseconds) and (d <= 2_000.milliseconds) + proc testTimer(): bool = + let a = Moment.now() + waitFor(sleepAsync(1000.milliseconds)) + let b = Moment.now() + let d = b - a + result = (d >= 1000.milliseconds) and (d <= 2_000.milliseconds) -when isMainModule: - suite "Asynchronous timers test suite": - test "Timer reliability test [" & asyncTimer & "]": - check testTimer() == true - test $TimersCount & " timers with 10ms timeout": - var res = waitFor(test(10.milliseconds)) - check (res >= 10.milliseconds) and (res <= 100.milliseconds) - test $TimersCount & " timers with 100ms timeout": - var res = waitFor(test(100.milliseconds)) - check (res >= 100.milliseconds) and (res <= 1000.milliseconds) - test $TimersCount & " timers with 1000ms timeout": - var res = waitFor(test(1000.milliseconds)) - check (res >= 1000.milliseconds) and (res <= 5000.milliseconds) + test "Timer reliability test [" & asyncTimer & "]": + check testTimer() == true + test $TimersCount & " timers with 10ms timeout": + var res = waitFor(test(10.milliseconds)) + check (res >= 10.milliseconds) and (res <= 100.milliseconds) + test $TimersCount & " timers with 100ms timeout": + var res = waitFor(test(100.milliseconds)) + check (res >= 100.milliseconds) and (res <= 1000.milliseconds) + test $TimersCount & " timers with 1000ms timeout": + var res = waitFor(test(1000.milliseconds)) + check (res >= 1000.milliseconds) and (res <= 5000.milliseconds)