diff --git a/json_rpc/client.nim b/json_rpc/client.nim index 524ca30..dcfb11a 100644 --- a/json_rpc/client.nim +++ b/json_rpc/client.nim @@ -5,40 +5,26 @@ import jsonmarshal export asyncdispatch2 type - RpcClient*[T, A] = ref object - awaiting: Table[string, Future[Response]] - transport: T - address: A - nextId: int64 + ClientId* = int64 + RpcClient* = ref object of RootRef# + awaiting*: Table[ClientId, Future[Response]] + nextId: ClientId Response* = tuple[error: bool, result: JsonNode] -const defaultMaxRequestLength* = 1024 * 128 +proc initRpcClient*[T: RpcClient](client: var T) = + client.awaiting = initTable[ClientId, Future[Response]]() + client.nextId = 1 -proc newRpcClient*[T, A]: RpcClient[T, A] = - ## Creates a new ``RpcClient`` instance. - result = RpcClient[T, A](awaiting: initTable[string, Future[Response]](), nextId: 1) +proc getNextId*(client: RpcClient): ClientId = + result = client.nextId + client.nextId.inc -proc call*(self: RpcClient, name: string, - params: JsonNode): Future[Response] {.async.} = - ## Remotely calls the specified RPC method. - let id = $self.nextId - self.nextId.inc - var - value = - $ %{"jsonrpc": %"2.0", - "method": %name, - "params": params, - "id": %id} & "\c\l" - let res = await self.transport.write(value) - # TODO: Add actions when not full packet was send, e.g. disconnect peer. - assert(res == len(value)) - - # completed by processMessage. - var newFut = newFuture[Response]() - # add to awaiting responses - self.awaiting[id] = newFut - result = await newFut +proc rpcCallNode*(path: string, params: JsonNode, id: ClientId): JsonNode = + %{"jsonrpc": %"2.0", + "method": %path, + "params": params, + "id": %id} template asyncRaise[T](fut: Future[T], errType: typedesc, msg: string) = fut.fail(newException(errType, msg)) @@ -62,11 +48,11 @@ macro checkGet(node: JsonNode, fieldName: string, of JObject: result.add(quote do: `n`.getObject) else: discard -proc processMessage(self: RpcClient, line: string) = +proc processMessage*[T: RpcClient](self: T, line: string) = # Note: this doesn't use any transport code so doesn't need to be differentiated. let - node = parseJson(line) # TODO: Check errors - id = checkGet(node, "id", JString) + node = parseJson(line) + id = checkGet(node, "id", JInt) if not self.awaiting.hasKey(id): raise newException(ValueError, @@ -88,30 +74,6 @@ proc processMessage(self: RpcClient, line: string) = self.awaiting[id].fail(newException(ValueError, $errorNode)) self.awaiting.del(id) -proc processData(client: RpcClient) {.async.} = - while true: - var value = await client.transport.readLine(defaultMaxRequestLength) - if value == "": - # transmission ends - client.transport.close - break - - client.processMessage(value) - # async loop reconnection and waiting - client.transport = await connect(client.address) - -type RpcStreamClient* = RpcClient[StreamTransport, TransportAddress] - -proc connect*(client: RpcStreamClient, address: string, port: Port) {.async.} = - let addresses = resolveTAddress(address, port) - client.transport = await connect(addresses[0]) - client.address = addresses[0] - asyncCheck processData(client) - -proc newRpcStreamClient*(): RpcStreamClient = - ## Create new server and assign it to addresses ``addresses``. - result = newRpcClient[StreamTransport, TransportAddress]() - # Signature processing proc createRpcProc(procName, parameters, callBody: NimNode): NimNode = @@ -137,7 +99,7 @@ proc toJsonArray(parameters: NimNode): NimNode = items.add(nnkPrefix.newTree(ident"%", curParam)) result = nnkPrefix.newTree(bindSym("%", brForceOpen), items) -proc createRpcFromSig*(rpcDecl: NimNode): NimNode = +proc createRpcFromSig*(clientType, rpcDecl: NimNode): NimNode = # Each input parameter in the rpc signature is converted # to json with `%`. # Return types are then converted back to native Nim types. @@ -158,7 +120,7 @@ proc createRpcFromSig*(rpcDecl: NimNode): NimNode = customReturnType = returnType != iJsonNode # insert rpc client as first parameter - parameters.insert(1, nnkIdentDefs.newTree(ident"client", ident"RpcClient", + parameters.insert(1, nnkIdentDefs.newTree(ident"client", ident($clientType), newEmptyNode())) let @@ -205,17 +167,17 @@ proc createRpcFromSig*(rpcDecl: NimNode): NimNode = when defined(nimDumpRpcs): echo pathStr, ":\n", result.repr -proc processRpcSigs(parsedCode: NimNode): NimNode = +proc processRpcSigs(clientType, parsedCode: NimNode): NimNode = result = newStmtList() for line in parsedCode: if line.kind == nnkProcDef: - var procDef = createRpcFromSig(line) + var procDef = createRpcFromSig(clientType, line) result.add(procDef) -macro createRpcSigs*(filePath: static[string]): untyped = +macro createRpcSigs*(clientType: untyped, filePath: static[string]): untyped = ## Takes a file of forward declarations in Nim and builds them into RPC ## calls, based on their parameters. ## Inputs are marshalled to json, and results are put into the signature's ## Nim type. - result = processRpcSigs(staticRead($filePath).parseStmt()) + result = processRpcSigs(clientType, staticRead($filePath).parseStmt()) diff --git a/json_rpc/clients/socketclient.nim b/json_rpc/clients/socketclient.nim new file mode 100644 index 0000000..236dd95 --- /dev/null +++ b/json_rpc/clients/socketclient.nim @@ -0,0 +1,52 @@ +import ../client, asyncdispatch2, tables, json + +type + RpcSocketClient* = ref object of RpcClient + transport*: StreamTransport + address*: TransportAddress + +const defaultMaxRequestLength* = 1024 * 128 + +proc newRpcSocketClient*: RpcSocketClient = + ## Creates a new client instance. + new result + result.initRpcClient() + +proc call*(self: RpcSocketClient, name: string, + params: JsonNode): Future[Response] {.async.} = + ## Remotely calls the specified RPC method. + let id = self.getNextId() + + var + value = + $rpcCallNode(name, params, id) & "\c\l" + if self.transport.isNil: + var connectStr = "" + raise newException(ValueError, "Transport is not initialised (missing a call to connect?)") + let res = await self.transport.write(value) + # TODO: Add actions when not full packet was send, e.g. disconnect peer. + assert(res == len(value)) + + # completed by processMessage. + var newFut = newFuture[Response]() + # add to awaiting responses + self.awaiting[id] = newFut + result = await newFut + +proc processData(client: RpcSocketClient) {.async.} = + while true: + var value = await client.transport.readLine(defaultMaxRequestLength) + if value == "": + # transmission ends + client.transport.close + break + + client.processMessage(value) + # async loop reconnection and waiting + client.transport = await connect(client.address) + +proc connect*(client: RpcSocketClient, address: string, port: Port) {.async.} = + let addresses = resolveTAddress(address, port) + client.transport = await connect(addresses[0]) + client.address = addresses[0] + asyncCheck processData(client) \ No newline at end of file diff --git a/json_rpc/router.nim b/json_rpc/router.nim index 1ee9294..e5a0b6a 100644 --- a/json_rpc/router.nim +++ b/json_rpc/router.nim @@ -144,7 +144,7 @@ proc route*(router: RpcRouter, data: string): Future[string] {.async, gcsafe.} = else: node["id"] let - # fixed error code and message + # const error code and message errKind = jsonErrorMessages[errState.err] # pass on the actual error message fullMsg = errKind[1] & " " & errState[1] @@ -159,8 +159,8 @@ proc tryRoute*(router: RpcRouter, data: JsonNode, fut: var Future[JsonNode]): bo ## Route to RPC, returns false if the method or params cannot be found. ## Expects json input and returns json output. let - jPath = data{methodField} - jParams = data{paramsField} + jPath = data.getOrDefault(methodField) + jParams = data.getOrDefault(paramsField) if jPath.isEmpty or jParams.isEmpty: return false @@ -181,6 +181,14 @@ proc hasReturnType(params: NimNode): bool = params[0].kind != nnkEmpty: result = true +template trap(path: string, body: untyped): untyped = + try: + body + except: + let msg = getCurrentExceptionMsg() + debug "Error occurred within RPC ", path = path, errorMessage = msg + result = %*{codeField: %SERVER_ERROR, messageField: %msg} + macro rpc*(server: RpcRouter, path: string, body: untyped): untyped = ## Define a remote procedure call. ## Input and return parameters are defined using the ``do`` notation. @@ -210,47 +218,37 @@ macro rpc*(server: RpcRouter, path: string, body: untyped): untyped = var setup = jsonToNim(parameters, paramsIdent) procBody = if body.kind == nnkStmtList: body else: body.body - errTrappedBody = quote do: - try: - `procBody` - except: - let msg = getCurrentExceptionMsg() - debug "Error occurred within RPC ", path = `path`, errorMessage = msg - `errJson` = %*{codeField: %SERVER_ERROR, messageField: %msg} - + if parameters.hasReturnType: let returnType = parameters[0] # delegate async proc allows return and setting of result as native type result.add(quote do: - proc `doMain`(`paramsIdent`: JsonNode, `errJson`: var JsonNode): `returnType` = + proc `doMain`(`paramsIdent`: JsonNode): `returnType` = `setup` - `errTrappedBody` + `procBody` ) if returnType == ident"JsonNode": # `JsonNode` results don't need conversion result.add( quote do: - proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} = - var `errJson`: JsonNode - `res` = `doMain`(`paramsIdent`, `errJson`) - if `errJson` != nil: `res` = `errJson` + proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async, gcsafe.} = + trap(`pathStr`): + `res` = `doMain`(`paramsIdent`) ) else: result.add(quote do: - proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} = - var `errJson`: JsonNode - `res` = %`doMain`(`paramsIdent`, `errJson`) - if `errJson` != nil: `res` = `errJson` + proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async, gcsafe.} = + trap(`pathStr`): + `res` = %`doMain`(`paramsIdent`) ) else: # no return types, inline contents result.add(quote do: - proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} = + proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async, gcsafe.} = `setup` - var `errJson`: JsonNode - `errTrappedBody` - if `errJson` != nil: `res` = `errJson` + trap(`pathStr`): + `procBody` ) result.add( quote do: `server`.register(`path`, `procName`) diff --git a/json_rpc/transports/socket.nim b/json_rpc/servers/socketserver.nim similarity index 100% rename from json_rpc/transports/socket.nim rename to json_rpc/servers/socketserver.nim diff --git a/rpcclient.nim b/rpcclient.nim index ebecba9..3a0a4e9 100644 --- a/rpcclient.nim +++ b/rpcclient.nim @@ -1,3 +1,5 @@ -import json_rpc / client -export client +import + json_rpc / client, + json_rpc / clients / socketclient +export client, socketclient diff --git a/rpcserver.nim b/rpcserver.nim index ef197d1..5b08ecf 100644 --- a/rpcserver.nim +++ b/rpcserver.nim @@ -1,2 +1,2 @@ -import json_rpc / server -export server +import json_rpc / server, json_rpc / servers / socketserver +export server, socketserver diff --git a/rpcsocket.nim b/rpcsocket.nim deleted file mode 100644 index 7568e4d..0000000 --- a/rpcsocket.nim +++ /dev/null @@ -1,2 +0,0 @@ -import json_rpc / server, json_rpc / transports / socket -export server, socket diff --git a/tests/testerrors.nim b/tests/testerrors.nim index 5e856f1..2ab2b9e 100644 --- a/tests/testerrors.nim +++ b/tests/testerrors.nim @@ -3,11 +3,11 @@ allow unchecked and unformatted calls. ]# -import unittest, debugclient, ../rpcsocket +import unittest, debugclient, ../rpcserver import strformat, chronicles var server = newRpcSocketServer("localhost", 8547.Port) -var client = newRpcStreamClient() +var client = newRpcSocketClient() server.start() waitFor client.connect("localhost", Port(8547)) diff --git a/tests/testethcalls.nim b/tests/testethcalls.nim index 572c808..4b67e8f 100644 --- a/tests/testethcalls.nim +++ b/tests/testethcalls.nim @@ -1,5 +1,5 @@ import unittest, json, tables -import ../rpcclient, ../rpcsocket +import ../rpcclient, ../rpcserver import stint, ethtypes, ethprocs, stintjson, nimcrypto, ethhexstrings, chronicles from os import getCurrentDir, DirSep @@ -8,13 +8,13 @@ template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] var server = newRpcSocketServer("localhost", Port(8546)) - client = newRpcStreamClient() + client = newRpcSocketClient() ## Generate Ethereum server RPCs server.addEthRpcs() ## Generate client convenience marshalling wrappers from forward declarations -createRpcSigs(sourceDir & DirSep & "ethcallsigs.nim") +createRpcSigs(RpcSocketClient, sourceDir & DirSep & "ethcallsigs.nim") ## Create custom RPC with StUint input parameter server.rpc("rpc.uint256param") do(i: UInt256): diff --git a/tests/testrpcmacro.nim b/tests/testrpcmacro.nim index 43cab64..8d10b8c 100644 --- a/tests/testrpcmacro.nim +++ b/tests/testrpcmacro.nim @@ -1,5 +1,5 @@ import unittest, json, tables, chronicles -import ../rpcsocket +import ../rpcserver type # some nested types to check object parsing diff --git a/tests/testserverclient.nim b/tests/testserverclient.nim index a50f03f..6e4b008 100644 --- a/tests/testserverclient.nim +++ b/tests/testserverclient.nim @@ -1,8 +1,8 @@ import unittest, json, chronicles -import ../rpcclient, ../rpcsocket +import ../rpcclient, ../rpcserver var srv = newRpcSocketServer(["localhost:8545"]) -var client = newRpcStreamClient() +var client = newRpcSocketClient() # Create RPC on server srv.rpc("myProc") do(input: string, data: array[0..3, int]):