Fix connections management in http client. (#246)

* Proper connection lifetime handling mechanism.
HttpClientResponse do not close connection anymore.

* Add unique HTTP client connection identifier.
This commit is contained in:
Eugene Kabanov 2021-12-01 12:23:39 +02:00 committed by GitHub
parent 37c62af579
commit 1c17d4c094
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 41 additions and 9 deletions

View File

@ -89,6 +89,7 @@ type
data: seq[byte] data: seq[byte]
HttpClientConnection* = object of RootObj HttpClientConnection* = object of RootObj
id*: uint64
case kind*: HttpClientScheme case kind*: HttpClientScheme
of HttpClientScheme.NonSecure: of HttpClientScheme.NonSecure:
discard discard
@ -107,6 +108,7 @@ type
HttpSessionRef* = ref object HttpSessionRef* = ref object
connections*: Table[string, seq[HttpClientConnectionRef]] connections*: Table[string, seq[HttpClientConnectionRef]]
counter*: uint64
maxRedirections*: int maxRedirections*: int
connectTimeout*: Duration connectTimeout*: Duration
headersTimeout*: Duration headersTimeout*: Duration
@ -452,11 +454,16 @@ proc redirect*(session: HttpSessionRef,
addresses: srcaddr.addresses addresses: srcaddr.addresses
)) ))
proc getUniqueConnectionId(session: HttpSessionRef): uint64 =
inc(session.counter)
session.counter
proc new(t: typedesc[HttpClientConnectionRef], session: HttpSessionRef, proc new(t: typedesc[HttpClientConnectionRef], session: HttpSessionRef,
ha: HttpAddress, transp: StreamTransport): HttpClientConnectionRef = ha: HttpAddress, transp: StreamTransport): HttpClientConnectionRef =
case ha.scheme case ha.scheme
of HttpClientScheme.NonSecure: of HttpClientScheme.NonSecure:
let res = HttpClientConnectionRef( let res = HttpClientConnectionRef(
id: session.getUniqueConnectionId(),
kind: HttpClientScheme.NonSecure, kind: HttpClientScheme.NonSecure,
transp: transp, transp: transp,
reader: newAsyncStreamReader(transp), reader: newAsyncStreamReader(transp),
@ -472,6 +479,7 @@ proc new(t: typedesc[HttpClientConnectionRef], session: HttpSessionRef,
let tls = newTLSClientAsyncStream(treader, twriter, ha.hostname, let tls = newTLSClientAsyncStream(treader, twriter, ha.hostname,
flags = session.flags.getTLSFlags()) flags = session.flags.getTLSFlags())
let res = HttpClientConnectionRef( let res = HttpClientConnectionRef(
id: session.getUniqueConnectionId(),
kind: HttpClientScheme.Secure, kind: HttpClientScheme.Secure,
transp: transp, transp: transp,
treader: treader, treader: treader,
@ -639,13 +647,38 @@ proc removeConnection(session: HttpSessionRef,
await conn.closeWait() await conn.closeWait()
proc releaseConnection(session: HttpSessionRef, proc releaseConnection(session: HttpSessionRef,
conn: HttpClientConnectionRef) {.async.} = conn: HttpClientConnectionRef,
forceRemove = false) {.async.} =
## Return connection back to the ``session``. ## Return connection back to the ``session``.
## ##
## If connection not in ``Ready`` state it will be closed and removed from ## If connection not in ``Ready`` state it will be closed and removed from
## the ``session``. ## the ``session``.
if conn.state != HttpClientConnectionState.Ready: if forceRemove:
await session.removeConnection(conn) await session.removeConnection(conn)
else:
if conn.state != HttpClientConnectionState.Ready:
await session.removeConnection(conn)
proc needKeepConnection(request: HttpClientRequestRef): bool =
## Returns ``true`` if the request's corresponding connection should be kept
## alive, and ``false`` if this connection should be dropped.
case request.state
of HttpClientRequestState.ResponseReceived:
case request.version
of HttpVersion11, HttpVersion20:
let connection = toLowerAscii(request.headers.getString(ConnectionHeader))
if connection == "keep-alive":
true
else:
# If `Connection` header is missing or its value not equal to
# `keep-alive`.
false
else:
# Versions prior HTTP 1.1 do not support request pipelining.
false
else:
# Request not in proper state.
false
proc closeWait*(session: HttpSessionRef) {.async.} = proc closeWait*(session: HttpSessionRef) {.async.} =
## Closes HTTP session object. ## Closes HTTP session object.
@ -665,10 +698,12 @@ proc closeWait*(request: HttpClientRequestRef) {.async.} =
if not(request.writer.closed()): if not(request.writer.closed()):
await request.writer.closeWait() await request.writer.closeWait()
request.writer = nil request.writer = nil
if request.state != HttpClientRequestState.ResponseReceived: if not(isNil(request.connection)):
if not(isNil(request.connection)): if request.needKeepConnection():
await request.session.releaseConnection(request.connection) await request.session.releaseConnection(request.connection, false)
request.connection = nil else:
await request.session.releaseConnection(request.connection, true)
request.connection = nil
request.session = nil request.session = nil
request.error = nil request.error = nil
request.setState(HttpClientRequestState.Closed) request.setState(HttpClientRequestState.Closed)
@ -682,9 +717,6 @@ proc closeWait*(response: HttpClientResponseRef) {.async.} =
if not(response.reader.closed()): if not(response.reader.closed()):
await response.reader.closeWait() await response.reader.closeWait()
response.reader = nil response.reader = nil
if not(isNil(response.connection)):
await response.session.releaseConnection(response.connection)
response.connection = nil
response.session = nil response.session = nil
response.error = nil response.error = nil
response.setState(HttpClientResponseState.Closed) response.setState(HttpClientResponseState.Closed)