Attempt to fix connection state. (#248)

* Attempt to fix connection state.

* Add actual values dump on asserts.

* Total rework of connection's states.
This commit is contained in:
Eugene Kabanov 2021-12-03 13:11:39 +02:00 committed by GitHub
parent 1c17d4c094
commit 36e5f6fc89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 199 additions and 184 deletions

View File

@ -52,25 +52,13 @@ type
NonSecure, ## Non-secure connection
Secure ## Secure TLS connection
HttpClientRequestState* {.pure.} = enum
Closed, ## Request has been closed
Closing, ## Connection is closing
Created, ## Request created
Connecting, ## Connecting to remote host
HeadersSending, ## Sending request headers
HeadersSent, ## Request headers has been sent
BodySending, ## Sending request body
BodySent, ## Request body has been sent
ResponseReceived, ## Request's response headers received
Error ## Error happens
HttpClientResponseState* {.pure.} = enum
Closed, ## Response has been closed
Closing, ## Response is closing
HeadersReceived, ## Response headers received
BodyReceiving, ## Response body receiving
BodyReceived, ## Response body received
Error ## Error happens
HttpReqRespState* {.pure.} = enum
Closed, ## Request/response has been closed
Closing, ## Request/response is closing
Ready, ## Request/response is ready
Open, ## Request/response started
Finished, ## Request/response has been sent/received
Error ## Request/response in error state
HttpClientBodyFlag* {.pure.} = enum
Sized, ## `Content-Length` present
@ -80,6 +68,12 @@ type
HttpClientRequestFlag* {.pure.} = enum
CloseConnection, ## Send `Connection: close` in request
HttpClientConnectionFlag* {.pure.} = enum
Request, ## Connection has pending request
Response, ## Connection has pending response
KeepAlive, ## Connection should be kept alive
NoBody ## Connection response do not have body
HttpHeaderTuple* = tuple
key: string
value: string
@ -103,6 +97,7 @@ type
state*: HttpClientConnectionState
error*: ref HttpError
remoteHostname*: string
flags*: set[HttpClientConnectionFlag]
HttpClientConnectionRef* = ref HttpClientConnection
@ -129,9 +124,9 @@ type
addresses*: seq[TransportAddress]
HttpClientRequest* = object
state: HttpReqRespState
meth*: HttpMethod
address*: HttpAddress
state: HttpClientRequestState
version*: HttpVersion
headers*: HttpTable
bodyFlag: HttpClientBodyFlag
@ -146,7 +141,7 @@ type
HttpClientRequestRef* = ref HttpClientRequest
HttpClientResponse* = object
state: HttpClientResponseState
state: HttpReqRespState
requestMethod*: HttpMethod
address*: HttpAddress
status*: int
@ -178,6 +173,12 @@ type
opened*: int64
closed*: int64
# HttpClientRequestRef valid states are:
# Ready -> Open -> (Finished, Error) -> (Closing, Closed)
#
# HttpClientResponseRef valid states are
# Open -> (Finished, Error) -> (Closing, Closed)
proc setupHttpClientConnectionTracker(): HttpClientTracker {.
gcsafe, raises: [Defect].}
proc setupHttpClientRequestTracker(): HttpClientTracker {.
@ -493,41 +494,10 @@ proc new(t: typedesc[HttpClientConnectionRef], session: HttpSessionRef,
trackHttpClientConnection(res)
res
proc setState(request: HttpClientRequestRef, state: HttpClientRequestState) {.
raises: [Defect] .} =
request.state = state
case state
of HttpClientRequestState.HeadersSending:
request.connection.state = HttpClientConnectionState.RequestHeadersSending
of HttpClientRequestState.HeadersSent:
request.connection.state = HttpClientConnectionState.RequestHeadersSent
of HttpClientRequestState.BodySending:
request.connection.state = HttpClientConnectionState.RequestBodySending
of HttpClientRequestState.BodySent:
request.connection.state = HttpClientConnectionState.RequestBodySent
of HttpClientRequestState.ResponseReceived:
request.connection.state = HttpClientConnectionState.ResponseHeadersReceived
else:
discard
proc setState(response: HttpClientResponseRef,
state: HttpClientResponseState) {.raises: [Defect] .} =
response.state = state
case state
of HttpClientResponseState.HeadersReceived:
response.connection.state =
HttpClientConnectionState.ResponseHeadersReceived
of HttpClientResponseState.BodyReceiving:
response.connection.state = HttpClientConnectionState.ResponseBodyReceiving
of HttpClientResponseState.BodyReceived:
response.connection.state = HttpClientConnectionState.ResponseBodyReceived
else:
discard
proc setError(request: HttpClientRequestRef, error: ref HttpError) {.
raises: [Defect] .} =
request.error = error
request.setState(HttpClientRequestState.Error)
request.state = HttpReqRespState.Error
if not(isNil(request.connection)):
request.connection.state = HttpClientConnectionState.Error
request.connection.error = error
@ -535,7 +505,7 @@ proc setError(request: HttpClientRequestRef, error: ref HttpError) {.
proc setError(response: HttpClientResponseRef, error: ref HttpError) {.
raises: [Defect] .} =
response.error = error
response.setState(HttpClientResponseState.Error)
response.state = HttpReqRespState.Error
if not(isNil(response.connection)):
response.connection.state = HttpClientConnectionState.Error
response.connection.error = error
@ -647,38 +617,62 @@ proc removeConnection(session: HttpSessionRef,
await conn.closeWait()
proc releaseConnection(session: HttpSessionRef,
conn: HttpClientConnectionRef,
forceRemove = false) {.async.} =
connection: HttpClientConnectionRef) {.async.} =
## Return connection back to the ``session``.
##
## If connection not in ``Ready`` state it will be closed and removed from
## the ``session``.
if forceRemove:
await session.removeConnection(conn)
else:
if conn.state != HttpClientConnectionState.Ready:
await session.removeConnection(conn)
proc needKeepConnection(request: HttpClientRequestRef): bool =
## Returns ``true`` if the request's corresponding connection should be kept
## alive, and ``false`` if this connection should be dropped.
case request.state
of HttpClientRequestState.ResponseReceived:
case request.version
of HttpVersion11, HttpVersion20:
let connection = toLowerAscii(request.headers.getString(ConnectionHeader))
if connection == "keep-alive":
true
else:
# If `Connection` header is missing or its value not equal to
# `keep-alive`.
let removeConnection =
case connection.state
of HttpClientConnectionState.ResponseBodyReceived:
if HttpClientConnectionFlag.KeepAlive in connection.flags:
# HTTP response body has been received and "Connection: keep-alive" is
# present in response headers.
false
else:
# HTTP response body has been received, but "Connection: keep-alive" is
# not present or not supported.
true
of HttpClientConnectionState.ResponseHeadersReceived:
if (HttpClientConnectionFlag.NoBody in connection.flags) and
(HttpClientConnectionFlag.KeepAlive in connection.flags):
# HTTP response headers received with an empty response body and
# "Connection: keep-alive" is present in response headers.
false
else:
# HTTP response body is not received or "Connection: keep-alive" is not
# present or not supported.
true
else:
# Versions prior HTTP 1.1 do not support request pipelining.
false
# Connection not in proper state.
true
if removeConnection:
await session.removeConnection(connection)
else:
# Request not in proper state.
false
connection.state = HttpClientConnectionState.Ready
connection.flags = {}
proc releaseConnection(request: HttpClientRequestRef) {.async.} =
let
session = request.session
connection = request.connection
if not(isNil(connection)):
request.connection = nil
request.session = nil
connection.flags.excl(HttpClientConnectionFlag.Request)
if HttpClientConnectionFlag.Response notin connection.flags:
await session.releaseConnection(connection)
proc releaseConnection(response: HttpClientResponseRef) {.async.} =
let
session = response.session
connection = response.connection
if not(isNil(connection)):
response.connection = nil
response.session = nil
connection.flags.excl(HttpClientConnectionFlag.Response)
if HttpClientConnectionFlag.Request notin connection.flags:
await session.releaseConnection(connection)
proc closeWait*(session: HttpSessionRef) {.async.} =
## Closes HTTP session object.
@ -691,40 +685,33 @@ proc closeWait*(session: HttpSessionRef) {.async.} =
await allFutures(pending)
proc closeWait*(request: HttpClientRequestRef) {.async.} =
if request.state notin {HttpClientRequestState.Closing,
HttpClientRequestState.Closed}:
request.setState(HttpClientRequestState.Closing)
if request.state notin {HttpReqRespState.Closing, HttpReqRespState.Closed}:
request.state = HttpReqRespState.Closing
if not(isNil(request.writer)):
if not(request.writer.closed()):
await request.writer.closeWait()
request.writer = nil
if not(isNil(request.connection)):
if request.needKeepConnection():
await request.session.releaseConnection(request.connection, false)
else:
await request.session.releaseConnection(request.connection, true)
request.connection = nil
await request.releaseConnection()
request.session = nil
request.error = nil
request.setState(HttpClientRequestState.Closed)
request.state = HttpReqRespState.Closed
untrackHttpClientRequest(request)
proc closeWait*(response: HttpClientResponseRef) {.async.} =
if response.state notin {HttpClientResponseState.Closing,
HttpClientResponseState.Closed}:
response.setState(HttpClientResponseState.Closing)
if response.state notin {HttpReqRespState.Closing, HttpReqRespState.Closed}:
response.state = HttpReqRespState.Closing
if not(isNil(response.reader)):
if not(response.reader.closed()):
await response.reader.closeWait()
response.reader = nil
await response.releaseConnection()
response.session = nil
response.error = nil
response.setState(HttpClientResponseState.Closed)
response.state = HttpReqRespState.Closed
untrackHttpClientResponse(response)
proc prepareResponse(request: HttpClientRequestRef,
data: openarray[byte]): HttpResult[HttpClientResponseRef] {.
raises: [Defect] .} =
proc prepareResponse(request: HttpClientRequestRef, data: openarray[byte]
): HttpResult[HttpClientResponseRef] {.raises: [Defect] .} =
## Process response headers.
let resp = parseResponse(data, false)
if resp.failed():
@ -762,25 +749,43 @@ proc prepareResponse(request: HttpClientRequestRef,
res.get()
# Preprocessing "Content-Length" header.
let (contentLength, bodyFlag) =
let (contentLength, bodyFlag, nobodyFlag) =
if ContentLengthHeader in headers:
let length = headers.getInt(ContentLengthHeader)
(length, HttpClientBodyFlag.Sized)
(length, HttpClientBodyFlag.Sized, length == 0)
else:
if TransferEncodingFlags.Chunked in transferEncoding:
(0'u64, HttpClientBodyFlag.Chunked)
(0'u64, HttpClientBodyFlag.Chunked, false)
else:
(0'u64, HttpClientBodyFlag.Custom)
(0'u64, HttpClientBodyFlag.Custom, false)
# Preprocessing "Connection" header.
let connectionFlag =
block:
case resp.version
of HttpVersion11, HttpVersion20:
let header = toLowerAscii(headers.getString(ConnectionHeader))
if header == "keep-alive":
true
else:
false
else:
false
let res = HttpClientResponseRef(
state: HttpClientResponseState.HeadersReceived, status: resp.code,
state: HttpReqRespState.Open, status: resp.code,
address: request.address, requestMethod: request.meth,
reason: resp.reason(data), version: resp.version, session: request.session,
connection: request.connection, headers: headers,
contentEncoding: contentEncoding, transferEncoding: transferEncoding,
contentLength: contentLength, bodyFlag: bodyFlag
)
request.setState(HttpClientRequestState.ResponseReceived)
res.connection.state = HttpClientConnectionState.ResponseHeadersReceived
if nobodyFlag:
res.connection.flags.incl(HttpClientConnectionFlag.NoBody)
if connectionFlag:
res.connection.flags.incl(HttpClientConnectionFlag.KeepAlive)
res.connection.flags.incl(HttpClientConnectionFlag.Response)
trackHttpClientResponse(res)
ok(res)
@ -798,10 +803,11 @@ proc getResponse(req: HttpClientRequestRef): Future[HttpClientResponseRef] {.
raiseHttpReadError("Reading response headers timed out")
except AsyncStreamError:
raiseHttpReadError("Could not read response headers")
let resp = prepareResponse(req, buffer.toOpenArray(0, bytesRead - 1))
if resp.isErr():
raiseHttpProtocolError(resp.error())
return resp.get()
let response = prepareResponse(req, buffer.toOpenArray(0, bytesRead - 1))
if response.isErr():
raiseHttpProtocolError(response.error())
return response.get()
proc new*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef,
ha: HttpAddress, meth: HttpMethod = MethodGet,
@ -811,7 +817,7 @@ proc new*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef,
body: openarray[byte] = []): HttpClientRequestRef {.
raises: [Defect].} =
let res = HttpClientRequestRef(
state: HttpClientRequestState.Created, session: session, meth: meth,
state: HttpReqRespState.Ready, session: session, meth: meth,
version: version, flags: flags, headers: HttpTable.init(headers),
address: ha, bodyFlag: HttpClientBodyFlag.Custom, buffer: @body
)
@ -827,7 +833,7 @@ proc new*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef,
raises: [Defect].} =
let address = ? session.getAddress(parseUri(url))
let res = HttpClientRequestRef(
state: HttpClientRequestState.Created, session: session, meth: meth,
state: HttpReqRespState.Ready, session: session, meth: meth,
version: version, flags: flags, headers: HttpTable.init(headers),
address: address, bodyFlag: HttpClientBodyFlag.Custom, buffer: @body
)
@ -960,9 +966,9 @@ proc prepareRequest(request: HttpClientRequestRef): string {.
proc send*(request: HttpClientRequestRef): Future[HttpClientResponseRef] {.
async.} =
doAssert(request.state == HttpClientRequestState.Created)
request.setState(HttpClientRequestState.Connecting)
request.connection =
doAssert(request.state == HttpReqRespState.Ready,
"Request's state is " & $request.state)
let connection =
try:
await request.session.acquireConnection(request.address)
except CancelledError as exc:
@ -972,16 +978,19 @@ proc send*(request: HttpClientRequestRef): Future[HttpClientResponseRef] {.
request.setError(exc)
raise exc
let headers = request.prepareRequest()
request.connection = connection
try:
request.setState(HttpClientRequestState.HeadersSending)
let headers = request.prepareRequest()
request.connection.state = HttpClientConnectionState.RequestHeadersSending
request.state = HttpReqRespState.Open
await request.connection.writer.write(headers)
request.setState(HttpClientRequestState.HeadersSent)
request.setState(HttpClientRequestState.BodySending)
request.connection.state = HttpClientConnectionState.RequestHeadersSent
request.connection.state = HttpClientConnectionState.RequestBodySending
if len(request.buffer) > 0:
await request.connection.writer.write(request.buffer)
request.setState(HttpClientRequestState.BodySent)
request.connection.state = HttpClientConnectionState.RequestBodySent
request.state = HttpReqRespState.Finished
except CancelledError as exc:
request.setError(newHttpInterruptError())
raise exc
@ -1005,11 +1014,11 @@ proc open*(request: HttpClientRequestRef): Future[HttpBodyWriter] {.
async.} =
## Start sending request's headers and return `HttpBodyWriter`, which can be
## used to send request's body.
doAssert(request.state == HttpClientRequestState.Created)
doAssert(request.state == HttpReqRespState.Ready,
"Request's state is " & $request.state)
doAssert(len(request.buffer) == 0,
"Request should not have static body content (len(buffer) == 0)")
request.setState(HttpClientRequestState.Connecting)
request.connection =
let connection =
try:
await request.session.acquireConnection(request.address)
except CancelledError as exc:
@ -1019,12 +1028,13 @@ proc open*(request: HttpClientRequestRef): Future[HttpBodyWriter] {.
request.setError(exc)
raise exc
let headers = request.prepareRequest()
request.connection = connection
try:
request.setState(HttpClientRequestState.HeadersSending)
let headers = request.prepareRequest()
request.connection.state = HttpClientConnectionState.RequestHeadersSending
await request.connection.writer.write(headers)
request.setState(HttpClientRequestState.HeadersSent)
request.connection.state = HttpClientConnectionState.RequestHeadersSent
except CancelledError as exc:
request.setError(newHttpInterruptError())
raise exc
@ -1048,18 +1058,25 @@ proc open*(request: HttpClientRequestRef): Future[HttpBodyWriter] {.
newHttpBodyWriter([writer])
request.writer = writer
request.setState(HttpClientRequestState.BodySending)
request.state = HttpReqRespState.Open
request.connection.state = HttpClientConnectionState.RequestBodySending
return writer
proc finish*(request: HttpClientRequestRef): Future[HttpClientResponseRef] {.
async.} =
## Finish sending request and receive response.
doAssert(request.state == HttpClientRequestState.BodySending)
doAssert(not(isNil(request.connection)),
"Request missing connection instance")
doAssert(request.state == HttpReqRespState.Open,
"Request's state is " & $request.state)
doAssert(request.connection.state ==
HttpClientConnectionState.RequestBodySending)
doAssert(request.writer.closed())
request.setState(HttpClientRequestState.BodySent)
let resp =
HttpClientConnectionState.RequestBodySending,
"Connection's state is " & $request.connection.state)
doAssert(request.writer.closed(),
"Body writer instance must be closed before finish(request) call")
request.state = HttpReqRespState.Finished
request.connection.state = HttpClientConnectionState.RequestBodySent
let response =
try:
await request.getResponse()
except CancelledError as exc:
@ -1068,7 +1085,7 @@ proc finish*(request: HttpClientRequestRef): Future[HttpClientResponseRef] {.
except HttpError as exc:
request.setError(exc)
raise exc
return resp
return response
proc getNewLocation*(resp: HttpClientResponseRef): HttpResult[HttpAddress] =
## Returns new address according to response's `Location` header value.
@ -1081,47 +1098,55 @@ proc getNewLocation*(resp: HttpClientResponseRef): HttpResult[HttpAddress] =
else:
err("Location header is missing")
proc getBodyReader*(resp: HttpClientResponseRef): HttpBodyReader =
proc getBodyReader*(response: HttpClientResponseRef): HttpBodyReader =
## Returns stream's reader instance which can be used to read response's body.
##
## Streams which was obtained using this procedure must be closed to avoid
## leaks.
doAssert(resp.state in {
HttpClientResponseState.HeadersReceived,
HttpClientResponseState.BodyReceiving})
doAssert(resp.connection.state in {
HttpClientConnectionState.ResponseHeadersReceived,
HttpClientConnectionState.ResponseBodyReceiving})
if isNil(resp.reader):
doAssert(not(isNil(response.connection)),
"Response missing connection instance")
doAssert(response.state == HttpReqRespState.Open,
"Response's state is " & $response.state)
doAssert(response.connection.state in
{HttpClientConnectionState.ResponseHeadersReceived,
HttpClientConnectionState.ResponseBodyReceiving},
"Connection state is " & $response.connection.state)
if isNil(response.reader):
let reader =
case resp.bodyFlag
case response.bodyFlag
of HttpClientBodyFlag.Sized:
let bstream = newBoundedStreamReader(resp.connection.reader,
resp.contentLength)
let bstream = newBoundedStreamReader(response.connection.reader,
response.contentLength)
newHttpBodyReader(bstream)
of HttpClientBodyFlag.Chunked:
newHttpBodyReader(newChunkedStreamReader(resp.connection.reader))
newHttpBodyReader(newChunkedStreamReader(response.connection.reader))
of HttpClientBodyFlag.Custom:
newHttpBodyReader(newAsyncStreamReader(resp.connection.reader))
resp.setState(HttpClientResponseState.BodyReceiving)
resp.reader = reader
resp.reader
newHttpBodyReader(newAsyncStreamReader(response.connection.reader))
response.connection.state = HttpClientConnectionState.ResponseBodyReceiving
response.reader = reader
response.reader
proc finish*(resp: HttpClientResponseRef) {.async.} =
proc finish*(response: HttpClientResponseRef) {.async.} =
## Finish receiving response.
doAssert(resp.state == HttpClientResponseState.BodyReceiving)
doAssert(resp.connection.state ==
HttpClientConnectionState.ResponseBodyReceiving)
doAssert(resp.reader.closed())
resp.setState(HttpClientResponseState.BodyReceived)
resp.connection.state = HttpClientConnectionState.Ready
##
## Because ``finish()`` returns nothing, this operation become NOP for
## response which is not in ``Open`` state.
if response.state == HttpReqRespState.Open:
doAssert(not(isNil(response.connection)),
"Response missing connection instance")
doAssert(response.connection.state ==
HttpClientConnectionState.ResponseBodyReceiving,
"Connection state is " & $response.connection.state)
doAssert(response.reader.closed(),
"Body reader instance must be closed before finish(response) call")
response.connection.state = HttpClientConnectionState.ResponseBodyReceived
response.state = HttpReqRespState.Finished
proc getBodyBytes*(response: HttpClientResponseRef): Future[seq[byte]] {.
async.} =
## Read all bytes from response ``response``.
doAssert(response.state == HttpClientResponseState.HeadersReceived)
doAssert(response.connection.state ==
HttpClientConnectionState.ResponseHeadersReceived)
##
## Note: This procedure performs automatic finishing for ``response``.
var reader = response.getBodyReader()
try:
let data = await reader.read()
@ -1145,9 +1170,8 @@ proc getBodyBytes*(response: HttpClientResponseRef,
nbytes: int): Future[seq[byte]] {.async.} =
## Read all bytes (nbytes <= 0) or exactly `nbytes` bytes from response
## ``response``.
doAssert(response.state == HttpClientResponseState.HeadersReceived)
doAssert(response.connection.state ==
HttpClientConnectionState.ResponseHeadersReceived)
##
## Note: This procedure performs automatic finishing for ``response``.
var reader = response.getBodyReader()
try:
let data = await reader.read(nbytes)
@ -1169,9 +1193,8 @@ proc getBodyBytes*(response: HttpClientResponseRef,
proc consumeBody*(response: HttpClientResponseRef): Future[int] {.async.} =
## Consume/discard response and return number of bytes consumed.
doAssert(response.state == HttpClientResponseState.HeadersReceived)
doAssert(response.connection.state ==
HttpClientConnectionState.ResponseHeadersReceived)
##
## Note: This procedure performs automatic finishing for ``response``.
var reader = response.getBodyReader()
try:
let res = await reader.consume()
@ -1236,12 +1259,10 @@ proc fetch*(request: HttpClientRequestRef): Future[HttpResponseTuple] {.
response = nil
return (status, buffer)
except HttpError as exc:
if not(isNil(response)):
await response.closeWait()
if not(isNil(response)): await response.closeWait()
raise exc
except CancelledError as exc:
if not(isNil(response)):
await response.closeWait()
if not(isNil(response)): await response.closeWait()
raise exc
proc fetch*(session: HttpSessionRef, url: Uri): Future[HttpResponseTuple] {.
@ -1280,33 +1301,27 @@ proc fetch*(session: HttpSessionRef, url: Uri): Future[HttpResponseTuple] {.
else:
raiseHttpRedirectError("Location header missing")
discard await response.consumeBody()
await request.closeWait()
request = nil
await response.closeWait()
response = nil
await request.closeWait()
request = nil
request = redirect
redirect = nil
else:
let data = await response.getBodyBytes()
let code = response.status
await request.closeWait()
request = nil
await response.closeWait()
response = nil
await request.closeWait()
request = nil
return (code, data)
except CancelledError as exc:
if not(isNil(request)):
await closeWait(request)
if not(isNil(redirect)):
await closeWait(redirect)
if not(isNil(response)):
await closeWait(response)
if not(isNil(response)): await closeWait(response)
if not(isNil(request)): await closeWait(request)
if not(isNil(redirect)): await closeWait(redirect)
raise exc
except HttpError as exc:
if not(isNil(request)):
await closeWait(request)
if not(isNil(redirect)):
await closeWait(redirect)
if not(isNil(response)):
await closeWait(response)
if not(isNil(response)): await closeWait(response)
if not(isNil(request)): await closeWait(request)
if not(isNil(redirect)): await closeWait(redirect)
raise exc