From 14ebf269e9322de5a7f1fa455033b0bcf18144c6 Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Wed, 30 Jun 2021 18:22:37 +0300 Subject: [PATCH] Fix accept defect (#206) * Fix accept() FutureDefect on cancellation. * Do not close pipe twice. * Check for retFuture state before changing it state. --- chronos/transports/stream.nim | 238 ++++++++++++++++++---------------- 1 file changed, 123 insertions(+), 115 deletions(-) diff --git a/chronos/transports/stream.nim b/chronos/transports/stream.nim index 87096da..e752a39 100644 --- a/chronos/transports/stream.nim +++ b/chronos/transports/stream.nim @@ -1032,44 +1032,45 @@ when defined(windows): var server = cast[StreamServer](ovl.data.udata) server.apending = false - if server.status in {ServerStatus.Stopped, ServerStatus.Closed}: - server.asock.closeSocket() - retFuture.fail(getServerUseClosedError()) - server.clean() - else: - if ovl.data.errCode == OSErrorCode(-1): - if setsockopt(SocketHandle(server.asock), cint(SOL_SOCKET), - cint(SO_UPDATE_ACCEPT_CONTEXT), addr server.sock, - SockLen(sizeof(SocketHandle))) != 0'i32: - let err = OSErrorCode(wsaGetLastError()) - server.asock.closeSocket() - if int32(err) == WSAENOTSOCK: - # This can be happened when server get closed, but continuation - # was already scheduled, so we failing it not with OS error. - retFuture.fail(getServerUseClosedError()) - else: - retFuture.fail(getTransportOsError(err)) - else: - var ntransp: StreamTransport - if not isNil(server.init): - let transp = server.init(server, server.asock) - ntransp = newStreamSocketTransport(server.asock, - server.bufferSize, - transp) - else: - ntransp = newStreamSocketTransport(server.asock, - server.bufferSize, nil) - # Start tracking transport - trackStream(ntransp) - retFuture.complete(ntransp) - elif int32(ovl.data.errCode) == ERROR_OPERATION_ABORTED: - # CancelIO() interrupt or close. - server.asock.closeSocket() + if not(retFuture.finished()): + if server.status in {ServerStatus.Stopped, ServerStatus.Closed}: retFuture.fail(getServerUseClosedError()) + server.asock.closeSocket() server.clean() else: - server.asock.closeSocket() - retFuture.fail(getTransportOsError(ovl.data.errCode)) + if ovl.data.errCode == OSErrorCode(-1): + if setsockopt(SocketHandle(server.asock), cint(SOL_SOCKET), + cint(SO_UPDATE_ACCEPT_CONTEXT), addr server.sock, + SockLen(sizeof(SocketHandle))) != 0'i32: + let err = OSErrorCode(wsaGetLastError()) + server.asock.closeSocket() + if int32(err) == WSAENOTSOCK: + # This can be happened when server get closed, but continuation + # was already scheduled, so we failing it not with OS error. + retFuture.fail(getServerUseClosedError()) + else: + retFuture.fail(getTransportOsError(err)) + else: + var ntransp: StreamTransport + if not isNil(server.init): + let transp = server.init(server, server.asock) + ntransp = newStreamSocketTransport(server.asock, + server.bufferSize, + transp) + else: + ntransp = newStreamSocketTransport(server.asock, + server.bufferSize, nil) + # Start tracking transport + trackStream(ntransp) + retFuture.complete(ntransp) + elif int32(ovl.data.errCode) == ERROR_OPERATION_ABORTED: + # CancelIO() interrupt or close. + server.asock.closeSocket() + retFuture.fail(getServerUseClosedError()) + server.clean() + else: + server.asock.closeSocket() + retFuture.fail(getTransportOsError(ovl.data.errCode)) proc cancellationSocket(udata: pointer) {.gcsafe.} = server.asock.closeSocket() @@ -1079,48 +1080,50 @@ when defined(windows): var server = cast[StreamServer](ovl.data.udata) server.apending = false - if server.status in {ServerStatus.Stopped, ServerStatus.Closed}: - retFuture.fail(getServerUseClosedError()) - server.clean() - else: - if ovl.data.errCode == OSErrorCode(-1): - var ntransp: StreamTransport - var flags = {WinServerPipe} - if NoPipeFlash in server.flags: - flags.incl(WinNoPipeFlash) - if not isNil(server.init): - var transp = server.init(server, server.sock) - ntransp = newStreamPipeTransport(server.sock, server.bufferSize, - transp, flags) - else: - ntransp = newStreamPipeTransport(server.sock, server.bufferSize, - nil, flags) - # Start tracking transport - try: - server.createAcceptPipe() - except CatchableError as exc: - closeHandle(server.sock) - retFuture.fail(exc) - return - trackStream(ntransp) - retFuture.complete(ntransp) - - elif int32(ovl.data.errCode) in {ERROR_OPERATION_ABORTED, - ERROR_PIPE_NOT_CONNECTED}: - # CancelIO() interrupt or close call. + if not(retFuture.finished()): + if server.status in {ServerStatus.Stopped, ServerStatus.Closed}: retFuture.fail(getServerUseClosedError()) + server.sock.closeHandle() server.clean() else: - let sock = server.sock - try: - server.createAcceptPipe() - except CatchableError as exc: - closeHandle(sock) - retFuture.fail(exc) - return - closeHandle(sock) + if ovl.data.errCode == OSErrorCode(-1): + var ntransp: StreamTransport + var flags = {WinServerPipe} + if NoPipeFlash in server.flags: + flags.incl(WinNoPipeFlash) + if not isNil(server.init): + var transp = server.init(server, server.sock) + ntransp = newStreamPipeTransport(server.sock, server.bufferSize, + transp, flags) + else: + ntransp = newStreamPipeTransport(server.sock, server.bufferSize, + nil, flags) + # Start tracking transport + try: + server.createAcceptPipe() + except CatchableError as exc: + closeHandle(server.sock) + retFuture.fail(exc) + return + trackStream(ntransp) + retFuture.complete(ntransp) - retFuture.fail(getTransportOsError(ovl.data.errCode)) + elif int32(ovl.data.errCode) in {ERROR_OPERATION_ABORTED, + ERROR_PIPE_NOT_CONNECTED}: + # CancelIO() interrupt or close call. + retFuture.fail(getServerUseClosedError()) + server.clean() + else: + let sock = server.sock + try: + server.createAcceptPipe() + except CatchableError as exc: + closeHandle(sock) + retFuture.fail(exc) + return + closeHandle(sock) + + retFuture.fail(getTransportOsError(ovl.data.errCode)) proc cancellationPipe(udata: pointer) {.gcsafe.} = server.sock.closeHandle() @@ -1698,51 +1701,56 @@ else: saddr: Sockaddr_storage slen: SockLen - if server.status in {ServerStatus.Stopped, ServerStatus.Closed}: - retFuture.fail(getServerUseClosedError()) - else: - while true: - let res = posix.accept(SocketHandle(server.sock), - cast[ptr SockAddr](addr saddr), addr slen) - if int(res) > 0: - let sock = try: wrapAsyncSocket(res) - except CatchableError as exc: - close(res) - retFuture.fail(exc) - return + if not(retFuture.finished()): + if server.status in {ServerStatus.Stopped, ServerStatus.Closed}: + retFuture.fail(getServerUseClosedError()) + else: + while true: + let res = posix.accept(SocketHandle(server.sock), + cast[ptr SockAddr](addr saddr), addr slen) + if int(res) > 0: + let sock = + try: + wrapAsyncSocket(res) + except CatchableError as exc: + close(res) + retFuture.fail(exc) + return - if sock != asyncInvalidSocket: - var ntransp: StreamTransport - if not isNil(server.init): - let transp = server.init(server, sock) - ntransp = newStreamSocketTransport(sock, server.bufferSize, - transp) + if sock != asyncInvalidSocket: + var ntransp: StreamTransport + if not isNil(server.init): + let transp = server.init(server, sock) + ntransp = newStreamSocketTransport(sock, server.bufferSize, + transp) + else: + ntransp = newStreamSocketTransport(sock, server.bufferSize, + nil) + # Start tracking transport + trackStream(ntransp) + retFuture.complete(ntransp) else: - ntransp = newStreamSocketTransport(sock, server.bufferSize, nil) - # Start tracking transport - trackStream(ntransp) - retFuture.complete(ntransp) + retFuture.fail(getTransportOsError(osLastError())) else: - retFuture.fail(getTransportOsError(osLastError())) - else: - let err = osLastError() - if int(err) == EINTR: - continue - elif int(err) == EAGAIN: - # This error appears only when server get closed, while accept() - # continuation is already scheduled. - retFuture.fail(getServerUseClosedError()) - elif int(err) == EMFILE: - retFuture.fail(getTransportTooManyError()) - else: - retFuture.fail(getTransportOsError(err)) - break - try: - removeReader(server.sock) - except IOSelectorsException as exc: - raiseAsDefect exc, "removeReader" - except ValueError as exc: - raiseAsDefect exc, "removeReader" + let err = osLastError() + if int(err) == EINTR: + continue + elif int(err) == EAGAIN: + # This error appears only when server get closed, while accept() + # continuation is already scheduled. + retFuture.fail(getServerUseClosedError()) + elif int(err) == EMFILE: + retFuture.fail(getTransportTooManyError()) + else: + retFuture.fail(getTransportOsError(err)) + break + + try: + removeReader(server.sock) + except IOSelectorsException as exc: + raiseAsDefect exc, "removeReader" + except ValueError as exc: + raiseAsDefect exc, "removeReader" proc cancellation(udata: pointer) = try: