Add cross-platform shutdown() call and use it for HTTP server. (#420)

* Add cross-platform shutdown() call and use it for httpserver connection processing.

* Fix Posix compilation issues and warnings.
This commit is contained in:
Eugene Kabanov 2023-07-19 20:33:28 +03:00 committed by GitHub
parent d652c52142
commit e04c042e8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 143 additions and 35 deletions

View File

@ -54,6 +54,9 @@ type
HttpResponseStreamType* {.pure.} = enum
Plain, SSE, Chunked
HttpProcessExitType* {.pure.} = enum
KeepAlive, Graceful, Immediate
HttpResponseState* {.pure.} = enum
Empty, Prepared, Sending, Finished, Failed, Cancelled, Default
@ -663,62 +666,83 @@ proc sendErrorResponse(conn: HttpConnectionRef, version: HttpVersion,
# We ignore errors here, because we indicating error already.
discard
proc sendErrorResponse(conn: HttpConnectionRef, reqFence: RequestFence,
respError: HttpProcessError): Future[bool] {.async.} =
proc sendErrorResponse(
conn: HttpConnectionRef,
reqFence: RequestFence,
respError: HttpProcessError
): Future[HttpProcessExitType] {.async.} =
let version = getResponseVersion(reqFence)
try:
if reqFence.isOk():
case respError.kind
of HttpServerError.CriticalError:
await conn.sendErrorResponse(version, respError.code, false)
false
HttpProcessExitType.Graceful
of HttpServerError.RecoverableError:
await conn.sendErrorResponse(version, respError.code, true)
true
HttpProcessExitType.Graceful
of HttpServerError.CatchableError:
await conn.sendErrorResponse(version, respError.code, false)
false
HttpProcessExitType.Graceful
of HttpServerError.DisconnectError,
HttpServerError.InterruptError,
HttpServerError.TimeoutError:
raiseAssert("Unexpected response error: " & $respError.kind)
else:
false
HttpProcessExitType.Graceful
except CancelledError:
false
HttpProcessExitType.Immediate
except CatchableError:
HttpProcessExitType.Immediate
proc sendDefaultResponse(conn: HttpConnectionRef, reqFence: RequestFence,
response: HttpResponseRef): Future[bool] {.async.} =
proc sendDefaultResponse(
conn: HttpConnectionRef,
reqFence: RequestFence,
response: HttpResponseRef
): Future[HttpProcessExitType] {.async.} =
let
version = getResponseVersion(reqFence)
keepConnection =
if isNil(response):
false
if isNil(response) or (HttpResponseFlags.KeepAlive notin response.flags):
HttpProcessExitType.Graceful
else:
HttpResponseFlags.KeepAlive in response.flags
HttpProcessExitType.KeepAlive
template toBool(hpet: HttpProcessExitType): bool =
case hpet
of HttpProcessExitType.KeepAlive:
true
of HttpProcessExitType.Immediate:
false
of HttpProcessExitType.Graceful:
false
try:
if reqFence.isOk():
if isNil(response):
await conn.sendErrorResponse(version, Http404, keepConnection)
await conn.sendErrorResponse(version, Http404, keepConnection.toBool())
keepConnection
else:
case response.state
of HttpResponseState.Empty:
# Response was ignored, so we respond with not found.
await conn.sendErrorResponse(version, Http404, keepConnection)
await conn.sendErrorResponse(version, Http404,
keepConnection.toBool())
keepConnection
of HttpResponseState.Prepared:
# Response was prepared but not sent, so we can respond with some
# error code
await conn.sendErrorResponse(HttpVersion11, Http409, keepConnection)
await conn.sendErrorResponse(HttpVersion11, Http409,
keepConnection.toBool())
keepConnection
of HttpResponseState.Sending, HttpResponseState.Failed,
HttpResponseState.Cancelled:
# Just drop connection, because we dont know at what stage we are
false
HttpProcessExitType.Immediate
of HttpResponseState.Default:
# Response was ignored, so we respond with not found.
await conn.sendErrorResponse(version, Http404, keepConnection)
await conn.sendErrorResponse(version, Http404,
keepConnection.toBool())
keepConnection
of HttpResponseState.Finished:
keepConnection
@ -726,23 +750,25 @@ proc sendDefaultResponse(conn: HttpConnectionRef, reqFence: RequestFence,
case reqFence.error.kind
of HttpServerError.TimeoutError:
await conn.sendErrorResponse(version, reqFence.error.code, false)
false
HttpProcessExitType.Graceful
of HttpServerError.CriticalError:
await conn.sendErrorResponse(version, reqFence.error.code, false)
false
HttpProcessExitType.Graceful
of HttpServerError.RecoverableError:
await conn.sendErrorResponse(version, reqFence.error.code, true)
false
await conn.sendErrorResponse(version, reqFence.error.code, false)
HttpProcessExitType.Graceful
of HttpServerError.CatchableError:
await conn.sendErrorResponse(version, reqFence.error.code, false)
false
HttpProcessExitType.Graceful
of HttpServerError.DisconnectError:
# When `HttpServerFlags.NotifyDisconnect` is set.
false
HttpProcessExitType.Immediate
of HttpServerError.InterruptError:
raiseAssert("Unexpected request error: " & $reqFence.error.kind)
except CancelledError:
false
HttpProcessExitType.Immediate
except CatchableError:
HttpProcessExitType.Immediate
proc getRequest(conn: HttpConnectionRef): Future[HttpRequestRef] {.async.} =
try:
@ -800,6 +826,10 @@ proc new(ht: typedesc[HttpConnectionRef], server: HttpServerRef,
trackCounter(HttpServerUnsecureConnectionTrackerName)
res
proc gracefulCloseWait*(conn: HttpConnectionRef) {.async.} =
await conn.transp.shutdownWait()
await conn.closeCb(conn)
proc closeWait*(conn: HttpConnectionRef): Future[void] =
conn.closeCb(conn)
@ -942,15 +972,15 @@ proc getConnectionFence*(server: HttpServerRef,
proc processRequest(server: HttpServerRef,
connection: HttpConnectionRef,
connId: string): Future[bool] {.async.} =
connId: string): Future[HttpProcessExitType] {.async.} =
let requestFence = await getRequestFence(server, connection)
if requestFence.isErr():
case requestFence.error.kind
of HttpServerError.InterruptError:
return false
return HttpProcessExitType.Immediate
of HttpServerError.DisconnectError:
if HttpServerFlags.NotifyDisconnect notin server.flags:
return false
return HttpProcessExitType.Immediate
else:
discard
@ -961,7 +991,7 @@ proc processRequest(server: HttpServerRef,
let responseFence = await getResponseFence(connection, requestFence)
if responseFence.isErr() and
(responseFence.error.kind == HttpServerError.InterruptError):
return false
return HttpProcessExitType.Immediate
if responseFence.isErr():
await connection.sendErrorResponse(requestFence, responseFence.error)
@ -985,12 +1015,20 @@ proc processLoop(holder: HttpConnectionHolderRef) {.async.} =
holder.connection = connection
var runLoop = HttpProcessExitType.KeepAlive
defer:
server.connections.del(connectionId)
await connection.closeWait()
case runLoop
of HttpProcessExitType.KeepAlive:
# This could happened only on CancelledError.
await connection.closeWait()
of HttpProcessExitType.Immediate:
await connection.closeWait()
of HttpProcessExitType.Graceful:
await connection.gracefulCloseWait()
var runLoop = true
while runLoop:
while runLoop == HttpProcessExitType.KeepAlive:
runLoop = await server.processRequest(connection, connectionId)
proc acceptClientLoop(server: HttpServerRef) {.async.} =

View File

@ -313,6 +313,7 @@ when defined(windows):
getAcceptExSockAddrs*: WSAPROC_GETACCEPTEXSOCKADDRS
transmitFile*: WSAPROC_TRANSMITFILE
getQueuedCompletionStatusEx*: LPFN_GETQUEUEDCOMPLETIONSTATUSEX
disconnectEx*: WSAPROC_DISCONNECTEX
flags: set[DispatcherFlag]
PtrCustomOverlapped* = ptr CustomOverlapped
@ -393,6 +394,13 @@ when defined(windows):
"dispatcher's TransmitFile()")
loop.transmitFile = cast[WSAPROC_TRANSMITFILE](funcPointer)
block:
let res = getFunc(sock, funcPointer, WSAID_DISCONNECTEX)
if not(res):
raiseOsDefect(osLastError(), "initAPI(): Unable to initialize " &
"dispatcher's DisconnectEx()")
loop.disconnectEx = cast[WSAPROC_DISCONNECTEX](funcPointer)
if closeFd(sock) != 0:
raiseOsDefect(osLastError(), "initAPI(): Unable to close control socket")

View File

@ -237,6 +237,10 @@ when defined(windows):
GUID(D1: 0xb5367df0'u32, D2: 0xcbac'u16, D3: 0x11cf'u16,
D4: [0x95'u8, 0xca'u8, 0x00'u8, 0x80'u8,
0x5f'u8, 0x48'u8, 0xa1'u8, 0x92'u8])
WSAID_DISCONNECTEX* =
GUID(D1: 0x7fda2e11'u32, D2: 0x8630'u16, D3: 0x436f'u16,
D4: [0xa0'u8, 0x31'u8, 0xf5'u8, 0x36'u8,
0xa6'u8, 0xee'u8, 0xc1'u8, 0x57'u8])
GAA_FLAG_INCLUDE_PREFIX* = 0x0010'u32
@ -497,6 +501,11 @@ when defined(windows):
lpTransmitBuffers: pointer, dwReserved: DWORD): WINBOOL {.
stdcall, gcsafe, raises: [].}
WSAPROC_DISCONNECTEX* = proc (
hSocket: SocketHandle, lpOverlapped: POVERLAPPED, dwFlags: DWORD,
dwReserved: DWORD): WINBOOL {.
stdcall, gcsafe, raises: [].}
LPFN_GETQUEUEDCOMPLETIONSTATUSEX* = proc (
completionPort: HANDLE, lpPortEntries: ptr OVERLAPPED_ENTRY,
ulCount: ULONG, ulEntriesRemoved: var ULONG,
@ -879,7 +888,7 @@ elif defined(macos) or defined(macosx):
AF_INET, AF_INET6, SO_ERROR, SO_REUSEADDR,
SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP,
IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE,
SIG_BLOCK, SIG_UNBLOCK,
SIG_BLOCK, SIG_UNBLOCK, SHUT_RD, SHUT_WR, SHUT_RDWR,
SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT,
SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2,
SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP,
@ -900,7 +909,7 @@ elif defined(macos) or defined(macosx):
AF_INET, AF_INET6, SO_ERROR, SO_REUSEADDR,
SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP,
IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE,
SIG_BLOCK, SIG_UNBLOCK,
SIG_BLOCK, SIG_UNBLOCK, SHUT_RD, SHUT_WR, SHUT_RDWR,
SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT,
SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2,
SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP,
@ -939,7 +948,7 @@ elif defined(linux):
SOL_SOCKET, SO_ERROR, RLIMIT_NOFILE, MSG_NOSIGNAL,
AF_INET, AF_INET6, SO_REUSEADDR, SO_REUSEPORT,
SO_BROADCAST, IPPROTO_IP, IPV6_MULTICAST_HOPS,
SOCK_DGRAM,
SOCK_DGRAM, SHUT_RD, SHUT_WR, SHUT_RDWR,
SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT,
SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2,
SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP,
@ -962,7 +971,7 @@ elif defined(linux):
SOL_SOCKET, SO_ERROR, RLIMIT_NOFILE, MSG_NOSIGNAL,
AF_INET, AF_INET6, SO_REUSEADDR, SO_REUSEPORT,
SO_BROADCAST, IPPROTO_IP, IPV6_MULTICAST_HOPS,
SOCK_DGRAM,
SOCK_DGRAM, SHUT_RD, SHUT_WR, SHUT_RDWR,
SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT,
SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2,
SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP,
@ -1081,6 +1090,7 @@ elif defined(freebsd) or defined(openbsd) or defined(netbsd) or
SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP,
IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE,
SIG_BLOCK, SIG_UNBLOCK, CLOCK_MONOTONIC,
SHUT_RD, SHUT_WR, SHUT_RDWR,
SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT,
SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2,
SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP,
@ -1101,6 +1111,7 @@ elif defined(freebsd) or defined(openbsd) or defined(netbsd) or
SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP,
IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE,
SIG_BLOCK, SIG_UNBLOCK, CLOCK_MONOTONIC,
SHUT_RD, SHUT_WR, SHUT_RDWR,
SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT,
SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2,
SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP,

View File

@ -2571,6 +2571,57 @@ proc closeWait*(transp: StreamTransport): Future[void] =
transp.close()
transp.join()
proc shutdownWait*(transp: StreamTransport): Future[void] =
## Perform graceful shutdown of TCP connection backed by transport ``transp``.
doAssert(transp.kind == TransportKind.Socket)
let retFuture = newFuture[void]("stream.transport.shutdown")
transp.checkClosed(retFuture)
transp.checkWriteEof(retFuture)
when defined(windows):
let loop = getThreadDispatcher()
proc continuation(udata: pointer) {.gcsafe.} =
let ovl = cast[RefCustomOverlapped](udata)
if not(retFuture.finished()):
if ovl.data.errCode == OSErrorCode(-1):
retFuture.complete()
else:
transp.state.excl({WriteEof})
retFuture.fail(getTransportOsError(ovl.data.errCode))
GC_unref(ovl)
let povl = RefCustomOverlapped(data: CompletionData(cb: continuation))
GC_ref(povl)
let res = loop.disconnectEx(SocketHandle(transp.fd),
cast[POVERLAPPED](povl), 0'u32, 0'u32)
if res == FALSE:
let err = osLastError()
case err
of ERROR_IO_PENDING:
transp.state.incl({WriteEof})
else:
GC_unref(povl)
retFuture.fail(getTransportOsError(err))
else:
transp.state.incl({WriteEof})
retFuture.complete()
retFuture
else:
proc continuation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
retFuture.complete()
let res = osdefs.shutdown(SocketHandle(transp.fd), SHUT_WR)
if res < 0:
let err = osLastError()
retFuture.fail(getTransportOsError(err))
else:
transp.state.incl({WriteEof})
callSoon(continuation, nil)
retFuture
proc closed*(transp: StreamTransport): bool {.inline.} =
## Returns ``true`` if transport in closed state.
({ReadClosed, WriteClosed} * transp.state != {})