Add flag to completely disable HTTP/1.1 pipelining support. (#387)
* Add flag for HTTP/1.1 pipeline disable. * Switch from NoHttp11Pipeline to Http11Pipeline.
This commit is contained in:
parent
8563c93673
commit
8aa8ee8822
|
@ -183,7 +183,8 @@ type
|
|||
NoInet4Resolution, ## Do not resolve server hostname to IPv4 addresses
|
||||
NoInet6Resolution, ## Do not resolve server hostname to IPv6 addresses
|
||||
NoAutomaticRedirect, ## Do not handle HTTP redirection automatically
|
||||
NewConnectionAlways ## Always create new connection to HTTP server
|
||||
NewConnectionAlways, ## Always create new connection to HTTP server
|
||||
Http11Pipeline ## Enable HTTP/1.1 pipelining
|
||||
|
||||
HttpClientFlags* = set[HttpClientFlag]
|
||||
|
||||
|
@ -365,7 +366,11 @@ proc new*(t: typedesc[HttpSessionRef],
|
|||
idlePeriod: idlePeriod,
|
||||
connections: initTable[string, seq[HttpClientConnectionRef]](),
|
||||
)
|
||||
res.watcherFut = sessionWatcher(res)
|
||||
res.watcherFut =
|
||||
if HttpClientFlag.Http11Pipeline in flags:
|
||||
sessionWatcher(res)
|
||||
else:
|
||||
newFuture[void]("session.watcher.placeholder")
|
||||
res
|
||||
|
||||
proc getTLSFlags(flags: HttpClientFlags): set[TLSFlags] {.raises: [Defect] .} =
|
||||
|
@ -658,6 +663,12 @@ proc removeConnection(session: HttpSessionRef,
|
|||
dec(session.connectionsCount)
|
||||
await conn.closeWait()
|
||||
|
||||
func connectionPoolEnabled(session: HttpSessionRef,
|
||||
flags: set[HttpClientRequestFlag]): bool =
|
||||
(HttpClientFlag.NewConnectionAlways notin session.flags) and
|
||||
(HttpClientRequestFlag.DedicatedConnection notin flags) and
|
||||
(HttpClientFlag.Http11Pipeline in session.flags)
|
||||
|
||||
proc acquireConnection(
|
||||
session: HttpSessionRef,
|
||||
ha: HttpAddress,
|
||||
|
@ -665,8 +676,7 @@ proc acquireConnection(
|
|||
): Future[HttpClientConnectionRef] {.async.} =
|
||||
## Obtain connection from ``session`` or establish a new one.
|
||||
var default: seq[HttpClientConnectionRef]
|
||||
if (HttpClientFlag.NewConnectionAlways notin session.flags) and
|
||||
(HttpClientRequestFlag.DedicatedConnection notin flags):
|
||||
if session.connectionPoolEnabled(flags):
|
||||
# Trying to reuse existing connection from our connection's pool.
|
||||
let timestamp = Moment.now()
|
||||
# We looking for non-idle connection at `Ready` state, all idle connections
|
||||
|
@ -691,29 +701,32 @@ proc releaseConnection(session: HttpSessionRef,
|
|||
connection: HttpClientConnectionRef) {.async.} =
|
||||
## Return connection back to the ``session``.
|
||||
let removeConnection =
|
||||
case connection.state
|
||||
of HttpClientConnectionState.ResponseBodyReceived:
|
||||
if HttpClientConnectionFlag.KeepAlive in connection.flags:
|
||||
# HTTP response body has been received and "Connection: keep-alive" is
|
||||
# present in response headers.
|
||||
false
|
||||
else:
|
||||
# HTTP response body has been received, but "Connection: keep-alive" is
|
||||
# not present or not supported.
|
||||
true
|
||||
of HttpClientConnectionState.ResponseHeadersReceived:
|
||||
if (HttpClientConnectionFlag.NoBody in connection.flags) and
|
||||
(HttpClientConnectionFlag.KeepAlive in connection.flags):
|
||||
# HTTP response headers received with an empty response body and
|
||||
# "Connection: keep-alive" is present in response headers.
|
||||
false
|
||||
else:
|
||||
# HTTP response body is not received or "Connection: keep-alive" is not
|
||||
# present or not supported.
|
||||
true
|
||||
else:
|
||||
# Connection not in proper state.
|
||||
if HttpClientFlag.Http11Pipeline notin session.flags:
|
||||
true
|
||||
else:
|
||||
case connection.state
|
||||
of HttpClientConnectionState.ResponseBodyReceived:
|
||||
if HttpClientConnectionFlag.KeepAlive in connection.flags:
|
||||
# HTTP response body has been received and "Connection: keep-alive" is
|
||||
# present in response headers.
|
||||
false
|
||||
else:
|
||||
# HTTP response body has been received, but "Connection: keep-alive"
|
||||
# is not present or not supported.
|
||||
true
|
||||
of HttpClientConnectionState.ResponseHeadersReceived:
|
||||
if (HttpClientConnectionFlag.NoBody in connection.flags) and
|
||||
(HttpClientConnectionFlag.KeepAlive in connection.flags):
|
||||
# HTTP response headers received with an empty response body and
|
||||
# "Connection: keep-alive" is present in response headers.
|
||||
false
|
||||
else:
|
||||
# HTTP response body is not received or "Connection: keep-alive" is
|
||||
# not present or not supported.
|
||||
true
|
||||
else:
|
||||
# Connection not in proper state.
|
||||
true
|
||||
|
||||
if removeConnection:
|
||||
await session.removeConnection(connection)
|
||||
|
@ -753,7 +766,8 @@ proc closeWait*(session: HttpSessionRef) {.async.} =
|
|||
## This closes all the connections opened to remote servers.
|
||||
var pending: seq[Future[void]]
|
||||
# Closing sessionWatcher to avoid race condition.
|
||||
await cancelAndWait(session.watcherFut)
|
||||
if not(isNil(session.watcherFut)):
|
||||
await cancelAndWait(session.watcherFut)
|
||||
for connections in session.connections.values():
|
||||
for conn in connections:
|
||||
pending.add(closeWait(conn))
|
||||
|
@ -924,11 +938,15 @@ proc prepareResponse(request: HttpClientRequestRef, data: openArray[byte]
|
|||
res.connection.state = HttpClientConnectionState.ResponseHeadersReceived
|
||||
if nobodyFlag:
|
||||
res.connection.flags.incl(HttpClientConnectionFlag.NoBody)
|
||||
let newConnectionAlways =
|
||||
HttpClientFlag.NewConnectionAlways in request.session.flags
|
||||
let closeConnection =
|
||||
HttpClientRequestFlag.CloseConnection in request.flags
|
||||
if connectionFlag and not(newConnectionAlways) and not(closeConnection):
|
||||
let
|
||||
newConnectionAlways =
|
||||
HttpClientFlag.NewConnectionAlways in request.session.flags
|
||||
httpPipeline =
|
||||
HttpClientFlag.Http11Pipeline in request.session.flags
|
||||
closeConnection =
|
||||
HttpClientRequestFlag.CloseConnection in request.flags
|
||||
if connectionFlag and not(newConnectionAlways) and not(closeConnection) and
|
||||
httpPipeline:
|
||||
res.connection.flags.incl(HttpClientConnectionFlag.KeepAlive)
|
||||
res.connection.flags.incl(HttpClientConnectionFlag.Response)
|
||||
trackHttpClientResponse(res)
|
||||
|
@ -1049,7 +1067,8 @@ proc prepareRequest(request: HttpClientRequestRef): string {.
|
|||
discard request.headers.hasKeyOrPut(HostHeader, request.address.hostname)
|
||||
# We set `Connection` to value according to flags if its not set.
|
||||
if ConnectionHeader notin request.headers:
|
||||
if HttpClientRequestFlag.CloseConnection in request.flags:
|
||||
if (HttpClientFlag.Http11Pipeline notin request.session.flags) or
|
||||
(HttpClientRequestFlag.CloseConnection in request.flags):
|
||||
request.headers.add(ConnectionHeader, "close")
|
||||
else:
|
||||
request.headers.add(ConnectionHeader, "keep-alive")
|
||||
|
|
|
@ -831,21 +831,30 @@ suite "HTTP client testing suite":
|
|||
d8 == @[(200, "ok", 0), (200, "ok", 0)]
|
||||
|
||||
let
|
||||
n1 = await test1(keepHa, HttpVersion11, {}, {})
|
||||
n2 = await test2(keepHa, keepHa, HttpVersion11, {}, {})
|
||||
n3 = await test1(dropHa, HttpVersion11, {}, {})
|
||||
n4 = await test2(dropHa, dropHa, HttpVersion11, {}, {})
|
||||
n1 = await test1(keepHa, HttpVersion11,
|
||||
{HttpClientFlag.Http11Pipeline}, {})
|
||||
n2 = await test2(keepHa, keepHa, HttpVersion11,
|
||||
{HttpClientFlag.Http11Pipeline}, {})
|
||||
n3 = await test1(dropHa, HttpVersion11,
|
||||
{HttpClientFlag.Http11Pipeline}, {})
|
||||
n4 = await test2(dropHa, dropHa, HttpVersion11,
|
||||
{HttpClientFlag.Http11Pipeline}, {})
|
||||
n5 = await test1(keepHa, HttpVersion11,
|
||||
{HttpClientFlag.NewConnectionAlways}, {})
|
||||
n6 = await test1(keepHa, HttpVersion11, {},
|
||||
{HttpClientFlag.NewConnectionAlways,
|
||||
HttpClientFlag.Http11Pipeline}, {})
|
||||
n6 = await test1(keepHa, HttpVersion11,
|
||||
{HttpClientFlag.Http11Pipeline},
|
||||
{HttpClientRequestFlag.DedicatedConnection})
|
||||
n7 = await test1(keepHa, HttpVersion11, {},
|
||||
n7 = await test1(keepHa, HttpVersion11,
|
||||
{HttpClientFlag.Http11Pipeline},
|
||||
{HttpClientRequestFlag.DedicatedConnection,
|
||||
HttpClientRequestFlag.CloseConnection})
|
||||
n8 = await test1(keepHa, HttpVersion11, {},
|
||||
n8 = await test1(keepHa, HttpVersion11,
|
||||
{HttpClientFlag.Http11Pipeline},
|
||||
{HttpClientRequestFlag.CloseConnection})
|
||||
n9 = await test1(keepHa, HttpVersion11,
|
||||
{HttpClientFlag.NewConnectionAlways},
|
||||
{HttpClientFlag.NewConnectionAlways,
|
||||
HttpClientFlag.Http11Pipeline},
|
||||
{HttpClientRequestFlag.CloseConnection})
|
||||
check:
|
||||
n1 == (200, "ok", 1)
|
||||
|
@ -895,7 +904,8 @@ suite "HTTP client testing suite":
|
|||
|
||||
var server = createServer(address, process, false)
|
||||
server.start()
|
||||
let session = HttpSessionRef.new(idleTimeout = 1.seconds,
|
||||
let session = HttpSessionRef.new({HttpClientFlag.Http11Pipeline},
|
||||
idleTimeout = 1.seconds,
|
||||
idlePeriod = 200.milliseconds)
|
||||
try:
|
||||
var f1 = test(session, ha)
|
||||
|
@ -922,6 +932,75 @@ suite "HTTP client testing suite":
|
|||
|
||||
return true
|
||||
|
||||
proc testNoPipeline(address: TransportAddress): Future[bool] {.
|
||||
async.} =
|
||||
let
|
||||
ha = getAddress(address, HttpClientScheme.NonSecure, "/test")
|
||||
hb = getAddress(address, HttpClientScheme.NonSecure, "/keep-test")
|
||||
|
||||
proc test(
|
||||
session: HttpSessionRef,
|
||||
a: HttpAddress
|
||||
): Future[TestResponseTuple] {.async.} =
|
||||
|
||||
var
|
||||
data: HttpResponseTuple
|
||||
request = HttpClientRequestRef.new(session, a, version = HttpVersion11)
|
||||
try:
|
||||
data = await request.fetch()
|
||||
finally:
|
||||
await request.closeWait()
|
||||
return (data.status, data.data.bytesToString(), 0)
|
||||
|
||||
proc process(r: RequestFence): Future[HttpResponseRef] {.async.} =
|
||||
if r.isOk():
|
||||
let request = r.get()
|
||||
case request.uri.path
|
||||
of "/test":
|
||||
return await request.respond(Http200, "ok")
|
||||
of "/keep-test":
|
||||
let headers = HttpTable.init([("Connection", "keep-alive")])
|
||||
return await request.respond(Http200, "not-alive", headers)
|
||||
else:
|
||||
return await request.respond(Http404, "Page not found")
|
||||
else:
|
||||
return dumbResponse()
|
||||
|
||||
var server = createServer(address, process, false)
|
||||
server.start()
|
||||
let session = HttpSessionRef.new(idleTimeout = 100.seconds,
|
||||
idlePeriod = 10.milliseconds)
|
||||
try:
|
||||
var f1 = test(session, ha)
|
||||
var f2 = test(session, ha)
|
||||
await allFutures(f1, f2)
|
||||
check:
|
||||
f1.finished()
|
||||
f1.done()
|
||||
f2.finished()
|
||||
f2.done()
|
||||
f1.read() == (200, "ok", 0)
|
||||
f2.read() == (200, "ok", 0)
|
||||
session.connectionsCount == 0
|
||||
|
||||
await sleepAsync(100.milliseconds)
|
||||
block:
|
||||
let resp = await test(session, ha)
|
||||
check:
|
||||
resp == (200, "ok", 0)
|
||||
session.connectionsCount == 0
|
||||
block:
|
||||
let resp = await test(session, hb)
|
||||
check:
|
||||
resp == (200, "not-alive", 0)
|
||||
session.connectionsCount == 0
|
||||
finally:
|
||||
await session.closeWait()
|
||||
await server.stop()
|
||||
await server.closeWait()
|
||||
|
||||
return true
|
||||
|
||||
test "HTTP all request methods test":
|
||||
let address = initTAddress("127.0.0.1:30080")
|
||||
check waitFor(testMethods(address, false)) == 18
|
||||
|
@ -997,6 +1076,10 @@ suite "HTTP client testing suite":
|
|||
let address = initTAddress("127.0.0.1:30080")
|
||||
check waitFor(testIdleConnection(address)) == true
|
||||
|
||||
test "HTTP client no-pipeline test":
|
||||
let address = initTAddress("127.0.0.1:30080")
|
||||
check waitFor(testNoPipeline(address)) == true
|
||||
|
||||
test "Leaks test":
|
||||
proc getTrackerLeaks(tracker: string): bool =
|
||||
let tracker = getTracker(tracker)
|
||||
|
|
Loading…
Reference in New Issue