diff --git a/json_rpc/clients/httpclient.nim b/json_rpc/clients/httpclient.nim index 2c5b7d2..cfd4b9e 100644 --- a/json_rpc/clients/httpclient.nim +++ b/json_rpc/clients/httpclient.nim @@ -11,6 +11,7 @@ type RpcHttpClient* = ref object of RpcClient transp*: StreamTransport + loop: Future[void] addresses: seq[TransportAddress] options: HttpClientOptions @@ -115,7 +116,7 @@ proc recvData(transp: StreamTransport): Future[string] {.async.} = error = true if error or not transp.validateResponse(header): - transp.close() + await transp.closeWait() result = "" return @@ -155,7 +156,7 @@ proc recvData(transp: StreamTransport): Future[string] {.async.} = error = true if error: - transp.close() + await transp.closeWait() result = "" else: result = cast[string](buffer) @@ -188,7 +189,7 @@ proc call*(client: RpcHttpClient, name: string, if not res: debug "Failed to send message to RPC server", address = client.transp.remoteAddress(), msg_len = len(value) - client.transp.close() + await client.transp.closeWait() raise newException(ValueError, "Transport error") else: debug "Message sent to RPC server", address = client.transp.remoteAddress(), @@ -207,19 +208,28 @@ template call*(client: RpcHttpClient, name: string, proc processData(client: RpcHttpClient) {.async.} = while true: - var value = await client.transp.recvData() - if value == "": - break - debug "Received response from RPC server", - address = client.transp.remoteAddress(), - msg_len = len(value) - trace "Message", msg = value - client.processMessage(value) + while true: + var value = await client.transp.recvData() + debug "Returned from recvData()", address = client.transp.remoteAddress() + if value == "": + debug "Empty response from RPC server", + address = client.transp.remoteAddress() + break + debug "Received response from RPC server", + address = client.transp.remoteAddress(), + msg_len = len(value) + trace "Message", msg = value + client.processMessage(value) - # async loop reconnection and waiting - client.transp = await connect(client.addresses[0]) + # async loop reconnection and waiting + try: + client.transp = await connect(client.addresses[0]) + except: + debug "Could not establish new connection to RPC server", + address = client.addresses[0] + break proc connect*(client: RpcHttpClient, address: string, port: Port) {.async.} = client.addresses = resolveTAddress(address, port) client.transp = await connect(client.addresses[0]) - asyncCheck processData(client) + client.loop = processData(client) diff --git a/json_rpc/clients/socketclient.nim b/json_rpc/clients/socketclient.nim index d9cd7df..8a801be 100644 --- a/json_rpc/clients/socketclient.nim +++ b/json_rpc/clients/socketclient.nim @@ -4,6 +4,7 @@ type RpcSocketClient* = ref object of RpcClient transport*: StreamTransport address*: TransportAddress + loop*: Future[void] const defaultMaxRequestLength* = 1024 * 128 @@ -32,18 +33,19 @@ proc call*(self: RpcSocketClient, name: string, proc processData(client: RpcSocketClient) {.async.} = while true: - var value = await client.transport.readLine(defaultMaxRequestLength) - if value == "": - # transmission ends - client.transport.close - break + while true: + var value = await client.transport.readLine(defaultMaxRequestLength) + if value == "": + # transmission ends + await client.transport.closeWait() + break - client.processMessage(value) - # async loop reconnection and waiting - client.transport = await connect(client.address) + client.processMessage(value) + # async loop reconnection and waiting + client.transport = await connect(client.address) proc connect*(client: RpcSocketClient, address: string, port: Port) {.async.} = let addresses = resolveTAddress(address, port) client.transport = await connect(addresses[0]) client.address = addresses[0] - asyncCheck processData(client) + client.loop = processData(client) diff --git a/json_rpc/servers/httpserver.nim b/json_rpc/servers/httpserver.nim index 2dfaa34..18e8f80 100644 --- a/json_rpc/servers/httpserver.nim +++ b/json_rpc/servers/httpserver.nim @@ -104,7 +104,7 @@ proc processClient(server: StreamServer, debug "Timeout expired while receiving headers", address = transp.remoteAddress() let res = await transp.sendAnswer(HttpVersion11, Http408) - transp.close() + await transp.closeWait() break else: let hlen = hlenfut.read() @@ -115,24 +115,29 @@ proc processClient(server: StreamServer, debug "Malformed header received", address = transp.remoteAddress() let res = await transp.sendAnswer(HttpVersion11, Http400) - transp.close() + await transp.closeWait() break except TransportLimitError: # size of headers exceeds `MaxHttpHeadersSize` debug "Maximum size of headers limit reached", address = transp.remoteAddress() let res = await transp.sendAnswer(HttpVersion11, Http413) - transp.close() + await transp.closeWait() break except TransportIncompleteError: # remote peer disconnected debug "Remote peer disconnected", address = transp.remoteAddress() - transp.close() + await transp.closeWait() break except TransportOsError: debug "Problems with networking", address = transp.remoteAddress(), error = getCurrentExceptionMsg() - transp.close() + await transp.closeWait() + break + except: + debug "Unknown exception", address = transp.remoteAddress(), + error = getCurrentExceptionMsg() + await transp.closeWait() break let vres = await validateRequest(transp, header) @@ -154,19 +159,19 @@ proc processClient(server: StreamServer, debug "Timeout expired while receiving request body", address = transp.remoteAddress() let res = await transp.sendAnswer(header.version, Http413) - transp.close() + await transp.closeWait() break else: blenfut.read() except TransportIncompleteError: # remote peer disconnected debug "Remote peer disconnected", address = transp.remoteAddress() - transp.close() + await transp.closeWait() break except TransportOsError: debug "Problems with networking", address = transp.remoteAddress(), error = getCurrentExceptionMsg() - transp.close() + await transp.closeWait() break let future = rpc.route(cast[string](buffer)) @@ -177,30 +182,32 @@ proc processClient(server: StreamServer, address = transp.remoteAddress() let res = await transp.sendAnswer(header.version, Http503) if not res: - transp.close() + await transp.closeWait() break else: var data = future.read() let res = await transp.sendAnswer(header.version, Http200, data) info "RPC result has been sent", address = transp.remoteAddress() if not res: - transp.close() + await transp.closeWait() break elif vres == ErrorFailure: debug "Remote peer disconnected", address = transp.remoteAddress() - transp.close() + await transp.closeWait() break if header.version in {HttpVersion09, HttpVersion10}: debug "Disconnecting client", address = transp.remoteAddress() - transp.close() + await transp.closeWait() break else: if connection == "close": debug "Disconnecting client", address = transp.remoteAddress() - transp.close() + await transp.closeWait() break + info "Finished connection", address = transp.remoteAddress() + # Utility functions for setting up servers using stream transport addresses proc addStreamServer*(server: RpcHttpServer, address: TransportAddress) = diff --git a/json_rpc/servers/socketserver.nim b/json_rpc/servers/socketserver.nim index 1151205..b966ae2 100644 --- a/json_rpc/servers/socketserver.nim +++ b/json_rpc/servers/socketserver.nim @@ -20,7 +20,7 @@ proc processClient(server: StreamServer, transport: StreamTransport) {.async, gc maxRequestLength = defaultMaxRequestLength value = await transport.readLine(defaultMaxRequestLength) if value == "": - transport.close + await transport.closeWait() break debug "Processing message", address = transport.remoteAddress(), line = value