diff --git a/json_rpc.nimble b/json_rpc.nimble index c20807f..464532a 100644 --- a/json_rpc.nimble +++ b/json_rpc.nimble @@ -11,7 +11,8 @@ requires "nim >= 0.17.3", "stint", "chronos", "httputils", - "chronicles" + "chronicles", + "news" proc configForTests() = --hints: off diff --git a/json_rpc/client.nim b/json_rpc/client.nim index add3e67..59e9e32 100644 --- a/json_rpc/client.nim +++ b/json_rpc/client.nim @@ -9,12 +9,14 @@ type RpcClient* = ref object of RootRef awaiting*: Table[ClientId, Future[Response]] nextId: ClientId + methodHandlers: Table[string, proc(j: JsonNode)] Response* = tuple[error: bool, result: JsonNode] proc initRpcClient*[T: RpcClient](client: var T) = client.awaiting = initTable[ClientId, Future[Response]]() client.nextId = 1 + client.methodHandlers = initTable[string, proc(j: JsonNode)]() proc getNextId*(client: RpcClient): ClientId = result = client.nextId @@ -23,6 +25,11 @@ proc getNextId*(client: RpcClient): ClientId = proc rpcCallNode*(path: string, params: JsonNode, id: ClientId): JsonNode = %{"jsonrpc": %"2.0", "method": %path, "params": params, "id": %id} +method call*(client: RpcClient, name: string, + params: JsonNode): Future[Response] {.async, base.} = discard + +method close*(client: RpcClient) {.base, async.} = discard + template asyncRaise[T](fut: Future[T], errType: typedesc, msg: string) = fut.fail(newException(errType, msg)) @@ -45,32 +52,41 @@ macro checkGet(node: JsonNode, fieldName: string, of JObject: result.add(quote do: `n`.getObject) else: discard -proc processMessage*[T: RpcClient](self: T, line: string) = +proc processMessage*(self: RpcClient, line: string) = # Note: this doesn't use any transport code so doesn't need to be # differentiated. - let - node = parseJson(line) - id = checkGet(node, "id", JInt) + let node = parseJson(line) - if not self.awaiting.hasKey(id): - raise newException(ValueError, - "Cannot find message id \"" & node["id"].str & "\"") + if "id" in node: + let id = checkGet(node, "id", JInt) - let version = checkGet(node, "jsonrpc", JString) - if version != "2.0": - self.awaiting[id].asyncRaise(ValueError, - "Unsupported version of JSON, expected 2.0, received \"" & version & "\"") + if not self.awaiting.hasKey(id): + raise newException(ValueError, + "Cannot find message id \"" & node["id"].str & "\"") - let errorNode = node{"error"} - if errorNode.isNil or errorNode.kind == JNull: - var res = node{"result"} - if not res.isNil: - self.awaiting[id].complete((false, res)) - self.awaiting.del(id) - # TODO: actions on unable find result node + let version = checkGet(node, "jsonrpc", JString) + if version != "2.0": + self.awaiting[id].asyncRaise(ValueError, + "Unsupported version of JSON, expected 2.0, received \"" & version & "\"") + + let errorNode = node{"error"} + if errorNode.isNil or errorNode.kind == JNull: + var res = node{"result"} + if not res.isNil: + self.awaiting[id].complete((false, res)) + self.awaiting.del(id) + # TODO: actions on unable find result node + else: + self.awaiting[id].fail(newException(ValueError, $errorNode)) + self.awaiting.del(id) + elif "method" in node: + # This could be subscription notification + let name = node["method"].getStr() + let handler = self.methodHandlers.getOrDefault(name) + if not handler.isNil: + handler(node{"params"}) else: - self.awaiting[id].fail(newException(ValueError, $errorNode)) - self.awaiting.del(id) + raise newException(ValueError, "Invalid jsonrpc message: " & $node) # Signature processing @@ -173,6 +189,12 @@ proc processRpcSigs(clientType, parsedCode: NimNode): NimNode = var procDef = createRpcFromSig(clientType, line) result.add(procDef) +proc setMethodHandler*(cl: RpcClient, name: string, callback: proc(j: JsonNode)) = + cl.methodHandlers[name] = callback + +proc delMethodHandler*(cl: RpcClient, name: string) = + cl.methodHandlers.del(name) + macro createRpcSigs*(clientType: untyped, filePath: static[string]): untyped = ## Takes a file of forward declarations in Nim and builds them into RPC ## calls, based on their parameters. diff --git a/json_rpc/clients/httpclient.nim b/json_rpc/clients/httpclient.nim index cfd4b9e..929a2ae 100644 --- a/json_rpc/clients/httpclient.nim +++ b/json_rpc/clients/httpclient.nim @@ -10,7 +10,6 @@ type httpMethod: HttpMethod RpcHttpClient* = ref object of RpcClient - transp*: StreamTransport loop: Future[void] addresses: seq[TransportAddress] options: HttpClientOptions @@ -69,13 +68,11 @@ proc validateResponse*(transp: StreamTransport, if header["Transfer-Encoding"].toLowerAscii() == "chunked": debug "Chunked encoding is not supported", address = transp.remoteAddress() - result = false - return else: debug "Content body size could not be calculated", address = transp.remoteAddress() - result = false - return + result = false + return result = true @@ -116,7 +113,6 @@ proc recvData(transp: StreamTransport): Future[string] {.async.} = error = true if error or not transp.validateResponse(header): - await transp.closeWait() result = "" return @@ -156,13 +152,12 @@ proc recvData(transp: StreamTransport): Future[string] {.async.} = error = true if error: - await transp.closeWait() result = "" else: result = cast[string](buffer) proc init(opts: var HttpClientOptions) = - opts.httpMethod = MethodGet + opts.httpMethod = MethodPost proc newRpcHttpClient*(): RpcHttpClient = ## Creates a new HTTP client instance. @@ -176,60 +171,36 @@ proc httpMethod*(client: RpcHttpClient): HttpMethod = proc httpMethod*(client: RpcHttpClient, m: HttpMethod) = client.options.httpMethod = m -proc call*(client: RpcHttpClient, name: string, - params: JsonNode, httpMethod: HttpMethod): Future[Response] {.async.} = +method call*(client: RpcHttpClient, name: string, + params: JsonNode): Future[Response] {.async.} = ## Remotely calls the specified RPC method. let id = client.getNextId() - var value = $rpcCallNode(name, params, id) & "\c\l" - if isNil(client.transp) or client.transp.closed(): - raise newException(ValueError, - "Transport is not initialised or already closed") - let res = await client.transp.sendRequest(value, httpMethod) + let transp = await connect(client.addresses[0]) + var reqBody = $rpcCallNode(name, params, id) + echo "Sending (", client.httpMethod, "): ", reqBody + let res = await transp.sendRequest(reqBody, client.httpMethod) if not res: debug "Failed to send message to RPC server", - address = client.transp.remoteAddress(), msg_len = len(value) - await client.transp.closeWait() + address = transp.remoteAddress(), msg_len = len(reqBody) + await transp.closeWait() raise newException(ValueError, "Transport error") else: - debug "Message sent to RPC server", address = client.transp.remoteAddress(), - msg_len = len(value) - trace "Message", msg = value + debug "Message sent to RPC server", address = transp.remoteAddress(), + msg_len = len(reqBody) + trace "Message", msg = reqBody + + var value = await transp.recvData() + await transp.closeWait() + if value.len == 0: + raise newException(ValueError, "Empty response from server") # completed by processMessage. var newFut = newFuture[Response]() # add to awaiting responses client.awaiting[id] = newFut + client.processMessage(value) result = await newFut -template call*(client: RpcHttpClient, name: string, - params: JsonNode): untyped = - client.call(name, params, client.httpMethod) - -proc processData(client: RpcHttpClient) {.async.} = - while true: - while true: - var value = await client.transp.recvData() - debug "Returned from recvData()", address = client.transp.remoteAddress() - if value == "": - debug "Empty response from RPC server", - address = client.transp.remoteAddress() - break - debug "Received response from RPC server", - address = client.transp.remoteAddress(), - msg_len = len(value) - trace "Message", msg = value - client.processMessage(value) - - # async loop reconnection and waiting - try: - client.transp = await connect(client.addresses[0]) - except: - debug "Could not establish new connection to RPC server", - address = client.addresses[0] - break - proc connect*(client: RpcHttpClient, address: string, port: Port) {.async.} = client.addresses = resolveTAddress(address, port) - client.transp = await connect(client.addresses[0]) - client.loop = processData(client) diff --git a/json_rpc/clients/socketclient.nim b/json_rpc/clients/socketclient.nim index 8a801be..1a8064a 100644 --- a/json_rpc/clients/socketclient.nim +++ b/json_rpc/clients/socketclient.nim @@ -13,7 +13,7 @@ proc newRpcSocketClient*: RpcSocketClient = new result result.initRpcClient() -proc call*(self: RpcSocketClient, name: string, +method call*(self: RpcSocketClient, name: string, params: JsonNode): Future[Response] {.async.} = ## Remotely calls the specified RPC method. let id = self.getNextId() @@ -49,3 +49,7 @@ proc connect*(client: RpcSocketClient, address: string, port: Port) {.async.} = client.transport = await connect(addresses[0]) client.address = addresses[0] client.loop = processData(client) + +method close*(client: RpcSocketClient) {.async.} = + # TODO: Stop the processData loop + await client.transport.closeWait() diff --git a/json_rpc/clients/websocketclient.nim b/json_rpc/clients/websocketclient.nim new file mode 100644 index 0000000..714e147 --- /dev/null +++ b/json_rpc/clients/websocketclient.nim @@ -0,0 +1,54 @@ +import ../client, chronos, tables, json + +const newsUseChronos = true +include news + +type + RpcWebSocketClient* = ref object of RpcClient + transport*: WebSocket + uri*: string + loop*: Future[void] + +proc newRpcWebSocketClient*: RpcWebSocketClient = + ## Creates a new client instance. + new result + result.initRpcClient() + +method call*(self: RpcWebSocketClient, name: string, + params: JsonNode): Future[Response] {.async.} = + ## Remotely calls the specified RPC method. + let id = self.getNextId() + var value = $rpcCallNode(name, params, id) & "\c\l" + if self.transport.isNil: + raise newException(ValueError, + "Transport is not initialised (missing a call to connect?)") + # echo "Sent msg: ", value + await self.transport.send(value) + + # completed by processMessage. + var newFut = newFuture[Response]() + # add to awaiting responses + self.awaiting[id] = newFut + result = await newFut + +proc processData(client: RpcWebSocketClient) {.async.} = + while true: + while true: + var value = await client.transport.receivePacket() + if value == "": + # transmission ends + client.transport.close() + break + + client.processMessage(value) + # async loop reconnection and waiting + client.transport = await newWebSocket(client.uri) + +proc connect*(client: RpcWebSocketClient, uri: string) {.async.} = + client.transport = await newWebSocket(uri) + client.uri = uri + client.loop = processData(client) + +method close*(client: RpcWebSocketClient) {.async.} = + # TODO: Stop the processData loop + client.transport.close() diff --git a/json_rpc/rpcclient.nim b/json_rpc/rpcclient.nim index 03224d5..5b1de04 100644 --- a/json_rpc/rpcclient.nim +++ b/json_rpc/rpcclient.nim @@ -1,3 +1,3 @@ import client -import clients/[socketclient, httpclient] -export client, socketclient, httpclient +import clients/[socketclient, httpclient, websocketclient] +export client, socketclient, httpclient, websocketclient diff --git a/tests/testhttp.nim b/tests/testhttp.nim index ec9f99c..04c9dca 100644 --- a/tests/testhttp.nim +++ b/tests/testhttp.nim @@ -66,7 +66,7 @@ proc continuousTest(address: string, port: Port): Future[int] {.async.} = var r = await client.call("myProc", %[%"abc", %[1, 2, 3, i]]) if r.result.getStr == "Hello abc data: [1, 2, 3, " & $i & "]": result += 1 - client.transp.close() + await client.close() proc customMessage(address: TransportAddress, data: string,