From 33b1c61952c8996c2a7bbe37ec9c379d7b5881d6 Mon Sep 17 00:00:00 2001 From: coffeepots Date: Thu, 14 Jun 2018 16:52:41 +0100 Subject: [PATCH] Move client and server to `rpc` folder --- rpc/client.nim | 212 +++++++++++++++++++++++++++++++++++++++ rpc/server.nim | 265 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 477 insertions(+) create mode 100644 rpc/client.nim create mode 100644 rpc/server.nim diff --git a/rpc/client.nim b/rpc/client.nim new file mode 100644 index 0000000..e4bb85b --- /dev/null +++ b/rpc/client.nim @@ -0,0 +1,212 @@ +import tables, json, macros +import asyncdispatch2 +import jsonmarshal + +type + RpcClient* = ref object + transp: StreamTransport + awaiting: Table[string, Future[Response]] + address: TransportAddress + nextId: int64 + Response* = tuple[error: bool, result: JsonNode] + +const maxRequestLength = 1024 * 128 + +proc newRpcClient*(): RpcClient = + ## Creates a new ``RpcClient`` instance. + result = RpcClient(awaiting: initTable[string, Future[Response]](), nextId: 1) + +proc call*(self: RpcClient, name: string, + params: JsonNode): Future[Response] {.async.} = + ## Remotely calls the specified RPC method. + let id = $self.nextId + self.nextId.inc + var msg = $ %{"jsonrpc": %"2.0", "method": %name, "params": params, + "id": %id} & "\c\l" + let res = await self.transp.write(msg) + # TODO: Add actions when not full packet was send, e.g. disconnect peer. + assert(res == len(msg)) + + # completed by processMessage. + var newFut = newFuture[Response]() + # add to awaiting responses + self.awaiting[id] = newFut + result = await newFut + +template handleRaise[T](fut: Future[T], errType: typedesc, msg: string) = + # complete future before raising + fut.complete((true, %msg)) + raise newException(errType, msg) + +macro checkGet(node: JsonNode, fieldName: string, + jKind: static[JsonNodeKind]): untyped = + let n = genSym(ident = "n") #`node`{`fieldName`} + result = quote: + let `n` = `node`{`fieldname`} + if `n`.isNil or `n`.kind == JNull: + raise newException(ValueError, + "Message is missing required field \"" & `fieldName` & "\"") + if `n`.kind != `jKind`.JsonNodeKind: + raise newException(ValueError, + "Expected " & $(`jKind`.JsonNodeKind) & ", got " & $`n`.kind) + case jKind + of JBool: result.add(quote do: `n`.getBool) + of JInt: result.add(quote do: `n`.getInt) + of JString: result.add(quote do: `n`.getStr) + of JFloat: result.add(quote do: `n`.getFloat) + of JObject: result.add(quote do: `n`.getObject) + else: discard + +proc processMessage(self: RpcClient, line: string) = + let node = parseJson(line) + + # TODO: Use more appropriate exception objects + let id = checkGet(node, "id", JString) + if not self.awaiting.hasKey(id): + raise newException(ValueError, + "Cannot find message id \"" & node["id"].str & "\"") + + let version = checkGet(node, "jsonrpc", JString) + if version != "2.0": + self.awaiting[id].handleRaise(ValueError, + "Unsupported version of JSON, expected 2.0, received \"" & version & "\"") + + let errorNode = node{"error"} + if errorNode.isNil or errorNode.kind == JNull: + var res = node{"result"} + if not res.isNil: + self.awaiting[id].complete((false, res)) + self.awaiting.del(id) + # TODO: actions on unable find result node + else: + self.awaiting[id].complete((true, errorNode)) + self.awaiting.del(id) + +proc connect*(self: RpcClient, address: string, port: Port): Future[void] + +proc processData(self: RpcClient) {.async.} = + while true: + let line = await self.transp.readLine(maxRequestLength) + if line == "": + # transmission ends + self.transp.close() + break + + processMessage(self, line) + # async loop reconnection and waiting + self.transp = await connect(self.address) + +proc connect*(self: RpcClient, address: string, port: Port) {.async.} = + # TODO: `address` hostname can be resolved to many IP addresses, we are using + # first one, but maybe it would be better to iterate over all IP addresses + # and try to establish connection until it will not be established. + let addresses = resolveTAddress(address, port) + self.transp = await connect(addresses[0]) + self.address = addresses[0] + asyncCheck processData(self) + +proc createRpcProc(procName, parameters, callBody: NimNode): NimNode = + # parameters come as a tree + var paramList = newSeq[NimNode]() + for p in parameters: paramList.add(p) + + # build proc + result = newProc(procName, paramList, callBody) + # make proc async + result.addPragma ident"async" + # export this proc + result[0] = nnkPostFix.newTree(ident"*", newIdentNode($procName)) + +proc toJsonArray(parameters: NimNode): NimNode = + # outputs an array of jsonified parameters + # ie; %[%a, %b, %c] + parameters.expectKind nnkFormalParams + var items = newNimNode(nnkBracket) + for i in 2 ..< parameters.len: + let curParam = parameters[i][0] + if curParam.kind != nnkEmpty: + items.add(nnkPrefix.newTree(ident"%", curParam)) + result = nnkPrefix.newTree(bindSym("%", brForceOpen), items) + +proc createRpcFromSig*(rpcDecl: NimNode): NimNode = + # Each input parameter in the rpc signature is converted + # to json with `%`. + # Return types are then converted back to native Nim types. + let iJsonNode = newIdentNode("JsonNode") + + var parameters = rpcDecl.findChild(it.kind == nnkFormalParams).copy + # ensure we have at least space for a return parameter + if parameters.isNil or parameters.kind == nnkEmpty or parameters.len == 0: + parameters = nnkFormalParams.newTree(iJsonNode) + + let + procName = rpcDecl.name + pathStr = $procName + returnType = + # if no return type specified, defaults to JsonNode + if parameters[0].kind == nnkEmpty: iJsonNode + else: parameters[0] + customReturnType = returnType != iJsonNode + + # insert rpc client as first parameter + parameters.insert(1, nnkIdentDefs.newTree(ident"client", ident"RpcClient", + newEmptyNode())) + + let + # variable used to send json to the server + jsonParamIdent = genSym(nskVar, "jsonParam") + # json array of marshalled parameters + jsonParamArray = parameters.toJsonArray() + var + # populate json params - even rpcs with no parameters have an empty json + # array node sent + callBody = newStmtList().add(quote do: + var `jsonParamIdent` = `jsonParamArray` + ) + + # convert return type to Future + parameters[0] = nnkBracketExpr.newTree(ident"Future", returnType) + # create rpc proc + result = createRpcProc(procName, parameters, callBody) + + let + # temporary variable to hold `Response` from rpc call + rpcResult = genSym(nskLet, "res") + clientIdent = newIdentNode("client") + # proc return variable + procRes = ident"result" + # actual return value, `rpcResult`.result + jsonRpcResult = nnkDotExpr.newTree(rpcResult, newIdentNode("result")) + + # perform rpc call + callBody.add(quote do: + # `rpcResult` is of type `Response` + let `rpcResult` = await `clientIdent`.call(`pathStr`, `jsonParamIdent`) + if `rpcResult`.error: raise newException(ValueError, $`rpcResult`.result) + ) + + if customReturnType: + # marshal json to native Nim type + callBody.add(jsonToNim(procRes, returnType, jsonRpcResult, "result")) + else: + # native json expected so no work + callBody.add(quote do: + `procRes` = `rpcResult`.result + ) + when defined(nimDumpRpcs): + echo pathStr, ":\n", result.repr + +proc processRpcSigs(parsedCode: NimNode): NimNode = + result = newStmtList() + + for line in parsedCode: + if line.kind == nnkProcDef: + var procDef = createRpcFromSig(line) + result.add(procDef) + +macro createRpcSigs*(filePath: static[string]): untyped = + ## Takes a file of forward declarations in Nim and builds them into RPC + ## calls, based on their parameters. + ## Inputs are marshalled to json, and results are put into the signature's + ## Nim type. + result = processRpcSigs(staticRead($filePath).parseStmt()) diff --git a/rpc/server.nim b/rpc/server.nim new file mode 100644 index 0000000..541d713 --- /dev/null +++ b/rpc/server.nim @@ -0,0 +1,265 @@ +import json, tables, strutils, options, macros #, chronicles +import asyncdispatch2 +import jsonmarshal + +export asyncdispatch2, json, jsonmarshal + +# Temporarily disable logging +macro debug(body: varargs[untyped]): untyped = newStmtList() +macro info(body: varargs[untyped]): untyped = newStmtList() +macro error(body: varargs[untyped]): untyped = newStmtList() + +#logScope: +# topics = "RpcServer" + +type + RpcJsonError* = enum rjeInvalidJson, rjeVersionError, rjeNoMethod, rjeNoId + + RpcJsonErrorContainer* = tuple[err: RpcJsonError, msg: string] + + # Procedure signature accepted as an RPC call by server + RpcProc* = proc (params: JsonNode): Future[JsonNode] + + RpcClientTransport* = concept trans, type t + trans.write(var string) is Future[int] + trans.readLine(int) is Future[string] + #trans.getUserData[t] is t + + RpcServerTransport* = concept t + t.start + t.stop + t.close + + RpcServer*[S: RpcServerTransport] = ref object + servers*: seq[S] + procs*: TableRef[string, RpcProc] + + RpcProcError* = ref object of Exception + code*: int + data*: JsonNode + + RpcBindError* = object of Exception + RpcAddressUnresolvableError* = object of Exception + +const + JSON_PARSE_ERROR* = -32700 + INVALID_REQUEST* = -32600 + METHOD_NOT_FOUND* = -32601 + INVALID_PARAMS* = -32602 + INTERNAL_ERROR* = -32603 + SERVER_ERROR* = -32000 + + maxRequestLength = 1024 * 128 + + jsonErrorMessages*: array[RpcJsonError, (int, string)] = + [ + (JSON_PARSE_ERROR, "Invalid JSON"), + (INVALID_REQUEST, "JSON 2.0 required"), + (INVALID_REQUEST, "No method requested"), + (INVALID_REQUEST, "No id specified") + ] + +# Utility functions +# TODO: Move outside server +func `%`*(p: Port): JsonNode = %(p.int) + +# Json state checking + +template jsonValid*(jsonString: string, node: var JsonNode): (bool, string) = + var + valid = true + msg = "" + try: node = parseJson(line) + except: + valid = false + msg = getCurrentExceptionMsg() + debug "Cannot process json", json = jsonString, msg = msg + (valid, msg) + +proc checkJsonErrors*(line: string, + node: var JsonNode): Option[RpcJsonErrorContainer] = + ## Tries parsing line into node, if successful checks required fields + ## Returns: error state or none + let res = jsonValid(line, node) + if not res[0]: + return some((rjeInvalidJson, res[1])) + if not node.hasKey("id"): + return some((rjeNoId, "")) + if node{"jsonrpc"} != %"2.0": + return some((rjeVersionError, "")) + if not node.hasKey("method"): + return some((rjeNoMethod, "")) + return none(RpcJsonErrorContainer) + +# Json reply wrappers + +proc wrapReply*(id: JsonNode, value: JsonNode, error: JsonNode): string = + let node = %{"jsonrpc": %"2.0", "result": value, "error": error, "id": id} + return $node & "\c\l" + +proc sendError*(client: RpcClientTransport, code: int, msg: string, id: JsonNode, + data: JsonNode = newJNull()) {.async.} = + ## Send error message to client + let error = %{"code": %(code), "id": id, "message": %msg, "data": data} + debug "Error generated", error = error, id = id + var res = wrapReply(id, newJNull(), error) + result = client.write(res) + +proc sendJsonError*(state: RpcJsonError, client: RpcClientTransport, id: JsonNode, + data = newJNull()) {.async.} = + ## Send client response for invalid json state + let errMsgs = jsonErrorMessages[state] + await client.sendError(errMsgs[0], errMsgs[1], id, data) + +# Server message processing +proc processMessage[T](server: RpcServer[T], client: RpcClientTransport, + line: string) {.async.} = + var + node: JsonNode + # set up node and/or flag errors + jsonErrorState = checkJsonErrors(line, node) + + if jsonErrorState.isSome: + let errState = jsonErrorState.get + var id = + if errState.err == rjeInvalidJson or errState.err == rjeNoId: + newJNull() + else: + node["id"] + await errState.err.sendJsonError(client, id, %errState.msg) + else: + let + methodName = node["method"].str + id = node["id"] + + if not server.procs.hasKey(methodName): + await client.sendError(METHOD_NOT_FOUND, "Method not found", %id, + %(methodName & " is not a registered method.")) + else: + let callRes = await server.procs[methodName](node["params"]) + var res = wrapReply(id, callRes, newJNull()) + discard await client.write(res) + +proc processClient*[S: RpcServerTransport, C: RpcClientTransport](server: S, client: C) {.async, gcsafe.} = + var rpc = getUserData[RpcServer[S]](server) + while true: + let line = await client.readLine(maxRequestLength) + if line == "": + client.close() + break + + debug "Processing client", addresss = client.remoteAddress(), line + + let future = processMessage(rpc, client, line) + yield future + if future.failed: + if future.readError of RpcProcError: + let err = future.readError.RpcProcError + await client.sendError(err.code, err.msg, err.data) + elif future.readError of ValueError: + let err = future.readError[].ValueError + await client.sendError(INVALID_PARAMS, err.msg, %"") + else: + await client.sendError(SERVER_ERROR, + "Error: Unknown error occurred", %"") + +proc start*(server: RpcServer) = + ## Start the RPC server. + for item in server.servers: + item.start() + +proc stop*(server: RpcServer) = + ## Stop the RPC server. + for item in server.servers: + item.stop() + +proc close*(server: RpcServer) = + ## Cleanup resources of RPC server. + for item in server.servers: + item.close() + +# Server registration and RPC generation + +proc register*(server: RpcServer, name: string, rpc: RpcProc) = + ## Add a name/code pair to the RPC server. + server.procs[name] = rpc + +proc unRegisterAll*(server: RpcServer) = + # Remove all remote procedure calls from this server. + server.procs.clear + +proc makeProcName(s: string): string = + result = "" + for c in s: + if c.isAlphaNumeric: result.add c + +proc hasReturnType(params: NimNode): bool = + if params != nil and params.len > 0 and params[0] != nil and + params[0].kind != nnkEmpty: + result = true + +macro rpc*(server: RpcServer, path: string, body: untyped): untyped = + ## Define a remote procedure call. + ## Input and return parameters are defined using the ``do`` notation. + ## For example: + ## .. code-block:: nim + ## myServer.rpc("path") do(param1: int, param2: float) -> string: + ## result = $param1 & " " & $param2 + ## ``` + ## Input parameters are automatically marshalled from json to Nim types, + ## and output parameters are automatically marshalled to json for transport. + result = newStmtList() + let + parameters = body.findChild(it.kind == nnkFormalParams) + # all remote calls have a single parameter: `params: JsonNode` + paramsIdent = newIdentNode"params" + # procs are generated from the stripped path + pathStr = $path + # strip non alphanumeric + procNameStr = pathStr.makeProcName + # public rpc proc + procName = newIdentNode(procNameStr) + # when parameters present: proc that contains our rpc body + doMain = newIdentNode(procNameStr & "DoMain") + # async result + res = newIdentNode("result") + var + setup = jsonToNim(parameters, paramsIdent) + procBody = if body.kind == nnkStmtList: body else: body.body + + if parameters.hasReturnType: + let returnType = parameters[0] + + # delegate async proc allows return and setting of result as native type + result.add(quote do: + proc `doMain`(`paramsIdent`: JsonNode): Future[`returnType`] {.async.} = + `setup` + `procBody` + ) + + if returnType == ident"JsonNode": + # `JsonNode` results don't need conversion + result.add( quote do: + proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} = + `res` = await `doMain`(`paramsIdent`) + ) + else: + result.add(quote do: + proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} = + `res` = %await `doMain`(`paramsIdent`) + ) + else: + # no return types, inline contents + result.add(quote do: + proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} = + `setup` + `procBody` + ) + result.add( quote do: + `server`.register(`path`, `procName`) + ) + + when defined(nimDumpRpcs): + echo "\n", pathStr, ": ", result.repr + +# TODO: Allow cross checking between client signatures and server calls