From 1ca18476000a9308e33e6745275d6a530fcd9e00 Mon Sep 17 00:00:00 2001 From: coffeepots Date: Thu, 12 Jul 2018 18:36:40 +0100 Subject: [PATCH] Make client usable with different transports --- json_rpc/client.nim | 89 +++++-------------- json_rpc/clients/socketclient.nim | 52 +++++++++++ .../socket.nim => servers/socketserver.nim} | 0 rpcclient.nim | 6 +- rpcserver.nim | 4 +- tests/testethcalls.nim | 2 +- 6 files changed, 83 insertions(+), 70 deletions(-) create mode 100644 json_rpc/clients/socketclient.nim rename json_rpc/{transports/socket.nim => servers/socketserver.nim} (100%) diff --git a/json_rpc/client.nim b/json_rpc/client.nim index 102717a..dcfb11a 100644 --- a/json_rpc/client.nim +++ b/json_rpc/client.nim @@ -5,43 +5,26 @@ import jsonmarshal export asyncdispatch2 type - RpcClient*[T, A] = ref object - awaiting: Table[string, Future[Response]] - transport: T - address: A - nextId: int64 + ClientId* = int64 + RpcClient* = ref object of RootRef# + awaiting*: Table[ClientId, Future[Response]] + nextId: ClientId Response* = tuple[error: bool, result: JsonNode] -const defaultMaxRequestLength* = 1024 * 128 +proc initRpcClient*[T: RpcClient](client: var T) = + client.awaiting = initTable[ClientId, Future[Response]]() + client.nextId = 1 -proc newRpcClient*[T, A]: RpcClient[T, A] = - ## Creates a new ``RpcClient`` instance. - result = RpcClient[T, A](awaiting: initTable[string, Future[Response]](), nextId: 1) +proc getNextId*(client: RpcClient): ClientId = + result = client.nextId + client.nextId.inc -proc call*(self: RpcClient, name: string, - params: JsonNode): Future[Response] {.async.} = - ## Remotely calls the specified RPC method. - let id = $self.nextId - self.nextId.inc - var - value = - $ %{"jsonrpc": %"2.0", - "method": %name, - "params": params, - "id": %id} & "\c\l" - if self.transport.isNil: - var connectStr = "" - raise newException(ValueError, "Transport is not initialised (missing a call to connect?)") - let res = await self.transport.write(value) - # TODO: Add actions when not full packet was send, e.g. disconnect peer. - assert(res == len(value)) - - # completed by processMessage. - var newFut = newFuture[Response]() - # add to awaiting responses - self.awaiting[id] = newFut - result = await newFut +proc rpcCallNode*(path: string, params: JsonNode, id: ClientId): JsonNode = + %{"jsonrpc": %"2.0", + "method": %path, + "params": params, + "id": %id} template asyncRaise[T](fut: Future[T], errType: typedesc, msg: string) = fut.fail(newException(errType, msg)) @@ -65,11 +48,11 @@ macro checkGet(node: JsonNode, fieldName: string, of JObject: result.add(quote do: `n`.getObject) else: discard -proc processMessage(self: RpcClient, line: string) = +proc processMessage*[T: RpcClient](self: T, line: string) = # Note: this doesn't use any transport code so doesn't need to be differentiated. let - node = parseJson(line) # TODO: Check errors - id = checkGet(node, "id", JString) + node = parseJson(line) + id = checkGet(node, "id", JInt) if not self.awaiting.hasKey(id): raise newException(ValueError, @@ -91,30 +74,6 @@ proc processMessage(self: RpcClient, line: string) = self.awaiting[id].fail(newException(ValueError, $errorNode)) self.awaiting.del(id) -proc processData(client: RpcClient) {.async.} = - while true: - var value = await client.transport.readLine(defaultMaxRequestLength) - if value == "": - # transmission ends - client.transport.close - break - - client.processMessage(value) - # async loop reconnection and waiting - client.transport = await connect(client.address) - -type RpcSocketClient* = RpcClient[StreamTransport, TransportAddress] - -proc connect*(client: RpcSocketClient, address: string, port: Port) {.async.} = - let addresses = resolveTAddress(address, port) - client.transport = await connect(addresses[0]) - client.address = addresses[0] - asyncCheck processData(client) - -proc newRpcSocketClient*(): RpcSocketClient = - ## Create new server and assign it to addresses ``addresses``. - result = newRpcClient[StreamTransport, TransportAddress]() - # Signature processing proc createRpcProc(procName, parameters, callBody: NimNode): NimNode = @@ -140,7 +99,7 @@ proc toJsonArray(parameters: NimNode): NimNode = items.add(nnkPrefix.newTree(ident"%", curParam)) result = nnkPrefix.newTree(bindSym("%", brForceOpen), items) -proc createRpcFromSig*(rpcDecl: NimNode): NimNode = +proc createRpcFromSig*(clientType, rpcDecl: NimNode): NimNode = # Each input parameter in the rpc signature is converted # to json with `%`. # Return types are then converted back to native Nim types. @@ -161,7 +120,7 @@ proc createRpcFromSig*(rpcDecl: NimNode): NimNode = customReturnType = returnType != iJsonNode # insert rpc client as first parameter - parameters.insert(1, nnkIdentDefs.newTree(ident"client", ident"RpcClient", + parameters.insert(1, nnkIdentDefs.newTree(ident"client", ident($clientType), newEmptyNode())) let @@ -208,17 +167,17 @@ proc createRpcFromSig*(rpcDecl: NimNode): NimNode = when defined(nimDumpRpcs): echo pathStr, ":\n", result.repr -proc processRpcSigs(parsedCode: NimNode): NimNode = +proc processRpcSigs(clientType, parsedCode: NimNode): NimNode = result = newStmtList() for line in parsedCode: if line.kind == nnkProcDef: - var procDef = createRpcFromSig(line) + var procDef = createRpcFromSig(clientType, line) result.add(procDef) -macro createRpcSigs*(filePath: static[string]): untyped = +macro createRpcSigs*(clientType: untyped, filePath: static[string]): untyped = ## Takes a file of forward declarations in Nim and builds them into RPC ## calls, based on their parameters. ## Inputs are marshalled to json, and results are put into the signature's ## Nim type. - result = processRpcSigs(staticRead($filePath).parseStmt()) + result = processRpcSigs(clientType, staticRead($filePath).parseStmt()) diff --git a/json_rpc/clients/socketclient.nim b/json_rpc/clients/socketclient.nim new file mode 100644 index 0000000..236dd95 --- /dev/null +++ b/json_rpc/clients/socketclient.nim @@ -0,0 +1,52 @@ +import ../client, asyncdispatch2, tables, json + +type + RpcSocketClient* = ref object of RpcClient + transport*: StreamTransport + address*: TransportAddress + +const defaultMaxRequestLength* = 1024 * 128 + +proc newRpcSocketClient*: RpcSocketClient = + ## Creates a new client instance. + new result + result.initRpcClient() + +proc call*(self: RpcSocketClient, name: string, + params: JsonNode): Future[Response] {.async.} = + ## Remotely calls the specified RPC method. + let id = self.getNextId() + + var + value = + $rpcCallNode(name, params, id) & "\c\l" + if self.transport.isNil: + var connectStr = "" + raise newException(ValueError, "Transport is not initialised (missing a call to connect?)") + let res = await self.transport.write(value) + # TODO: Add actions when not full packet was send, e.g. disconnect peer. + assert(res == len(value)) + + # completed by processMessage. + var newFut = newFuture[Response]() + # add to awaiting responses + self.awaiting[id] = newFut + result = await newFut + +proc processData(client: RpcSocketClient) {.async.} = + while true: + var value = await client.transport.readLine(defaultMaxRequestLength) + if value == "": + # transmission ends + client.transport.close + break + + client.processMessage(value) + # async loop reconnection and waiting + client.transport = await connect(client.address) + +proc connect*(client: RpcSocketClient, address: string, port: Port) {.async.} = + let addresses = resolveTAddress(address, port) + client.transport = await connect(addresses[0]) + client.address = addresses[0] + asyncCheck processData(client) \ No newline at end of file diff --git a/json_rpc/transports/socket.nim b/json_rpc/servers/socketserver.nim similarity index 100% rename from json_rpc/transports/socket.nim rename to json_rpc/servers/socketserver.nim diff --git a/rpcclient.nim b/rpcclient.nim index ebecba9..3a0a4e9 100644 --- a/rpcclient.nim +++ b/rpcclient.nim @@ -1,3 +1,5 @@ -import json_rpc / client -export client +import + json_rpc / client, + json_rpc / clients / socketclient +export client, socketclient diff --git a/rpcserver.nim b/rpcserver.nim index 7568e4d..5b08ecf 100644 --- a/rpcserver.nim +++ b/rpcserver.nim @@ -1,2 +1,2 @@ -import json_rpc / server, json_rpc / transports / socket -export server, socket +import json_rpc / server, json_rpc / servers / socketserver +export server, socketserver diff --git a/tests/testethcalls.nim b/tests/testethcalls.nim index 7717783..4b67e8f 100644 --- a/tests/testethcalls.nim +++ b/tests/testethcalls.nim @@ -14,7 +14,7 @@ var server.addEthRpcs() ## Generate client convenience marshalling wrappers from forward declarations -createRpcSigs(sourceDir & DirSep & "ethcallsigs.nim") +createRpcSigs(RpcSocketClient, sourceDir & DirSep & "ethcallsigs.nim") ## Create custom RPC with StUint input parameter server.rpc("rpc.uint256param") do(i: UInt256):