From e04c042e8acfe0025c780de8a025aa4c4e042130 Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Wed, 19 Jul 2023 20:33:28 +0300 Subject: [PATCH] Add cross-platform shutdown() call and use it for HTTP server. (#420) * Add cross-platform shutdown() call and use it for httpserver connection processing. * Fix Posix compilation issues and warnings. --- chronos/apps/http/httpserver.nim | 100 +++++++++++++++++++++---------- chronos/asyncloop.nim | 8 +++ chronos/osdefs.nim | 19 ++++-- chronos/transports/stream.nim | 51 ++++++++++++++++ 4 files changed, 143 insertions(+), 35 deletions(-) diff --git a/chronos/apps/http/httpserver.nim b/chronos/apps/http/httpserver.nim index 6cddb22..b86c0b3 100644 --- a/chronos/apps/http/httpserver.nim +++ b/chronos/apps/http/httpserver.nim @@ -54,6 +54,9 @@ type HttpResponseStreamType* {.pure.} = enum Plain, SSE, Chunked + HttpProcessExitType* {.pure.} = enum + KeepAlive, Graceful, Immediate + HttpResponseState* {.pure.} = enum Empty, Prepared, Sending, Finished, Failed, Cancelled, Default @@ -663,62 +666,83 @@ proc sendErrorResponse(conn: HttpConnectionRef, version: HttpVersion, # We ignore errors here, because we indicating error already. discard -proc sendErrorResponse(conn: HttpConnectionRef, reqFence: RequestFence, - respError: HttpProcessError): Future[bool] {.async.} = +proc sendErrorResponse( + conn: HttpConnectionRef, + reqFence: RequestFence, + respError: HttpProcessError + ): Future[HttpProcessExitType] {.async.} = let version = getResponseVersion(reqFence) try: if reqFence.isOk(): case respError.kind of HttpServerError.CriticalError: await conn.sendErrorResponse(version, respError.code, false) - false + HttpProcessExitType.Graceful of HttpServerError.RecoverableError: await conn.sendErrorResponse(version, respError.code, true) - true + HttpProcessExitType.Graceful of HttpServerError.CatchableError: await conn.sendErrorResponse(version, respError.code, false) - false + HttpProcessExitType.Graceful of HttpServerError.DisconnectError, HttpServerError.InterruptError, HttpServerError.TimeoutError: raiseAssert("Unexpected response error: " & $respError.kind) else: - false + HttpProcessExitType.Graceful except CancelledError: - false + HttpProcessExitType.Immediate + except CatchableError: + HttpProcessExitType.Immediate -proc sendDefaultResponse(conn: HttpConnectionRef, reqFence: RequestFence, - response: HttpResponseRef): Future[bool] {.async.} = +proc sendDefaultResponse( + conn: HttpConnectionRef, + reqFence: RequestFence, + response: HttpResponseRef + ): Future[HttpProcessExitType] {.async.} = let version = getResponseVersion(reqFence) keepConnection = - if isNil(response): - false + if isNil(response) or (HttpResponseFlags.KeepAlive notin response.flags): + HttpProcessExitType.Graceful else: - HttpResponseFlags.KeepAlive in response.flags + HttpProcessExitType.KeepAlive + + template toBool(hpet: HttpProcessExitType): bool = + case hpet + of HttpProcessExitType.KeepAlive: + true + of HttpProcessExitType.Immediate: + false + of HttpProcessExitType.Graceful: + false + try: if reqFence.isOk(): if isNil(response): - await conn.sendErrorResponse(version, Http404, keepConnection) + await conn.sendErrorResponse(version, Http404, keepConnection.toBool()) keepConnection else: case response.state of HttpResponseState.Empty: # Response was ignored, so we respond with not found. - await conn.sendErrorResponse(version, Http404, keepConnection) + await conn.sendErrorResponse(version, Http404, + keepConnection.toBool()) keepConnection of HttpResponseState.Prepared: # Response was prepared but not sent, so we can respond with some # error code - await conn.sendErrorResponse(HttpVersion11, Http409, keepConnection) + await conn.sendErrorResponse(HttpVersion11, Http409, + keepConnection.toBool()) keepConnection of HttpResponseState.Sending, HttpResponseState.Failed, HttpResponseState.Cancelled: # Just drop connection, because we dont know at what stage we are - false + HttpProcessExitType.Immediate of HttpResponseState.Default: # Response was ignored, so we respond with not found. - await conn.sendErrorResponse(version, Http404, keepConnection) + await conn.sendErrorResponse(version, Http404, + keepConnection.toBool()) keepConnection of HttpResponseState.Finished: keepConnection @@ -726,23 +750,25 @@ proc sendDefaultResponse(conn: HttpConnectionRef, reqFence: RequestFence, case reqFence.error.kind of HttpServerError.TimeoutError: await conn.sendErrorResponse(version, reqFence.error.code, false) - false + HttpProcessExitType.Graceful of HttpServerError.CriticalError: await conn.sendErrorResponse(version, reqFence.error.code, false) - false + HttpProcessExitType.Graceful of HttpServerError.RecoverableError: - await conn.sendErrorResponse(version, reqFence.error.code, true) - false + await conn.sendErrorResponse(version, reqFence.error.code, false) + HttpProcessExitType.Graceful of HttpServerError.CatchableError: await conn.sendErrorResponse(version, reqFence.error.code, false) - false + HttpProcessExitType.Graceful of HttpServerError.DisconnectError: # When `HttpServerFlags.NotifyDisconnect` is set. - false + HttpProcessExitType.Immediate of HttpServerError.InterruptError: raiseAssert("Unexpected request error: " & $reqFence.error.kind) except CancelledError: - false + HttpProcessExitType.Immediate + except CatchableError: + HttpProcessExitType.Immediate proc getRequest(conn: HttpConnectionRef): Future[HttpRequestRef] {.async.} = try: @@ -800,6 +826,10 @@ proc new(ht: typedesc[HttpConnectionRef], server: HttpServerRef, trackCounter(HttpServerUnsecureConnectionTrackerName) res +proc gracefulCloseWait*(conn: HttpConnectionRef) {.async.} = + await conn.transp.shutdownWait() + await conn.closeCb(conn) + proc closeWait*(conn: HttpConnectionRef): Future[void] = conn.closeCb(conn) @@ -942,15 +972,15 @@ proc getConnectionFence*(server: HttpServerRef, proc processRequest(server: HttpServerRef, connection: HttpConnectionRef, - connId: string): Future[bool] {.async.} = + connId: string): Future[HttpProcessExitType] {.async.} = let requestFence = await getRequestFence(server, connection) if requestFence.isErr(): case requestFence.error.kind of HttpServerError.InterruptError: - return false + return HttpProcessExitType.Immediate of HttpServerError.DisconnectError: if HttpServerFlags.NotifyDisconnect notin server.flags: - return false + return HttpProcessExitType.Immediate else: discard @@ -961,7 +991,7 @@ proc processRequest(server: HttpServerRef, let responseFence = await getResponseFence(connection, requestFence) if responseFence.isErr() and (responseFence.error.kind == HttpServerError.InterruptError): - return false + return HttpProcessExitType.Immediate if responseFence.isErr(): await connection.sendErrorResponse(requestFence, responseFence.error) @@ -985,12 +1015,20 @@ proc processLoop(holder: HttpConnectionHolderRef) {.async.} = holder.connection = connection + var runLoop = HttpProcessExitType.KeepAlive + defer: server.connections.del(connectionId) - await connection.closeWait() + case runLoop + of HttpProcessExitType.KeepAlive: + # This could happened only on CancelledError. + await connection.closeWait() + of HttpProcessExitType.Immediate: + await connection.closeWait() + of HttpProcessExitType.Graceful: + await connection.gracefulCloseWait() - var runLoop = true - while runLoop: + while runLoop == HttpProcessExitType.KeepAlive: runLoop = await server.processRequest(connection, connectionId) proc acceptClientLoop(server: HttpServerRef) {.async.} = diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index a603ee4..9d5ac23 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -313,6 +313,7 @@ when defined(windows): getAcceptExSockAddrs*: WSAPROC_GETACCEPTEXSOCKADDRS transmitFile*: WSAPROC_TRANSMITFILE getQueuedCompletionStatusEx*: LPFN_GETQUEUEDCOMPLETIONSTATUSEX + disconnectEx*: WSAPROC_DISCONNECTEX flags: set[DispatcherFlag] PtrCustomOverlapped* = ptr CustomOverlapped @@ -393,6 +394,13 @@ when defined(windows): "dispatcher's TransmitFile()") loop.transmitFile = cast[WSAPROC_TRANSMITFILE](funcPointer) + block: + let res = getFunc(sock, funcPointer, WSAID_DISCONNECTEX) + if not(res): + raiseOsDefect(osLastError(), "initAPI(): Unable to initialize " & + "dispatcher's DisconnectEx()") + loop.disconnectEx = cast[WSAPROC_DISCONNECTEX](funcPointer) + if closeFd(sock) != 0: raiseOsDefect(osLastError(), "initAPI(): Unable to close control socket") diff --git a/chronos/osdefs.nim b/chronos/osdefs.nim index 789da8c..a638056 100644 --- a/chronos/osdefs.nim +++ b/chronos/osdefs.nim @@ -237,6 +237,10 @@ when defined(windows): GUID(D1: 0xb5367df0'u32, D2: 0xcbac'u16, D3: 0x11cf'u16, D4: [0x95'u8, 0xca'u8, 0x00'u8, 0x80'u8, 0x5f'u8, 0x48'u8, 0xa1'u8, 0x92'u8]) + WSAID_DISCONNECTEX* = + GUID(D1: 0x7fda2e11'u32, D2: 0x8630'u16, D3: 0x436f'u16, + D4: [0xa0'u8, 0x31'u8, 0xf5'u8, 0x36'u8, + 0xa6'u8, 0xee'u8, 0xc1'u8, 0x57'u8]) GAA_FLAG_INCLUDE_PREFIX* = 0x0010'u32 @@ -497,6 +501,11 @@ when defined(windows): lpTransmitBuffers: pointer, dwReserved: DWORD): WINBOOL {. stdcall, gcsafe, raises: [].} + WSAPROC_DISCONNECTEX* = proc ( + hSocket: SocketHandle, lpOverlapped: POVERLAPPED, dwFlags: DWORD, + dwReserved: DWORD): WINBOOL {. + stdcall, gcsafe, raises: [].} + LPFN_GETQUEUEDCOMPLETIONSTATUSEX* = proc ( completionPort: HANDLE, lpPortEntries: ptr OVERLAPPED_ENTRY, ulCount: ULONG, ulEntriesRemoved: var ULONG, @@ -879,7 +888,7 @@ elif defined(macos) or defined(macosx): AF_INET, AF_INET6, SO_ERROR, SO_REUSEADDR, SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE, - SIG_BLOCK, SIG_UNBLOCK, + SIG_BLOCK, SIG_UNBLOCK, SHUT_RD, SHUT_WR, SHUT_RDWR, SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT, SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2, SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP, @@ -900,7 +909,7 @@ elif defined(macos) or defined(macosx): AF_INET, AF_INET6, SO_ERROR, SO_REUSEADDR, SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE, - SIG_BLOCK, SIG_UNBLOCK, + SIG_BLOCK, SIG_UNBLOCK, SHUT_RD, SHUT_WR, SHUT_RDWR, SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT, SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2, SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP, @@ -939,7 +948,7 @@ elif defined(linux): SOL_SOCKET, SO_ERROR, RLIMIT_NOFILE, MSG_NOSIGNAL, AF_INET, AF_INET6, SO_REUSEADDR, SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, IPV6_MULTICAST_HOPS, - SOCK_DGRAM, + SOCK_DGRAM, SHUT_RD, SHUT_WR, SHUT_RDWR, SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT, SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2, SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP, @@ -962,7 +971,7 @@ elif defined(linux): SOL_SOCKET, SO_ERROR, RLIMIT_NOFILE, MSG_NOSIGNAL, AF_INET, AF_INET6, SO_REUSEADDR, SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, IPV6_MULTICAST_HOPS, - SOCK_DGRAM, + SOCK_DGRAM, SHUT_RD, SHUT_WR, SHUT_RDWR, SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT, SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2, SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP, @@ -1081,6 +1090,7 @@ elif defined(freebsd) or defined(openbsd) or defined(netbsd) or SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE, SIG_BLOCK, SIG_UNBLOCK, CLOCK_MONOTONIC, + SHUT_RD, SHUT_WR, SHUT_RDWR, SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT, SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2, SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP, @@ -1101,6 +1111,7 @@ elif defined(freebsd) or defined(openbsd) or defined(netbsd) or SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE, SIG_BLOCK, SIG_UNBLOCK, CLOCK_MONOTONIC, + SHUT_RD, SHUT_WR, SHUT_RDWR, SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT, SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2, SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP, diff --git a/chronos/transports/stream.nim b/chronos/transports/stream.nim index a4190da..257c475 100644 --- a/chronos/transports/stream.nim +++ b/chronos/transports/stream.nim @@ -2571,6 +2571,57 @@ proc closeWait*(transp: StreamTransport): Future[void] = transp.close() transp.join() +proc shutdownWait*(transp: StreamTransport): Future[void] = + ## Perform graceful shutdown of TCP connection backed by transport ``transp``. + doAssert(transp.kind == TransportKind.Socket) + let retFuture = newFuture[void]("stream.transport.shutdown") + transp.checkClosed(retFuture) + transp.checkWriteEof(retFuture) + + when defined(windows): + let loop = getThreadDispatcher() + proc continuation(udata: pointer) {.gcsafe.} = + let ovl = cast[RefCustomOverlapped](udata) + if not(retFuture.finished()): + if ovl.data.errCode == OSErrorCode(-1): + retFuture.complete() + else: + transp.state.excl({WriteEof}) + retFuture.fail(getTransportOsError(ovl.data.errCode)) + GC_unref(ovl) + + let povl = RefCustomOverlapped(data: CompletionData(cb: continuation)) + GC_ref(povl) + + let res = loop.disconnectEx(SocketHandle(transp.fd), + cast[POVERLAPPED](povl), 0'u32, 0'u32) + if res == FALSE: + let err = osLastError() + case err + of ERROR_IO_PENDING: + transp.state.incl({WriteEof}) + else: + GC_unref(povl) + retFuture.fail(getTransportOsError(err)) + else: + transp.state.incl({WriteEof}) + retFuture.complete() + + retFuture + else: + proc continuation(udata: pointer) {.gcsafe.} = + if not(retFuture.finished()): + retFuture.complete() + + let res = osdefs.shutdown(SocketHandle(transp.fd), SHUT_WR) + if res < 0: + let err = osLastError() + retFuture.fail(getTransportOsError(err)) + else: + transp.state.incl({WriteEof}) + callSoon(continuation, nil) + retFuture + proc closed*(transp: StreamTransport): bool {.inline.} = ## Returns ``true`` if transport in closed state. ({ReadClosed, WriteClosed} * transp.state != {})