diff --git a/chronos/apps/http/httpclient.nim b/chronos/apps/http/httpclient.nim index 2997d04..90539b9 100644 --- a/chronos/apps/http/httpclient.nim +++ b/chronos/apps/http/httpclient.nim @@ -183,7 +183,8 @@ type NoInet4Resolution, ## Do not resolve server hostname to IPv4 addresses NoInet6Resolution, ## Do not resolve server hostname to IPv6 addresses NoAutomaticRedirect, ## Do not handle HTTP redirection automatically - NewConnectionAlways ## Always create new connection to HTTP server + NewConnectionAlways, ## Always create new connection to HTTP server + Http11Pipeline ## Enable HTTP/1.1 pipelining HttpClientFlags* = set[HttpClientFlag] @@ -365,7 +366,11 @@ proc new*(t: typedesc[HttpSessionRef], idlePeriod: idlePeriod, connections: initTable[string, seq[HttpClientConnectionRef]](), ) - res.watcherFut = sessionWatcher(res) + res.watcherFut = + if HttpClientFlag.Http11Pipeline in flags: + sessionWatcher(res) + else: + newFuture[void]("session.watcher.placeholder") res proc getTLSFlags(flags: HttpClientFlags): set[TLSFlags] {.raises: [Defect] .} = @@ -658,6 +663,12 @@ proc removeConnection(session: HttpSessionRef, dec(session.connectionsCount) await conn.closeWait() +func connectionPoolEnabled(session: HttpSessionRef, + flags: set[HttpClientRequestFlag]): bool = + (HttpClientFlag.NewConnectionAlways notin session.flags) and + (HttpClientRequestFlag.DedicatedConnection notin flags) and + (HttpClientFlag.Http11Pipeline in session.flags) + proc acquireConnection( session: HttpSessionRef, ha: HttpAddress, @@ -665,8 +676,7 @@ proc acquireConnection( ): Future[HttpClientConnectionRef] {.async.} = ## Obtain connection from ``session`` or establish a new one. var default: seq[HttpClientConnectionRef] - if (HttpClientFlag.NewConnectionAlways notin session.flags) and - (HttpClientRequestFlag.DedicatedConnection notin flags): + if session.connectionPoolEnabled(flags): # Trying to reuse existing connection from our connection's pool. let timestamp = Moment.now() # We looking for non-idle connection at `Ready` state, all idle connections @@ -691,29 +701,32 @@ proc releaseConnection(session: HttpSessionRef, connection: HttpClientConnectionRef) {.async.} = ## Return connection back to the ``session``. let removeConnection = - case connection.state - of HttpClientConnectionState.ResponseBodyReceived: - if HttpClientConnectionFlag.KeepAlive in connection.flags: - # HTTP response body has been received and "Connection: keep-alive" is - # present in response headers. - false - else: - # HTTP response body has been received, but "Connection: keep-alive" is - # not present or not supported. - true - of HttpClientConnectionState.ResponseHeadersReceived: - if (HttpClientConnectionFlag.NoBody in connection.flags) and - (HttpClientConnectionFlag.KeepAlive in connection.flags): - # HTTP response headers received with an empty response body and - # "Connection: keep-alive" is present in response headers. - false - else: - # HTTP response body is not received or "Connection: keep-alive" is not - # present or not supported. - true - else: - # Connection not in proper state. + if HttpClientFlag.Http11Pipeline notin session.flags: true + else: + case connection.state + of HttpClientConnectionState.ResponseBodyReceived: + if HttpClientConnectionFlag.KeepAlive in connection.flags: + # HTTP response body has been received and "Connection: keep-alive" is + # present in response headers. + false + else: + # HTTP response body has been received, but "Connection: keep-alive" + # is not present or not supported. + true + of HttpClientConnectionState.ResponseHeadersReceived: + if (HttpClientConnectionFlag.NoBody in connection.flags) and + (HttpClientConnectionFlag.KeepAlive in connection.flags): + # HTTP response headers received with an empty response body and + # "Connection: keep-alive" is present in response headers. + false + else: + # HTTP response body is not received or "Connection: keep-alive" is + # not present or not supported. + true + else: + # Connection not in proper state. + true if removeConnection: await session.removeConnection(connection) @@ -753,7 +766,8 @@ proc closeWait*(session: HttpSessionRef) {.async.} = ## This closes all the connections opened to remote servers. var pending: seq[Future[void]] # Closing sessionWatcher to avoid race condition. - await cancelAndWait(session.watcherFut) + if not(isNil(session.watcherFut)): + await cancelAndWait(session.watcherFut) for connections in session.connections.values(): for conn in connections: pending.add(closeWait(conn)) @@ -924,11 +938,15 @@ proc prepareResponse(request: HttpClientRequestRef, data: openArray[byte] res.connection.state = HttpClientConnectionState.ResponseHeadersReceived if nobodyFlag: res.connection.flags.incl(HttpClientConnectionFlag.NoBody) - let newConnectionAlways = - HttpClientFlag.NewConnectionAlways in request.session.flags - let closeConnection = - HttpClientRequestFlag.CloseConnection in request.flags - if connectionFlag and not(newConnectionAlways) and not(closeConnection): + let + newConnectionAlways = + HttpClientFlag.NewConnectionAlways in request.session.flags + httpPipeline = + HttpClientFlag.Http11Pipeline in request.session.flags + closeConnection = + HttpClientRequestFlag.CloseConnection in request.flags + if connectionFlag and not(newConnectionAlways) and not(closeConnection) and + httpPipeline: res.connection.flags.incl(HttpClientConnectionFlag.KeepAlive) res.connection.flags.incl(HttpClientConnectionFlag.Response) trackHttpClientResponse(res) @@ -1049,7 +1067,8 @@ proc prepareRequest(request: HttpClientRequestRef): string {. discard request.headers.hasKeyOrPut(HostHeader, request.address.hostname) # We set `Connection` to value according to flags if its not set. if ConnectionHeader notin request.headers: - if HttpClientRequestFlag.CloseConnection in request.flags: + if (HttpClientFlag.Http11Pipeline notin request.session.flags) or + (HttpClientRequestFlag.CloseConnection in request.flags): request.headers.add(ConnectionHeader, "close") else: request.headers.add(ConnectionHeader, "keep-alive") diff --git a/tests/testhttpclient.nim b/tests/testhttpclient.nim index 16d1dc7..ccd8eef 100644 --- a/tests/testhttpclient.nim +++ b/tests/testhttpclient.nim @@ -831,21 +831,30 @@ suite "HTTP client testing suite": d8 == @[(200, "ok", 0), (200, "ok", 0)] let - n1 = await test1(keepHa, HttpVersion11, {}, {}) - n2 = await test2(keepHa, keepHa, HttpVersion11, {}, {}) - n3 = await test1(dropHa, HttpVersion11, {}, {}) - n4 = await test2(dropHa, dropHa, HttpVersion11, {}, {}) + n1 = await test1(keepHa, HttpVersion11, + {HttpClientFlag.Http11Pipeline}, {}) + n2 = await test2(keepHa, keepHa, HttpVersion11, + {HttpClientFlag.Http11Pipeline}, {}) + n3 = await test1(dropHa, HttpVersion11, + {HttpClientFlag.Http11Pipeline}, {}) + n4 = await test2(dropHa, dropHa, HttpVersion11, + {HttpClientFlag.Http11Pipeline}, {}) n5 = await test1(keepHa, HttpVersion11, - {HttpClientFlag.NewConnectionAlways}, {}) - n6 = await test1(keepHa, HttpVersion11, {}, + {HttpClientFlag.NewConnectionAlways, + HttpClientFlag.Http11Pipeline}, {}) + n6 = await test1(keepHa, HttpVersion11, + {HttpClientFlag.Http11Pipeline}, {HttpClientRequestFlag.DedicatedConnection}) - n7 = await test1(keepHa, HttpVersion11, {}, + n7 = await test1(keepHa, HttpVersion11, + {HttpClientFlag.Http11Pipeline}, {HttpClientRequestFlag.DedicatedConnection, HttpClientRequestFlag.CloseConnection}) - n8 = await test1(keepHa, HttpVersion11, {}, + n8 = await test1(keepHa, HttpVersion11, + {HttpClientFlag.Http11Pipeline}, {HttpClientRequestFlag.CloseConnection}) n9 = await test1(keepHa, HttpVersion11, - {HttpClientFlag.NewConnectionAlways}, + {HttpClientFlag.NewConnectionAlways, + HttpClientFlag.Http11Pipeline}, {HttpClientRequestFlag.CloseConnection}) check: n1 == (200, "ok", 1) @@ -895,7 +904,8 @@ suite "HTTP client testing suite": var server = createServer(address, process, false) server.start() - let session = HttpSessionRef.new(idleTimeout = 1.seconds, + let session = HttpSessionRef.new({HttpClientFlag.Http11Pipeline}, + idleTimeout = 1.seconds, idlePeriod = 200.milliseconds) try: var f1 = test(session, ha) @@ -922,6 +932,75 @@ suite "HTTP client testing suite": return true + proc testNoPipeline(address: TransportAddress): Future[bool] {. + async.} = + let + ha = getAddress(address, HttpClientScheme.NonSecure, "/test") + hb = getAddress(address, HttpClientScheme.NonSecure, "/keep-test") + + proc test( + session: HttpSessionRef, + a: HttpAddress + ): Future[TestResponseTuple] {.async.} = + + var + data: HttpResponseTuple + request = HttpClientRequestRef.new(session, a, version = HttpVersion11) + try: + data = await request.fetch() + finally: + await request.closeWait() + return (data.status, data.data.bytesToString(), 0) + + proc process(r: RequestFence): Future[HttpResponseRef] {.async.} = + if r.isOk(): + let request = r.get() + case request.uri.path + of "/test": + return await request.respond(Http200, "ok") + of "/keep-test": + let headers = HttpTable.init([("Connection", "keep-alive")]) + return await request.respond(Http200, "not-alive", headers) + else: + return await request.respond(Http404, "Page not found") + else: + return dumbResponse() + + var server = createServer(address, process, false) + server.start() + let session = HttpSessionRef.new(idleTimeout = 100.seconds, + idlePeriod = 10.milliseconds) + try: + var f1 = test(session, ha) + var f2 = test(session, ha) + await allFutures(f1, f2) + check: + f1.finished() + f1.done() + f2.finished() + f2.done() + f1.read() == (200, "ok", 0) + f2.read() == (200, "ok", 0) + session.connectionsCount == 0 + + await sleepAsync(100.milliseconds) + block: + let resp = await test(session, ha) + check: + resp == (200, "ok", 0) + session.connectionsCount == 0 + block: + let resp = await test(session, hb) + check: + resp == (200, "not-alive", 0) + session.connectionsCount == 0 + finally: + await session.closeWait() + await server.stop() + await server.closeWait() + + return true + test "HTTP all request methods test": let address = initTAddress("127.0.0.1:30080") check waitFor(testMethods(address, false)) == 18 @@ -997,6 +1076,10 @@ suite "HTTP client testing suite": let address = initTAddress("127.0.0.1:30080") check waitFor(testIdleConnection(address)) == true + test "HTTP client no-pipeline test": + let address = initTAddress("127.0.0.1:30080") + check waitFor(testNoPipeline(address)) == true + test "Leaks test": proc getTrackerLeaks(tracker: string): bool = let tracker = getTracker(tracker)