Fix freezes on *nix systems.

This commit is contained in:
cheatfate 2019-05-14 18:42:51 +03:00
parent 493583b7f4
commit a56ad7dfcb
No known key found for this signature in database
GPG Key ID: 46ADD633A7201F95
4 changed files with 56 additions and 37 deletions

View File

@ -11,6 +11,7 @@ type
RpcHttpClient* = ref object of RpcClient
transp*: StreamTransport
loop: Future[void]
addresses: seq[TransportAddress]
options: HttpClientOptions
@ -115,7 +116,7 @@ proc recvData(transp: StreamTransport): Future[string] {.async.} =
error = true
if error or not transp.validateResponse(header):
transp.close()
await transp.closeWait()
result = ""
return
@ -155,7 +156,7 @@ proc recvData(transp: StreamTransport): Future[string] {.async.} =
error = true
if error:
transp.close()
await transp.closeWait()
result = ""
else:
result = cast[string](buffer)
@ -188,7 +189,7 @@ proc call*(client: RpcHttpClient, name: string,
if not res:
debug "Failed to send message to RPC server",
address = client.transp.remoteAddress(), msg_len = len(value)
client.transp.close()
await client.transp.closeWait()
raise newException(ValueError, "Transport error")
else:
debug "Message sent to RPC server", address = client.transp.remoteAddress(),
@ -207,19 +208,28 @@ template call*(client: RpcHttpClient, name: string,
proc processData(client: RpcHttpClient) {.async.} =
while true:
var value = await client.transp.recvData()
if value == "":
break
debug "Received response from RPC server",
address = client.transp.remoteAddress(),
msg_len = len(value)
trace "Message", msg = value
client.processMessage(value)
while true:
var value = await client.transp.recvData()
debug "Returned from recvData()", address = client.transp.remoteAddress()
if value == "":
debug "Empty response from RPC server",
address = client.transp.remoteAddress()
break
debug "Received response from RPC server",
address = client.transp.remoteAddress(),
msg_len = len(value)
trace "Message", msg = value
client.processMessage(value)
# async loop reconnection and waiting
client.transp = await connect(client.addresses[0])
# async loop reconnection and waiting
try:
client.transp = await connect(client.addresses[0])
except:
debug "Could not establish new connection to RPC server",
address = client.addresses[0]
break
proc connect*(client: RpcHttpClient, address: string, port: Port) {.async.} =
client.addresses = resolveTAddress(address, port)
client.transp = await connect(client.addresses[0])
asyncCheck processData(client)
client.loop = processData(client)

View File

@ -4,6 +4,7 @@ type
RpcSocketClient* = ref object of RpcClient
transport*: StreamTransport
address*: TransportAddress
loop*: Future[void]
const defaultMaxRequestLength* = 1024 * 128
@ -32,18 +33,19 @@ proc call*(self: RpcSocketClient, name: string,
proc processData(client: RpcSocketClient) {.async.} =
while true:
var value = await client.transport.readLine(defaultMaxRequestLength)
if value == "":
# transmission ends
client.transport.close
break
while true:
var value = await client.transport.readLine(defaultMaxRequestLength)
if value == "":
# transmission ends
await client.transport.closeWait()
break
client.processMessage(value)
# async loop reconnection and waiting
client.transport = await connect(client.address)
client.processMessage(value)
# async loop reconnection and waiting
client.transport = await connect(client.address)
proc connect*(client: RpcSocketClient, address: string, port: Port) {.async.} =
let addresses = resolveTAddress(address, port)
client.transport = await connect(addresses[0])
client.address = addresses[0]
asyncCheck processData(client)
client.loop = processData(client)

View File

@ -104,7 +104,7 @@ proc processClient(server: StreamServer,
debug "Timeout expired while receiving headers",
address = transp.remoteAddress()
let res = await transp.sendAnswer(HttpVersion11, Http408)
transp.close()
await transp.closeWait()
break
else:
let hlen = hlenfut.read()
@ -115,24 +115,29 @@ proc processClient(server: StreamServer,
debug "Malformed header received",
address = transp.remoteAddress()
let res = await transp.sendAnswer(HttpVersion11, Http400)
transp.close()
await transp.closeWait()
break
except TransportLimitError:
# size of headers exceeds `MaxHttpHeadersSize`
debug "Maximum size of headers limit reached",
address = transp.remoteAddress()
let res = await transp.sendAnswer(HttpVersion11, Http413)
transp.close()
await transp.closeWait()
break
except TransportIncompleteError:
# remote peer disconnected
debug "Remote peer disconnected", address = transp.remoteAddress()
transp.close()
await transp.closeWait()
break
except TransportOsError:
debug "Problems with networking", address = transp.remoteAddress(),
error = getCurrentExceptionMsg()
transp.close()
await transp.closeWait()
break
except:
debug "Unknown exception", address = transp.remoteAddress(),
error = getCurrentExceptionMsg()
await transp.closeWait()
break
let vres = await validateRequest(transp, header)
@ -154,19 +159,19 @@ proc processClient(server: StreamServer,
debug "Timeout expired while receiving request body",
address = transp.remoteAddress()
let res = await transp.sendAnswer(header.version, Http413)
transp.close()
await transp.closeWait()
break
else:
blenfut.read()
except TransportIncompleteError:
# remote peer disconnected
debug "Remote peer disconnected", address = transp.remoteAddress()
transp.close()
await transp.closeWait()
break
except TransportOsError:
debug "Problems with networking", address = transp.remoteAddress(),
error = getCurrentExceptionMsg()
transp.close()
await transp.closeWait()
break
let future = rpc.route(cast[string](buffer))
@ -177,30 +182,32 @@ proc processClient(server: StreamServer,
address = transp.remoteAddress()
let res = await transp.sendAnswer(header.version, Http503)
if not res:
transp.close()
await transp.closeWait()
break
else:
var data = future.read()
let res = await transp.sendAnswer(header.version, Http200, data)
info "RPC result has been sent", address = transp.remoteAddress()
if not res:
transp.close()
await transp.closeWait()
break
elif vres == ErrorFailure:
debug "Remote peer disconnected", address = transp.remoteAddress()
transp.close()
await transp.closeWait()
break
if header.version in {HttpVersion09, HttpVersion10}:
debug "Disconnecting client", address = transp.remoteAddress()
transp.close()
await transp.closeWait()
break
else:
if connection == "close":
debug "Disconnecting client", address = transp.remoteAddress()
transp.close()
await transp.closeWait()
break
info "Finished connection", address = transp.remoteAddress()
# Utility functions for setting up servers using stream transport addresses
proc addStreamServer*(server: RpcHttpServer, address: TransportAddress) =

View File

@ -20,7 +20,7 @@ proc processClient(server: StreamServer, transport: StreamTransport) {.async, gc
maxRequestLength = defaultMaxRequestLength
value = await transport.readLine(defaultMaxRequestLength)
if value == "":
transport.close
await transport.closeWait()
break
debug "Processing message", address = transport.remoteAddress(), line = value