From 0688d2ef8f7a4c26c0fb23055d8b6679ed61a89f Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Tue, 21 Mar 2023 15:10:35 +0200 Subject: [PATCH] Add idle connection timeouts for HTTP client's connections pool. (#324) * Add idle connection timeouts for HTTP client's connections pool. Add timestamps and duration for both HTTP client requests/responses. Add test. * Add comments on `connectionFlag` decisions. * Address review comments. Adjust default idle connection timeout to 60 seconds. * Increase timeout for test. * Adjust timeout to lower value. * Address review comments. --- chronos/apps/http/httpclient.nim | 253 ++++++++++++++++++++++--------- tests/testhttpclient.nim | 63 ++++++++ 2 files changed, 246 insertions(+), 70 deletions(-) diff --git a/chronos/apps/http/httpclient.nim b/chronos/apps/http/httpclient.nim index f63b24c..2997d04 100644 --- a/chronos/apps/http/httpclient.nim +++ b/chronos/apps/http/httpclient.nim @@ -22,6 +22,12 @@ const ## Timeout for connecting to host (12 sec) HttpHeadersTimeout* = 120.seconds ## Timeout for receiving response headers (120 sec) + HttpConnectionIdleTimeout* = 60.seconds + ## Time after which idle connections are removed from the HttpSession's + ## connections pool (120 sec) + HttpConnectionCheckPeriod* = 10.seconds + ## Period of time between idle connections checks in HttpSession's + ## connection pool (10 sec) HttpMaxRedirections* = 10 ## Maximum number of Location redirections. HttpClientConnectionTrackerName* = "httpclient.connection" @@ -100,6 +106,7 @@ type error*: ref HttpError remoteHostname*: string flags*: set[HttpClientConnectionFlag] + timestamp*: Moment HttpClientConnectionRef* = ref HttpClientConnection @@ -109,6 +116,9 @@ type maxRedirections*: int connectTimeout*: Duration headersTimeout*: Duration + idleTimeout: Duration + idlePeriod: Duration + watcherFut: Future[void] connectionBufferSize*: int maxConnections*: int connectionsCount*: int @@ -140,6 +150,8 @@ type buffer*: seq[byte] writer*: HttpBodyWriter redirectCount: int + timestamp*: Moment + duration*: Duration HttpClientRequestRef* = ref HttpClientRequest @@ -160,6 +172,8 @@ type transferEncoding*: set[TransferEncodingFlags] contentLength*: uint64 contentType*: Opt[ContentTypeData] + timestamp*: Moment + duration*: Duration HttpClientResponseRef* = ref HttpClientResponse @@ -284,29 +298,75 @@ template checkClosed(reqresp: untyped): untyped = reqresp.setError(e) raise e +template setTimestamp(conn: HttpClientConnectionRef, + moment: Moment): untyped = + if not(isNil(conn)): + conn.timestamp = moment + +template setTimestamp( + reqresp: HttpClientRequestRef|HttpClientRequestRef + ): untyped = + if not(isNil(reqresp)): + let timestamp = Moment.now() + reqresp.timestamp = timestamp + reqresp.connection.setTimestamp(timestamp) + +template setTimestamp(resp: HttpClientResponseRef, moment: Moment): untyped = + if not(isNil(resp)): + resp.timestamp = moment + resp.connection.setTimestamp(moment) + +template setDuration( + reqresp: HttpClientRequestRef|HttpClientResponseRef + ): untyped = + if not(isNil(reqresp)): + let timestamp = Moment.now() + reqresp.duration = timestamp - reqresp.timestamp + reqresp.connection.setTimestamp(timestamp) + +template isReady(conn: HttpClientConnectionRef): bool = + (conn.state == HttpClientConnectionState.Ready) and + (HttpClientConnectionFlag.KeepAlive in conn.flags) and + (HttpClientConnectionFlag.Request notin conn.flags) and + (HttpClientConnectionFlag.Response notin conn.flags) + +template isIdle(conn: HttpClientConnectionRef, timestamp: Moment, + timeout: Duration): bool = + (timestamp - conn.timestamp) >= timeout + +proc sessionWatcher(session: HttpSessionRef) {.async.} + proc new*(t: typedesc[HttpSessionRef], flags: HttpClientFlags = {}, maxRedirections = HttpMaxRedirections, connectTimeout = HttpConnectTimeout, headersTimeout = HttpHeadersTimeout, connectionBufferSize = DefaultStreamBufferSize, - maxConnections = -1): HttpSessionRef {. + maxConnections = -1, + idleTimeout = HttpConnectionIdleTimeout, + idlePeriod = HttpConnectionCheckPeriod): HttpSessionRef {. raises: [Defect] .} = ## Create new HTTP session object. ## ## ``maxRedirections`` - maximum number of HTTP 3xx redirections ## ``connectTimeout`` - timeout for ongoing HTTP connection ## ``headersTimeout`` - timeout for receiving HTTP response headers + ## ``idleTimeout`` - timeout to consider HTTP connection as idle + ## ``idlePeriod`` - period of time to check HTTP connections for inactivity doAssert(maxRedirections >= 0, "maxRedirections should not be negative") - HttpSessionRef( + var res = HttpSessionRef( flags: flags, maxRedirections: maxRedirections, connectTimeout: connectTimeout, headersTimeout: headersTimeout, connectionBufferSize: connectionBufferSize, maxConnections: maxConnections, - connections: initTable[string, seq[HttpClientConnectionRef]]() + idleTimeout: idleTimeout, + idlePeriod: idlePeriod, + connections: initTable[string, seq[HttpClientConnectionRef]](), ) + res.watcherFut = sessionWatcher(res) + res proc getTLSFlags(flags: HttpClientFlags): set[TLSFlags] {.raises: [Defect] .} = var res: set[TLSFlags] @@ -583,52 +643,6 @@ proc connect(session: HttpSessionRef, # If all attempts to connect to the remote host have failed. raiseHttpConnectionError("Could not connect to remote host") -proc acquireConnection( - session: HttpSessionRef, - ha: HttpAddress, - flags: set[HttpClientRequestFlag] - ): Future[HttpClientConnectionRef] {.async.} = - ## Obtain connection from ``session`` or establish a new one. - if (HttpClientFlag.NewConnectionAlways in session.flags) or - (HttpClientRequestFlag.DedicatedConnection in flags): - var default: seq[HttpClientConnectionRef] - let res = - try: - await session.connect(ha).wait(session.connectTimeout) - except AsyncTimeoutError: - raiseHttpConnectionError("Connection timed out") - res[].state = HttpClientConnectionState.Acquired - session.connections.mgetOrPut(ha.id, default).add(res) - inc(session.connectionsCount) - return res - else: - let conn = - block: - let conns = session.connections.getOrDefault(ha.id) - if len(conns) > 0: - var res: HttpClientConnectionRef = nil - for item in conns: - if item.state == HttpClientConnectionState.Ready: - res = item - break - res - else: - nil - if not(isNil(conn)): - conn[].state = HttpClientConnectionState.Acquired - return conn - else: - var default: seq[HttpClientConnectionRef] - let res = - try: - await session.connect(ha).wait(session.connectTimeout) - except AsyncTimeoutError: - raiseHttpConnectionError("Connection timed out") - res[].state = HttpClientConnectionState.Acquired - session.connections.mgetOrPut(ha.id, default).add(res) - inc(session.connectionsCount) - return res - proc removeConnection(session: HttpSessionRef, conn: HttpClientConnectionRef) {.async.} = let removeHost = @@ -644,6 +658,35 @@ proc removeConnection(session: HttpSessionRef, dec(session.connectionsCount) await conn.closeWait() +proc acquireConnection( + session: HttpSessionRef, + ha: HttpAddress, + flags: set[HttpClientRequestFlag] + ): Future[HttpClientConnectionRef] {.async.} = + ## Obtain connection from ``session`` or establish a new one. + var default: seq[HttpClientConnectionRef] + if (HttpClientFlag.NewConnectionAlways notin session.flags) and + (HttpClientRequestFlag.DedicatedConnection notin flags): + # Trying to reuse existing connection from our connection's pool. + let timestamp = Moment.now() + # We looking for non-idle connection at `Ready` state, all idle connections + # will be freed by sessionWatcher(). + for connection in session.connections.getOrDefault(ha.id): + if connection.isReady() and + not(connection.isIdle(timestamp, session.idleTimeout)): + connection.state = HttpClientConnectionState.Acquired + return connection + + let connection = + try: + await session.connect(ha).wait(session.connectTimeout) + except AsyncTimeoutError: + raiseHttpConnectionError("Connection timed out") + connection.state = HttpClientConnectionState.Acquired + session.connections.mgetOrPut(ha.id, default).add(connection) + inc(session.connectionsCount) + return connection + proc releaseConnection(session: HttpSessionRef, connection: HttpClientConnectionRef) {.async.} = ## Return connection back to the ``session``. @@ -676,7 +719,9 @@ proc releaseConnection(session: HttpSessionRef, await session.removeConnection(connection) else: connection.state = HttpClientConnectionState.Ready - connection.flags = {} + connection.flags.excl({HttpClientConnectionFlag.Request, + HttpClientConnectionFlag.Response, + HttpClientConnectionFlag.NoBody}) proc releaseConnection(request: HttpClientRequestRef) {.async.} = let @@ -707,11 +752,55 @@ proc closeWait*(session: HttpSessionRef) {.async.} = ## ## This closes all the connections opened to remote servers. var pending: seq[Future[void]] - for items in session.connections.values(): - for item in items: - pending.add(closeWait(item)) + # Closing sessionWatcher to avoid race condition. + await cancelAndWait(session.watcherFut) + for connections in session.connections.values(): + for conn in connections: + pending.add(closeWait(conn)) await allFutures(pending) +proc sessionWatcher(session: HttpSessionRef) {.async.} = + while true: + let firstBreak = + try: + await sleepAsync(session.idlePeriod) + false + except CancelledError: + true + + if firstBreak: + break + + var idleConnections: seq[HttpClientConnectionRef] + let timestamp = Moment.now() + for _, connections in session.connections.mpairs(): + connections.keepItIf( + if isNil(it): + false + else: + if it.isReady() and it.isIdle(timestamp, session.idleTimeout): + idleConnections.add(it) + false + else: + true + ) + + if len(idleConnections) > 0: + dec(session.connectionsCount, len(idleConnections)) + var pending: seq[Future[void]] + let secondBreak = + try: + pending = idleConnections.mapIt(it.closeWait()) + await allFutures(pending) + false + except CancelledError: + # We still want to close connections to avoid socket leaks. + await allFutures(pending) + true + + if secondBreak: + break + proc closeWait*(request: HttpClientRequestRef) {.async.} = if request.state notin {HttpReqRespState.Closing, HttpReqRespState.Closed}: request.state = HttpReqRespState.Closing @@ -791,14 +880,26 @@ proc prepareResponse(request: HttpClientRequestRef, data: openArray[byte] let connectionFlag = block: case resp.version - of HttpVersion11, HttpVersion20: + of HttpVersion11: + # Keeping a connection open is the default on HTTP/1.1 requests. + # https://www.rfc-editor.org/rfc/rfc2068.html#section-19.7.1 let header = toLowerAscii(headers.getString(ConnectionHeader)) - if header == "keep-alive": - true - else: + if header == "close": false - else: + else: + true + of HttpVersion10: + # This is the default on HTTP/1.0 requests. false + else: + # HTTP/2 does not use the Connection header field (Section 7.6.1 of + # [HTTP]) to indicate connection-specific header fields. + # https://httpwg.org/specs/rfc9113.html#rfc.section.8.2.2 + # + # HTTP/3 does not use the Connection header field to indicate + # connection-specific fields; + # https://httpwg.org/specs/rfc9114.html#rfc.section.4.2 + true let contentType = block: @@ -836,22 +937,25 @@ proc prepareResponse(request: HttpClientRequestRef, data: openArray[byte] proc getResponse(req: HttpClientRequestRef): Future[HttpClientResponseRef] {. async.} = var buffer: array[HttpMaxHeadersSize, byte] - let bytesRead = - try: - await req.connection.reader.readUntil(addr buffer[0], - len(buffer), HeadersMark).wait( - req.session.headersTimeout) - except CancelledError as exc: - raise exc - except AsyncTimeoutError: - raiseHttpReadError("Reading response headers timed out") - except AsyncStreamError: - raiseHttpReadError("Could not read response headers") + let timestamp = Moment.now() + req.connection.setTimestamp(timestamp) + let + bytesRead = + try: + await req.connection.reader.readUntil(addr buffer[0], + len(buffer), HeadersMark).wait( + req.session.headersTimeout) + except AsyncTimeoutError: + raiseHttpReadError("Reading response headers timed out") + except AsyncStreamError: + raiseHttpReadError("Could not read response headers") let response = prepareResponse(req, buffer.toOpenArray(0, bytesRead - 1)) if response.isErr(): raiseHttpProtocolError(response.error()) - return response.get() + let res = response.get() + res.setTimestamp(timestamp) + return res proc new*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef, ha: HttpAddress, meth: HttpMethod = MethodGet, @@ -1029,6 +1133,7 @@ proc send*(request: HttpClientRequestRef): Future[HttpClientResponseRef] {. let headers = request.prepareRequest() request.connection.state = HttpClientConnectionState.RequestHeadersSending request.state = HttpReqRespState.Open + request.setTimestamp() await request.connection.writer.write(headers) request.connection.state = HttpClientConnectionState.RequestHeadersSent request.connection.state = HttpClientConnectionState.RequestBodySending @@ -1036,10 +1141,13 @@ proc send*(request: HttpClientRequestRef): Future[HttpClientResponseRef] {. await request.connection.writer.write(request.buffer) request.connection.state = HttpClientConnectionState.RequestBodySent request.state = HttpReqRespState.Finished + request.setDuration() except CancelledError as exc: + request.setDuration() request.setError(newHttpInterruptError()) raise exc except AsyncStreamError: + request.setDuration() let error = newHttpWriteError("Could not send request headers") request.setError(error) raise error @@ -1079,13 +1187,16 @@ proc open*(request: HttpClientRequestRef): Future[HttpBodyWriter] {. try: let headers = request.prepareRequest() request.connection.state = HttpClientConnectionState.RequestHeadersSending + request.setTimestamp() await request.connection.writer.write(headers) request.connection.state = HttpClientConnectionState.RequestHeadersSent except CancelledError as exc: + request.setDuration() request.setError(newHttpInterruptError()) raise exc except AsyncStreamError: let error = newHttpWriteError("Could not send request headers") + request.setDuration() request.setError(error) raise error @@ -1123,6 +1234,7 @@ proc finish*(request: HttpClientRequestRef): Future[HttpClientResponseRef] {. "Body writer instance must be closed before finish(request) call") request.state = HttpReqRespState.Finished request.connection.state = HttpClientConnectionState.RequestBodySent + request.setDuration() let response = try: await request.getResponse() @@ -1190,6 +1302,7 @@ proc finish*(response: HttpClientResponseRef) {.async.} = "Body reader instance must be closed before finish(response) call") response.connection.state = HttpClientConnectionState.ResponseBodyReceived response.state = HttpReqRespState.Finished + response.setDuration() proc getBodyBytes*(response: HttpClientResponseRef): Future[seq[byte]] {. async.} = diff --git a/tests/testhttpclient.nim b/tests/testhttpclient.nim index ec55ddf..15f77d3 100644 --- a/tests/testhttpclient.nim +++ b/tests/testhttpclient.nim @@ -863,6 +863,65 @@ suite "HTTP client testing suite": return true + proc testIdleConnection(address: TransportAddress): Future[bool] {. + async.} = + let + ha = getAddress(address, HttpClientScheme.NonSecure, "/test") + + proc test( + session: HttpSessionRef, + a: HttpAddress + ): Future[TestResponseTuple] {.async.} = + + var + data: HttpResponseTuple + request = HttpClientRequestRef.new(session, a, version = HttpVersion11) + try: + data = await request.fetch() + finally: + await request.closeWait() + return (data.status, data.data.bytesToString(), 0) + + proc process(r: RequestFence): Future[HttpResponseRef] {.async.} = + if r.isOk(): + let request = r.get() + case request.uri.path + of "/test": + return await request.respond(Http200, "ok") + else: + return await request.respond(Http404, "Page not found") + else: + return dumbResponse() + + var server = createServer(address, process, false) + server.start() + let session = HttpSessionRef.new(idleTimeout = 1.seconds, + idlePeriod = 200.milliseconds) + try: + var f1 = test(session, ha) + var f2 = test(session, ha) + await allFutures(f1, f2) + check: + f1.finished() + f1.done() + f2.finished() + f2.done() + f1.read() == (200, "ok", 0) + f2.read() == (200, "ok", 0) + session.connectionsCount == 2 + + await sleepAsync(1500.milliseconds) + let resp = await test(session, ha) + check: + resp == (200, "ok", 0) + session.connectionsCount == 1 + finally: + await session.closeWait() + await server.stop() + await server.closeWait() + + return true + test "HTTP all request methods test": let address = initTAddress("127.0.0.1:30080") check waitFor(testMethods(address, false)) == 18 @@ -934,6 +993,10 @@ suite "HTTP client testing suite": let address = initTAddress("127.0.0.1:30080") check waitFor(testConnectionManagement(address)) == true + test "HTTP client idle connection test": + let address = initTAddress("127.0.0.1:30080") + check waitFor(testIdleConnection(address)) == true + test "Leaks test": proc getTrackerLeaks(tracker: string): bool = let tracker = getTracker(tracker)