From b8e8d96f3bf6cf8532a8b8af858e135fba9f5de4 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Tue, 5 Jun 2018 23:21:07 +0300 Subject: [PATCH] Fix datagram: for send/sendTo (string, seq[T]) versions. Fix stream: for write (string, seq[T]) versions More tests for both datagram/stream. --- asyncdispatch2/transports/common.nim | 19 +- asyncdispatch2/transports/datagram.nim | 143 +++++++++------ asyncdispatch2/transports/stream.nim | 136 ++++++++------ tests/testdatagram.nim | 242 +++++++++++++++++++++++-- tests/teststream.nim | 3 +- 5 files changed, 407 insertions(+), 136 deletions(-) diff --git a/asyncdispatch2/transports/common.nim b/asyncdispatch2/transports/common.nim index 0b11f5a6..f52cb38e 100644 --- a/asyncdispatch2/transports/common.nim +++ b/asyncdispatch2/transports/common.nim @@ -8,7 +8,7 @@ # MIT license (LICENSE-MIT) import net, nativesockets, strutils -import ../asyncloop, ../asyncsync +import ../asyncloop const DefaultStreamBufferSize* = 4096 ## Default buffer size for stream @@ -38,14 +38,20 @@ type Running, # Server running Closed # Server closed + FutureGCString*[T] = ref object of Future[T] + ## Future to hold GC strings + gcholder*: string + + FutureGCSeq*[A, B] = ref object of Future[A] + ## Future to hold GC seqs + gcholder*: seq[B] + when defined(windows): type SocketServer* = ref object of RootRef ## Socket server object sock*: AsyncFD # Socket local*: TransportAddress # Address - # actEvent*: AsyncEvent # Activation event - # action*: ServerCommand # Activation command status*: ServerStatus # Current server status udata*: pointer # User-defined pointer flags*: set[ServerFlags] # Flags @@ -62,8 +68,6 @@ else: ## Socket server object sock*: AsyncFD # Socket local*: TransportAddress # Address - # actEvent*: AsyncEvent # Activation event - # action*: ServerCommand # Activation command status*: ServerStatus # Current server status udata*: pointer # User-defined pointer flags*: set[ServerFlags] # Flags @@ -194,6 +198,11 @@ template checkClosed*(t: untyped) = if (ReadClosed in (t).state) or (WriteClosed in (t).state): raise newException(TransportError, "Transport is already closed!") +template checkClosed*(t: untyped, future: untyped) = + if (ReadClosed in (t).state) or (WriteClosed in (t).state): + future.fail(newException(TransportError, "Transport is already closed!")) + return future + template getError*(t: untyped): ref Exception = var err = (t).error (t).error = nil diff --git a/asyncdispatch2/transports/datagram.nim b/asyncdispatch2/transports/datagram.nim index b8b7a968..a248feab 100644 --- a/asyncdispatch2/transports/datagram.nim +++ b/asyncdispatch2/transports/datagram.nim @@ -63,10 +63,6 @@ template setReadError(t, e: untyped) = (t).state.incl(ReadError) (t).error = newException(TransportOsError, osErrorMsg((e))) -template setWriteError(t, e: untyped) = - (t).state.incl(WriteError) - (t).error = newException(TransportOsError, osErrorMsg((e))) - template setWriterWSABuffer(t, v: untyped) = (t).wwsabuf.buf = cast[cstring](v.buf) (t).wwsabuf.len = cast[int32](v.buflen) @@ -84,10 +80,6 @@ when defined(windows): wlen: SockLen # Writer address length wwsabuf: TWSABuf # Writer WSABUF structure - template finishWriter(t: untyped) = - var vv = (t).queue.popFirst() - vv.writer.complete() - proc writeDatagramLoop(udata: pointer) = var bytesCount: int32 var ovl = cast[PtrCustomOverlapped](udata) @@ -97,16 +89,16 @@ when defined(windows): ## Continuation transp.state.excl(WritePending) let err = transp.wovl.data.errCode + let vector = transp.queue.popFirst() if err == OSErrorCode(-1): - discard + vector.writer.complete() elif int(err) == ERROR_OPERATION_ABORTED: # CancelIO() interrupt transp.state.incl(WritePaused) - transp.finishWriter() - break + vector.writer.complete() else: - transp.setWriteError(err) - transp.finishWriter() + transp.state = transp.state + {WritePaused, WriteError} + vector.writer.fail(newException(TransportOsError, osErrorMsg(err))) else: ## Initiation transp.state.incl(WritePending) @@ -129,12 +121,13 @@ when defined(windows): if int(err) == ERROR_OPERATION_ABORTED: # CancelIO() interrupt transp.state.incl(WritePaused) + vector.writer.complete() elif int(err) == ERROR_IO_PENDING: transp.queue.addFirst(vector) else: transp.state.excl(WritePending) - transp.setWriteError(err) - vector.writer.complete() + transp.state = transp.state + {WritePaused, WriteError} + vector.writer.fail(newException(TransportOsError, osErrorMsg(err))) else: transp.queue.addFirst(vector) break @@ -190,12 +183,14 @@ when defined(windows): transp.state.excl(ReadPending) transp.state.incl(ReadPaused) elif int(err) == WSAECONNRESET: - transp.state = {ReadPaused, ReadEof} + transp.state.excl(ReadPending) + transp.state = transp.state + {ReadPaused, ReadEof} break elif int(err) == ERROR_IO_PENDING: discard else: transp.state.excl(ReadPending) + transp.state.incl(ReadPaused) transp.setReadError(err) discard transp.function(transp, nil, 0, raddr, transp.udata) break @@ -325,7 +320,7 @@ else: raddr: TransportAddress var cdata = cast[ptr CompletionData](udata) - if not isNil(cdata) and int(cdata.fd) == 0: + if not isNil(cdata) and (int(cdata.fd) == 0 or isNil(cdata.udata)): # Transport was closed earlier, exiting return var transp = cast[DatagramTransport](cdata.udata) @@ -380,8 +375,8 @@ else: if int(err) == EINTR: continue else: - transp.setWriteError(err) - vector.writer.complete() + vector.writer.fail(newException(TransportOsError, + osErrorMsg(err))) break else: transp.state.incl(WritePaused) @@ -525,61 +520,93 @@ proc join*(transp: DatagramTransport) {.async.} = await transp.future proc send*(transp: DatagramTransport, pbytes: pointer, - nbytes: int) {.async.} = + nbytes: int): Future[void] = ## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport ## ``transp`` to remote destination address which was bounded on transport. - checkClosed(transp) + var retFuture = newFuture[void]() + transp.checkClosed(retFuture) if transp.remote.port == Port(0): - raise newException(TransportError, "Remote peer is not set!") - var waitFuture = newFuture[void]("datagram.transport.send") + retFuture.fail(newException(TransportError, "Remote peer not set!")) + return retFuture var vector = GramVector(kind: WithoutAddress, buf: pbytes, buflen: nbytes, - writer: waitFuture) + writer: retFuture) transp.queue.addLast(vector) if WritePaused in transp.state: transp.resumeWrite() - await vector.writer - if WriteError in transp.state: - raise transp.getError() + return retFuture + +proc send*(transp: DatagramTransport, msg: string): Future[void] = + ## Send string ``msg`` using transport ``transp`` to remote destination + ## address which was bounded on transport. + var retFuture = FutureGCString[void]() + transp.checkClosed(retFuture) + shallowCopy(retFuture.gcholder, msg) + let vector = GramVector(kind: WithoutAddress, buf: unsafeAddr msg[0], + buflen: len(msg), + writer: cast[Future[void]](retFuture)) + transp.queue.addLast(vector) + if WritePaused in transp.state: + transp.resumeWrite() + return retFuture + +proc send*[T](transp: DatagramTransport, msg: seq[T]): Future[void] = + ## Send string ``msg`` using transport ``transp`` to remote destination + ## address which was bounded on transport. + var retFuture = FutureGCSeq[void, T]() + transp.checkClosed(retFuture) + shallowCopy(retFuture.gcholder, msg) + let vector = GramVector(kind: WithoutAddress, buf: unsafeAddr msg[0], + buflen: (len(msg) * sizeof(T)), + writer: cast[Future[void]](retFuture)) + transp.queue.addLast(vector) + if WritePaused in transp.state: + transp.resumeWrite() + return retFuture proc sendTo*(transp: DatagramTransport, pbytes: pointer, nbytes: int, - remote: TransportAddress) {.async.} = + remote: TransportAddress): Future[void] = ## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport ## ``transp`` to remote destination address ``remote``. - checkClosed(transp) - var saddr: Sockaddr_storage - var slen: SockLen - toSockAddr(remote.address, remote.port, saddr, slen) - var waitFuture = newFuture[void]("datagram.transport.sendto") - var vector = GramVector(kind: WithAddress, buf: pbytes, buflen: nbytes, - writer: waitFuture, address: remote) + var retFuture = newFuture[void]() + transp.checkClosed(retFuture) + let vector = GramVector(kind: WithAddress, buf: pbytes, buflen: nbytes, + writer: retFuture, address: remote) transp.queue.addLast(vector) if WritePaused in transp.state: transp.resumeWrite() - await vector.writer - if WriteError in transp.state: - raise transp.getError() + return retFuture -template send*(transp: DatagramTransport, msg: var string): untyped = - ## Send message ``msg`` using transport ``transp`` to remote destination - ## address which was bounded on transport. - send(transp, addr msg[0], len(msg)) +proc sendTo*(transp: DatagramTransport, msg: string, + remote: TransportAddress): Future[void] = + ## Send string ``msg`` using transport ``transp`` to remote destination + ## address ``remote``. + var retFuture = FutureGCString[void]() + transp.checkClosed(retFuture) + shallowCopy(retFuture.gcholder, msg) + let vector = GramVector(kind: WithAddress, buf: unsafeAddr msg[0], + buflen: len(msg), + writer: cast[Future[void]](retFuture), + address: remote) + transp.queue.addLast(vector) + if WritePaused in transp.state: + transp.resumeWrite() + return retFuture -template send*(transp: DatagramTransport, msg: var seq[byte]): untyped = - ## Send message ``msg`` using transport ``transp`` to remote destination - ## address which was bounded on transport. - send(transp, addr msg[0], len(msg)) - -template sendTo*(transp: DatagramTransport, msg: var string, - remote: TransportAddress): untyped = - ## Send message ``msg`` using transport ``transp`` to remote - ## destination address ``remote``. - sendTo(transp, addr msg[0], len(msg), remote) - -template sendTo*(transp: DatagramTransport, msg: var seq[byte], - remote: TransportAddress): untyped = - ## Send message ``msg`` using transport ``transp`` to remote - ## destination address ``remote``. - sendTo(transp, addr msg[0], len(msg), remote) +proc sendTo*[T](transp: DatagramTransport, msg: seq[T], + remote: TransportAddress): Future[void] = + ## Send sequence ``msg`` using transport ``transp`` to remote destination + ## address ``remote``. + var retFuture = FutureGCSeq[void, T]() + transp.checkClosed(retFuture) + shallowCopy(retFuture.gcholder, msg) + let vector = GramVector(kind: WithAddress, buf: unsafeAddr msg[0], + buflen: (len(msg) * sizeof(T)), + writer: cast[Future[void]](retFuture), + address: remote) + transp.queue.addLast(vector) + if WritePaused in transp.state: + transp.resumeWrite() + return retFuture proc createDatagramServer*(host: TransportAddress, cbproc: DatagramCallback, diff --git a/asyncdispatch2/transports/stream.nim b/asyncdispatch2/transports/stream.nim index 196a416a..a24c52eb 100644 --- a/asyncdispatch2/transports/stream.nim +++ b/asyncdispatch2/transports/stream.nim @@ -21,13 +21,12 @@ type DataBuffer, # Simple buffer pointer/length DataFile # File handle for sendfile/TransmitFile -type StreamVector = object kind: VectorKind # Writer vector source kind buf: pointer # Writer buffer pointer buflen: int # Writer buffer size offset: uint # Writer vector offset - writer: Future[void] # Writer vector completion Future + writer: Future[int] # Writer vector completion Future TransportKind* {.pure.} = enum Socket, # Socket transport @@ -102,9 +101,9 @@ template setReadError(t, e: untyped) = (t).state.incl(ReadError) (t).error = newException(TransportOsError, osErrorMsg((e))) -template setWriteError(t, e: untyped) = - (t).state.incl(WriteError) - (t).error = newException(TransportOsError, osErrorMsg((e))) +# template setWriteError(t, e: untyped) = +# (t).state.incl(WriteError) +# (t).error = newException(TransportOsError, osErrorMsg((e))) template finishReader(t: untyped) = var reader = (t).reader @@ -141,10 +140,6 @@ when defined(windows): const SO_UPDATE_CONNECT_CONTEXT = 0x7010 - template finishWriter(t: untyped) = - var vv = (t).queue.popFirst() - vv.writer.complete() - template zeroOvelappedOffset(t: untyped) = (t).offset = 0 (t).offsetHigh = 0 @@ -186,7 +181,7 @@ when defined(windows): bytesCount = transp.wovl.data.bytesCount var vector = transp.queue.popFirst() if bytesCount == 0: - vector.writer.complete() + vector.writer.complete(0) else: if transp.kind == TransportKind.Socket: if vector.kind == VectorKind.DataBuffer: @@ -194,19 +189,23 @@ when defined(windows): vector.shiftVectorBuffer(bytesCount) transp.queue.addFirst(vector) else: - vector.writer.complete() + vector.writer.complete(transp.wwsabuf.len) else: if uint(bytesCount) < getFileSize(vector): vector.shiftVectorFile(bytesCount) transp.queue.addFirst(vector) else: - vector.writer.complete() + vector.writer.complete(int(getFileSize(vector))) elif int(err) == ERROR_OPERATION_ABORTED: # CancelIO() interrupt - transp.finishWriter() + transp.state.incl(WritePaused) + let v = transp.queue.popFirst() + v.writer.complete(0) + break else: - transp.setWriteError(err) - transp.finishWriter() + let v = transp.queue.popFirst() + transp.state.incl(WriteError) + v.writer.fail(newException(TransportOsError, osErrorMsg(err))) else: ## Initiation transp.state.incl(WritePending) @@ -225,12 +224,14 @@ when defined(windows): # CancelIO() interrupt transp.state.excl(WritePending) transp.state.incl(WritePaused) + vector.writer.complete(0) elif int(err) == ERROR_IO_PENDING: transp.queue.addFirst(vector) else: transp.state.excl(WritePending) - transp.setWriteError(err) - vector.writer.complete() + transp.state = transp.state + {WritePaused, WriteError} + vector.writer.fail(newException(TransportOsError, + osErrorMsg(err))) else: transp.queue.addFirst(vector) else: @@ -253,12 +254,14 @@ when defined(windows): # CancelIO() interrupt transp.state.excl(WritePending) transp.state.incl(WritePaused) + vector.writer.complete(0) elif int(err) == ERROR_IO_PENDING: transp.queue.addFirst(vector) else: transp.state.excl(WritePending) - transp.setWriteError(err) - vector.writer.complete() + transp.state = transp.state + {WritePaused, WriteError} + vector.writer.fail(newException(TransportOsError, + osErrorMsg(err))) else: transp.queue.addFirst(vector) break @@ -385,11 +388,11 @@ when defined(windows): sock = createAsyncSocket(address.address.getDomain(), SockType.SOCK_STREAM, Protocol.IPPROTO_TCP) if sock == asyncInvalidSocket: - result.fail(newException(OSError, osErrorMsg(osLastError()))) + result.fail(newException(TransportOsError, osErrorMsg(osLastError()))) if not bindToDomain(sock, address.address.getDomain()): sock.closeAsyncSocket() - result.fail(newException(OSError, osErrorMsg(osLastError()))) + result.fail(newException(TransportOsError, osErrorMsg(osLastError()))) proc continuation(udata: pointer) = var ovl = cast[RefCustomOverlapped](udata) @@ -399,13 +402,15 @@ when defined(windows): cint(SO_UPDATE_CONNECT_CONTEXT), nil, SockLen(0)) != 0'i32: sock.closeAsyncSocket() - retFuture.fail(newException(OSError, osErrorMsg(osLastError()))) + retFuture.fail(newException(TransportOsError, + osErrorMsg(osLastError()))) else: retFuture.complete(newStreamSocketTransport(povl.data.fd, bufferSize)) else: sock.closeAsyncSocket() - retFuture.fail(newException(OSError, osErrorMsg(ovl.data.errCode))) + retFuture.fail(newException(TransportOsError, + osErrorMsg(ovl.data.errCode))) GC_unref(ovl) povl = RefCustomOverlapped() @@ -421,7 +426,7 @@ when defined(windows): if int32(err) != ERROR_IO_PENDING: GC_unref(povl) sock.closeAsyncSocket() - retFuture.fail(newException(OSError, osErrorMsg(err))) + retFuture.fail(newException(TransportOsError, osErrorMsg(err))) return retFuture proc acceptLoop(udata: pointer) {.gcsafe, nimcall.} = @@ -442,7 +447,7 @@ when defined(windows): addr server.sock, SockLen(sizeof(SocketHandle))) != 0'i32: server.asock.closeAsyncSocket() - raiseOsError(osLastError()) + raise newException(TransportOsError, osErrorMsg(osLastError())) else: discard server.function(server, newStreamSocketTransport(server.asock, server.bufferSize), @@ -521,7 +526,7 @@ else: proc writeStreamLoop(udata: pointer) {.gcsafe.} = var cdata = cast[ptr CompletionData](udata) - if not isNil(cdata) and int(cdata.fd) == 0: + if not isNil(cdata) and (int(cdata.fd) == 0 or isNil(cdata.udata)): # Transport was closed earlier, exiting return var transp = cast[UnixStreamTransport](cdata.udata) @@ -534,7 +539,7 @@ else: let res = posix.send(fd, vector.buf, vector.buflen, MSG_NOSIGNAL) if res >= 0: if vector.buflen - res == 0: - vector.writer.complete() + vector.writer.complete(vector.buflen) else: vector.shiftVectorBuffer(res) transp.queue.addFirst(vector) @@ -543,15 +548,15 @@ else: if int(err) == EINTR: continue else: - transp.setWriteError(err) - vector.writer.complete() + vector.writer.fail(newException(TransportOsError, + osErrorMsg(err))) else: let res = sendfile(int(fd), cast[int](vector.buflen), int(vector.offset), cast[int](vector.buf)) if res >= 0: if cast[int](vector.buf) - res == 0: - vector.writer.complete() + vector.writer.complete(cast[int](vector.buf)) else: vector.shiftVectorFile(res) transp.queue.addFirst(vector) @@ -560,16 +565,16 @@ else: if int(err) == EINTR: continue else: - transp.setWriteError(err) - vector.writer.complete() - break + vector.writer.fail(newException(TransportOsError, + osErrorMsg(err))) + break else: transp.state.incl(WritePaused) transp.fd.removeWriter() proc readStreamLoop(udata: pointer) {.gcsafe.} = var cdata = cast[ptr CompletionData](udata) - if not isNil(cdata) and int(cdata.fd) == 0: + if not isNil(cdata) and (int(cdata.fd) == 0 or isNil(cdata.udata)): # Transport was closed earlier, exiting return var transp = cast[UnixStreamTransport](cdata.udata) @@ -582,15 +587,13 @@ else: if int(err) == EINTR: continue elif int(err) in {ECONNRESET}: - transp.state.incl(ReadEof) - transp.state.incl(ReadPaused) + transp.state = transp.state + {ReadEof, ReadPaused} cdata.fd.removeReader() else: transp.setReadError(err) cdata.fd.removeReader() elif res == 0: - transp.state.incl(ReadEof) - transp.state.incl(ReadPaused) + transp.state = transp.state + {ReadEof, ReadPaused} cdata.fd.removeReader() else: transp.offset += res @@ -796,49 +799,62 @@ proc createStreamServer*(host: TransportAddress, result.resumeAccept() proc write*(transp: StreamTransport, pbytes: pointer, - nbytes: int): Future[int] {.async.} = + nbytes: int): Future[int] = ## Write data from buffer ``pbytes`` with size ``nbytes`` using transport ## ``transp``. - checkClosed(transp) - var waitFuture = newFuture[void]("transport.write") - var vector = StreamVector(kind: DataBuffer, writer: waitFuture, + var retFuture = newFuture[int]() + transp.checkClosed(retFuture) + var vector = StreamVector(kind: DataBuffer, writer: retFuture, buf: pbytes, buflen: nbytes) transp.queue.addLast(vector) if WritePaused in transp.state: transp.resumeWrite() - await vector.writer - if WriteError in transp.state: - raise transp.getError() - result = nbytes + return retFuture -template write*(transp: StreamTransport, msg: var string): untyped = - ## Write string ``msg`` using transport ``transp``. - write(transp, addr msg[0], len(msg)) +proc write*(transp: StreamTransport, msg: string): Future[int] = + ## Write data from string ``msg`` using transport ``transp``. + var retFuture = FutureGCString[int]() + transp.checkClosed(retFuture) + shallowCopy(retFuture.gcholder, msg) + var vector = StreamVector(kind: DataBuffer, + writer: cast[Future[int]](retFuture), + buf: unsafeAddr msg[0], buflen: len(msg)) + transp.queue.addLast(vector) + if WritePaused in transp.state: + transp.resumeWrite() + return retFuture -template write*(transp: StreamTransport, msg: var seq[byte]): untyped = - ## Write seq[byte] ``msg`` using transport ``transp``. - write(transp, addr msg[0], len(msg)) +proc write*[T](transp: StreamTransport, msg: seq[T]): Future[int] = + ## Write sequence ``msg`` using transport ``transp``. + var retFuture = FutureGCSeq[int, T]() + transp.checkClosed(retFuture) + shallowCopy(retFuture.gcholder, msg) + var vector = StreamVector(kind: DataBuffer, + writer: cast[Future[int]](retFuture), + buf: unsafeAddr msg[0], + buflen: len(msg) * sizeof(T)) + transp.queue.addLast(vector) + if WritePaused in transp.state: + transp.resumeWrite() + return retFuture proc writeFile*(transp: StreamTransport, handle: int, offset: uint = 0, - size: int = 0): Future[void] {.async.} = + size: int = 0): Future[int] = ## Write data from file descriptor ``handle`` to transport ``transp``. ## ## You can specify starting ``offset`` in opened file and number of bytes ## to transfer from file to transport via ``size``. - if transp.kind != TransportKind.Socket: - raise newException(TransportError, "You can transmit files only to sockets") + var retFuture = newFuture[int]("transport.writeFile") checkClosed(transp) - var waitFuture = newFuture[void]("transport.writeFile") - var vector = StreamVector(kind: DataFile, writer: waitFuture, + + var vector = StreamVector(kind: DataFile, writer: retFuture, buf: cast[pointer](size), offset: offset, buflen: handle) transp.queue.addLast(vector) if WritePaused in transp.state: transp.resumeWrite() - await vector.writer - if WriteError in transp.state: - raise transp.getError() + return retFuture proc atEof*(transp: StreamTransport): bool {.inline.} = ## Returns ``true`` if ``transp`` is at EOF. diff --git a/tests/testdatagram.nim b/tests/testdatagram.nim index a9c5b77e..bccf52db 100644 --- a/tests/testdatagram.nim +++ b/tests/testdatagram.nim @@ -132,7 +132,154 @@ proc client5(transp: DatagramTransport, pbytes: pointer, nbytes: int, counterPtr[] = -1 transp.close() -proc test1(): Future[int] {.async.} = +proc client6(transp: DatagramTransport, pbytes: pointer, nbytes: int, + raddr: TransportAddress, udata: pointer): Future[void] {.async.} = + if not isNil(pbytes): + var data = newString(nbytes + 1) + copyMem(addr data[0], pbytes, nbytes) + data.setLen(nbytes) + if data.startsWith("REQUEST"): + var numstr = data[7..^1] + var num = parseInt(numstr) + var ans = "ANSWER" & $num + await transp.sendTo(ans, raddr) + else: + var err = "ERROR" + await transp.sendTo(err, raddr) + else: + ## Read operation failed with error + var counterPtr = cast[ptr int](udata) + counterPtr[] = -1 + transp.close() + +proc client7(transp: DatagramTransport, pbytes: pointer, nbytes: int, + raddr: TransportAddress, udata: pointer): Future[void] {.async.} = + if not isNil(pbytes): + var data = newString(nbytes + 1) + copyMem(addr data[0], pbytes, nbytes) + data.setLen(nbytes) + if data.startsWith("ANSWER"): + var counterPtr = cast[ptr int](udata) + counterPtr[] = counterPtr[] + 1 + if counterPtr[] == TestsCount: + transp.close() + else: + var ta = initTAddress("127.0.0.1:33336") + var req = "REQUEST" & $counterPtr[] + await transp.sendTo(req, ta) + else: + var counterPtr = cast[ptr int](udata) + counterPtr[] = -1 + transp.close() + else: + ## Read operation failed with error + var counterPtr = cast[ptr int](udata) + counterPtr[] = -1 + transp.close() + +proc client8(transp: DatagramTransport, pbytes: pointer, nbytes: int, + raddr: TransportAddress, udata: pointer): Future[void] {.async.} = + if not isNil(pbytes): + var data = newString(nbytes + 1) + copyMem(addr data[0], pbytes, nbytes) + data.setLen(nbytes) + if data.startsWith("ANSWER"): + var counterPtr = cast[ptr int](udata) + counterPtr[] = counterPtr[] + 1 + if counterPtr[] == TestsCount: + transp.close() + else: + var req = "REQUEST" & $counterPtr[] + await transp.send(req) + else: + var counterPtr = cast[ptr int](udata) + counterPtr[] = -1 + transp.close() + else: + ## Read operation failed with error + var counterPtr = cast[ptr int](udata) + counterPtr[] = -1 + transp.close() + +proc client9(transp: DatagramTransport, pbytes: pointer, nbytes: int, + raddr: TransportAddress, udata: pointer): Future[void] {.async.} = + if not isNil(pbytes): + var data = newString(nbytes + 1) + copyMem(addr data[0], pbytes, nbytes) + data.setLen(nbytes) + if data.startsWith("REQUEST"): + var numstr = data[7..^1] + var num = parseInt(numstr) + var ans = "ANSWER" & $num + var ansseq = newSeq[byte](len(ans)) + copyMem(addr ansseq[0], addr ans[0], len(ans)) + await transp.sendTo(ansseq, raddr) + else: + var err = "ERROR" + var errseq = newSeq[byte](len(err)) + copyMem(addr errseq[0], addr err[0], len(err)) + await transp.sendTo(errseq, raddr) + else: + ## Read operation failed with error + var counterPtr = cast[ptr int](udata) + counterPtr[] = -1 + transp.close() + +proc client10(transp: DatagramTransport, pbytes: pointer, nbytes: int, + raddr: TransportAddress, udata: pointer): Future[void] {.async.} = + if not isNil(pbytes): + var data = newString(nbytes + 1) + copyMem(addr data[0], pbytes, nbytes) + data.setLen(nbytes) + if data.startsWith("ANSWER"): + var counterPtr = cast[ptr int](udata) + counterPtr[] = counterPtr[] + 1 + if counterPtr[] == TestsCount: + transp.close() + else: + var ta = initTAddress("127.0.0.1:33336") + var req = "REQUEST" & $counterPtr[] + var reqseq = newSeq[byte](len(req)) + copyMem(addr reqseq[0], addr req[0], len(req)) + await transp.sendTo(reqseq, ta) + else: + var counterPtr = cast[ptr int](udata) + counterPtr[] = -1 + transp.close() + else: + ## Read operation failed with error + var counterPtr = cast[ptr int](udata) + counterPtr[] = -1 + transp.close() + +proc client11(transp: DatagramTransport, pbytes: pointer, nbytes: int, + raddr: TransportAddress, udata: pointer): Future[void] {.async.} = + if not isNil(pbytes): + var data = newString(nbytes + 1) + copyMem(addr data[0], pbytes, nbytes) + data.setLen(nbytes) + if data.startsWith("ANSWER"): + var counterPtr = cast[ptr int](udata) + counterPtr[] = counterPtr[] + 1 + if counterPtr[] == TestsCount: + transp.close() + else: + var req = "REQUEST" & $counterPtr[] + var reqseq = newSeq[byte](len(req)) + copyMem(addr reqseq[0], addr req[0], len(req)) + await transp.send(reqseq) + else: + var counterPtr = cast[ptr int](udata) + counterPtr[] = -1 + transp.close() + else: + ## Read operation failed with error + var counterPtr = cast[ptr int](udata) + counterPtr[] = -1 + transp.close() + +proc testPointerSendTo(): Future[int] {.async.} = + ## sendTo(pointer) test var ta = initTAddress("127.0.0.1:33336") var counter = 0 var dgram1 = newDatagramTransport(client1, udata = addr counter, local = ta) @@ -144,7 +291,8 @@ proc test1(): Future[int] {.async.} = dgram2.close() result = counter -proc test2(): Future[int] {.async.} = +proc testPointerSend(): Future[int] {.async.} = + ## send(pointer) test var ta = initTAddress("127.0.0.1:33337") var counter = 0 var dgram1 = newDatagramTransport(client1, udata = addr counter, local = ta) @@ -156,6 +304,64 @@ proc test2(): Future[int] {.async.} = dgram2.close() result = counter +proc testStringSendTo(): Future[int] {.async.} = + ## sendTo(string) test + var ta = initTAddress("127.0.0.1:33336") + var counter = 0 + var dgram1 = newDatagramTransport(client6, udata = addr counter, local = ta) + var dgram2 = newDatagramTransport(client7, udata = addr counter) + var data = "REQUEST0" + await dgram2.sendTo(data, ta) + await dgram2.join() + dgram1.close() + dgram2.close() + result = counter + +proc testStringSend(): Future[int] {.async.} = + ## send(string) test + var ta = initTAddress("127.0.0.1:33337") + var counter = 0 + var dgram1 = newDatagramTransport(client6, udata = addr counter, local = ta) + var dgram2 = newDatagramTransport(client8, udata = addr counter, remote = ta) + var data = "REQUEST0" + await dgram2.send(data) + await dgram2.join() + dgram1.close() + dgram2.close() + result = counter + +proc testSeqSendTo(): Future[int] {.async.} = + ## sendTo(string) test + var ta = initTAddress("127.0.0.1:33336") + var counter = 0 + var dgram1 = newDatagramTransport(client9, udata = addr counter, local = ta) + var dgram2 = newDatagramTransport(client10, udata = addr counter) + var data = "REQUEST0" + var dataseq = newSeq[byte](len(data)) + copyMem(addr dataseq[0], addr data[0], len(data)) + await dgram2.sendTo(dataseq, ta) + await dgram2.join() + dgram1.close() + dgram2.close() + result = counter + +proc testSeqSend(): Future[int] {.async.} = + ## send(string) test + var ta = initTAddress("127.0.0.1:33337") + var counter = 0 + var dgram1 = newDatagramTransport(client9, udata = addr counter, local = ta) + var dgram2 = newDatagramTransport(client11, udata = addr counter, remote = ta) + var data = "REQUEST0" + var dataseq = newSeq[byte](len(data)) + copyMem(addr dataseq[0], addr data[0], len(data)) + await dgram2.send(data) + await dgram2.join() + dgram1.close() + dgram2.close() + result = counter + +# + proc waitAll(futs: seq[Future[void]]): Future[void] = var counter = len(futs) var retFuture = newFuture[void]("waitAll") @@ -277,22 +483,34 @@ proc test4(): Future[int] {.async.} = when isMainModule: const - m1 = "Unbounded test (" & $TestsCount & " messages)" - m2 = "Bounded test (" & $TestsCount & " messages)" - m3 = "Unbounded multiple clients with messages (" & $ClientsCount & + m1 = "sendTo(pointer) test (" & $TestsCount & " messages)" + m2 = "send(pointer) test (" & $TestsCount & " messages)" + m3 = "sendTo(string) test (" & $TestsCount & " messages)" + m4 = "send(string) test (" & $TestsCount & " messages)" + m5 = "sendTo(seq[byte]) test (" & $TestsCount & " messages)" + m6 = "send(seq[byte]) test (" & $TestsCount & " messages)" + m7 = "Unbounded multiple clients with messages (" & $ClientsCount & " clients x " & $MessagesCount & " messages)" - m4 = "Bounded multiple clients with messages (" & $ClientsCount & + m8 = "Bounded multiple clients with messages (" & $ClientsCount & " clients x " & $MessagesCount & " messages)" - m5 = "DatagramServer multiple clients with messages (" & $ClientsCount & + m9 = "DatagramServer multiple clients with messages (" & $ClientsCount & " clients x " & $MessagesCount & " messages)" suite "Datagram Transport test suite": test m1: - check waitFor(test1()) == TestsCount + check waitFor(testPointerSendTo()) == TestsCount test m2: - check waitFor(test2()) == TestsCount + check waitFor(testPointerSend()) == TestsCount test m3: - check waitFor(test3(false)) == ClientsCount * MessagesCount + check waitFor(testStringSendTo()) == TestsCount test m4: + check waitFor(testStringSend()) == TestsCount + test m5: + check waitFor(testSeqSendTo()) == TestsCount + test m6: + check waitFor(testSeqSend()) == TestsCount + test m7: + check waitFor(test3(false)) == ClientsCount * MessagesCount + test m8: check waitFor(test3(true)) == ClientsCount * MessagesCount - # test m5: - # check waitFor(test4()) == ClientsCount * MessagesCount + test m9: + check waitFor(test4()) == ClientsCount * MessagesCount diff --git a/tests/teststream.nim b/tests/teststream.nim index 14566154..a1198be9 100644 --- a/tests/teststream.nim +++ b/tests/teststream.nim @@ -253,7 +253,8 @@ proc swarmWorker4(address: TransportAddress): Future[int] {.async.} = doAssert(res == len(name)) res = await transp.write(cast[pointer](addr ssize[0]), len(ssize)) doAssert(res == len(ssize)) - await transp.writeFile(handle, 0'u, size) + var checksize = await transp.writeFile(handle, 0'u, size) + doAssert(checksize == size) close(fhandle) var ans = await transp.readLine() doAssert(ans == "OK")