Add one more state `Closing` to help avoid race condition while in `closeWait` of request, response, connection. (#198)
This commit is contained in:
parent
252e5d0d50
commit
e6fd38fd49
|
@ -32,6 +32,7 @@ const
|
||||||
type
|
type
|
||||||
HttpClientConnectionState* {.pure.} = enum
|
HttpClientConnectionState* {.pure.} = enum
|
||||||
Closed ## Connection has been closed
|
Closed ## Connection has been closed
|
||||||
|
Closing, ## Connection is closing
|
||||||
Resolving, ## Resolving remote hostname
|
Resolving, ## Resolving remote hostname
|
||||||
Connecting, ## Connecting to remote server
|
Connecting, ## Connecting to remote server
|
||||||
Ready, ## Connected to remote server
|
Ready, ## Connected to remote server
|
||||||
|
@ -51,6 +52,7 @@ type
|
||||||
|
|
||||||
HttpClientRequestState* {.pure.} = enum
|
HttpClientRequestState* {.pure.} = enum
|
||||||
Closed, ## Request has been closed
|
Closed, ## Request has been closed
|
||||||
|
Closing, ## Connection is closing
|
||||||
Created, ## Request created
|
Created, ## Request created
|
||||||
Connecting, ## Connecting to remote host
|
Connecting, ## Connecting to remote host
|
||||||
HeadersSending, ## Sending request headers
|
HeadersSending, ## Sending request headers
|
||||||
|
@ -62,6 +64,7 @@ type
|
||||||
|
|
||||||
HttpClientResponseState* {.pure.} = enum
|
HttpClientResponseState* {.pure.} = enum
|
||||||
Closed, ## Response has been closed
|
Closed, ## Response has been closed
|
||||||
|
Closing, ## Response is closing
|
||||||
HeadersReceived, ## Response headers received
|
HeadersReceived, ## Response headers received
|
||||||
BodyReceiving, ## Response body receiving
|
BodyReceiving, ## Response body receiving
|
||||||
BodyReceived, ## Response body received
|
BodyReceived, ## Response body received
|
||||||
|
@ -528,7 +531,9 @@ proc setError(response: HttpClientResponseRef, error: ref HttpError) {.
|
||||||
|
|
||||||
proc closeWait(conn: HttpClientConnectionRef) {.async.} =
|
proc closeWait(conn: HttpClientConnectionRef) {.async.} =
|
||||||
## Close HttpClientConnectionRef instance ``conn`` and free all the resources.
|
## Close HttpClientConnectionRef instance ``conn`` and free all the resources.
|
||||||
if conn.state != HttpClientConnectionState.Closed:
|
if conn.state notin {HttpClientConnectionState.Closing,
|
||||||
|
HttpClientConnectionState.Closed}:
|
||||||
|
conn.state = HttpClientConnectionState.Closing
|
||||||
await allFutures(conn.reader.closeWait(), conn.writer.closeWait())
|
await allFutures(conn.reader.closeWait(), conn.writer.closeWait())
|
||||||
case conn.kind
|
case conn.kind
|
||||||
of HttpClientScheme.Secure:
|
of HttpClientScheme.Secure:
|
||||||
|
@ -568,6 +573,7 @@ proc connect(session: HttpSessionRef,
|
||||||
raise exc
|
raise exc
|
||||||
except AsyncStreamError:
|
except AsyncStreamError:
|
||||||
await res.closeWait()
|
await res.closeWait()
|
||||||
|
res.state = HttpClientConnectionState.Error
|
||||||
of HttpClientScheme.Nonsecure:
|
of HttpClientScheme.Nonsecure:
|
||||||
res.state = HttpClientConnectionState.Ready
|
res.state = HttpClientConnectionState.Ready
|
||||||
res
|
res
|
||||||
|
@ -632,7 +638,9 @@ proc closeWait*(session: HttpSessionRef) {.async.} =
|
||||||
await allFutures(pending)
|
await allFutures(pending)
|
||||||
|
|
||||||
proc closeWait*(request: HttpClientRequestRef) {.async.} =
|
proc closeWait*(request: HttpClientRequestRef) {.async.} =
|
||||||
if request.state != HttpClientRequestState.Closed:
|
if request.state notin {HttpClientRequestState.Closing,
|
||||||
|
HttpClientRequestState.Closed}:
|
||||||
|
request.setState(HttpClientRequestState.Closing)
|
||||||
if not(isNil(request.writer)):
|
if not(isNil(request.writer)):
|
||||||
if not(request.writer.closed()):
|
if not(request.writer.closed()):
|
||||||
await request.writer.closeWait()
|
await request.writer.closeWait()
|
||||||
|
@ -647,7 +655,9 @@ proc closeWait*(request: HttpClientRequestRef) {.async.} =
|
||||||
untrackHttpClientRequest(request)
|
untrackHttpClientRequest(request)
|
||||||
|
|
||||||
proc closeWait*(response: HttpClientResponseRef) {.async.} =
|
proc closeWait*(response: HttpClientResponseRef) {.async.} =
|
||||||
if response.state != HttpClientResponseState.Closed:
|
if response.state notin {HttpClientResponseState.Closing,
|
||||||
|
HttpClientResponseState.Closed}:
|
||||||
|
response.setState(HttpClientResponseState.Closing)
|
||||||
if not(isNil(response.reader)):
|
if not(isNil(response.reader)):
|
||||||
if not(response.reader.closed()):
|
if not(response.reader.closed()):
|
||||||
await response.reader.closeWait()
|
await response.reader.closeWait()
|
||||||
|
@ -1200,18 +1210,18 @@ proc fetch*(session: HttpSessionRef, url: Uri): Future[HttpResponseTuple] {.
|
||||||
raiseHttpRedirectError("Location header with an empty value")
|
raiseHttpRedirectError("Location header with an empty value")
|
||||||
else:
|
else:
|
||||||
raiseHttpRedirectError("Location header missing")
|
raiseHttpRedirectError("Location header missing")
|
||||||
|
discard await response.consumeBody()
|
||||||
await request.closeWait()
|
await request.closeWait()
|
||||||
request = nil
|
request = nil
|
||||||
discard await response.consumeBody()
|
|
||||||
await response.closeWait()
|
await response.closeWait()
|
||||||
response = nil
|
response = nil
|
||||||
request = redirect
|
request = redirect
|
||||||
redirect = nil
|
redirect = nil
|
||||||
else:
|
else:
|
||||||
await request.closeWait()
|
|
||||||
request = nil
|
|
||||||
let data = await response.getBodyBytes()
|
let data = await response.getBodyBytes()
|
||||||
let code = response.status
|
let code = response.status
|
||||||
|
await request.closeWait()
|
||||||
|
request = nil
|
||||||
await response.closeWait()
|
await response.closeWait()
|
||||||
response = nil
|
response = nil
|
||||||
return (code, data)
|
return (code, data)
|
||||||
|
|
Loading…
Reference in New Issue