Handle accept() errors properly. (#234)
* Add more accept() call error handlers. Fix issue when HTTP server silently stops accepting new connections. Remove unneeded MacOS syscall to disable SIG_PIPE on socket, we already mask this signal in process. * User `if` instead `case` because constants are actually variables. * Fix mistypes. * Do not use case for posix constants. * Fix Linux compilation error.
This commit is contained in:
parent
df980dd713
commit
0c7a0bd0a5
|
@ -246,9 +246,12 @@ proc dumbResponse*(): HttpResponseRef {.raises: [Defect].} =
|
|||
## Create an empty response to return when request processor got no request.
|
||||
HttpResponseRef(state: HttpResponseState.Dumb, version: HttpVersion11)
|
||||
|
||||
proc getId(transp: StreamTransport): string {.inline.} =
|
||||
proc getId(transp: StreamTransport): Result[string, string] {.inline.} =
|
||||
## Returns string unique transport's identifier as string.
|
||||
$transp.remoteAddress() & "_" & $transp.localAddress()
|
||||
try:
|
||||
ok($transp.remoteAddress() & "_" & $transp.localAddress())
|
||||
except TransportOsError as exc:
|
||||
err($exc.msg)
|
||||
|
||||
proc hasBody*(request: HttpRequestRef): bool {.raises: [Defect].} =
|
||||
## Returns ``true`` if request has body.
|
||||
|
@ -665,7 +668,8 @@ proc `keepalive=`*(resp: HttpResponseRef, value: bool) =
|
|||
proc keepalive*(resp: HttpResponseRef): bool {.raises: [Defect].} =
|
||||
HttpResponseFlags.KeepAlive in resp.flags
|
||||
|
||||
proc processLoop(server: HttpServerRef, transp: StreamTransport) {.async.} =
|
||||
proc processLoop(server: HttpServerRef, transp: StreamTransport,
|
||||
connId: string) {.async.} =
|
||||
var
|
||||
conn: HttpConnectionRef
|
||||
connArg: RequestFence
|
||||
|
@ -675,7 +679,7 @@ proc processLoop(server: HttpServerRef, transp: StreamTransport) {.async.} =
|
|||
conn = await server.createConnCallback(server, transp)
|
||||
runLoop = true
|
||||
except CancelledError:
|
||||
server.connections.del(transp.getId())
|
||||
server.connections.del(connId)
|
||||
await transp.closeWait()
|
||||
return
|
||||
except HttpCriticalError as exc:
|
||||
|
@ -822,7 +826,7 @@ proc processLoop(server: HttpServerRef, transp: StreamTransport) {.async.} =
|
|||
# need to close it.
|
||||
await conn.closeWait()
|
||||
|
||||
server.connections.del(transp.getId())
|
||||
server.connections.del(connId)
|
||||
# if server.maxConnections > 0:
|
||||
# server.semaphore.release()
|
||||
|
||||
|
@ -832,10 +836,16 @@ proc acceptClientLoop(server: HttpServerRef) {.async.} =
|
|||
try:
|
||||
# if server.maxConnections > 0:
|
||||
# await server.semaphore.acquire()
|
||||
|
||||
let transp = await server.instance.accept()
|
||||
server.connections[transp.getId()] = processLoop(server, transp)
|
||||
|
||||
let resId = transp.getId()
|
||||
if resId.isErr():
|
||||
# We are unable to identify remote peer, it means that remote peer
|
||||
# disconnected before identification.
|
||||
await transp.closeWait()
|
||||
breakLoop = false
|
||||
else:
|
||||
let connId = resId.get()
|
||||
server.connections[connId] = processLoop(server, transp, connId)
|
||||
except CancelledError:
|
||||
# Server was stopped
|
||||
breakLoop = true
|
||||
|
@ -845,10 +855,12 @@ proc acceptClientLoop(server: HttpServerRef) {.async.} =
|
|||
except TransportTooManyError:
|
||||
# Non critical error
|
||||
breakLoop = false
|
||||
except TransportAbortedError:
|
||||
# Non critical error
|
||||
breakLoop = false
|
||||
except CatchableError:
|
||||
# Unexpected error
|
||||
breakLoop = true
|
||||
discard
|
||||
|
||||
if breakLoop:
|
||||
break
|
||||
|
|
|
@ -105,10 +105,6 @@ proc createAsyncSocket*(domain: Domain, sockType: SockType,
|
|||
if not setSocketBlocking(handle, false):
|
||||
close(handle)
|
||||
return asyncInvalidSocket
|
||||
when defined(macosx) and not defined(nimdoc):
|
||||
if not setSockOpt(AsyncFD(handle), SOL_SOCKET, SO_NOSIGPIPE, 1):
|
||||
close(handle)
|
||||
return asyncInvalidSocket
|
||||
register(AsyncFD(handle))
|
||||
AsyncFD(handle)
|
||||
|
||||
|
@ -119,10 +115,6 @@ proc wrapAsyncSocket*(sock: SocketHandle): AsyncFD {.
|
|||
if not setSocketBlocking(sock, false):
|
||||
close(sock)
|
||||
return asyncInvalidSocket
|
||||
when defined(macosx) and not defined(nimdoc):
|
||||
if not setSockOpt(AsyncFD(sock), SOL_SOCKET, SO_NOSIGPIPE, 1):
|
||||
close(sock)
|
||||
return asyncInvalidSocket
|
||||
register(AsyncFD(sock))
|
||||
AsyncFD(sock)
|
||||
|
||||
|
|
|
@ -108,6 +108,8 @@ type
|
|||
## Usage after transport close exception
|
||||
TransportTooManyError* = object of TransportError
|
||||
## Too many open file descriptors exception
|
||||
TransportAbortedError* = object of TransportError
|
||||
## Remote client disconnected before server accepts connection
|
||||
|
||||
TransportState* = enum
|
||||
## Transport's state
|
||||
|
@ -557,9 +559,6 @@ template getError*(t: untyped): ref CatchableError =
|
|||
template getServerUseClosedError*(): ref TransportUseClosedError =
|
||||
newException(TransportUseClosedError, "Server is already closed!")
|
||||
|
||||
template getTransportTooManyError*(): ref TransportTooManyError =
|
||||
newException(TransportTooManyError, "Too many open transports!")
|
||||
|
||||
template getTransportUseClosedError*(): ref TransportUseClosedError =
|
||||
newException(TransportUseClosedError, "Transport is already closed!")
|
||||
|
||||
|
@ -613,6 +612,13 @@ when defined(windows):
|
|||
ERROR_NO_DATA* = 232
|
||||
ERROR_CONNECTION_ABORTED* = 1236
|
||||
ERROR_TOO_MANY_OPEN_FILES* = 4
|
||||
WSAEMFILE* = 10024
|
||||
WSAENETDOWN* = 10050
|
||||
WSAENETRESET* = 10052
|
||||
WSAECONNABORTED* = 10053
|
||||
WSAECONNRESET* = 10054
|
||||
WSAENOBUFS* = 10055
|
||||
WSAETIMEDOUT* = 10060
|
||||
|
||||
proc cancelIo*(hFile: HANDLE): WINBOOL
|
||||
{.stdcall, dynlib: "kernel32", importc: "CancelIo".}
|
||||
|
@ -625,3 +631,72 @@ when defined(windows):
|
|||
{.stdcall, dynlib: "kernel32", importc: "SetNamedPipeHandleState".}
|
||||
proc resetEvent*(hEvent: HANDLE): WINBOOL
|
||||
{.stdcall, dynlib: "kernel32", importc: "ResetEvent".}
|
||||
|
||||
template getTransportTooManyError*(code: int = 0): ref TransportTooManyError =
|
||||
let msg =
|
||||
when defined(posix):
|
||||
if code == 0:
|
||||
"Too many open transports"
|
||||
elif code == EMFILE:
|
||||
"[EMFILE] Too many open files in the process"
|
||||
elif code == ENFILE:
|
||||
"[ENFILE] Too many open files in system"
|
||||
elif code == ENOBUFS:
|
||||
"[ENOBUFS] No buffer space available"
|
||||
elif code == ENOMEM:
|
||||
"[ENOMEM] Not enough memory availble"
|
||||
else:
|
||||
"[" & $code & "] Too many open transports"
|
||||
elif defined(windows):
|
||||
case code
|
||||
of 0:
|
||||
"Too many open transports"
|
||||
of ERROR_TOO_MANY_OPEN_FILES:
|
||||
"[ERROR_TOO_MANY_OPEN_FILES] Too many open files"
|
||||
of WSAENOBUFS:
|
||||
"[WSAENOBUFS] No buffer space available"
|
||||
of WSAEMFILE:
|
||||
"[WSAEMFILE] Too many open sockets"
|
||||
else:
|
||||
"[" & $code & "] Too many open transports"
|
||||
else:
|
||||
"[" & $code & "] Too many open transports"
|
||||
newException(TransportTooManyError, msg)
|
||||
|
||||
template getConnectionAbortedError*(m: string = ""): ref TransportAbortedError =
|
||||
let msg =
|
||||
if len(m) == 0:
|
||||
"[ECONNABORTED] Connection has been aborted before being accepted"
|
||||
else:
|
||||
"[ECONNABORTED] " & m
|
||||
newException(TransportAbortedError, msg)
|
||||
|
||||
template getConnectionAbortedError*(code: int): ref TransportAbortedError =
|
||||
let msg =
|
||||
when defined(posix):
|
||||
if code == 0:
|
||||
"[ECONNABORTED] Connection has been aborted before being accepted"
|
||||
elif code == EPERM:
|
||||
"[EPERM] Firewall rules forbid connection"
|
||||
elif code == ETIMEDOUT:
|
||||
"[ETIMEDOUT] Operation has been timed out"
|
||||
else:
|
||||
"[" & $code & "] Connection has been aborted"
|
||||
elif defined(windows):
|
||||
case code
|
||||
of 0, WSAECONNABORTED:
|
||||
"[ECONNABORTED] Connection has been aborted before being accepted"
|
||||
of WSAENETDOWN:
|
||||
"[ENETDOWN] Network is down"
|
||||
of WSAENETRESET:
|
||||
"[ENETRESET] Network dropped connection on reset"
|
||||
of WSAECONNRESET:
|
||||
"[ECONNRESET] Connection reset by peer"
|
||||
of WSAETIMEDOUT:
|
||||
"[ETIMEDOUT] Connection timed out"
|
||||
else:
|
||||
"[" & $code & "] Connection has been aborted"
|
||||
else:
|
||||
"[" & $code & "] Connection has been aborted"
|
||||
|
||||
newException(TransportAbortedError, msg)
|
||||
|
|
|
@ -254,7 +254,7 @@ when defined(windows):
|
|||
# CancelIO() interrupt
|
||||
transp.state.excl(ReadPending)
|
||||
transp.state.incl(ReadPaused)
|
||||
elif int(err) == WSAECONNRESET:
|
||||
elif int(err) == common.WSAECONNRESET:
|
||||
transp.state.excl(ReadPending)
|
||||
transp.state.incl({ReadPaused, ReadEof})
|
||||
break
|
||||
|
|
|
@ -318,9 +318,9 @@ when defined(windows):
|
|||
(t).wwsabuf.len = cast[int32](v.buflen)
|
||||
|
||||
proc isConnResetError(err: OSErrorCode): bool {.inline.} =
|
||||
result = (err == OSErrorCode(WSAECONNRESET)) or
|
||||
(err == OSErrorCode(WSAECONNABORTED)) or
|
||||
(err == OSErrorCode(ERROR_PIPE_NOT_CONNECTED))
|
||||
result = (err == OSErrorCode(common.WSAECONNRESET)) or
|
||||
(err == OSErrorCode(common.WSAECONNABORTED)) or
|
||||
(err == OSErrorCode(common.ERROR_PIPE_NOT_CONNECTED))
|
||||
|
||||
proc writeStreamLoop(udata: pointer) {.gcsafe, nimcall.} =
|
||||
var bytesCount: int32
|
||||
|
@ -552,7 +552,7 @@ when defined(windows):
|
|||
# CancelIO() interrupt or closeSocket() call.
|
||||
transp.state.incl(ReadPaused)
|
||||
elif transp.kind == TransportKind.Socket and
|
||||
(int(err) in {ERROR_NETNAME_DELETED, WSAECONNABORTED}):
|
||||
(int(err) in {ERROR_NETNAME_DELETED, common.WSAECONNABORTED}):
|
||||
transp.state.incl({ReadEof, ReadPaused})
|
||||
elif transp.kind == TransportKind.Pipe and
|
||||
(int(err) in {ERROR_PIPE_NOT_CONNECTED}):
|
||||
|
@ -589,7 +589,8 @@ when defined(windows):
|
|||
# CancelIO() interrupt
|
||||
transp.state.excl(ReadPending)
|
||||
transp.state.incl(ReadPaused)
|
||||
elif int32(err) in {WSAECONNRESET, WSAENETRESET, WSAECONNABORTED}:
|
||||
elif int32(err) in {common.WSAECONNRESET, common.WSAENETRESET,
|
||||
common.WSAECONNABORTED}:
|
||||
transp.state.excl(ReadPending)
|
||||
transp.state.incl({ReadEof, ReadPaused})
|
||||
transp.completeReader()
|
||||
|
@ -1038,7 +1039,8 @@ when defined(windows):
|
|||
server.asock.closeSocket()
|
||||
server.clean()
|
||||
else:
|
||||
if ovl.data.errCode == OSErrorCode(-1):
|
||||
case ovl.data.errCode
|
||||
of OSErrorCode(-1):
|
||||
if setsockopt(SocketHandle(server.asock), cint(SOL_SOCKET),
|
||||
cint(SO_UPDATE_ACCEPT_CONTEXT), addr server.sock,
|
||||
SockLen(sizeof(SocketHandle))) != 0'i32:
|
||||
|
@ -1049,7 +1051,8 @@ when defined(windows):
|
|||
# was already scheduled, so we failing it not with OS error.
|
||||
retFuture.fail(getServerUseClosedError())
|
||||
else:
|
||||
retFuture.fail(getTransportOsError(err))
|
||||
let errorMsg = osErrorMsg(err)
|
||||
retFuture.fail(getConnectionAbortedError(errorMsg))
|
||||
else:
|
||||
var ntransp: StreamTransport
|
||||
if not(isNil(server.init)):
|
||||
|
@ -1063,11 +1066,18 @@ when defined(windows):
|
|||
# Start tracking transport
|
||||
trackStream(ntransp)
|
||||
retFuture.complete(ntransp)
|
||||
elif int32(ovl.data.errCode) == ERROR_OPERATION_ABORTED:
|
||||
of OSErrorCode(ERROR_OPERATION_ABORTED):
|
||||
# CancelIO() interrupt or close.
|
||||
server.asock.closeSocket()
|
||||
retFuture.fail(getServerUseClosedError())
|
||||
server.clean()
|
||||
of OsErrorCode(common.WSAENETDOWN), OSErrorCode(common.WSAENETRESET),
|
||||
OSErrorCode(common.WSAECONNABORTED),
|
||||
OSErrorCode(common.WSAECONNRESET),
|
||||
OSErrorCode(common.WSAETIMEDOUT):
|
||||
server.asock.closeSocket()
|
||||
retFuture.fail(getConnectionAbortedError(int(ovl.data.errCode)))
|
||||
server.clean()
|
||||
else:
|
||||
server.asock.closeSocket()
|
||||
retFuture.fail(getTransportOsError(ovl.data.errCode))
|
||||
|
@ -1135,16 +1145,19 @@ when defined(windows):
|
|||
if server.local.family in {AddressFamily.IPv4, AddressFamily.IPv6}:
|
||||
# TCP Sockets part
|
||||
var loop = getThreadDispatcher()
|
||||
server.asock = try: createAsyncSocket(server.domain, SockType.SOCK_STREAM,
|
||||
Protocol.IPPROTO_TCP)
|
||||
except CatchableError as exc:
|
||||
retFuture.fail(exc)
|
||||
return retFuture
|
||||
server.asock =
|
||||
try:
|
||||
createAsyncSocket(server.domain, SockType.SOCK_STREAM,
|
||||
Protocol.IPPROTO_TCP)
|
||||
except CatchableError as exc:
|
||||
retFuture.fail(exc)
|
||||
return retFuture
|
||||
|
||||
if server.asock == asyncInvalidSocket:
|
||||
let err = osLastError()
|
||||
if int32(err) == ERROR_TOO_MANY_OPEN_FILES:
|
||||
retFuture.fail(getTransportTooManyError())
|
||||
case int(err)
|
||||
of ERROR_TOO_MANY_OPEN_FILES, WSAENOBUFS, WSAEMFILE:
|
||||
retFuture.fail(getTransportTooManyError(int(err)))
|
||||
else:
|
||||
retFuture.fail(getTransportOsError(err))
|
||||
return retFuture
|
||||
|
@ -1166,12 +1179,18 @@ when defined(windows):
|
|||
cast[POVERLAPPED](addr server.aovl))
|
||||
if not(res):
|
||||
let err = osLastError()
|
||||
if int32(err) == ERROR_OPERATION_ABORTED:
|
||||
case int(err)
|
||||
of ERROR_OPERATION_ABORTED:
|
||||
server.apending = false
|
||||
retFuture.fail(getServerUseClosedError())
|
||||
return retFuture
|
||||
elif int32(err) == ERROR_IO_PENDING:
|
||||
of ERROR_IO_PENDING:
|
||||
discard
|
||||
of common.WSAECONNRESET, common.WSAECONNABORTED, common.WSAENETDOWN,
|
||||
common.WSAENETRESET, common.WSAETIMEDOUT:
|
||||
server.apending = false
|
||||
retFuture.fail(getConnectionAbortedError(int(err)))
|
||||
return retFuture
|
||||
else:
|
||||
server.apending = false
|
||||
retFuture.fail(getTransportOsError(err))
|
||||
|
@ -1718,7 +1737,7 @@ else:
|
|||
wrapAsyncSocket(res)
|
||||
except CatchableError as exc:
|
||||
close(res)
|
||||
retFuture.fail(exc)
|
||||
retFuture.fail(getConnectionAbortedError($exc.msg))
|
||||
return
|
||||
|
||||
if sock != asyncInvalidSocket:
|
||||
|
@ -1734,19 +1753,22 @@ else:
|
|||
trackStream(ntransp)
|
||||
retFuture.complete(ntransp)
|
||||
else:
|
||||
retFuture.fail(getTransportOsError(osLastError()))
|
||||
let errorMsg = osErrorMsg(osLastError())
|
||||
retFuture.fail(getConnectionAbortedError(errorMsg))
|
||||
else:
|
||||
let err = osLastError()
|
||||
if int(err) == EINTR:
|
||||
let err = int(osLastError())
|
||||
if err == EINTR:
|
||||
continue
|
||||
elif int(err) == EAGAIN:
|
||||
elif err == EAGAIN:
|
||||
# This error appears only when server get closed, while accept()
|
||||
# continuation is already scheduled.
|
||||
retFuture.fail(getServerUseClosedError())
|
||||
elif int(err) == EMFILE:
|
||||
retFuture.fail(getTransportTooManyError())
|
||||
elif err in {EMFILE, ENFILE, ENOBUFS, ENOMEM}:
|
||||
retFuture.fail(getTransportTooManyError(err))
|
||||
elif err in {ECONNABORTED, EPERM, ETIMEDOUT}:
|
||||
retFuture.fail(getConnectionAbortedError(err))
|
||||
else:
|
||||
retFuture.fail(getTransportOsError(err))
|
||||
retFuture.fail(getTransportOsError(OSErrorCode(err)))
|
||||
break
|
||||
|
||||
try:
|
||||
|
|
Loading…
Reference in New Issue