From f19d66b35c073065a6886623f87e64ae9a27f31e Mon Sep 17 00:00:00 2001 From: Yuriy Glukhov Date: Wed, 12 Jun 2019 16:44:19 +0300 Subject: [PATCH 1/3] Websocket client support and some fixes --- json_rpc.nimble | 4 +- json_rpc/client.nim | 62 +++++++++++++++++-------- json_rpc/clients/httpclient.nim | 69 ++++++++-------------------- json_rpc/clients/socketclient.nim | 6 ++- json_rpc/clients/websocketclient.nim | 54 ++++++++++++++++++++++ json_rpc/jsonmarshal.nim | 24 ++++++++-- json_rpc/rpcclient.nim | 4 +- tests/testhttp.nim | 2 +- 8 files changed, 148 insertions(+), 77 deletions(-) create mode 100644 json_rpc/clients/websocketclient.nim diff --git a/json_rpc.nimble b/json_rpc.nimble index c20807f..04902bc 100644 --- a/json_rpc.nimble +++ b/json_rpc.nimble @@ -11,7 +11,9 @@ requires "nim >= 0.17.3", "stint", "chronos", "httputils", - "chronicles" + "chronicles", + "byteutils", + "news" proc configForTests() = --hints: off diff --git a/json_rpc/client.nim b/json_rpc/client.nim index add3e67..464ba66 100644 --- a/json_rpc/client.nim +++ b/json_rpc/client.nim @@ -9,12 +9,14 @@ type RpcClient* = ref object of RootRef awaiting*: Table[ClientId, Future[Response]] nextId: ClientId + methodHandlers: Table[string, proc(j: JsonNode)] Response* = tuple[error: bool, result: JsonNode] proc initRpcClient*[T: RpcClient](client: var T) = client.awaiting = initTable[ClientId, Future[Response]]() client.nextId = 1 + client.methodHandlers = initTable[string, proc(j: JsonNode)]() proc getNextId*(client: RpcClient): ClientId = result = client.nextId @@ -23,6 +25,11 @@ proc getNextId*(client: RpcClient): ClientId = proc rpcCallNode*(path: string, params: JsonNode, id: ClientId): JsonNode = %{"jsonrpc": %"2.0", "method": %path, "params": params, "id": %id} +method call*(client: RpcClient, name: string, + params: JsonNode): Future[Response] {.async, base.} = discard + +method close*(client: RpcClient) {.base.} = discard + template asyncRaise[T](fut: Future[T], errType: typedesc, msg: string) = fut.fail(newException(errType, msg)) @@ -45,32 +52,41 @@ macro checkGet(node: JsonNode, fieldName: string, of JObject: result.add(quote do: `n`.getObject) else: discard -proc processMessage*[T: RpcClient](self: T, line: string) = +proc processMessage*(self: RpcClient, line: string) = # Note: this doesn't use any transport code so doesn't need to be # differentiated. - let - node = parseJson(line) - id = checkGet(node, "id", JInt) + let node = parseJson(line) - if not self.awaiting.hasKey(id): - raise newException(ValueError, - "Cannot find message id \"" & node["id"].str & "\"") + if "id" in node: + let id = checkGet(node, "id", JInt) - let version = checkGet(node, "jsonrpc", JString) - if version != "2.0": - self.awaiting[id].asyncRaise(ValueError, - "Unsupported version of JSON, expected 2.0, received \"" & version & "\"") + if not self.awaiting.hasKey(id): + raise newException(ValueError, + "Cannot find message id \"" & node["id"].str & "\"") - let errorNode = node{"error"} - if errorNode.isNil or errorNode.kind == JNull: - var res = node{"result"} - if not res.isNil: - self.awaiting[id].complete((false, res)) - self.awaiting.del(id) - # TODO: actions on unable find result node + let version = checkGet(node, "jsonrpc", JString) + if version != "2.0": + self.awaiting[id].asyncRaise(ValueError, + "Unsupported version of JSON, expected 2.0, received \"" & version & "\"") + + let errorNode = node{"error"} + if errorNode.isNil or errorNode.kind == JNull: + var res = node{"result"} + if not res.isNil: + self.awaiting[id].complete((false, res)) + self.awaiting.del(id) + # TODO: actions on unable find result node + else: + self.awaiting[id].fail(newException(ValueError, $errorNode)) + self.awaiting.del(id) + elif "method" in node: + # This could be subscription notification + let name = node["method"].getStr() + let handler = self.methodHandlers.getOrDefault(name) + if not handler.isNil: + handler(node{"params"}) else: - self.awaiting[id].fail(newException(ValueError, $errorNode)) - self.awaiting.del(id) + raise newException(ValueError, "Invalid jsonrpc message: " & $node) # Signature processing @@ -173,6 +189,12 @@ proc processRpcSigs(clientType, parsedCode: NimNode): NimNode = var procDef = createRpcFromSig(clientType, line) result.add(procDef) +proc setMethodHandler*(cl: RpcClient, name: string, callback: proc(j: JsonNode)) = + cl.methodHandlers[name] = callback + +proc delMethodHandler*(cl: RpcClient, name: string) = + cl.methodHandlers.del(name) + macro createRpcSigs*(clientType: untyped, filePath: static[string]): untyped = ## Takes a file of forward declarations in Nim and builds them into RPC ## calls, based on their parameters. diff --git a/json_rpc/clients/httpclient.nim b/json_rpc/clients/httpclient.nim index cfd4b9e..595159e 100644 --- a/json_rpc/clients/httpclient.nim +++ b/json_rpc/clients/httpclient.nim @@ -10,7 +10,6 @@ type httpMethod: HttpMethod RpcHttpClient* = ref object of RpcClient - transp*: StreamTransport loop: Future[void] addresses: seq[TransportAddress] options: HttpClientOptions @@ -69,13 +68,11 @@ proc validateResponse*(transp: StreamTransport, if header["Transfer-Encoding"].toLowerAscii() == "chunked": debug "Chunked encoding is not supported", address = transp.remoteAddress() - result = false - return else: debug "Content body size could not be calculated", address = transp.remoteAddress() - result = false - return + result = false + return result = true @@ -116,7 +113,6 @@ proc recvData(transp: StreamTransport): Future[string] {.async.} = error = true if error or not transp.validateResponse(header): - await transp.closeWait() result = "" return @@ -156,13 +152,12 @@ proc recvData(transp: StreamTransport): Future[string] {.async.} = error = true if error: - await transp.closeWait() result = "" else: result = cast[string](buffer) proc init(opts: var HttpClientOptions) = - opts.httpMethod = MethodGet + opts.httpMethod = MethodPost proc newRpcHttpClient*(): RpcHttpClient = ## Creates a new HTTP client instance. @@ -176,60 +171,36 @@ proc httpMethod*(client: RpcHttpClient): HttpMethod = proc httpMethod*(client: RpcHttpClient, m: HttpMethod) = client.options.httpMethod = m -proc call*(client: RpcHttpClient, name: string, - params: JsonNode, httpMethod: HttpMethod): Future[Response] {.async.} = +method call*(client: RpcHttpClient, name: string, + params: JsonNode): Future[Response] {.async.} = ## Remotely calls the specified RPC method. let id = client.getNextId() - var value = $rpcCallNode(name, params, id) & "\c\l" - if isNil(client.transp) or client.transp.closed(): - raise newException(ValueError, - "Transport is not initialised or already closed") - let res = await client.transp.sendRequest(value, httpMethod) + let transp = await connect(client.addresses[0]) + var reqBody = $rpcCallNode(name, params, id) + echo "Sending (", client.httpMethod, "): ", reqBody + let res = await transp.sendRequest(reqBody, client.httpMethod) if not res: debug "Failed to send message to RPC server", - address = client.transp.remoteAddress(), msg_len = len(value) - await client.transp.closeWait() + address = transp.remoteAddress(), msg_len = len(reqBody) + transp.close() raise newException(ValueError, "Transport error") else: - debug "Message sent to RPC server", address = client.transp.remoteAddress(), - msg_len = len(value) - trace "Message", msg = value + debug "Message sent to RPC server", address = transp.remoteAddress(), + msg_len = len(reqBody) + trace "Message", msg = reqBody + + var value = await transp.recvData() + transp.close() + if value.len == 0: + raise newException(ValueError, "Empty response from server") # completed by processMessage. var newFut = newFuture[Response]() # add to awaiting responses client.awaiting[id] = newFut + client.processMessage(value) result = await newFut -template call*(client: RpcHttpClient, name: string, - params: JsonNode): untyped = - client.call(name, params, client.httpMethod) - -proc processData(client: RpcHttpClient) {.async.} = - while true: - while true: - var value = await client.transp.recvData() - debug "Returned from recvData()", address = client.transp.remoteAddress() - if value == "": - debug "Empty response from RPC server", - address = client.transp.remoteAddress() - break - debug "Received response from RPC server", - address = client.transp.remoteAddress(), - msg_len = len(value) - trace "Message", msg = value - client.processMessage(value) - - # async loop reconnection and waiting - try: - client.transp = await connect(client.addresses[0]) - except: - debug "Could not establish new connection to RPC server", - address = client.addresses[0] - break - proc connect*(client: RpcHttpClient, address: string, port: Port) {.async.} = client.addresses = resolveTAddress(address, port) - client.transp = await connect(client.addresses[0]) - client.loop = processData(client) diff --git a/json_rpc/clients/socketclient.nim b/json_rpc/clients/socketclient.nim index 8a801be..593bff3 100644 --- a/json_rpc/clients/socketclient.nim +++ b/json_rpc/clients/socketclient.nim @@ -13,7 +13,7 @@ proc newRpcSocketClient*: RpcSocketClient = new result result.initRpcClient() -proc call*(self: RpcSocketClient, name: string, +method call*(self: RpcSocketClient, name: string, params: JsonNode): Future[Response] {.async.} = ## Remotely calls the specified RPC method. let id = self.getNextId() @@ -49,3 +49,7 @@ proc connect*(client: RpcSocketClient, address: string, port: Port) {.async.} = client.transport = await connect(addresses[0]) client.address = addresses[0] client.loop = processData(client) + +method close*(client: RpcSocketClient) = + # TODO: Stop the processData loop + client.transport.close() diff --git a/json_rpc/clients/websocketclient.nim b/json_rpc/clients/websocketclient.nim new file mode 100644 index 0000000..b1d59d9 --- /dev/null +++ b/json_rpc/clients/websocketclient.nim @@ -0,0 +1,54 @@ +import ../client, chronos, tables, json + +const newsUseChronos = true +include news + +type + RpcWebSocketClient* = ref object of RpcClient + transport*: WebSocket + uri*: string + loop*: Future[void] + +proc newRpcWebSocketClient*: RpcWebSocketClient = + ## Creates a new client instance. + new result + result.initRpcClient() + +method call*(self: RpcWebSocketClient, name: string, + params: JsonNode): Future[Response] {.async.} = + ## Remotely calls the specified RPC method. + let id = self.getNextId() + var value = $rpcCallNode(name, params, id) & "\c\l" + if self.transport.isNil: + raise newException(ValueError, + "Transport is not initialised (missing a call to connect?)") + # echo "Sent msg: ", value + await self.transport.send(value) + + # completed by processMessage. + var newFut = newFuture[Response]() + # add to awaiting responses + self.awaiting[id] = newFut + result = await newFut + +proc processData(client: RpcWebSocketClient) {.async.} = + while true: + while true: + var value = await client.transport.receivePacket() + if value == "": + # transmission ends + client.transport.close() + break + + client.processMessage(value) + # async loop reconnection and waiting + client.transport = await newWebSocket(client.uri) + +proc connect*(client: RpcWebSocketClient, uri: string) {.async.} = + client.transport = await newWebSocket(uri) + client.uri = uri + client.loop = processData(client) + +method close*(client: RpcWebSocketClient) = + # TODO: Stop the processData loop + client.transport.close() diff --git a/json_rpc/jsonmarshal.nim b/json_rpc/jsonmarshal.nim index b0c5a48..82f80b6 100644 --- a/json_rpc/jsonmarshal.nim +++ b/json_rpc/jsonmarshal.nim @@ -1,4 +1,4 @@ -import macros, json, options, typetraits +import macros, json, options, strutils, typetraits, byteutils proc expect*(actual, expected: JsonNodeKind, argName: string) = if actual != expected: raise newException(ValueError, "Parameter [" & argName & "] expected " & $expected & " but got " & $actual) @@ -36,6 +36,13 @@ proc fromJson(n: JsonNode, argName: string, result: var ref int64) proc fromJson(n: JsonNode, argName: string, result: var ref int) proc fromJson[T](n: JsonNode, argName: string, result: var Option[T]) +proc parseStringToInt(s: string): int = + ## If s starts with '0x' parse as hexadecimal, otherwise parse as decimal. + if s.len > 2 and s[0] == '0' and s[1] in {'x', 'X'}: + result = parseHexInt(s) + else: + result = parseInt(s) + # This can't be forward declared: https://github.com/nim-lang/Nim/issues/7868 proc fromJson[T: enum](n: JsonNode, argName: string, result: var T) = n.kind.expect(JInt, argName) @@ -62,8 +69,11 @@ proc fromJson(n: JsonNode, argName: string, result: var bool) = result = n.getBool() proc fromJson(n: JsonNode, argName: string, result: var int) = - n.kind.expect(JInt, argName) - result = n.getInt() + if n.kind == JString: + result = n.getStr().parseStringToInt() + else: + n.kind.expect(JInt, argName) + result = n.getInt() proc fromJson[T: ref object](n: JsonNode, argName: string, result: var T) = n.kind.expect(JObject, argName) @@ -110,6 +120,14 @@ proc fromJson[T](n: JsonNode, argName: string, result: var seq[T]) = fromJson(n[i], argName, result[i]) proc fromJson[N, T](n: JsonNode, argName: string, result: var array[N, T]) = + when T is byte: + if n.kind == JString: + let s = n.getStr + if s.len >= result.len + 2 and # (2 for 0x prefix) + s[0] == '0' and s[1] in {'x', 'X'}: + hexToByteArray(n.getStr, result) + return + n.kind.expect(JArray, argName) if n.len > result.len: raise newException(ValueError, "Parameter \"" & argName & "\" item count is too big for array") for i in 0 ..< n.len: diff --git a/json_rpc/rpcclient.nim b/json_rpc/rpcclient.nim index 03224d5..5b1de04 100644 --- a/json_rpc/rpcclient.nim +++ b/json_rpc/rpcclient.nim @@ -1,3 +1,3 @@ import client -import clients/[socketclient, httpclient] -export client, socketclient, httpclient +import clients/[socketclient, httpclient, websocketclient] +export client, socketclient, httpclient, websocketclient diff --git a/tests/testhttp.nim b/tests/testhttp.nim index ec9f99c..d588ebc 100644 --- a/tests/testhttp.nim +++ b/tests/testhttp.nim @@ -66,7 +66,7 @@ proc continuousTest(address: string, port: Port): Future[int] {.async.} = var r = await client.call("myProc", %[%"abc", %[1, 2, 3, i]]) if r.result.getStr == "Hello abc data: [1, 2, 3, " & $i & "]": result += 1 - client.transp.close() + client.close() proc customMessage(address: TransportAddress, data: string, From ac2f6b9360499c819a5d510f173a7994b9cae391 Mon Sep 17 00:00:00 2001 From: Yuriy Glukhov Date: Mon, 17 Jun 2019 18:44:17 +0300 Subject: [PATCH 2/3] Comments addressed --- json_rpc.nimble | 1 - json_rpc/jsonmarshal.nim | 24 +++--------------------- 2 files changed, 3 insertions(+), 22 deletions(-) diff --git a/json_rpc.nimble b/json_rpc.nimble index 04902bc..464532a 100644 --- a/json_rpc.nimble +++ b/json_rpc.nimble @@ -12,7 +12,6 @@ requires "nim >= 0.17.3", "chronos", "httputils", "chronicles", - "byteutils", "news" proc configForTests() = diff --git a/json_rpc/jsonmarshal.nim b/json_rpc/jsonmarshal.nim index 82f80b6..b0c5a48 100644 --- a/json_rpc/jsonmarshal.nim +++ b/json_rpc/jsonmarshal.nim @@ -1,4 +1,4 @@ -import macros, json, options, strutils, typetraits, byteutils +import macros, json, options, typetraits proc expect*(actual, expected: JsonNodeKind, argName: string) = if actual != expected: raise newException(ValueError, "Parameter [" & argName & "] expected " & $expected & " but got " & $actual) @@ -36,13 +36,6 @@ proc fromJson(n: JsonNode, argName: string, result: var ref int64) proc fromJson(n: JsonNode, argName: string, result: var ref int) proc fromJson[T](n: JsonNode, argName: string, result: var Option[T]) -proc parseStringToInt(s: string): int = - ## If s starts with '0x' parse as hexadecimal, otherwise parse as decimal. - if s.len > 2 and s[0] == '0' and s[1] in {'x', 'X'}: - result = parseHexInt(s) - else: - result = parseInt(s) - # This can't be forward declared: https://github.com/nim-lang/Nim/issues/7868 proc fromJson[T: enum](n: JsonNode, argName: string, result: var T) = n.kind.expect(JInt, argName) @@ -69,11 +62,8 @@ proc fromJson(n: JsonNode, argName: string, result: var bool) = result = n.getBool() proc fromJson(n: JsonNode, argName: string, result: var int) = - if n.kind == JString: - result = n.getStr().parseStringToInt() - else: - n.kind.expect(JInt, argName) - result = n.getInt() + n.kind.expect(JInt, argName) + result = n.getInt() proc fromJson[T: ref object](n: JsonNode, argName: string, result: var T) = n.kind.expect(JObject, argName) @@ -120,14 +110,6 @@ proc fromJson[T](n: JsonNode, argName: string, result: var seq[T]) = fromJson(n[i], argName, result[i]) proc fromJson[N, T](n: JsonNode, argName: string, result: var array[N, T]) = - when T is byte: - if n.kind == JString: - let s = n.getStr - if s.len >= result.len + 2 and # (2 for 0x prefix) - s[0] == '0' and s[1] in {'x', 'X'}: - hexToByteArray(n.getStr, result) - return - n.kind.expect(JArray, argName) if n.len > result.len: raise newException(ValueError, "Parameter \"" & argName & "\" item count is too big for array") for i in 0 ..< n.len: From b583dfb029bac8035148236d3a79842a67adcad9 Mon Sep 17 00:00:00 2001 From: Yuriy Glukhov Date: Mon, 17 Jun 2019 19:56:19 +0300 Subject: [PATCH 3/3] Use closeWait --- json_rpc/client.nim | 2 +- json_rpc/clients/httpclient.nim | 4 ++-- json_rpc/clients/socketclient.nim | 4 ++-- json_rpc/clients/websocketclient.nim | 2 +- tests/testhttp.nim | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/json_rpc/client.nim b/json_rpc/client.nim index 464ba66..59e9e32 100644 --- a/json_rpc/client.nim +++ b/json_rpc/client.nim @@ -28,7 +28,7 @@ proc rpcCallNode*(path: string, params: JsonNode, id: ClientId): JsonNode = method call*(client: RpcClient, name: string, params: JsonNode): Future[Response] {.async, base.} = discard -method close*(client: RpcClient) {.base.} = discard +method close*(client: RpcClient) {.base, async.} = discard template asyncRaise[T](fut: Future[T], errType: typedesc, msg: string) = fut.fail(newException(errType, msg)) diff --git a/json_rpc/clients/httpclient.nim b/json_rpc/clients/httpclient.nim index 595159e..929a2ae 100644 --- a/json_rpc/clients/httpclient.nim +++ b/json_rpc/clients/httpclient.nim @@ -183,7 +183,7 @@ method call*(client: RpcHttpClient, name: string, if not res: debug "Failed to send message to RPC server", address = transp.remoteAddress(), msg_len = len(reqBody) - transp.close() + await transp.closeWait() raise newException(ValueError, "Transport error") else: debug "Message sent to RPC server", address = transp.remoteAddress(), @@ -191,7 +191,7 @@ method call*(client: RpcHttpClient, name: string, trace "Message", msg = reqBody var value = await transp.recvData() - transp.close() + await transp.closeWait() if value.len == 0: raise newException(ValueError, "Empty response from server") diff --git a/json_rpc/clients/socketclient.nim b/json_rpc/clients/socketclient.nim index 593bff3..1a8064a 100644 --- a/json_rpc/clients/socketclient.nim +++ b/json_rpc/clients/socketclient.nim @@ -50,6 +50,6 @@ proc connect*(client: RpcSocketClient, address: string, port: Port) {.async.} = client.address = addresses[0] client.loop = processData(client) -method close*(client: RpcSocketClient) = +method close*(client: RpcSocketClient) {.async.} = # TODO: Stop the processData loop - client.transport.close() + await client.transport.closeWait() diff --git a/json_rpc/clients/websocketclient.nim b/json_rpc/clients/websocketclient.nim index b1d59d9..714e147 100644 --- a/json_rpc/clients/websocketclient.nim +++ b/json_rpc/clients/websocketclient.nim @@ -49,6 +49,6 @@ proc connect*(client: RpcWebSocketClient, uri: string) {.async.} = client.uri = uri client.loop = processData(client) -method close*(client: RpcWebSocketClient) = +method close*(client: RpcWebSocketClient) {.async.} = # TODO: Stop the processData loop client.transport.close() diff --git a/tests/testhttp.nim b/tests/testhttp.nim index d588ebc..04c9dca 100644 --- a/tests/testhttp.nim +++ b/tests/testhttp.nim @@ -66,7 +66,7 @@ proc continuousTest(address: string, port: Port): Future[int] {.async.} = var r = await client.call("myProc", %[%"abc", %[1, 2, 3, i]]) if r.result.getStr == "Hello abc data: [1, 2, 3, " & $i & "]": result += 1 - client.close() + await client.close() proc customMessage(address: TransportAddress, data: string,