Add idle connection timeouts for HTTP client's connections pool. (#324)

* Add idle connection timeouts for HTTP client's connections pool.
Add timestamps and duration for both HTTP client requests/responses.
Add test.

* Add comments on `connectionFlag` decisions.

* Address review comments.
Adjust default idle connection timeout to 60 seconds.

* Increase timeout for test.

* Adjust timeout to lower value.

* Address review comments.
This commit is contained in:
Eugene Kabanov 2023-03-21 15:10:35 +02:00 committed by GitHub
parent 30c839bac7
commit 0688d2ef8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 246 additions and 70 deletions

View File

@ -22,6 +22,12 @@ const
## Timeout for connecting to host (12 sec) ## Timeout for connecting to host (12 sec)
HttpHeadersTimeout* = 120.seconds HttpHeadersTimeout* = 120.seconds
## Timeout for receiving response headers (120 sec) ## Timeout for receiving response headers (120 sec)
HttpConnectionIdleTimeout* = 60.seconds
## Time after which idle connections are removed from the HttpSession's
## connections pool (120 sec)
HttpConnectionCheckPeriod* = 10.seconds
## Period of time between idle connections checks in HttpSession's
## connection pool (10 sec)
HttpMaxRedirections* = 10 HttpMaxRedirections* = 10
## Maximum number of Location redirections. ## Maximum number of Location redirections.
HttpClientConnectionTrackerName* = "httpclient.connection" HttpClientConnectionTrackerName* = "httpclient.connection"
@ -100,6 +106,7 @@ type
error*: ref HttpError error*: ref HttpError
remoteHostname*: string remoteHostname*: string
flags*: set[HttpClientConnectionFlag] flags*: set[HttpClientConnectionFlag]
timestamp*: Moment
HttpClientConnectionRef* = ref HttpClientConnection HttpClientConnectionRef* = ref HttpClientConnection
@ -109,6 +116,9 @@ type
maxRedirections*: int maxRedirections*: int
connectTimeout*: Duration connectTimeout*: Duration
headersTimeout*: Duration headersTimeout*: Duration
idleTimeout: Duration
idlePeriod: Duration
watcherFut: Future[void]
connectionBufferSize*: int connectionBufferSize*: int
maxConnections*: int maxConnections*: int
connectionsCount*: int connectionsCount*: int
@ -140,6 +150,8 @@ type
buffer*: seq[byte] buffer*: seq[byte]
writer*: HttpBodyWriter writer*: HttpBodyWriter
redirectCount: int redirectCount: int
timestamp*: Moment
duration*: Duration
HttpClientRequestRef* = ref HttpClientRequest HttpClientRequestRef* = ref HttpClientRequest
@ -160,6 +172,8 @@ type
transferEncoding*: set[TransferEncodingFlags] transferEncoding*: set[TransferEncodingFlags]
contentLength*: uint64 contentLength*: uint64
contentType*: Opt[ContentTypeData] contentType*: Opt[ContentTypeData]
timestamp*: Moment
duration*: Duration
HttpClientResponseRef* = ref HttpClientResponse HttpClientResponseRef* = ref HttpClientResponse
@ -284,29 +298,75 @@ template checkClosed(reqresp: untyped): untyped =
reqresp.setError(e) reqresp.setError(e)
raise e raise e
template setTimestamp(conn: HttpClientConnectionRef,
moment: Moment): untyped =
if not(isNil(conn)):
conn.timestamp = moment
template setTimestamp(
reqresp: HttpClientRequestRef|HttpClientRequestRef
): untyped =
if not(isNil(reqresp)):
let timestamp = Moment.now()
reqresp.timestamp = timestamp
reqresp.connection.setTimestamp(timestamp)
template setTimestamp(resp: HttpClientResponseRef, moment: Moment): untyped =
if not(isNil(resp)):
resp.timestamp = moment
resp.connection.setTimestamp(moment)
template setDuration(
reqresp: HttpClientRequestRef|HttpClientResponseRef
): untyped =
if not(isNil(reqresp)):
let timestamp = Moment.now()
reqresp.duration = timestamp - reqresp.timestamp
reqresp.connection.setTimestamp(timestamp)
template isReady(conn: HttpClientConnectionRef): bool =
(conn.state == HttpClientConnectionState.Ready) and
(HttpClientConnectionFlag.KeepAlive in conn.flags) and
(HttpClientConnectionFlag.Request notin conn.flags) and
(HttpClientConnectionFlag.Response notin conn.flags)
template isIdle(conn: HttpClientConnectionRef, timestamp: Moment,
timeout: Duration): bool =
(timestamp - conn.timestamp) >= timeout
proc sessionWatcher(session: HttpSessionRef) {.async.}
proc new*(t: typedesc[HttpSessionRef], proc new*(t: typedesc[HttpSessionRef],
flags: HttpClientFlags = {}, flags: HttpClientFlags = {},
maxRedirections = HttpMaxRedirections, maxRedirections = HttpMaxRedirections,
connectTimeout = HttpConnectTimeout, connectTimeout = HttpConnectTimeout,
headersTimeout = HttpHeadersTimeout, headersTimeout = HttpHeadersTimeout,
connectionBufferSize = DefaultStreamBufferSize, connectionBufferSize = DefaultStreamBufferSize,
maxConnections = -1): HttpSessionRef {. maxConnections = -1,
idleTimeout = HttpConnectionIdleTimeout,
idlePeriod = HttpConnectionCheckPeriod): HttpSessionRef {.
raises: [Defect] .} = raises: [Defect] .} =
## Create new HTTP session object. ## Create new HTTP session object.
## ##
## ``maxRedirections`` - maximum number of HTTP 3xx redirections ## ``maxRedirections`` - maximum number of HTTP 3xx redirections
## ``connectTimeout`` - timeout for ongoing HTTP connection ## ``connectTimeout`` - timeout for ongoing HTTP connection
## ``headersTimeout`` - timeout for receiving HTTP response headers ## ``headersTimeout`` - timeout for receiving HTTP response headers
## ``idleTimeout`` - timeout to consider HTTP connection as idle
## ``idlePeriod`` - period of time to check HTTP connections for inactivity
doAssert(maxRedirections >= 0, "maxRedirections should not be negative") doAssert(maxRedirections >= 0, "maxRedirections should not be negative")
HttpSessionRef( var res = HttpSessionRef(
flags: flags, flags: flags,
maxRedirections: maxRedirections, maxRedirections: maxRedirections,
connectTimeout: connectTimeout, connectTimeout: connectTimeout,
headersTimeout: headersTimeout, headersTimeout: headersTimeout,
connectionBufferSize: connectionBufferSize, connectionBufferSize: connectionBufferSize,
maxConnections: maxConnections, maxConnections: maxConnections,
connections: initTable[string, seq[HttpClientConnectionRef]]() idleTimeout: idleTimeout,
idlePeriod: idlePeriod,
connections: initTable[string, seq[HttpClientConnectionRef]](),
) )
res.watcherFut = sessionWatcher(res)
res
proc getTLSFlags(flags: HttpClientFlags): set[TLSFlags] {.raises: [Defect] .} = proc getTLSFlags(flags: HttpClientFlags): set[TLSFlags] {.raises: [Defect] .} =
var res: set[TLSFlags] var res: set[TLSFlags]
@ -583,52 +643,6 @@ proc connect(session: HttpSessionRef,
# If all attempts to connect to the remote host have failed. # If all attempts to connect to the remote host have failed.
raiseHttpConnectionError("Could not connect to remote host") raiseHttpConnectionError("Could not connect to remote host")
proc acquireConnection(
session: HttpSessionRef,
ha: HttpAddress,
flags: set[HttpClientRequestFlag]
): Future[HttpClientConnectionRef] {.async.} =
## Obtain connection from ``session`` or establish a new one.
if (HttpClientFlag.NewConnectionAlways in session.flags) or
(HttpClientRequestFlag.DedicatedConnection in flags):
var default: seq[HttpClientConnectionRef]
let res =
try:
await session.connect(ha).wait(session.connectTimeout)
except AsyncTimeoutError:
raiseHttpConnectionError("Connection timed out")
res[].state = HttpClientConnectionState.Acquired
session.connections.mgetOrPut(ha.id, default).add(res)
inc(session.connectionsCount)
return res
else:
let conn =
block:
let conns = session.connections.getOrDefault(ha.id)
if len(conns) > 0:
var res: HttpClientConnectionRef = nil
for item in conns:
if item.state == HttpClientConnectionState.Ready:
res = item
break
res
else:
nil
if not(isNil(conn)):
conn[].state = HttpClientConnectionState.Acquired
return conn
else:
var default: seq[HttpClientConnectionRef]
let res =
try:
await session.connect(ha).wait(session.connectTimeout)
except AsyncTimeoutError:
raiseHttpConnectionError("Connection timed out")
res[].state = HttpClientConnectionState.Acquired
session.connections.mgetOrPut(ha.id, default).add(res)
inc(session.connectionsCount)
return res
proc removeConnection(session: HttpSessionRef, proc removeConnection(session: HttpSessionRef,
conn: HttpClientConnectionRef) {.async.} = conn: HttpClientConnectionRef) {.async.} =
let removeHost = let removeHost =
@ -644,6 +658,35 @@ proc removeConnection(session: HttpSessionRef,
dec(session.connectionsCount) dec(session.connectionsCount)
await conn.closeWait() await conn.closeWait()
proc acquireConnection(
session: HttpSessionRef,
ha: HttpAddress,
flags: set[HttpClientRequestFlag]
): Future[HttpClientConnectionRef] {.async.} =
## Obtain connection from ``session`` or establish a new one.
var default: seq[HttpClientConnectionRef]
if (HttpClientFlag.NewConnectionAlways notin session.flags) and
(HttpClientRequestFlag.DedicatedConnection notin flags):
# Trying to reuse existing connection from our connection's pool.
let timestamp = Moment.now()
# We looking for non-idle connection at `Ready` state, all idle connections
# will be freed by sessionWatcher().
for connection in session.connections.getOrDefault(ha.id):
if connection.isReady() and
not(connection.isIdle(timestamp, session.idleTimeout)):
connection.state = HttpClientConnectionState.Acquired
return connection
let connection =
try:
await session.connect(ha).wait(session.connectTimeout)
except AsyncTimeoutError:
raiseHttpConnectionError("Connection timed out")
connection.state = HttpClientConnectionState.Acquired
session.connections.mgetOrPut(ha.id, default).add(connection)
inc(session.connectionsCount)
return connection
proc releaseConnection(session: HttpSessionRef, proc releaseConnection(session: HttpSessionRef,
connection: HttpClientConnectionRef) {.async.} = connection: HttpClientConnectionRef) {.async.} =
## Return connection back to the ``session``. ## Return connection back to the ``session``.
@ -676,7 +719,9 @@ proc releaseConnection(session: HttpSessionRef,
await session.removeConnection(connection) await session.removeConnection(connection)
else: else:
connection.state = HttpClientConnectionState.Ready connection.state = HttpClientConnectionState.Ready
connection.flags = {} connection.flags.excl({HttpClientConnectionFlag.Request,
HttpClientConnectionFlag.Response,
HttpClientConnectionFlag.NoBody})
proc releaseConnection(request: HttpClientRequestRef) {.async.} = proc releaseConnection(request: HttpClientRequestRef) {.async.} =
let let
@ -707,11 +752,55 @@ proc closeWait*(session: HttpSessionRef) {.async.} =
## ##
## This closes all the connections opened to remote servers. ## This closes all the connections opened to remote servers.
var pending: seq[Future[void]] var pending: seq[Future[void]]
for items in session.connections.values(): # Closing sessionWatcher to avoid race condition.
for item in items: await cancelAndWait(session.watcherFut)
pending.add(closeWait(item)) for connections in session.connections.values():
for conn in connections:
pending.add(closeWait(conn))
await allFutures(pending) await allFutures(pending)
proc sessionWatcher(session: HttpSessionRef) {.async.} =
while true:
let firstBreak =
try:
await sleepAsync(session.idlePeriod)
false
except CancelledError:
true
if firstBreak:
break
var idleConnections: seq[HttpClientConnectionRef]
let timestamp = Moment.now()
for _, connections in session.connections.mpairs():
connections.keepItIf(
if isNil(it):
false
else:
if it.isReady() and it.isIdle(timestamp, session.idleTimeout):
idleConnections.add(it)
false
else:
true
)
if len(idleConnections) > 0:
dec(session.connectionsCount, len(idleConnections))
var pending: seq[Future[void]]
let secondBreak =
try:
pending = idleConnections.mapIt(it.closeWait())
await allFutures(pending)
false
except CancelledError:
# We still want to close connections to avoid socket leaks.
await allFutures(pending)
true
if secondBreak:
break
proc closeWait*(request: HttpClientRequestRef) {.async.} = proc closeWait*(request: HttpClientRequestRef) {.async.} =
if request.state notin {HttpReqRespState.Closing, HttpReqRespState.Closed}: if request.state notin {HttpReqRespState.Closing, HttpReqRespState.Closed}:
request.state = HttpReqRespState.Closing request.state = HttpReqRespState.Closing
@ -791,14 +880,26 @@ proc prepareResponse(request: HttpClientRequestRef, data: openArray[byte]
let connectionFlag = let connectionFlag =
block: block:
case resp.version case resp.version
of HttpVersion11, HttpVersion20: of HttpVersion11:
# Keeping a connection open is the default on HTTP/1.1 requests.
# https://www.rfc-editor.org/rfc/rfc2068.html#section-19.7.1
let header = toLowerAscii(headers.getString(ConnectionHeader)) let header = toLowerAscii(headers.getString(ConnectionHeader))
if header == "keep-alive": if header == "close":
true
else:
false false
else: else:
true
of HttpVersion10:
# This is the default on HTTP/1.0 requests.
false false
else:
# HTTP/2 does not use the Connection header field (Section 7.6.1 of
# [HTTP]) to indicate connection-specific header fields.
# https://httpwg.org/specs/rfc9113.html#rfc.section.8.2.2
#
# HTTP/3 does not use the Connection header field to indicate
# connection-specific fields;
# https://httpwg.org/specs/rfc9114.html#rfc.section.4.2
true
let contentType = let contentType =
block: block:
@ -836,22 +937,25 @@ proc prepareResponse(request: HttpClientRequestRef, data: openArray[byte]
proc getResponse(req: HttpClientRequestRef): Future[HttpClientResponseRef] {. proc getResponse(req: HttpClientRequestRef): Future[HttpClientResponseRef] {.
async.} = async.} =
var buffer: array[HttpMaxHeadersSize, byte] var buffer: array[HttpMaxHeadersSize, byte]
let bytesRead = let timestamp = Moment.now()
try: req.connection.setTimestamp(timestamp)
await req.connection.reader.readUntil(addr buffer[0], let
len(buffer), HeadersMark).wait( bytesRead =
req.session.headersTimeout) try:
except CancelledError as exc: await req.connection.reader.readUntil(addr buffer[0],
raise exc len(buffer), HeadersMark).wait(
except AsyncTimeoutError: req.session.headersTimeout)
raiseHttpReadError("Reading response headers timed out") except AsyncTimeoutError:
except AsyncStreamError: raiseHttpReadError("Reading response headers timed out")
raiseHttpReadError("Could not read response headers") except AsyncStreamError:
raiseHttpReadError("Could not read response headers")
let response = prepareResponse(req, buffer.toOpenArray(0, bytesRead - 1)) let response = prepareResponse(req, buffer.toOpenArray(0, bytesRead - 1))
if response.isErr(): if response.isErr():
raiseHttpProtocolError(response.error()) raiseHttpProtocolError(response.error())
return response.get() let res = response.get()
res.setTimestamp(timestamp)
return res
proc new*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef, proc new*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef,
ha: HttpAddress, meth: HttpMethod = MethodGet, ha: HttpAddress, meth: HttpMethod = MethodGet,
@ -1029,6 +1133,7 @@ proc send*(request: HttpClientRequestRef): Future[HttpClientResponseRef] {.
let headers = request.prepareRequest() let headers = request.prepareRequest()
request.connection.state = HttpClientConnectionState.RequestHeadersSending request.connection.state = HttpClientConnectionState.RequestHeadersSending
request.state = HttpReqRespState.Open request.state = HttpReqRespState.Open
request.setTimestamp()
await request.connection.writer.write(headers) await request.connection.writer.write(headers)
request.connection.state = HttpClientConnectionState.RequestHeadersSent request.connection.state = HttpClientConnectionState.RequestHeadersSent
request.connection.state = HttpClientConnectionState.RequestBodySending request.connection.state = HttpClientConnectionState.RequestBodySending
@ -1036,10 +1141,13 @@ proc send*(request: HttpClientRequestRef): Future[HttpClientResponseRef] {.
await request.connection.writer.write(request.buffer) await request.connection.writer.write(request.buffer)
request.connection.state = HttpClientConnectionState.RequestBodySent request.connection.state = HttpClientConnectionState.RequestBodySent
request.state = HttpReqRespState.Finished request.state = HttpReqRespState.Finished
request.setDuration()
except CancelledError as exc: except CancelledError as exc:
request.setDuration()
request.setError(newHttpInterruptError()) request.setError(newHttpInterruptError())
raise exc raise exc
except AsyncStreamError: except AsyncStreamError:
request.setDuration()
let error = newHttpWriteError("Could not send request headers") let error = newHttpWriteError("Could not send request headers")
request.setError(error) request.setError(error)
raise error raise error
@ -1079,13 +1187,16 @@ proc open*(request: HttpClientRequestRef): Future[HttpBodyWriter] {.
try: try:
let headers = request.prepareRequest() let headers = request.prepareRequest()
request.connection.state = HttpClientConnectionState.RequestHeadersSending request.connection.state = HttpClientConnectionState.RequestHeadersSending
request.setTimestamp()
await request.connection.writer.write(headers) await request.connection.writer.write(headers)
request.connection.state = HttpClientConnectionState.RequestHeadersSent request.connection.state = HttpClientConnectionState.RequestHeadersSent
except CancelledError as exc: except CancelledError as exc:
request.setDuration()
request.setError(newHttpInterruptError()) request.setError(newHttpInterruptError())
raise exc raise exc
except AsyncStreamError: except AsyncStreamError:
let error = newHttpWriteError("Could not send request headers") let error = newHttpWriteError("Could not send request headers")
request.setDuration()
request.setError(error) request.setError(error)
raise error raise error
@ -1123,6 +1234,7 @@ proc finish*(request: HttpClientRequestRef): Future[HttpClientResponseRef] {.
"Body writer instance must be closed before finish(request) call") "Body writer instance must be closed before finish(request) call")
request.state = HttpReqRespState.Finished request.state = HttpReqRespState.Finished
request.connection.state = HttpClientConnectionState.RequestBodySent request.connection.state = HttpClientConnectionState.RequestBodySent
request.setDuration()
let response = let response =
try: try:
await request.getResponse() await request.getResponse()
@ -1190,6 +1302,7 @@ proc finish*(response: HttpClientResponseRef) {.async.} =
"Body reader instance must be closed before finish(response) call") "Body reader instance must be closed before finish(response) call")
response.connection.state = HttpClientConnectionState.ResponseBodyReceived response.connection.state = HttpClientConnectionState.ResponseBodyReceived
response.state = HttpReqRespState.Finished response.state = HttpReqRespState.Finished
response.setDuration()
proc getBodyBytes*(response: HttpClientResponseRef): Future[seq[byte]] {. proc getBodyBytes*(response: HttpClientResponseRef): Future[seq[byte]] {.
async.} = async.} =

View File

@ -863,6 +863,65 @@ suite "HTTP client testing suite":
return true return true
proc testIdleConnection(address: TransportAddress): Future[bool] {.
async.} =
let
ha = getAddress(address, HttpClientScheme.NonSecure, "/test")
proc test(
session: HttpSessionRef,
a: HttpAddress
): Future[TestResponseTuple] {.async.} =
var
data: HttpResponseTuple
request = HttpClientRequestRef.new(session, a, version = HttpVersion11)
try:
data = await request.fetch()
finally:
await request.closeWait()
return (data.status, data.data.bytesToString(), 0)
proc process(r: RequestFence): Future[HttpResponseRef] {.async.} =
if r.isOk():
let request = r.get()
case request.uri.path
of "/test":
return await request.respond(Http200, "ok")
else:
return await request.respond(Http404, "Page not found")
else:
return dumbResponse()
var server = createServer(address, process, false)
server.start()
let session = HttpSessionRef.new(idleTimeout = 1.seconds,
idlePeriod = 200.milliseconds)
try:
var f1 = test(session, ha)
var f2 = test(session, ha)
await allFutures(f1, f2)
check:
f1.finished()
f1.done()
f2.finished()
f2.done()
f1.read() == (200, "ok", 0)
f2.read() == (200, "ok", 0)
session.connectionsCount == 2
await sleepAsync(1500.milliseconds)
let resp = await test(session, ha)
check:
resp == (200, "ok", 0)
session.connectionsCount == 1
finally:
await session.closeWait()
await server.stop()
await server.closeWait()
return true
test "HTTP all request methods test": test "HTTP all request methods test":
let address = initTAddress("127.0.0.1:30080") let address = initTAddress("127.0.0.1:30080")
check waitFor(testMethods(address, false)) == 18 check waitFor(testMethods(address, false)) == 18
@ -934,6 +993,10 @@ suite "HTTP client testing suite":
let address = initTAddress("127.0.0.1:30080") let address = initTAddress("127.0.0.1:30080")
check waitFor(testConnectionManagement(address)) == true check waitFor(testConnectionManagement(address)) == true
test "HTTP client idle connection test":
let address = initTAddress("127.0.0.1:30080")
check waitFor(testIdleConnection(address)) == true
test "Leaks test": test "Leaks test":
proc getTrackerLeaks(tracker: string): bool = proc getTrackerLeaks(tracker: string): bool =
let tracker = getTracker(tracker) let tracker = getTracker(tracker)