diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index 988b574..43b2c43 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -299,7 +299,6 @@ when defined(windows): CompletionKey = ULONG_PTR CompletionData* = object - fd*: AsyncFD cb*: CallbackFunc errCode*: OSErrorCode bytesCount*: int32 @@ -500,17 +499,9 @@ elif unixPlatform: type AsyncFD* = distinct cint - CompletionData* = object - fd*: AsyncFD - udata*: pointer - - PCompletionData* = ptr CompletionData - SelectorData* = object reader*: AsyncCallback - rdata*: CompletionData writer*: AsyncCallback - wdata*: CompletionData PDispatcher* = ref object of PDispatcherBase selector: Selector[SelectorData] @@ -555,8 +546,6 @@ elif unixPlatform: ## Register file descriptor ``fd`` in thread's dispatcher. let loop = getThreadDispatcher() var data: SelectorData - data.rdata.fd = fd - data.wdata.fd = fd loop.selector.registerHandle(int(fd), {}, data) proc unregister*(fd: AsyncFD) {.raises: [Defect, CatchableError].} = @@ -574,9 +563,8 @@ elif unixPlatform: let loop = getThreadDispatcher() var newEvents = {Event.Read} withData(loop.selector, int(fd), adata) do: - let acb = AsyncCallback(function: cb, udata: addr adata.rdata) + let acb = AsyncCallback(function: cb, udata: udata) adata.reader = acb - adata.rdata = CompletionData(fd: fd, udata: udata) newEvents.incl(Event.Read) if not(isNil(adata.writer.function)): newEvents.incl(Event.Write) @@ -592,7 +580,6 @@ elif unixPlatform: withData(loop.selector, int(fd), adata) do: # We need to clear `reader` data, because `selectors` don't do it adata.reader = default(AsyncCallback) - # adata.rdata = CompletionData() if not(isNil(adata.writer.function)): newEvents.incl(Event.Write) do: @@ -606,9 +593,8 @@ elif unixPlatform: let loop = getThreadDispatcher() var newEvents = {Event.Write} withData(loop.selector, int(fd), adata) do: - let acb = AsyncCallback(function: cb, udata: addr adata.wdata) + let acb = AsyncCallback(function: cb, udata: udata) adata.writer = acb - adata.wdata = CompletionData(fd: fd, udata: udata) newEvents.incl(Event.Write) if not(isNil(adata.reader.function)): newEvents.incl(Event.Read) @@ -624,7 +610,6 @@ elif unixPlatform: withData(loop.selector, int(fd), adata) do: # We need to clear `writer` data, because `selectors` don't do it adata.writer = default(AsyncCallback) - # adata.wdata = CompletionData() if not(isNil(adata.reader.function)): newEvents.incl(Event.Read) do: @@ -692,9 +677,7 @@ elif unixPlatform: var data: SelectorData result = loop.selector.registerSignal(signal, data) withData(loop.selector, result, adata) do: - adata.reader = AsyncCallback(function: cb, udata: addr adata.rdata) - adata.rdata.fd = AsyncFD(result) - adata.rdata.udata = udata + adata.reader = AsyncCallback(function: cb, udata: udata) do: raise newException(ValueError, "File descriptor not registered.") diff --git a/chronos/transports/datagram.nim b/chronos/transports/datagram.nim index 23fabba..f863e66 100644 --- a/chronos/transports/datagram.nim +++ b/chronos/transports/datagram.nim @@ -185,7 +185,7 @@ elif defined(windows): else: ## Initiation transp.state.incl(WritePending) - let fd = SocketHandle(ovl.data.fd) + let fd = SocketHandle(transp.fd) var vector = transp.queue.popFirst() transp.setWriterWSABuffer(vector) var ret: cint @@ -258,7 +258,7 @@ elif defined(windows): ## Initiation if transp.state * {ReadEof, ReadClosed, ReadError} == {}: transp.state.incl(ReadPending) - let fd = SocketHandle(ovl.data.fd) + let fd = SocketHandle(transp.fd) transp.rflag = 0 transp.ralen = SockLen(sizeof(Sockaddr_storage)) let ret = WSARecvFrom(fd, addr transp.rwsabuf, DWORD(1), @@ -406,9 +406,9 @@ elif defined(windows): result.udata = udata result.state = {WritePaused} result.future = newFuture[void]("datagram.transport") - result.rovl.data = CompletionData(fd: localSock, cb: readDatagramLoop, + result.rovl.data = CompletionData(cb: readDatagramLoop, udata: cast[pointer](result)) - result.wovl.data = CompletionData(fd: localSock, cb: writeDatagramLoop, + result.wovl.data = CompletionData(cb: writeDatagramLoop, udata: cast[pointer](result)) result.rwsabuf = TWSABuf(buf: cast[cstring](addr result.buffer[0]), len: int32(len(result.buffer))) @@ -426,9 +426,8 @@ else: proc readDatagramLoop(udata: pointer) {.raises: Defect.}= var raddr: TransportAddress doAssert(not isNil(udata)) - var cdata = cast[ptr CompletionData](udata) - var transp = cast[DatagramTransport](cdata.udata) - let fd = SocketHandle(cdata.fd) + let transp = cast[DatagramTransport](udata) + let fd = SocketHandle(transp.fd) if int(fd) == 0: ## This situation can be happen, when there events present ## after transport was closed. @@ -459,9 +458,8 @@ else: proc writeDatagramLoop(udata: pointer) = var res: int doAssert(not isNil(udata)) - var cdata = cast[ptr CompletionData](udata) - var transp = cast[DatagramTransport](cdata.udata) - let fd = SocketHandle(cdata.fd) + var transp = cast[DatagramTransport](udata) + let fd = SocketHandle(transp.fd) if int(fd) == 0: ## This situation can be happen, when there events present ## after transport was closed. diff --git a/chronos/transports/stream.nim b/chronos/transports/stream.nim index a621cdb..90512c8 100644 --- a/chronos/transports/stream.nim +++ b/chronos/transports/stream.nim @@ -407,7 +407,7 @@ elif defined(windows): ## Initiation transp.state.incl(WritePending) if transp.kind == TransportKind.Socket: - let sock = SocketHandle(transp.wovl.data.fd) + let sock = SocketHandle(transp.fd) var vector = transp.queue.popFirst() if vector.kind == VectorKind.DataBuffer: transp.wovl.zeroOvelappedOffset() @@ -492,7 +492,7 @@ elif defined(windows): else: transp.queue.addFirst(vector) elif transp.kind == TransportKind.Pipe: - let pipe = Handle(transp.wovl.data.fd) + let pipe = Handle(transp.fd) var vector = transp.queue.popFirst() if vector.kind == VectorKind.DataBuffer: transp.wovl.zeroOvelappedOffset() @@ -587,7 +587,7 @@ elif defined(windows): transp.state.excl(ReadPaused) transp.state.incl(ReadPending) if transp.kind == TransportKind.Socket: - let sock = SocketHandle(transp.rovl.data.fd) + let sock = SocketHandle(transp.fd) transp.roffset = transp.offset transp.setReaderWSABuffer() let ret = WSARecv(sock, addr transp.rwsabuf, 1, @@ -610,7 +610,7 @@ elif defined(windows): transp.setReadError(err) transp.completeReader() elif transp.kind == TransportKind.Pipe: - let pipe = Handle(transp.rovl.data.fd) + let pipe = Handle(transp.fd) transp.roffset = transp.offset transp.setReaderWSABuffer() let ret = readFile(pipe, cast[pointer](transp.rwsabuf.buf), @@ -650,9 +650,9 @@ elif defined(windows): else: transp = StreamTransport(kind: TransportKind.Socket) transp.fd = sock - transp.rovl.data = CompletionData(fd: sock, cb: readStreamLoop, + transp.rovl.data = CompletionData(cb: readStreamLoop, udata: cast[pointer](transp)) - transp.wovl.data = CompletionData(fd: sock, cb: writeStreamLoop, + transp.wovl.data = CompletionData(cb: writeStreamLoop, udata: cast[pointer](transp)) transp.buffer = newSeq[byte](bufsize) transp.state = {ReadPaused, WritePaused} @@ -670,9 +670,9 @@ elif defined(windows): else: transp = StreamTransport(kind: TransportKind.Pipe) transp.fd = fd - transp.rovl.data = CompletionData(fd: fd, cb: readStreamLoop, + transp.rovl.data = CompletionData(cb: readStreamLoop, udata: cast[pointer](transp)) - transp.wovl.data = CompletionData(fd: fd, cb: writeStreamLoop, + transp.wovl.data = CompletionData(cb: writeStreamLoop, udata: cast[pointer](transp)) transp.buffer = newSeq[byte](bufsize) transp.flags = flags @@ -746,8 +746,7 @@ elif defined(windows): sock.closeSocket() retFuture.fail(getTransportOsError(err)) else: - let transp = newStreamSocketTransport(povl.data.fd, bufferSize, - child) + let transp = newStreamSocketTransport(sock, bufferSize, child) # Start tracking transport trackStream(transp) retFuture.complete(transp) @@ -761,7 +760,7 @@ elif defined(windows): povl = RefCustomOverlapped() GC_ref(povl) - povl.data = CompletionData(fd: sock, cb: socketContinuation) + povl.data = CompletionData(cb: socketContinuation) let res = loop.connectEx(SocketHandle(sock), cast[ptr SockAddr](addr saddr), DWORD(slen), nil, 0, nil, @@ -895,7 +894,6 @@ elif defined(windows): if pipeHandle == INVALID_HANDLE_VALUE: raiseAssert osErrorMsg(osLastError()) server.sock = AsyncFD(pipeHandle) - server.aovl.data.fd = AsyncFD(pipeHandle) try: register(server.sock) except CatchableError as exc: raiseAsDefect exc, "register" @@ -1177,8 +1175,7 @@ elif defined(windows): let dwLocalAddressLength = DWORD(sizeof(Sockaddr_in6) + 16) let dwRemoteAddressLength = DWORD(sizeof(Sockaddr_in6) + 16) - server.aovl.data = CompletionData(fd: server.sock, - cb: continuationSocket, + server.aovl.data = CompletionData(cb: continuationSocket, udata: cast[pointer](server)) server.apending = true let res = loop.acceptEx(SocketHandle(server.sock), @@ -1219,8 +1216,7 @@ elif defined(windows): retFuture.fail(getTransportOsError(err)) return retFuture - server.aovl.data = CompletionData(fd: server.sock, - cb: continuationPipe, + server.aovl.data = CompletionData(cb: continuationPipe, udata: cast[pointer](server)) server.apending = true let res = connectNamedPipe(Handle(server.sock), @@ -1260,15 +1256,17 @@ else: raiseAsDefect exc, "removeWriter" proc writeStreamLoop(udata: pointer) = - var cdata = cast[ptr CompletionData](udata) - var transp = cast[StreamTransport](cdata.udata) - let fd = SocketHandle(cdata.fd) - - if int(fd) == 0 or isNil(transp): - ## This situation can be happen, when there events present - ## after transport was closed. + if isNil(udata): + # TODO this is an if rather than an assert for historical reasons: + # it should not happen unless there are race conditions - but if there + # are race conditions, `transp` might be invalid even if it's not nil: + # it could have been released return + let + transp = cast[StreamTransport](udata) + fd = SocketHandle(transp.fd) + if WriteClosed in transp.state: if transp.queue.len > 0: transp.removeWriter() @@ -1357,15 +1355,17 @@ else: transp.removeWriter() proc readStreamLoop(udata: pointer) = - # TODO fix Defect raises - they "shouldn't" happen - var cdata = cast[ptr CompletionData](udata) - var transp = cast[StreamTransport](cdata.udata) - let fd = SocketHandle(cdata.fd) - if int(fd) == 0 or isNil(transp): - ## This situation can be happen, when there events present - ## after transport was closed. + if isNil(udata): + # TODO this is an if rather than an assert for historical reasons: + # it should not happen unless there are race conditions - but if there + # are race conditions, `transp` might be invalid even if it's not nil: + # it could have been released return + let + transp = cast[StreamTransport](udata) + fd = SocketHandle(transp.fd) + if ReadClosed in transp.state: transp.state.incl({ReadPaused}) transp.completeReader() @@ -1381,7 +1381,7 @@ else: elif int(err) in {ECONNRESET}: transp.state.incl({ReadEof, ReadPaused}) try: - cdata.fd.removeReader() + transp.fd.removeReader() except IOSelectorsException as exc: raiseAsDefect exc, "removeReader" except ValueError as exc: @@ -1390,7 +1390,7 @@ else: transp.state.incl(ReadPaused) transp.setReadError(err) try: - cdata.fd.removeReader() + transp.fd.removeReader() except IOSelectorsException as exc: raiseAsDefect exc, "removeReader" except ValueError as exc: @@ -1398,7 +1398,7 @@ else: elif res == 0: transp.state.incl({ReadEof, ReadPaused}) try: - cdata.fd.removeReader() + transp.fd.removeReader() except IOSelectorsException as exc: raiseAsDefect exc, "removeReader" except ValueError as exc: @@ -1408,7 +1408,7 @@ else: if transp.offset == len(transp.buffer): transp.state.incl(ReadPaused) try: - cdata.fd.removeReader() + transp.fd.removeReader() except IOSelectorsException as exc: raiseAsDefect exc, "removeReader" except ValueError as exc: @@ -1427,7 +1427,7 @@ else: transp.state.incl(ReadPaused) transp.setReadError(err) try: - cdata.fd.removeReader() + transp.fd.removeReader() except IOSelectorsException as exc: raiseAsDefect exc, "removeReader" except ValueError as exc: @@ -1435,7 +1435,7 @@ else: elif res == 0: transp.state.incl({ReadEof, ReadPaused}) try: - cdata.fd.removeReader() + transp.fd.removeReader() except IOSelectorsException as exc: raiseAsDefect exc, "removeReader" except ValueError as exc: @@ -1445,7 +1445,7 @@ else: if transp.offset == len(transp.buffer): transp.state.incl(ReadPaused) try: - cdata.fd.removeReader() + transp.fd.removeReader() except IOSelectorsException as exc: raiseAsDefect exc, "removeReader" except ValueError as exc: @@ -1519,11 +1519,9 @@ else: proc continuation(udata: pointer) = if not(retFuture.finished()): - var data = cast[ptr CompletionData](udata) var err = 0 - let fd = data.fd try: - fd.removeWriter() + sock.removeWriter() except IOSelectorsException as exc: retFuture.fail(exc) return @@ -1531,15 +1529,15 @@ else: retFuture.fail(exc) return - if not(fd.getSocketError(err)): - closeSocket(fd) + if not(sock.getSocketError(err)): + closeSocket(sock) retFuture.fail(getTransportOsError(osLastError())) return if err != 0: - closeSocket(fd) + closeSocket(sock) retFuture.fail(getTransportOsError(OSErrorCode(err))) return - let transp = newStreamSocketTransport(fd, bufferSize, child) + let transp = newStreamSocketTransport(sock, bufferSize, child) # Start tracking transport trackStream(transp) retFuture.complete(transp) @@ -1581,11 +1579,18 @@ else: break return retFuture - proc acceptLoop(udata: pointer) {.gcsafe.} = + proc acceptLoop(udata: pointer) = + if isNil(udata): + # TODO this is an if rather than an assert for historical reasons: + # it should not happen unless there are race conditions - but if there + # are race conditions, `transp` might be invalid even if it's not nil: + # it could have been released + return + var saddr: Sockaddr_storage slen: SockLen - var server = cast[StreamServer](cast[ptr CompletionData](udata).udata) + let server = cast[StreamServer](udata) while true: if server.status in {ServerStatus.Stopped, ServerStatus.Closed}: break @@ -1990,7 +1995,7 @@ proc createStreamServer*(host: TransportAddress, cb = acceptPipeLoop if not(isNil(cbproc)): - result.aovl.data = CompletionData(fd: serverSocket, cb: cb, + result.aovl.data = CompletionData(cb: cb, udata: cast[pointer](result)) else: if host.family == AddressFamily.Unix: diff --git a/tests/testsignal.nim b/tests/testsignal.nim index ee32f8a..0d5377c 100644 --- a/tests/testsignal.nim +++ b/tests/testsignal.nim @@ -15,13 +15,14 @@ when not defined(windows): suite "Signal handling test suite": when not defined(windows): - var signalCounter = 0 + var + signalCounter = 0 + sigfd = -1 proc signalProc(udata: pointer) = - var cdata = cast[ptr CompletionData](udata) - signalCounter = cast[int](cdata.udata) + signalCounter = cast[int](udata) try: - removeSignal(int(cdata.fd)) + removeSignal(sigfd) except Exception as exc: raiseAssert exc.msg @@ -30,7 +31,7 @@ suite "Signal handling test suite": proc test(signal, value: int): bool = try: - discard addSignal(signal, signalProc, cast[pointer](value)) + sigfd = addSignal(signal, signalProc, cast[pointer](value)) except Exception as exc: raiseAssert exc.msg var fut = asyncProc()