diff --git a/asyncdispatch2/transports/common.nim b/asyncdispatch2/transports/common.nim index 34952e51..bdd777a4 100644 --- a/asyncdispatch2/transports/common.nim +++ b/asyncdispatch2/transports/common.nim @@ -292,6 +292,16 @@ proc raiseTransportOsError*(err: OSErrorCode) = var msg = "(" & $int(err) & ") " & osErrorMsg(err) raise newException(TransportOsError, msg) +type + SeqHeader = object + length, reserved: int + +proc isLiteral*(s: string): bool {.inline.} = + (cast[ptr SeqHeader](s).reserved and (1 shl (sizeof(int) * 8 - 2))) != 0 + +proc isLiteral*[T](s: seq[T]): bool {.inline.} = + (cast[ptr SeqHeader](s).reserved and (1 shl (sizeof(int) * 8 - 2))) != 0 + when defined(windows): import winlean diff --git a/asyncdispatch2/transports/datagram.nim b/asyncdispatch2/transports/datagram.nim index 17f7e84f..53cd349d 100644 --- a/asyncdispatch2/transports/datagram.nim +++ b/asyncdispatch2/transports/datagram.nim @@ -564,12 +564,14 @@ proc send*(transp: DatagramTransport, pbytes: pointer, transp.resumeWrite() return retFuture -proc send*(transp: DatagramTransport, msg: var string): Future[void] = +proc send*(transp: DatagramTransport, msg: string, msglen = -1): Future[void] = ## Send string ``msg`` using transport ``transp`` to remote destination ## address which was bounded on transport. var retFuture = FutureGCString[void]() transp.checkClosed(retFuture) - shallowCopy(retFuture.gcholder, msg) + if not isLiteral(msg): + shallowCopy(retFuture.gcholder, msg) + let length = if msglen <= 0: len(msg) else: msglen let vector = GramVector(kind: WithoutAddress, buf: unsafeAddr msg[0], buflen: len(msg), writer: cast[Future[void]](retFuture)) @@ -578,22 +580,25 @@ proc send*(transp: DatagramTransport, msg: var string): Future[void] = transp.resumeWrite() return retFuture -proc send*[T](transp: DatagramTransport, msg: var seq[T]): Future[void] = +proc send*[T](transp: DatagramTransport, msg: seq[T], + msglen = -1): Future[void] = ## Send string ``msg`` using transport ``transp`` to remote destination ## address which was bounded on transport. var retFuture = FutureGCSeq[void, T]() transp.checkClosed(retFuture) - shallowCopy(retFuture.gcholder, msg) + if not isLiteral(msg): + shallowCopy(retFuture.gcholder, msg) + let length = if msglen <= 0: (len(msg) * sizeof(T)) else: (msglen * sizeof(T)) let vector = GramVector(kind: WithoutAddress, buf: unsafeAddr msg[0], - buflen: (len(msg) * sizeof(T)), + buflen: length, writer: cast[Future[void]](retFuture)) transp.queue.addLast(vector) if WritePaused in transp.state: transp.resumeWrite() return retFuture -proc sendTo*(transp: DatagramTransport, pbytes: pointer, nbytes: int, - remote: TransportAddress): Future[void] = +proc sendTo*(transp: DatagramTransport, remote: TransportAddress, + pbytes: pointer, nbytes: int): Future[void] = ## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport ## ``transp`` to remote destination address ``remote``. var retFuture = newFuture[void]() @@ -605,15 +610,17 @@ proc sendTo*(transp: DatagramTransport, pbytes: pointer, nbytes: int, transp.resumeWrite() return retFuture -proc sendTo*(transp: DatagramTransport, msg: var string, - remote: TransportAddress): Future[void] = +proc sendTo*(transp: DatagramTransport, remote: TransportAddress, + msg: string, msglen = -1): Future[void] = ## Send string ``msg`` using transport ``transp`` to remote destination ## address ``remote``. var retFuture = FutureGCString[void]() transp.checkClosed(retFuture) - shallowCopy(retFuture.gcholder, msg) + if not isLiteral(msg): + shallowCopy(retFuture.gcholder, msg) + let length = if msglen <= 0: len(msg) else: msglen let vector = GramVector(kind: WithAddress, buf: unsafeAddr msg[0], - buflen: len(msg), + buflen: length, writer: cast[Future[void]](retFuture), address: remote) transp.queue.addLast(vector) @@ -621,15 +628,17 @@ proc sendTo*(transp: DatagramTransport, msg: var string, transp.resumeWrite() return retFuture -proc sendTo*[T](transp: DatagramTransport, msg: var seq[T], - remote: TransportAddress): Future[void] = +proc sendTo*[T](transp: DatagramTransport, remote: TransportAddress, + msg: seq[T], msglen = -1): Future[void] = ## Send sequence ``msg`` using transport ``transp`` to remote destination ## address ``remote``. var retFuture = FutureGCSeq[void, T]() transp.checkClosed(retFuture) - shallowCopy(retFuture.gcholder, msg) + if not isLiteral(msg): + shallowCopy(retFuture.gcholder, msg) + let length = if msglen <= 0: (len(msg) * sizeof(T)) else: (msglen * sizeof(T)) let vector = GramVector(kind: WithAddress, buf: unsafeAddr msg[0], - buflen: (len(msg) * sizeof(T)), + buflen: length, writer: cast[Future[void]](retFuture), address: remote) transp.queue.addLast(vector) @@ -656,55 +665,3 @@ proc getMessage*(transp: DatagramTransport): seq[byte] = proc getUserData*[T](transp: DatagramTransport): T {.inline.} = ## Obtain user data stored in ``transp`` object. result = cast[T](transp.udata) - - -# proc createDatagramServer*(host: TransportAddress, -# cbproc: DatagramCallback, -# flags: set[ServerFlags] = {}, -# sock: AsyncFD = asyncInvalidSocket, -# bufferSize: int = DefaultDatagramBufferSize, -# udata: pointer = nil): DatagramServer = -# var transp: DatagramTransport -# var fflags = flags + {NoAutoRead} -# if host.address.family == IpAddressFamily.IPv4: -# transp = newDatagramTransport(cbproc, AnyAddress, host, sock, -# fflags, udata, bufferSize) -# else: -# transp = newDatagramTransport6(cbproc, AnyAddress6, host, sock, -# fflags, udata, bufferSize) -# result = DatagramServer() -# result.transport = transp -# result.status = ServerStatus.Starting -# GC_ref(result) - -# proc start*(server: DatagramServer) = -# ## Starts ``server``. -# if server.status == ServerStatus.Starting: -# server.transport.resumeRead() -# server.status = ServerStatus.Running - -# proc stop*(server: DatagramServer) = -# ## Stops ``server``. -# if server.status == ServerStatus.Running: -# when defined(windows): -# if {WritePending, ReadPending} * server.transport.state != {}: -# ## CancelIO will stop both reading and writing. -# discard cancelIo(Handle(server.transport.fd)) -# else: -# if WritePaused notin server.transport.state: -# server.transport.fd.removeWriter() -# if ReadPaused notin server.transport.state: -# server.transport.fd.removeReader() -# server.status = ServerStatus.Stopped - -# proc join*(server: DatagramServer) {.async.} = -# ## Waits until ``server`` is not stopped. -# if not server.transport.future.finished: -# await server.transport.future - -# proc close*(server: DatagramServer) = -# ## Release ``server`` resources. -# if server.status == ServerStatus.Stopped: -# server.status = ServerStatus.Closed -# server.transport.close() -# GC_unref(server) diff --git a/tests/testdatagram.nim b/tests/testdatagram.nim index 72620886..07d28d22 100644 --- a/tests/testdatagram.nim +++ b/tests/testdatagram.nim @@ -27,10 +27,10 @@ proc client1(transp: DatagramTransport, var numstr = data[7..^1] var num = parseInt(numstr) var ans = "ANSWER" & $num - await transp.sendTo(addr ans[0], len(ans), raddr) + await transp.sendTo(raddr, addr ans[0], len(ans)) else: var err = "ERROR" - await transp.sendTo(addr err[0], len(err), raddr) + await transp.sendTo(raddr, addr err[0], len(err)) else: var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 @@ -53,7 +53,7 @@ proc client2(transp: DatagramTransport, else: var ta = initTAddress("127.0.0.1:33336") var req = "REQUEST" & $counterPtr[] - await transp.sendTo(addr req[0], len(req), ta) + await transp.sendTo(ta, addr req[0], len(req)) else: var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 @@ -135,7 +135,7 @@ proc client5(transp: DatagramTransport, else: var ta = initTAddress("127.0.0.1:33337") var req = "REQUEST" & $counterPtr[] - await transp.sendTo(addr req[0], len(req), ta) + await transp.sendTo(ta, addr req[0], len(req)) else: var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 @@ -159,10 +159,10 @@ proc client6(transp: DatagramTransport, var numstr = data[7..^1] var num = parseInt(numstr) var ans = "ANSWER" & $num - await transp.sendTo(ans, raddr) + await transp.sendTo(raddr, ans) else: var err = "ERROR" - await transp.sendTo(err, raddr) + await transp.sendTo(raddr, err) else: ## Read operation failed with error var counterPtr = cast[ptr int](transp.udata) @@ -186,7 +186,7 @@ proc client7(transp: DatagramTransport, else: var ta = initTAddress("127.0.0.1:33336") var req = "REQUEST" & $counterPtr[] - await transp.sendTo(req, ta) + await transp.sendTo(ta, req) else: var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 @@ -239,12 +239,12 @@ proc client9(transp: DatagramTransport, var ans = "ANSWER" & $num var ansseq = newSeq[byte](len(ans)) copyMem(addr ansseq[0], addr ans[0], len(ans)) - await transp.sendTo(ansseq, raddr) + await transp.sendTo(raddr, ansseq) else: var err = "ERROR" var errseq = newSeq[byte](len(err)) copyMem(addr errseq[0], addr err[0], len(err)) - await transp.sendTo(errseq, raddr) + await transp.sendTo(raddr, errseq) else: ## Read operation failed with error var counterPtr = cast[ptr int](transp.udata) @@ -270,7 +270,7 @@ proc client10(transp: DatagramTransport, var req = "REQUEST" & $counterPtr[] var reqseq = newSeq[byte](len(req)) copyMem(addr reqseq[0], addr req[0], len(req)) - await transp.sendTo(reqseq, ta) + await transp.sendTo(ta, reqseq) else: var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 @@ -317,7 +317,7 @@ proc testPointerSendTo(): Future[int] {.async.} = var dgram1 = newDatagramTransport(client1, udata = addr counter, local = ta) var dgram2 = newDatagramTransport(client2, udata = addr counter) var data = "REQUEST0" - await dgram2.sendTo(addr data[0], len(data), ta) + await dgram2.sendTo(ta, addr data[0], len(data)) await dgram2.join() dgram1.close() dgram2.close() @@ -343,7 +343,7 @@ proc testStringSendTo(): Future[int] {.async.} = var dgram1 = newDatagramTransport(client6, udata = addr counter, local = ta) var dgram2 = newDatagramTransport(client7, udata = addr counter) var data = "REQUEST0" - await dgram2.sendTo(data, ta) + await dgram2.sendTo(ta, data) await dgram2.join() dgram1.close() dgram2.close() @@ -371,7 +371,7 @@ proc testSeqSendTo(): Future[int] {.async.} = var data = "REQUEST0" var dataseq = newSeq[byte](len(data)) copyMem(addr dataseq[0], addr data[0], len(data)) - await dgram2.sendTo(dataseq, ta) + await dgram2.sendTo(ta, dataseq) await dgram2.join() dgram1.close() dgram2.close() @@ -420,7 +420,7 @@ proc test3(bounded: bool): Future[int] {.async.} = await grams[i].send(addr data[0], len(data)) else: grams[i] = newDatagramTransport(client5, udata = addr counters[i]) - await grams[i].sendTo(addr data[0], len(data), ta) + await grams[i].sendTo(ta, addr data[0], len(data)) clients[i] = grams[i].join() await waitAll(clients) @@ -442,7 +442,7 @@ proc testConnReset(): Future[bool] {.async.} = dgram1.close() var dgram2 = newDatagramTransport(client20, udata = addr counter) var data = "MESSAGE" - discard dgram2.sendTo(data, ta) + discard dgram2.sendTo(ta, data) await sleepAsync(1000) result = (counter == 0)