diff --git a/chronos/transports/stream.nim b/chronos/transports/stream.nim index a5f9788..ef96416 100644 --- a/chronos/transports/stream.nim +++ b/chronos/transports/stream.nim @@ -1065,60 +1065,69 @@ when defined(windows): return retFuture proc continuationSocket(udata: pointer) {.gcsafe.} = - var ovl = cast[PtrCustomOverlapped](udata) - var server = cast[StreamServer](ovl.data.udata) + if retFuture.finished(): + # `retFuture` could become finished in 2 cases: + # 1. OS sends IOCP notification about failure, but we already failed + # `retFuture` with proper error. + # 2. `accept()` call has been cancelled. Cancellation callback closed + # accepting socket, so OS sends IOCP notification with an + # `ERROR_OPERATION_ABORTED` error. + return + var + ovl = cast[PtrCustomOverlapped](udata) + server = cast[StreamServer](ovl.data.udata) server.apending = false - if not(retFuture.finished()): - if server.status in {ServerStatus.Stopped, ServerStatus.Closed}: - retFuture.fail(getServerUseClosedError()) + + if server.status in {ServerStatus.Stopped, ServerStatus.Closed}: + retFuture.fail(getServerUseClosedError()) + server.asock.closeSocket() + server.clean() + else: + case ovl.data.errCode + of OSErrorCode(-1): + if setsockopt(SocketHandle(server.asock), cint(osdefs.SOL_SOCKET), + cint(osdefs.SO_UPDATE_ACCEPT_CONTEXT), + addr server.sock, + SockLen(sizeof(SocketHandle))) != 0'i32: + let err = osLastError() + server.asock.closeSocket() + if err == osdefs.WSAENOTSOCK: + # This can be happened when server get closed, but continuation + # was already scheduled, so we failing it not with OS error. + retFuture.fail(getServerUseClosedError()) + else: + let errorMsg = osErrorMsg(err) + retFuture.fail(getConnectionAbortedError(errorMsg)) + else: + var ntransp: StreamTransport + if not(isNil(server.init)): + let transp = server.init(server, server.asock) + ntransp = newStreamSocketTransport(server.asock, + server.bufferSize, + transp) + else: + ntransp = newStreamSocketTransport(server.asock, + server.bufferSize, nil) + # Start tracking transport + trackStream(ntransp) + retFuture.complete(ntransp) + of OSErrorCode(osdefs.ERROR_OPERATION_ABORTED): + # CancelIO() interrupt or close. server.asock.closeSocket() + retFuture.fail(getServerUseClosedError()) + server.clean() + of OSErrorCode(osdefs.WSAENETDOWN), + OSErrorCode(osdefs.WSAENETRESET), + OSErrorCode(osdefs.WSAECONNABORTED), + OSErrorCode(osdefs.WSAECONNRESET), + OSErrorCode(osdefs.WSAETIMEDOUT): + server.asock.closeSocket() + retFuture.fail(getConnectionAbortedError(int(ovl.data.errCode))) server.clean() else: - case ovl.data.errCode - of OSErrorCode(-1): - if setsockopt(SocketHandle(server.asock), cint(osdefs.SOL_SOCKET), - cint(osdefs.SO_UPDATE_ACCEPT_CONTEXT), - addr server.sock, - SockLen(sizeof(SocketHandle))) != 0'i32: - let err = osLastError() - server.asock.closeSocket() - if err == osdefs.WSAENOTSOCK: - # This can be happened when server get closed, but continuation - # was already scheduled, so we failing it not with OS error. - retFuture.fail(getServerUseClosedError()) - else: - let errorMsg = osErrorMsg(err) - retFuture.fail(getConnectionAbortedError(errorMsg)) - else: - var ntransp: StreamTransport - if not(isNil(server.init)): - let transp = server.init(server, server.asock) - ntransp = newStreamSocketTransport(server.asock, - server.bufferSize, - transp) - else: - ntransp = newStreamSocketTransport(server.asock, - server.bufferSize, nil) - # Start tracking transport - trackStream(ntransp) - retFuture.complete(ntransp) - of OSErrorCode(osdefs.ERROR_OPERATION_ABORTED): - # CancelIO() interrupt or close. - server.asock.closeSocket() - retFuture.fail(getServerUseClosedError()) - server.clean() - of OSErrorCode(osdefs.WSAENETDOWN), - OSErrorCode(osdefs.WSAENETRESET), - OSErrorCode(osdefs.WSAECONNABORTED), - OSErrorCode(osdefs.WSAECONNRESET), - OSErrorCode(osdefs.WSAETIMEDOUT): - server.asock.closeSocket() - retFuture.fail(getConnectionAbortedError(int(ovl.data.errCode))) - server.clean() - else: - server.asock.closeSocket() - retFuture.fail(getTransportOsError(ovl.data.errCode)) + server.asock.closeSocket() + retFuture.fail(getTransportOsError(ovl.data.errCode)) proc cancellationSocket(udata: pointer) {.gcsafe.} = if server.apending: @@ -1126,50 +1135,59 @@ when defined(windows): server.asock.closeSocket() proc continuationPipe(udata: pointer) {.gcsafe.} = - var ovl = cast[PtrCustomOverlapped](udata) - var server = cast[StreamServer](ovl.data.udata) + if retFuture.finished(): + # `retFuture` could become finished in 2 cases: + # 1. OS sends IOCP notification about failure, but we already failed + # `retFuture` with proper error. + # 2. `accept()` call has been cancelled. Cancellation callback closed + # accepting socket, so OS sends IOCP notification with an + # `ERROR_OPERATION_ABORTED` error. + return + var + ovl = cast[PtrCustomOverlapped](udata) + server = cast[StreamServer](ovl.data.udata) server.apending = false - if not(retFuture.finished()): - if server.status in {ServerStatus.Stopped, ServerStatus.Closed}: + + if server.status in {ServerStatus.Stopped, ServerStatus.Closed}: + retFuture.fail(getServerUseClosedError()) + server.sock.closeHandle() + server.clean() + else: + if ovl.data.errCode == OSErrorCode(-1): + var ntransp: StreamTransport + var flags = {WinServerPipe} + if NoPipeFlash in server.flags: + flags.incl(WinNoPipeFlash) + if not(isNil(server.init)): + var transp = server.init(server, server.sock) + ntransp = newStreamPipeTransport(server.sock, server.bufferSize, + transp, flags) + else: + ntransp = newStreamPipeTransport(server.sock, server.bufferSize, + nil, flags) + server.sock = server.createAcceptPipe().valueOr: + server.sock = asyncInvalidSocket + server.errorCode = error + retFuture.fail(getTransportOsError(error)) + return + + trackStream(ntransp) + retFuture.complete(ntransp) + + elif int32(ovl.data.errCode) in {osdefs.ERROR_OPERATION_ABORTED, + osdefs.ERROR_PIPE_NOT_CONNECTED}: + # CancelIO() interrupt or close call. retFuture.fail(getServerUseClosedError()) - server.sock.closeHandle() server.clean() else: - if ovl.data.errCode == OSErrorCode(-1): - var ntransp: StreamTransport - var flags = {WinServerPipe} - if NoPipeFlash in server.flags: - flags.incl(WinNoPipeFlash) - if not(isNil(server.init)): - var transp = server.init(server, server.sock) - ntransp = newStreamPipeTransport(server.sock, server.bufferSize, - transp, flags) - else: - ntransp = newStreamPipeTransport(server.sock, server.bufferSize, - nil, flags) - server.sock = server.createAcceptPipe().valueOr: - server.sock = asyncInvalidSocket - server.errorCode = error - retFuture.fail(getTransportOsError(error)) - return - - trackStream(ntransp) - retFuture.complete(ntransp) - - elif int32(ovl.data.errCode) in {osdefs.ERROR_OPERATION_ABORTED, - osdefs.ERROR_PIPE_NOT_CONNECTED}: - # CancelIO() interrupt or close call. - retFuture.fail(getServerUseClosedError()) - server.clean() - else: - discard closeHandle(HANDLE(server.sock)) - server.sock = server.createAcceptPipe().valueOr: - server.sock = asyncInvalidSocket - server.errorCode = error - retFuture.fail(getTransportOsError(error)) - return - retFuture.fail(getTransportOsError(ovl.data.errCode)) + discard closeHandle(HANDLE(server.sock)) + server.sock = server.createAcceptPipe().valueOr: + server.sock = asyncInvalidSocket + server.errorCode = error + retFuture.fail(getTransportOsError(error)) + return + retFuture.fail(getTransportOsError(ovl.data.errCode)) proc cancellationPipe(udata: pointer) {.gcsafe.} = if server.apending: