diff --git a/json_rpc/clients/httpclient.nim b/json_rpc/clients/httpclient.nim index 15c6caa..d672336 100644 --- a/json_rpc/clients/httpclient.nim +++ b/json_rpc/clients/httpclient.nim @@ -161,7 +161,7 @@ method call*(client: RpcHttpClient, name: string, let msgRes = client.processMessage(resText) if msgRes.isErr: # Need to clean up in case the answer was invalid - debug "Failed to process POST Response for JSON-RPC", msg = msgRes.error + error "Failed to process POST Response for JSON-RPC", msg = msgRes.error let exc = newException(JsonRpcError, msgRes.error) newFut.fail(exc) client.awaiting.del(id) diff --git a/json_rpc/clients/socketclient.nim b/json_rpc/clients/socketclient.nim index 29207b7..86269f1 100644 --- a/json_rpc/clients/socketclient.nim +++ b/json_rpc/clients/socketclient.nim @@ -18,6 +18,9 @@ import export client +logScope: + topics = "JSONRPC-SOCKET-CLIENT" + type RpcSocketClient* = ref object of RpcClient transport*: StreamTransport @@ -74,22 +77,47 @@ method callBatch*(client: RpcSocketClient, return await client.batchFut -proc processData(client: RpcSocketClient) {.async.} = +proc processData(client: RpcSocketClient) {.async: (raises: []).} = while true: + var localException: ref JsonRpcError while true: - var value = await client.transport.readLine(defaultMaxRequestLength) - if value == "": - # transmission ends + try: + var value = await client.transport.readLine(defaultMaxRequestLength) + if value == "": + # transmission ends + await client.transport.closeWait() + break + + let res = client.processMessage(value) + if res.isErr: + error "Error when processing RPC message", msg=res.error + localException = newException(JsonRpcError, res.error) + break + except TransportError as exc: + localException = newException(JsonRpcError, exc.msg) + await client.transport.closeWait() + break + except CancelledError as exc: + localException = newException(JsonRpcError, exc.msg) await client.transport.closeWait() break - let res = client.processMessage(value) - if res.isErr: - error "error when processing message", msg=res.error - raise newException(JsonRpcError, res.error) + if localException.isNil.not: + for _,fut in client.awaiting: + fut.fail(localException) + if client.batchFut.isNil.not and not client.batchFut.completed(): + client.batchFut.fail(localException) # async loop reconnection and waiting - client.transport = await connect(client.address) + try: + info "Reconnect to server", address=client.address + client.transport = await connect(client.address) + except TransportError as exc: + error "Error when reconnecting to server", msg=exc.msg + break + except CancelledError as exc: + error "Error when reconnecting to server", msg=exc.msg + break proc connect*(client: RpcSocketClient, address: string, port: Port) {.async.} = let addresses = resolveTAddress(address, port) diff --git a/json_rpc/clients/websocketclientimpl.nim b/json_rpc/clients/websocketclientimpl.nim index d53af38..cef9384 100644 --- a/json_rpc/clients/websocketclientimpl.nim +++ b/json_rpc/clients/websocketclientimpl.nim @@ -74,6 +74,13 @@ method callBatch*(client: RpcWebSocketClient, proc processData(client: RpcWebSocketClient) {.async.} = var error: ref CatchableError + template processError() = + for k, v in client.awaiting: + v.fail(error) + if client.batchFut.isNil.not and not client.batchFut.completed(): + client.batchFut.fail(error) + client.awaiting.clear() + let ws = client.transport try: while ws.readyState != ReadyState.Closed: @@ -85,7 +92,9 @@ proc processData(client: RpcWebSocketClient) {.async.} = let res = client.processMessage(string.fromBytes(value)) if res.isErr: - raise newException(JsonRpcError, res.error) + error "Error when processing RPC message", msg=res.error + error = newException(JsonRpcError, res.error) + processError() except CatchableError as e: error = e @@ -97,9 +106,7 @@ proc processData(client: RpcWebSocketClient) {.async.} = if client.awaiting.len != 0: if error.isNil: error = newException(IOError, "Transport was closed while waiting for response") - for k, v in client.awaiting: - v.fail(error) - client.awaiting.clear() + processError() if not client.onDisconnect.isNil: client.onDisconnect() diff --git a/tests/test_client_hook.nim b/tests/test_client_hook.nim index 6a97273..85e4868 100644 --- a/tests/test_client_hook.nim +++ b/tests/test_client_hook.nim @@ -8,7 +8,10 @@ # those terms. import + std/importutils, unittest2, + chronicles, + websock/websock, ../json_rpc/rpcclient, ../json_rpc/rpcserver @@ -48,7 +51,7 @@ proc setupClientHook(client: RpcClient): Shadow = return err(exc.msg) shadow -suite "test callsigs": +suite "test client features": var server = newRpcHttpServer(["127.0.0.1:0"]) server.installHandlers() var client = newRpcHttpClient() @@ -74,3 +77,226 @@ suite "test callsigs": waitFor server.stop() waitFor server.closeWait() + + +type + TestSocketServer = ref object of RpcSocketServer + getData: proc(): string {.gcsafe, raises: [].} + +proc processClient(server: StreamServer, transport: StreamTransport) {.async: (raises: []), gcsafe.} = + ## Process transport data to the RPC server + try: + var rpc = getUserData[TestSocketServer](server) + while true: + var + value = await transport.readLine(router.defaultMaxRequestLength) + if value == "": + await transport.closeWait() + break + + let res = rpc.getData() + discard await transport.write(res & "\r\n") + except TransportError as ex: + error "Transport closed during processing client", msg=ex.msg + except CatchableError as ex: + error "Error occured during processing client", msg=ex.msg + +proc addStreamServer(server: TestSocketServer, address: TransportAddress) = + privateAccess(RpcSocketServer) + try: + info "Starting JSON-RPC socket server", address = $address + var transportServer = createStreamServer(address, processClient, {ReuseAddr}, udata = server) + server.servers.add(transportServer) + except CatchableError as exc: + error "Failed to create server", address = $address, message = exc.msg + + if len(server.servers) == 0: + # Server was not bound, critical error. + raise newException(RpcBindError, "Unable to create server!") + +proc new(T: type TestSocketServer, getData: proc(): string {.gcsafe, raises: [].}): T = + T( + router: RpcRouter.init(), + getData: getData, + ) + + +suite "test rpc socket client": + let server = TestSocketServer.new(proc(): string {.gcsafe, raises: [].} = + return """{"jsonrpc":"2.0","result":10}""" + ) + let serverAddress = initTAddress("127.0.0.1:0") + server.addStreamServer(serverAddress) + + var client = newRpcSocketClient() + server.start() + waitFor client.connect(server.localAddress()[0]) + + test "missing id in server response": + expect JsonRpcError: + let res = waitFor client.get_Banana(11) + discard res + + server.stop() + waitFor server.closeWait() + + +type + TestHttpServer = ref object of RpcHttpServer + getData: proc(): string {.gcsafe, raises: [].} + +proc processClientRpc(rpcServer: TestHttpServer): HttpProcessCallback2 = + return proc (req: RequestFence): Future[HttpResponseRef] + {.async: (raises: [CancelledError]).} = + if not req.isOk(): + return defaultResponse() + + let + request = req.get() + headers = HttpTable.init([("Content-Type", + "application/json; charset=utf-8")]) + try: + let data = rpcServer.getData() + let res = await request.respond(Http200, data, headers) + trace "JSON-RPC result has been sent" + return res + except CancelledError as exc: + raise exc + except CatchableError as exc: + debug "Internal error while processing JSON-RPC call" + return defaultResponse(exc) + +proc addHttpServer( + rpcServer: TestHttpServer, + address: TransportAddress, + socketFlags: set[ServerFlags] = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr}, + serverUri = Uri(), + serverIdent = "", + maxConnections: int = -1, + bufferSize: int = 4096, + backlogSize: int = 100, + httpHeadersTimeout = 10.seconds, + maxHeadersSize: int = 8192, + maxRequestBodySize: int = 1_048_576) = + let server = HttpServerRef.new( + address, + processClientRpc(rpcServer), + {}, + socketFlags, + serverUri, "nim-json-rpc", maxConnections, backlogSize, + bufferSize, httpHeadersTimeout, maxHeadersSize, maxRequestBodySize + ).valueOr: + error "Failed to create server", address = $address, + message = error + raise newException(RpcBindError, "Unable to create server: " & $error) + info "Starting JSON-RPC HTTP server", url = "http://" & $address + + privateAccess(RpcHttpServer) + rpcServer.httpServers.add server + +proc new(T: type TestHttpServer, getData: proc(): string {.gcsafe, raises: [].}): T = + T( + router: RpcRouter.init(), + maxChunkSize: 8192, + getData: getData, + ) + +suite "test rpc http client": + let server = TestHttpServer.new(proc(): string {.gcsafe, raises: [].} = + return """{"jsonrpc":"2.0","result":10}""" + ) + let serverAddress = initTAddress("127.0.0.1:0") + server.addHttpServer(serverAddress) + + var client = newRpcHttpClient() + server.start() + waitFor client.connect("http://" & $server.localAddress()[0]) + + test "missing id in server response": + expect JsonRpcError: + let res = waitFor client.get_Banana(11) + discard res + + waitFor server.stop() + waitFor server.closeWait() + + +type + TestWsServer = ref object of RpcWebSocketServer + getData: proc(): string {.gcsafe, raises: [].} + +proc handleRequest(rpc: TestWsServer, request: websock.HttpRequest) + {.async: (raises: [CancelledError]).} = + try: + let server = rpc.wsserver + let ws = await server.handleRequest(request) + if ws.readyState != ReadyState.Open: + error "Failed to open websocket connection" + return + + trace "Websocket handshake completed" + while ws.readyState != ReadyState.Closed: + let recvData = await ws.recvMsg() + trace "Client message: ", size = recvData.len, binary = ws.binary + + if ws.readyState == ReadyState.Closed: + # if session already terminated by peer, + # no need to send response + break + + if recvData.len == 0: + await ws.close( + reason = "cannot process zero length message" + ) + break + + let data = rpc.getData() + + trace "RPC result has been sent", address = $request.uri + await ws.send(data) + + except WebSocketError as exc: + error "WebSocket error:", exception = exc.msg + + except CancelledError as exc: + raise exc + + except CatchableError as exc: + error "Something error", msg=exc.msg + +proc newWsServer(address: TransportAddress, getData: proc(): string {.gcsafe, raises: [].}): TestWsServer = + + let flags = {ServerFlags.TcpNoDelay,ServerFlags.ReuseAddr} + var server = new(TestWsServer) + proc processCallback(request: websock.HttpRequest): Future[void] = + handleRequest(server, request) + + privateAccess(RpcWebSocketServer) + + server.getData = getData + server.wsserver = WSServer.new(rng = HmacDrbgContext.new()) + server.server = websock.HttpServer.create( + address, + processCallback, + flags + ) + + server + +suite "test ws http client": + let serverAddress = initTAddress("127.0.0.1:0") + let server = newWsServer(serverAddress, proc(): string {.gcsafe, raises: [].} = + return """{"jsonrpc":"2.0","result":10}""" + ) + + var client = newRpcWebSocketClient() + server.start() + waitFor client.connect("ws://" & $server.localAddress()) + + test "missing id in server response": + expect JsonRpcError: + let res = waitFor client.get_Banana(11) + discard res + + server.stop() + waitFor server.closeWait()