From 3ca2c5e6b510c15ce88c94ed25731b30f7ad46b5 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Fri, 19 Jan 2024 09:21:10 +0100 Subject: [PATCH] deprecate `callback=`, UDP fixes (fixes #491) (#492) Using the callback setter may lead to callbacks owned by others being reset, which is unexpected. * don't crash on zero-length UDP writes --- chronos/internal/asyncfutures.nim | 8 +++++-- chronos/transports/datagram.nim | 38 +++++++++++++++---------------- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/chronos/internal/asyncfutures.nim b/chronos/internal/asyncfutures.nim index 5ce9da4..496a776 100644 --- a/chronos/internal/asyncfutures.nim +++ b/chronos/internal/asyncfutures.nim @@ -330,7 +330,8 @@ proc removeCallback*(future: FutureBase, cb: CallbackFunc, proc removeCallback*(future: FutureBase, cb: CallbackFunc) = future.removeCallback(cb, cast[pointer](future)) -proc `callback=`*(future: FutureBase, cb: CallbackFunc, udata: pointer) = +proc `callback=`*(future: FutureBase, cb: CallbackFunc, udata: pointer) {. + deprecated: "use addCallback/removeCallback/clearCallbacks to manage the callback list".} = ## Clears the list of callbacks and sets the callback proc to be called when ## the future completes. ## @@ -341,11 +342,14 @@ proc `callback=`*(future: FutureBase, cb: CallbackFunc, udata: pointer) = future.clearCallbacks future.addCallback(cb, udata) -proc `callback=`*(future: FutureBase, cb: CallbackFunc) = +proc `callback=`*(future: FutureBase, cb: CallbackFunc) {. + deprecated: "use addCallback/removeCallback/clearCallbacks instead to manage the callback list".} = ## Sets the callback proc to be called when the future completes. ## ## If future has already completed then ``cb`` will be called immediately. + {.push warning[Deprecated]: off.} `callback=`(future, cb, cast[pointer](future)) + {.pop.} proc `cancelCallback=`*(future: FutureBase, cb: CallbackFunc) = ## Sets the callback procedure to be called when the future is cancelled. diff --git a/chronos/transports/datagram.nim b/chronos/transports/datagram.nim index 88db7ee..cd335df 100644 --- a/chronos/transports/datagram.nim +++ b/chronos/transports/datagram.nim @@ -13,6 +13,7 @@ import std/deques when not(defined(windows)): import ".."/selectors2 import ".."/[asyncloop, config, osdefs, oserrno, osutils, handles] import "."/common +import stew/ptrops type VectorKind = enum @@ -119,7 +120,7 @@ when defined(windows): ## Initiation transp.state.incl(WritePending) let fd = SocketHandle(transp.fd) - var vector = transp.queue.popFirst() + let vector = transp.queue.popFirst() transp.setWriterWSABuffer(vector) let ret = if vector.kind == WithAddress: @@ -365,7 +366,7 @@ when defined(windows): udata: cast[pointer](res)) res.wovl.data = CompletionData(cb: writeDatagramLoop, udata: cast[pointer](res)) - res.rwsabuf = WSABUF(buf: cast[cstring](addr res.buffer[0]), + res.rwsabuf = WSABUF(buf: cast[cstring](baseAddr res.buffer), len: ULONG(len(res.buffer))) GC_ref(res) # Start tracking transport @@ -392,7 +393,7 @@ else: else: while true: transp.ralen = SockLen(sizeof(Sockaddr_storage)) - var res = osdefs.recvfrom(fd, addr transp.buffer[0], + var res = osdefs.recvfrom(fd, baseAddr transp.buffer, cint(len(transp.buffer)), cint(0), cast[ptr SockAddr](addr transp.raddr), addr transp.ralen) @@ -424,7 +425,7 @@ else: transp.state.incl({WritePaused}) else: if len(transp.queue) > 0: - var vector = transp.queue.popFirst() + let vector = transp.queue.popFirst() while true: if vector.kind == WithAddress: toSAddr(vector.address, transp.waddr, transp.walen) @@ -826,7 +827,7 @@ proc newDatagramTransport6*[T](cbproc: UnsafeDatagramCallback, proc join*(transp: DatagramTransport): Future[void] {. async: (raw: true, raises: [CancelledError]).} = ## Wait until the transport ``transp`` will be closed. - var retFuture = newFuture[void]("datagram.transport.join") + let retFuture = newFuture[void]("datagram.transport.join") proc continuation(udata: pointer) {.gcsafe.} = retFuture.complete() @@ -858,12 +859,12 @@ proc send*(transp: DatagramTransport, pbytes: pointer, async: (raw: true, raises: [TransportError, CancelledError]).} = ## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport ## ``transp`` to remote destination address which was bounded on transport. - var retFuture = newFuture[void]("datagram.transport.send(pointer)") + let retFuture = newFuture[void]("datagram.transport.send(pointer)") transp.checkClosed(retFuture) if transp.remote.port == Port(0): retFuture.fail(newException(TransportError, "Remote peer not set!")) return retFuture - var vector = GramVector(kind: WithoutAddress, buf: pbytes, buflen: nbytes, + let vector = GramVector(kind: WithoutAddress, buf: pbytes, buflen: nbytes, writer: retFuture) transp.queue.addLast(vector) if WritePaused in transp.state: @@ -877,14 +878,14 @@ proc send*(transp: DatagramTransport, msg: sink string, async: (raw: true, raises: [TransportError, CancelledError]).} = ## Send string ``msg`` using transport ``transp`` to remote destination ## address which was bounded on transport. - var retFuture = newFuture[void]("datagram.transport.send(string)") + let retFuture = newFuture[void]("datagram.transport.send(string)") transp.checkClosed(retFuture) let length = if msglen <= 0: len(msg) else: msglen var localCopy = chronosMoveSink(msg) retFuture.addCallback(proc(_: pointer) = reset(localCopy)) - let vector = GramVector(kind: WithoutAddress, buf: addr localCopy[0], + let vector = GramVector(kind: WithoutAddress, buf: baseAddr localCopy, buflen: length, writer: retFuture) @@ -900,14 +901,14 @@ proc send*[T](transp: DatagramTransport, msg: sink seq[T], async: (raw: true, raises: [TransportError, CancelledError]).} = ## Send string ``msg`` using transport ``transp`` to remote destination ## address which was bounded on transport. - var retFuture = newFuture[void]("datagram.transport.send(seq)") + let retFuture = newFuture[void]("datagram.transport.send(seq)") transp.checkClosed(retFuture) let length = if msglen <= 0: (len(msg) * sizeof(T)) else: (msglen * sizeof(T)) var localCopy = chronosMoveSink(msg) retFuture.addCallback(proc(_: pointer) = reset(localCopy)) - let vector = GramVector(kind: WithoutAddress, buf: addr localCopy[0], + let vector = GramVector(kind: WithoutAddress, buf: baseAddr localCopy, buflen: length, writer: retFuture) transp.queue.addLast(vector) @@ -922,7 +923,7 @@ proc sendTo*(transp: DatagramTransport, remote: TransportAddress, async: (raw: true, raises: [TransportError, CancelledError]).} = ## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport ## ``transp`` to remote destination address ``remote``. - var retFuture = newFuture[void]("datagram.transport.sendTo(pointer)") + let retFuture = newFuture[void]("datagram.transport.sendTo(pointer)") transp.checkClosed(retFuture) let vector = GramVector(kind: WithAddress, buf: pbytes, buflen: nbytes, writer: retFuture, address: remote) @@ -938,14 +939,14 @@ proc sendTo*(transp: DatagramTransport, remote: TransportAddress, async: (raw: true, raises: [TransportError, CancelledError]).} = ## Send string ``msg`` using transport ``transp`` to remote destination ## address ``remote``. - var retFuture = newFuture[void]("datagram.transport.sendTo(string)") + let retFuture = newFuture[void]("datagram.transport.sendTo(string)") transp.checkClosed(retFuture) let length = if msglen <= 0: len(msg) else: msglen var localCopy = chronosMoveSink(msg) retFuture.addCallback(proc(_: pointer) = reset(localCopy)) - let vector = GramVector(kind: WithAddress, buf: addr localCopy[0], + let vector = GramVector(kind: WithAddress, buf: baseAddr localCopy, buflen: length, writer: retFuture, address: remote) @@ -961,15 +962,15 @@ proc sendTo*[T](transp: DatagramTransport, remote: TransportAddress, async: (raw: true, raises: [TransportError, CancelledError]).} = ## Send sequence ``msg`` using transport ``transp`` to remote destination ## address ``remote``. - var retFuture = newFuture[void]("datagram.transport.sendTo(seq)") + let retFuture = newFuture[void]("datagram.transport.sendTo(seq)") transp.checkClosed(retFuture) let length = if msglen <= 0: (len(msg) * sizeof(T)) else: (msglen * sizeof(T)) var localCopy = chronosMoveSink(msg) retFuture.addCallback(proc(_: pointer) = reset(localCopy)) - let vector = GramVector(kind: WithAddress, buf: addr localCopy[0], + let vector = GramVector(kind: WithAddress, buf: baseAddr localCopy, buflen: length, - writer: cast[Future[void]](retFuture), + writer: retFuture, address: remote) transp.queue.addLast(vector) if WritePaused in transp.state: @@ -993,7 +994,6 @@ proc peekMessage*(transp: DatagramTransport, msg: var seq[byte], proc getMessage*(transp: DatagramTransport): seq[byte] {. raises: [TransportError].} = ## Copy data from internal message buffer and return result. - var default: seq[byte] if ReadError in transp.state: transp.state.excl(ReadError) raise transp.getError() @@ -1002,7 +1002,7 @@ proc getMessage*(transp: DatagramTransport): seq[byte] {. copyMem(addr res[0], addr transp.buffer[0], transp.buflen) res else: - default + default(seq[byte]) proc getUserData*[T](transp: DatagramTransport): T {.inline.} = ## Obtain user data stored in ``transp`` object.