From 0c7a0bd0a5f9106612b841c5d9c20cd6776b4dde Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Fri, 12 Nov 2021 18:13:56 +0200 Subject: [PATCH] Handle accept() errors properly. (#234) * Add more accept() call error handlers. Fix issue when HTTP server silently stops accepting new connections. Remove unneeded MacOS syscall to disable SIG_PIPE on socket, we already mask this signal in process. * User `if` instead `case` because constants are actually variables. * Fix mistypes. * Do not use case for posix constants. * Fix Linux compilation error. --- chronos/apps/http/httpserver.nim | 30 ++++++++---- chronos/handles.nim | 8 ---- chronos/transports/common.nim | 81 ++++++++++++++++++++++++++++++-- chronos/transports/datagram.nim | 2 +- chronos/transports/stream.nim | 72 ++++++++++++++++++---------- 5 files changed, 147 insertions(+), 46 deletions(-) diff --git a/chronos/apps/http/httpserver.nim b/chronos/apps/http/httpserver.nim index cce74b6..0837958 100644 --- a/chronos/apps/http/httpserver.nim +++ b/chronos/apps/http/httpserver.nim @@ -246,9 +246,12 @@ proc dumbResponse*(): HttpResponseRef {.raises: [Defect].} = ## Create an empty response to return when request processor got no request. HttpResponseRef(state: HttpResponseState.Dumb, version: HttpVersion11) -proc getId(transp: StreamTransport): string {.inline.} = +proc getId(transp: StreamTransport): Result[string, string] {.inline.} = ## Returns string unique transport's identifier as string. - $transp.remoteAddress() & "_" & $transp.localAddress() + try: + ok($transp.remoteAddress() & "_" & $transp.localAddress()) + except TransportOsError as exc: + err($exc.msg) proc hasBody*(request: HttpRequestRef): bool {.raises: [Defect].} = ## Returns ``true`` if request has body. @@ -665,7 +668,8 @@ proc `keepalive=`*(resp: HttpResponseRef, value: bool) = proc keepalive*(resp: HttpResponseRef): bool {.raises: [Defect].} = HttpResponseFlags.KeepAlive in resp.flags -proc processLoop(server: HttpServerRef, transp: StreamTransport) {.async.} = +proc processLoop(server: HttpServerRef, transp: StreamTransport, + connId: string) {.async.} = var conn: HttpConnectionRef connArg: RequestFence @@ -675,7 +679,7 @@ proc processLoop(server: HttpServerRef, transp: StreamTransport) {.async.} = conn = await server.createConnCallback(server, transp) runLoop = true except CancelledError: - server.connections.del(transp.getId()) + server.connections.del(connId) await transp.closeWait() return except HttpCriticalError as exc: @@ -822,7 +826,7 @@ proc processLoop(server: HttpServerRef, transp: StreamTransport) {.async.} = # need to close it. await conn.closeWait() - server.connections.del(transp.getId()) + server.connections.del(connId) # if server.maxConnections > 0: # server.semaphore.release() @@ -832,10 +836,16 @@ proc acceptClientLoop(server: HttpServerRef) {.async.} = try: # if server.maxConnections > 0: # await server.semaphore.acquire() - let transp = await server.instance.accept() - server.connections[transp.getId()] = processLoop(server, transp) - + let resId = transp.getId() + if resId.isErr(): + # We are unable to identify remote peer, it means that remote peer + # disconnected before identification. + await transp.closeWait() + breakLoop = false + else: + let connId = resId.get() + server.connections[connId] = processLoop(server, transp, connId) except CancelledError: # Server was stopped breakLoop = true @@ -845,10 +855,12 @@ proc acceptClientLoop(server: HttpServerRef) {.async.} = except TransportTooManyError: # Non critical error breakLoop = false + except TransportAbortedError: + # Non critical error + breakLoop = false except CatchableError: # Unexpected error breakLoop = true - discard if breakLoop: break diff --git a/chronos/handles.nim b/chronos/handles.nim index 0cffee9..95b77b8 100644 --- a/chronos/handles.nim +++ b/chronos/handles.nim @@ -105,10 +105,6 @@ proc createAsyncSocket*(domain: Domain, sockType: SockType, if not setSocketBlocking(handle, false): close(handle) return asyncInvalidSocket - when defined(macosx) and not defined(nimdoc): - if not setSockOpt(AsyncFD(handle), SOL_SOCKET, SO_NOSIGPIPE, 1): - close(handle) - return asyncInvalidSocket register(AsyncFD(handle)) AsyncFD(handle) @@ -119,10 +115,6 @@ proc wrapAsyncSocket*(sock: SocketHandle): AsyncFD {. if not setSocketBlocking(sock, false): close(sock) return asyncInvalidSocket - when defined(macosx) and not defined(nimdoc): - if not setSockOpt(AsyncFD(sock), SOL_SOCKET, SO_NOSIGPIPE, 1): - close(sock) - return asyncInvalidSocket register(AsyncFD(sock)) AsyncFD(sock) diff --git a/chronos/transports/common.nim b/chronos/transports/common.nim index b6a4d12..633dc46 100644 --- a/chronos/transports/common.nim +++ b/chronos/transports/common.nim @@ -108,6 +108,8 @@ type ## Usage after transport close exception TransportTooManyError* = object of TransportError ## Too many open file descriptors exception + TransportAbortedError* = object of TransportError + ## Remote client disconnected before server accepts connection TransportState* = enum ## Transport's state @@ -557,9 +559,6 @@ template getError*(t: untyped): ref CatchableError = template getServerUseClosedError*(): ref TransportUseClosedError = newException(TransportUseClosedError, "Server is already closed!") -template getTransportTooManyError*(): ref TransportTooManyError = - newException(TransportTooManyError, "Too many open transports!") - template getTransportUseClosedError*(): ref TransportUseClosedError = newException(TransportUseClosedError, "Transport is already closed!") @@ -613,6 +612,13 @@ when defined(windows): ERROR_NO_DATA* = 232 ERROR_CONNECTION_ABORTED* = 1236 ERROR_TOO_MANY_OPEN_FILES* = 4 + WSAEMFILE* = 10024 + WSAENETDOWN* = 10050 + WSAENETRESET* = 10052 + WSAECONNABORTED* = 10053 + WSAECONNRESET* = 10054 + WSAENOBUFS* = 10055 + WSAETIMEDOUT* = 10060 proc cancelIo*(hFile: HANDLE): WINBOOL {.stdcall, dynlib: "kernel32", importc: "CancelIo".} @@ -625,3 +631,72 @@ when defined(windows): {.stdcall, dynlib: "kernel32", importc: "SetNamedPipeHandleState".} proc resetEvent*(hEvent: HANDLE): WINBOOL {.stdcall, dynlib: "kernel32", importc: "ResetEvent".} + +template getTransportTooManyError*(code: int = 0): ref TransportTooManyError = + let msg = + when defined(posix): + if code == 0: + "Too many open transports" + elif code == EMFILE: + "[EMFILE] Too many open files in the process" + elif code == ENFILE: + "[ENFILE] Too many open files in system" + elif code == ENOBUFS: + "[ENOBUFS] No buffer space available" + elif code == ENOMEM: + "[ENOMEM] Not enough memory availble" + else: + "[" & $code & "] Too many open transports" + elif defined(windows): + case code + of 0: + "Too many open transports" + of ERROR_TOO_MANY_OPEN_FILES: + "[ERROR_TOO_MANY_OPEN_FILES] Too many open files" + of WSAENOBUFS: + "[WSAENOBUFS] No buffer space available" + of WSAEMFILE: + "[WSAEMFILE] Too many open sockets" + else: + "[" & $code & "] Too many open transports" + else: + "[" & $code & "] Too many open transports" + newException(TransportTooManyError, msg) + +template getConnectionAbortedError*(m: string = ""): ref TransportAbortedError = + let msg = + if len(m) == 0: + "[ECONNABORTED] Connection has been aborted before being accepted" + else: + "[ECONNABORTED] " & m + newException(TransportAbortedError, msg) + +template getConnectionAbortedError*(code: int): ref TransportAbortedError = + let msg = + when defined(posix): + if code == 0: + "[ECONNABORTED] Connection has been aborted before being accepted" + elif code == EPERM: + "[EPERM] Firewall rules forbid connection" + elif code == ETIMEDOUT: + "[ETIMEDOUT] Operation has been timed out" + else: + "[" & $code & "] Connection has been aborted" + elif defined(windows): + case code + of 0, WSAECONNABORTED: + "[ECONNABORTED] Connection has been aborted before being accepted" + of WSAENETDOWN: + "[ENETDOWN] Network is down" + of WSAENETRESET: + "[ENETRESET] Network dropped connection on reset" + of WSAECONNRESET: + "[ECONNRESET] Connection reset by peer" + of WSAETIMEDOUT: + "[ETIMEDOUT] Connection timed out" + else: + "[" & $code & "] Connection has been aborted" + else: + "[" & $code & "] Connection has been aborted" + + newException(TransportAbortedError, msg) diff --git a/chronos/transports/datagram.nim b/chronos/transports/datagram.nim index b8c3610..57be22f 100644 --- a/chronos/transports/datagram.nim +++ b/chronos/transports/datagram.nim @@ -254,7 +254,7 @@ when defined(windows): # CancelIO() interrupt transp.state.excl(ReadPending) transp.state.incl(ReadPaused) - elif int(err) == WSAECONNRESET: + elif int(err) == common.WSAECONNRESET: transp.state.excl(ReadPending) transp.state.incl({ReadPaused, ReadEof}) break diff --git a/chronos/transports/stream.nim b/chronos/transports/stream.nim index 81923ff..7e7793e 100644 --- a/chronos/transports/stream.nim +++ b/chronos/transports/stream.nim @@ -318,9 +318,9 @@ when defined(windows): (t).wwsabuf.len = cast[int32](v.buflen) proc isConnResetError(err: OSErrorCode): bool {.inline.} = - result = (err == OSErrorCode(WSAECONNRESET)) or - (err == OSErrorCode(WSAECONNABORTED)) or - (err == OSErrorCode(ERROR_PIPE_NOT_CONNECTED)) + result = (err == OSErrorCode(common.WSAECONNRESET)) or + (err == OSErrorCode(common.WSAECONNABORTED)) or + (err == OSErrorCode(common.ERROR_PIPE_NOT_CONNECTED)) proc writeStreamLoop(udata: pointer) {.gcsafe, nimcall.} = var bytesCount: int32 @@ -552,7 +552,7 @@ when defined(windows): # CancelIO() interrupt or closeSocket() call. transp.state.incl(ReadPaused) elif transp.kind == TransportKind.Socket and - (int(err) in {ERROR_NETNAME_DELETED, WSAECONNABORTED}): + (int(err) in {ERROR_NETNAME_DELETED, common.WSAECONNABORTED}): transp.state.incl({ReadEof, ReadPaused}) elif transp.kind == TransportKind.Pipe and (int(err) in {ERROR_PIPE_NOT_CONNECTED}): @@ -589,7 +589,8 @@ when defined(windows): # CancelIO() interrupt transp.state.excl(ReadPending) transp.state.incl(ReadPaused) - elif int32(err) in {WSAECONNRESET, WSAENETRESET, WSAECONNABORTED}: + elif int32(err) in {common.WSAECONNRESET, common.WSAENETRESET, + common.WSAECONNABORTED}: transp.state.excl(ReadPending) transp.state.incl({ReadEof, ReadPaused}) transp.completeReader() @@ -1038,7 +1039,8 @@ when defined(windows): server.asock.closeSocket() server.clean() else: - if ovl.data.errCode == OSErrorCode(-1): + case ovl.data.errCode + of OSErrorCode(-1): if setsockopt(SocketHandle(server.asock), cint(SOL_SOCKET), cint(SO_UPDATE_ACCEPT_CONTEXT), addr server.sock, SockLen(sizeof(SocketHandle))) != 0'i32: @@ -1049,7 +1051,8 @@ when defined(windows): # was already scheduled, so we failing it not with OS error. retFuture.fail(getServerUseClosedError()) else: - retFuture.fail(getTransportOsError(err)) + let errorMsg = osErrorMsg(err) + retFuture.fail(getConnectionAbortedError(errorMsg)) else: var ntransp: StreamTransport if not(isNil(server.init)): @@ -1063,11 +1066,18 @@ when defined(windows): # Start tracking transport trackStream(ntransp) retFuture.complete(ntransp) - elif int32(ovl.data.errCode) == ERROR_OPERATION_ABORTED: + of OSErrorCode(ERROR_OPERATION_ABORTED): # CancelIO() interrupt or close. server.asock.closeSocket() retFuture.fail(getServerUseClosedError()) server.clean() + of OsErrorCode(common.WSAENETDOWN), OSErrorCode(common.WSAENETRESET), + OSErrorCode(common.WSAECONNABORTED), + OSErrorCode(common.WSAECONNRESET), + OSErrorCode(common.WSAETIMEDOUT): + server.asock.closeSocket() + retFuture.fail(getConnectionAbortedError(int(ovl.data.errCode))) + server.clean() else: server.asock.closeSocket() retFuture.fail(getTransportOsError(ovl.data.errCode)) @@ -1135,16 +1145,19 @@ when defined(windows): if server.local.family in {AddressFamily.IPv4, AddressFamily.IPv6}: # TCP Sockets part var loop = getThreadDispatcher() - server.asock = try: createAsyncSocket(server.domain, SockType.SOCK_STREAM, - Protocol.IPPROTO_TCP) - except CatchableError as exc: - retFuture.fail(exc) - return retFuture + server.asock = + try: + createAsyncSocket(server.domain, SockType.SOCK_STREAM, + Protocol.IPPROTO_TCP) + except CatchableError as exc: + retFuture.fail(exc) + return retFuture if server.asock == asyncInvalidSocket: let err = osLastError() - if int32(err) == ERROR_TOO_MANY_OPEN_FILES: - retFuture.fail(getTransportTooManyError()) + case int(err) + of ERROR_TOO_MANY_OPEN_FILES, WSAENOBUFS, WSAEMFILE: + retFuture.fail(getTransportTooManyError(int(err))) else: retFuture.fail(getTransportOsError(err)) return retFuture @@ -1166,12 +1179,18 @@ when defined(windows): cast[POVERLAPPED](addr server.aovl)) if not(res): let err = osLastError() - if int32(err) == ERROR_OPERATION_ABORTED: + case int(err) + of ERROR_OPERATION_ABORTED: server.apending = false retFuture.fail(getServerUseClosedError()) return retFuture - elif int32(err) == ERROR_IO_PENDING: + of ERROR_IO_PENDING: discard + of common.WSAECONNRESET, common.WSAECONNABORTED, common.WSAENETDOWN, + common.WSAENETRESET, common.WSAETIMEDOUT: + server.apending = false + retFuture.fail(getConnectionAbortedError(int(err))) + return retFuture else: server.apending = false retFuture.fail(getTransportOsError(err)) @@ -1718,7 +1737,7 @@ else: wrapAsyncSocket(res) except CatchableError as exc: close(res) - retFuture.fail(exc) + retFuture.fail(getConnectionAbortedError($exc.msg)) return if sock != asyncInvalidSocket: @@ -1734,19 +1753,22 @@ else: trackStream(ntransp) retFuture.complete(ntransp) else: - retFuture.fail(getTransportOsError(osLastError())) + let errorMsg = osErrorMsg(osLastError()) + retFuture.fail(getConnectionAbortedError(errorMsg)) else: - let err = osLastError() - if int(err) == EINTR: + let err = int(osLastError()) + if err == EINTR: continue - elif int(err) == EAGAIN: + elif err == EAGAIN: # This error appears only when server get closed, while accept() # continuation is already scheduled. retFuture.fail(getServerUseClosedError()) - elif int(err) == EMFILE: - retFuture.fail(getTransportTooManyError()) + elif err in {EMFILE, ENFILE, ENOBUFS, ENOMEM}: + retFuture.fail(getTransportTooManyError(err)) + elif err in {ECONNABORTED, EPERM, ETIMEDOUT}: + retFuture.fail(getConnectionAbortedError(err)) else: - retFuture.fail(getTransportOsError(err)) + retFuture.fail(getTransportOsError(OSErrorCode(err))) break try: