From 36e5f6fc89f7db5f77fd30a7e64f64d36b38b651 Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Fri, 3 Dec 2021 13:11:39 +0200 Subject: [PATCH] Attempt to fix connection state. (#248) * Attempt to fix connection state. * Add actual values dump on asserts. * Total rework of connection's states. --- chronos/apps/http/httpclient.nim | 383 ++++++++++++++++--------------- 1 file changed, 199 insertions(+), 184 deletions(-) diff --git a/chronos/apps/http/httpclient.nim b/chronos/apps/http/httpclient.nim index 5ac285b..b768a85 100644 --- a/chronos/apps/http/httpclient.nim +++ b/chronos/apps/http/httpclient.nim @@ -52,25 +52,13 @@ type NonSecure, ## Non-secure connection Secure ## Secure TLS connection - HttpClientRequestState* {.pure.} = enum - Closed, ## Request has been closed - Closing, ## Connection is closing - Created, ## Request created - Connecting, ## Connecting to remote host - HeadersSending, ## Sending request headers - HeadersSent, ## Request headers has been sent - BodySending, ## Sending request body - BodySent, ## Request body has been sent - ResponseReceived, ## Request's response headers received - Error ## Error happens - - HttpClientResponseState* {.pure.} = enum - Closed, ## Response has been closed - Closing, ## Response is closing - HeadersReceived, ## Response headers received - BodyReceiving, ## Response body receiving - BodyReceived, ## Response body received - Error ## Error happens + HttpReqRespState* {.pure.} = enum + Closed, ## Request/response has been closed + Closing, ## Request/response is closing + Ready, ## Request/response is ready + Open, ## Request/response started + Finished, ## Request/response has been sent/received + Error ## Request/response in error state HttpClientBodyFlag* {.pure.} = enum Sized, ## `Content-Length` present @@ -80,6 +68,12 @@ type HttpClientRequestFlag* {.pure.} = enum CloseConnection, ## Send `Connection: close` in request + HttpClientConnectionFlag* {.pure.} = enum + Request, ## Connection has pending request + Response, ## Connection has pending response + KeepAlive, ## Connection should be kept alive + NoBody ## Connection response do not have body + HttpHeaderTuple* = tuple key: string value: string @@ -103,6 +97,7 @@ type state*: HttpClientConnectionState error*: ref HttpError remoteHostname*: string + flags*: set[HttpClientConnectionFlag] HttpClientConnectionRef* = ref HttpClientConnection @@ -129,9 +124,9 @@ type addresses*: seq[TransportAddress] HttpClientRequest* = object + state: HttpReqRespState meth*: HttpMethod address*: HttpAddress - state: HttpClientRequestState version*: HttpVersion headers*: HttpTable bodyFlag: HttpClientBodyFlag @@ -146,7 +141,7 @@ type HttpClientRequestRef* = ref HttpClientRequest HttpClientResponse* = object - state: HttpClientResponseState + state: HttpReqRespState requestMethod*: HttpMethod address*: HttpAddress status*: int @@ -178,6 +173,12 @@ type opened*: int64 closed*: int64 +# HttpClientRequestRef valid states are: +# Ready -> Open -> (Finished, Error) -> (Closing, Closed) +# +# HttpClientResponseRef valid states are +# Open -> (Finished, Error) -> (Closing, Closed) + proc setupHttpClientConnectionTracker(): HttpClientTracker {. gcsafe, raises: [Defect].} proc setupHttpClientRequestTracker(): HttpClientTracker {. @@ -493,41 +494,10 @@ proc new(t: typedesc[HttpClientConnectionRef], session: HttpSessionRef, trackHttpClientConnection(res) res -proc setState(request: HttpClientRequestRef, state: HttpClientRequestState) {. - raises: [Defect] .} = - request.state = state - case state - of HttpClientRequestState.HeadersSending: - request.connection.state = HttpClientConnectionState.RequestHeadersSending - of HttpClientRequestState.HeadersSent: - request.connection.state = HttpClientConnectionState.RequestHeadersSent - of HttpClientRequestState.BodySending: - request.connection.state = HttpClientConnectionState.RequestBodySending - of HttpClientRequestState.BodySent: - request.connection.state = HttpClientConnectionState.RequestBodySent - of HttpClientRequestState.ResponseReceived: - request.connection.state = HttpClientConnectionState.ResponseHeadersReceived - else: - discard - -proc setState(response: HttpClientResponseRef, - state: HttpClientResponseState) {.raises: [Defect] .} = - response.state = state - case state - of HttpClientResponseState.HeadersReceived: - response.connection.state = - HttpClientConnectionState.ResponseHeadersReceived - of HttpClientResponseState.BodyReceiving: - response.connection.state = HttpClientConnectionState.ResponseBodyReceiving - of HttpClientResponseState.BodyReceived: - response.connection.state = HttpClientConnectionState.ResponseBodyReceived - else: - discard - proc setError(request: HttpClientRequestRef, error: ref HttpError) {. raises: [Defect] .} = request.error = error - request.setState(HttpClientRequestState.Error) + request.state = HttpReqRespState.Error if not(isNil(request.connection)): request.connection.state = HttpClientConnectionState.Error request.connection.error = error @@ -535,7 +505,7 @@ proc setError(request: HttpClientRequestRef, error: ref HttpError) {. proc setError(response: HttpClientResponseRef, error: ref HttpError) {. raises: [Defect] .} = response.error = error - response.setState(HttpClientResponseState.Error) + response.state = HttpReqRespState.Error if not(isNil(response.connection)): response.connection.state = HttpClientConnectionState.Error response.connection.error = error @@ -647,38 +617,62 @@ proc removeConnection(session: HttpSessionRef, await conn.closeWait() proc releaseConnection(session: HttpSessionRef, - conn: HttpClientConnectionRef, - forceRemove = false) {.async.} = + connection: HttpClientConnectionRef) {.async.} = ## Return connection back to the ``session``. - ## - ## If connection not in ``Ready`` state it will be closed and removed from - ## the ``session``. - if forceRemove: - await session.removeConnection(conn) - else: - if conn.state != HttpClientConnectionState.Ready: - await session.removeConnection(conn) - -proc needKeepConnection(request: HttpClientRequestRef): bool = - ## Returns ``true`` if the request's corresponding connection should be kept - ## alive, and ``false`` if this connection should be dropped. - case request.state - of HttpClientRequestState.ResponseReceived: - case request.version - of HttpVersion11, HttpVersion20: - let connection = toLowerAscii(request.headers.getString(ConnectionHeader)) - if connection == "keep-alive": - true - else: - # If `Connection` header is missing or its value not equal to - # `keep-alive`. + let removeConnection = + case connection.state + of HttpClientConnectionState.ResponseBodyReceived: + if HttpClientConnectionFlag.KeepAlive in connection.flags: + # HTTP response body has been received and "Connection: keep-alive" is + # present in response headers. false + else: + # HTTP response body has been received, but "Connection: keep-alive" is + # not present or not supported. + true + of HttpClientConnectionState.ResponseHeadersReceived: + if (HttpClientConnectionFlag.NoBody in connection.flags) and + (HttpClientConnectionFlag.KeepAlive in connection.flags): + # HTTP response headers received with an empty response body and + # "Connection: keep-alive" is present in response headers. + false + else: + # HTTP response body is not received or "Connection: keep-alive" is not + # present or not supported. + true else: - # Versions prior HTTP 1.1 do not support request pipelining. - false + # Connection not in proper state. + true + + if removeConnection: + await session.removeConnection(connection) else: - # Request not in proper state. - false + connection.state = HttpClientConnectionState.Ready + connection.flags = {} + +proc releaseConnection(request: HttpClientRequestRef) {.async.} = + let + session = request.session + connection = request.connection + + if not(isNil(connection)): + request.connection = nil + request.session = nil + connection.flags.excl(HttpClientConnectionFlag.Request) + if HttpClientConnectionFlag.Response notin connection.flags: + await session.releaseConnection(connection) + +proc releaseConnection(response: HttpClientResponseRef) {.async.} = + let + session = response.session + connection = response.connection + + if not(isNil(connection)): + response.connection = nil + response.session = nil + connection.flags.excl(HttpClientConnectionFlag.Response) + if HttpClientConnectionFlag.Request notin connection.flags: + await session.releaseConnection(connection) proc closeWait*(session: HttpSessionRef) {.async.} = ## Closes HTTP session object. @@ -691,40 +685,33 @@ proc closeWait*(session: HttpSessionRef) {.async.} = await allFutures(pending) proc closeWait*(request: HttpClientRequestRef) {.async.} = - if request.state notin {HttpClientRequestState.Closing, - HttpClientRequestState.Closed}: - request.setState(HttpClientRequestState.Closing) + if request.state notin {HttpReqRespState.Closing, HttpReqRespState.Closed}: + request.state = HttpReqRespState.Closing if not(isNil(request.writer)): if not(request.writer.closed()): await request.writer.closeWait() request.writer = nil - if not(isNil(request.connection)): - if request.needKeepConnection(): - await request.session.releaseConnection(request.connection, false) - else: - await request.session.releaseConnection(request.connection, true) - request.connection = nil + await request.releaseConnection() request.session = nil request.error = nil - request.setState(HttpClientRequestState.Closed) + request.state = HttpReqRespState.Closed untrackHttpClientRequest(request) proc closeWait*(response: HttpClientResponseRef) {.async.} = - if response.state notin {HttpClientResponseState.Closing, - HttpClientResponseState.Closed}: - response.setState(HttpClientResponseState.Closing) + if response.state notin {HttpReqRespState.Closing, HttpReqRespState.Closed}: + response.state = HttpReqRespState.Closing if not(isNil(response.reader)): if not(response.reader.closed()): await response.reader.closeWait() response.reader = nil + await response.releaseConnection() response.session = nil response.error = nil - response.setState(HttpClientResponseState.Closed) + response.state = HttpReqRespState.Closed untrackHttpClientResponse(response) -proc prepareResponse(request: HttpClientRequestRef, - data: openarray[byte]): HttpResult[HttpClientResponseRef] {. - raises: [Defect] .} = +proc prepareResponse(request: HttpClientRequestRef, data: openarray[byte] + ): HttpResult[HttpClientResponseRef] {.raises: [Defect] .} = ## Process response headers. let resp = parseResponse(data, false) if resp.failed(): @@ -762,25 +749,43 @@ proc prepareResponse(request: HttpClientRequestRef, res.get() # Preprocessing "Content-Length" header. - let (contentLength, bodyFlag) = + let (contentLength, bodyFlag, nobodyFlag) = if ContentLengthHeader in headers: let length = headers.getInt(ContentLengthHeader) - (length, HttpClientBodyFlag.Sized) + (length, HttpClientBodyFlag.Sized, length == 0) else: if TransferEncodingFlags.Chunked in transferEncoding: - (0'u64, HttpClientBodyFlag.Chunked) + (0'u64, HttpClientBodyFlag.Chunked, false) else: - (0'u64, HttpClientBodyFlag.Custom) + (0'u64, HttpClientBodyFlag.Custom, false) + + # Preprocessing "Connection" header. + let connectionFlag = + block: + case resp.version + of HttpVersion11, HttpVersion20: + let header = toLowerAscii(headers.getString(ConnectionHeader)) + if header == "keep-alive": + true + else: + false + else: + false let res = HttpClientResponseRef( - state: HttpClientResponseState.HeadersReceived, status: resp.code, + state: HttpReqRespState.Open, status: resp.code, address: request.address, requestMethod: request.meth, reason: resp.reason(data), version: resp.version, session: request.session, connection: request.connection, headers: headers, contentEncoding: contentEncoding, transferEncoding: transferEncoding, contentLength: contentLength, bodyFlag: bodyFlag ) - request.setState(HttpClientRequestState.ResponseReceived) + res.connection.state = HttpClientConnectionState.ResponseHeadersReceived + if nobodyFlag: + res.connection.flags.incl(HttpClientConnectionFlag.NoBody) + if connectionFlag: + res.connection.flags.incl(HttpClientConnectionFlag.KeepAlive) + res.connection.flags.incl(HttpClientConnectionFlag.Response) trackHttpClientResponse(res) ok(res) @@ -798,10 +803,11 @@ proc getResponse(req: HttpClientRequestRef): Future[HttpClientResponseRef] {. raiseHttpReadError("Reading response headers timed out") except AsyncStreamError: raiseHttpReadError("Could not read response headers") - let resp = prepareResponse(req, buffer.toOpenArray(0, bytesRead - 1)) - if resp.isErr(): - raiseHttpProtocolError(resp.error()) - return resp.get() + + let response = prepareResponse(req, buffer.toOpenArray(0, bytesRead - 1)) + if response.isErr(): + raiseHttpProtocolError(response.error()) + return response.get() proc new*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef, ha: HttpAddress, meth: HttpMethod = MethodGet, @@ -811,7 +817,7 @@ proc new*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef, body: openarray[byte] = []): HttpClientRequestRef {. raises: [Defect].} = let res = HttpClientRequestRef( - state: HttpClientRequestState.Created, session: session, meth: meth, + state: HttpReqRespState.Ready, session: session, meth: meth, version: version, flags: flags, headers: HttpTable.init(headers), address: ha, bodyFlag: HttpClientBodyFlag.Custom, buffer: @body ) @@ -827,7 +833,7 @@ proc new*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef, raises: [Defect].} = let address = ? session.getAddress(parseUri(url)) let res = HttpClientRequestRef( - state: HttpClientRequestState.Created, session: session, meth: meth, + state: HttpReqRespState.Ready, session: session, meth: meth, version: version, flags: flags, headers: HttpTable.init(headers), address: address, bodyFlag: HttpClientBodyFlag.Custom, buffer: @body ) @@ -960,9 +966,9 @@ proc prepareRequest(request: HttpClientRequestRef): string {. proc send*(request: HttpClientRequestRef): Future[HttpClientResponseRef] {. async.} = - doAssert(request.state == HttpClientRequestState.Created) - request.setState(HttpClientRequestState.Connecting) - request.connection = + doAssert(request.state == HttpReqRespState.Ready, + "Request's state is " & $request.state) + let connection = try: await request.session.acquireConnection(request.address) except CancelledError as exc: @@ -972,16 +978,19 @@ proc send*(request: HttpClientRequestRef): Future[HttpClientResponseRef] {. request.setError(exc) raise exc - let headers = request.prepareRequest() + request.connection = connection try: - request.setState(HttpClientRequestState.HeadersSending) + let headers = request.prepareRequest() + request.connection.state = HttpClientConnectionState.RequestHeadersSending + request.state = HttpReqRespState.Open await request.connection.writer.write(headers) - request.setState(HttpClientRequestState.HeadersSent) - request.setState(HttpClientRequestState.BodySending) + request.connection.state = HttpClientConnectionState.RequestHeadersSent + request.connection.state = HttpClientConnectionState.RequestBodySending if len(request.buffer) > 0: await request.connection.writer.write(request.buffer) - request.setState(HttpClientRequestState.BodySent) + request.connection.state = HttpClientConnectionState.RequestBodySent + request.state = HttpReqRespState.Finished except CancelledError as exc: request.setError(newHttpInterruptError()) raise exc @@ -1005,11 +1014,11 @@ proc open*(request: HttpClientRequestRef): Future[HttpBodyWriter] {. async.} = ## Start sending request's headers and return `HttpBodyWriter`, which can be ## used to send request's body. - doAssert(request.state == HttpClientRequestState.Created) + doAssert(request.state == HttpReqRespState.Ready, + "Request's state is " & $request.state) doAssert(len(request.buffer) == 0, "Request should not have static body content (len(buffer) == 0)") - request.setState(HttpClientRequestState.Connecting) - request.connection = + let connection = try: await request.session.acquireConnection(request.address) except CancelledError as exc: @@ -1019,12 +1028,13 @@ proc open*(request: HttpClientRequestRef): Future[HttpBodyWriter] {. request.setError(exc) raise exc - let headers = request.prepareRequest() + request.connection = connection try: - request.setState(HttpClientRequestState.HeadersSending) + let headers = request.prepareRequest() + request.connection.state = HttpClientConnectionState.RequestHeadersSending await request.connection.writer.write(headers) - request.setState(HttpClientRequestState.HeadersSent) + request.connection.state = HttpClientConnectionState.RequestHeadersSent except CancelledError as exc: request.setError(newHttpInterruptError()) raise exc @@ -1048,18 +1058,25 @@ proc open*(request: HttpClientRequestRef): Future[HttpBodyWriter] {. newHttpBodyWriter([writer]) request.writer = writer - request.setState(HttpClientRequestState.BodySending) + request.state = HttpReqRespState.Open + request.connection.state = HttpClientConnectionState.RequestBodySending return writer proc finish*(request: HttpClientRequestRef): Future[HttpClientResponseRef] {. async.} = ## Finish sending request and receive response. - doAssert(request.state == HttpClientRequestState.BodySending) + doAssert(not(isNil(request.connection)), + "Request missing connection instance") + doAssert(request.state == HttpReqRespState.Open, + "Request's state is " & $request.state) doAssert(request.connection.state == - HttpClientConnectionState.RequestBodySending) - doAssert(request.writer.closed()) - request.setState(HttpClientRequestState.BodySent) - let resp = + HttpClientConnectionState.RequestBodySending, + "Connection's state is " & $request.connection.state) + doAssert(request.writer.closed(), + "Body writer instance must be closed before finish(request) call") + request.state = HttpReqRespState.Finished + request.connection.state = HttpClientConnectionState.RequestBodySent + let response = try: await request.getResponse() except CancelledError as exc: @@ -1068,7 +1085,7 @@ proc finish*(request: HttpClientRequestRef): Future[HttpClientResponseRef] {. except HttpError as exc: request.setError(exc) raise exc - return resp + return response proc getNewLocation*(resp: HttpClientResponseRef): HttpResult[HttpAddress] = ## Returns new address according to response's `Location` header value. @@ -1081,47 +1098,55 @@ proc getNewLocation*(resp: HttpClientResponseRef): HttpResult[HttpAddress] = else: err("Location header is missing") -proc getBodyReader*(resp: HttpClientResponseRef): HttpBodyReader = +proc getBodyReader*(response: HttpClientResponseRef): HttpBodyReader = ## Returns stream's reader instance which can be used to read response's body. ## ## Streams which was obtained using this procedure must be closed to avoid ## leaks. - doAssert(resp.state in { - HttpClientResponseState.HeadersReceived, - HttpClientResponseState.BodyReceiving}) - doAssert(resp.connection.state in { - HttpClientConnectionState.ResponseHeadersReceived, - HttpClientConnectionState.ResponseBodyReceiving}) - if isNil(resp.reader): + doAssert(not(isNil(response.connection)), + "Response missing connection instance") + doAssert(response.state == HttpReqRespState.Open, + "Response's state is " & $response.state) + doAssert(response.connection.state in + {HttpClientConnectionState.ResponseHeadersReceived, + HttpClientConnectionState.ResponseBodyReceiving}, + "Connection state is " & $response.connection.state) + if isNil(response.reader): let reader = - case resp.bodyFlag + case response.bodyFlag of HttpClientBodyFlag.Sized: - let bstream = newBoundedStreamReader(resp.connection.reader, - resp.contentLength) + let bstream = newBoundedStreamReader(response.connection.reader, + response.contentLength) newHttpBodyReader(bstream) of HttpClientBodyFlag.Chunked: - newHttpBodyReader(newChunkedStreamReader(resp.connection.reader)) + newHttpBodyReader(newChunkedStreamReader(response.connection.reader)) of HttpClientBodyFlag.Custom: - newHttpBodyReader(newAsyncStreamReader(resp.connection.reader)) - resp.setState(HttpClientResponseState.BodyReceiving) - resp.reader = reader - resp.reader + newHttpBodyReader(newAsyncStreamReader(response.connection.reader)) + response.connection.state = HttpClientConnectionState.ResponseBodyReceiving + response.reader = reader + response.reader -proc finish*(resp: HttpClientResponseRef) {.async.} = +proc finish*(response: HttpClientResponseRef) {.async.} = ## Finish receiving response. - doAssert(resp.state == HttpClientResponseState.BodyReceiving) - doAssert(resp.connection.state == - HttpClientConnectionState.ResponseBodyReceiving) - doAssert(resp.reader.closed()) - resp.setState(HttpClientResponseState.BodyReceived) - resp.connection.state = HttpClientConnectionState.Ready + ## + ## Because ``finish()`` returns nothing, this operation become NOP for + ## response which is not in ``Open`` state. + if response.state == HttpReqRespState.Open: + doAssert(not(isNil(response.connection)), + "Response missing connection instance") + doAssert(response.connection.state == + HttpClientConnectionState.ResponseBodyReceiving, + "Connection state is " & $response.connection.state) + doAssert(response.reader.closed(), + "Body reader instance must be closed before finish(response) call") + response.connection.state = HttpClientConnectionState.ResponseBodyReceived + response.state = HttpReqRespState.Finished proc getBodyBytes*(response: HttpClientResponseRef): Future[seq[byte]] {. async.} = ## Read all bytes from response ``response``. - doAssert(response.state == HttpClientResponseState.HeadersReceived) - doAssert(response.connection.state == - HttpClientConnectionState.ResponseHeadersReceived) + ## + ## Note: This procedure performs automatic finishing for ``response``. var reader = response.getBodyReader() try: let data = await reader.read() @@ -1145,9 +1170,8 @@ proc getBodyBytes*(response: HttpClientResponseRef, nbytes: int): Future[seq[byte]] {.async.} = ## Read all bytes (nbytes <= 0) or exactly `nbytes` bytes from response ## ``response``. - doAssert(response.state == HttpClientResponseState.HeadersReceived) - doAssert(response.connection.state == - HttpClientConnectionState.ResponseHeadersReceived) + ## + ## Note: This procedure performs automatic finishing for ``response``. var reader = response.getBodyReader() try: let data = await reader.read(nbytes) @@ -1169,9 +1193,8 @@ proc getBodyBytes*(response: HttpClientResponseRef, proc consumeBody*(response: HttpClientResponseRef): Future[int] {.async.} = ## Consume/discard response and return number of bytes consumed. - doAssert(response.state == HttpClientResponseState.HeadersReceived) - doAssert(response.connection.state == - HttpClientConnectionState.ResponseHeadersReceived) + ## + ## Note: This procedure performs automatic finishing for ``response``. var reader = response.getBodyReader() try: let res = await reader.consume() @@ -1236,12 +1259,10 @@ proc fetch*(request: HttpClientRequestRef): Future[HttpResponseTuple] {. response = nil return (status, buffer) except HttpError as exc: - if not(isNil(response)): - await response.closeWait() + if not(isNil(response)): await response.closeWait() raise exc except CancelledError as exc: - if not(isNil(response)): - await response.closeWait() + if not(isNil(response)): await response.closeWait() raise exc proc fetch*(session: HttpSessionRef, url: Uri): Future[HttpResponseTuple] {. @@ -1280,33 +1301,27 @@ proc fetch*(session: HttpSessionRef, url: Uri): Future[HttpResponseTuple] {. else: raiseHttpRedirectError("Location header missing") discard await response.consumeBody() - await request.closeWait() - request = nil await response.closeWait() response = nil + await request.closeWait() + request = nil request = redirect redirect = nil else: let data = await response.getBodyBytes() let code = response.status - await request.closeWait() - request = nil await response.closeWait() response = nil + await request.closeWait() + request = nil return (code, data) except CancelledError as exc: - if not(isNil(request)): - await closeWait(request) - if not(isNil(redirect)): - await closeWait(redirect) - if not(isNil(response)): - await closeWait(response) + if not(isNil(response)): await closeWait(response) + if not(isNil(request)): await closeWait(request) + if not(isNil(redirect)): await closeWait(redirect) raise exc except HttpError as exc: - if not(isNil(request)): - await closeWait(request) - if not(isNil(redirect)): - await closeWait(redirect) - if not(isNil(response)): - await closeWait(response) + if not(isNil(response)): await closeWait(response) + if not(isNil(request)): await closeWait(request) + if not(isNil(redirect)): await closeWait(redirect) raise exc