From 9f05f6305423810feea5ef0e61aef1f736538235 Mon Sep 17 00:00:00 2001 From: coffeepots Date: Thu, 14 Jun 2018 16:52:03 +0100 Subject: [PATCH] Rename `eth-rpc` to `rpc` --- eth-rpc/client.nim | 212 ------------------ eth-rpc/server.nim | 354 ------------------------------- {eth-rpc => rpc}/jsonmarshal.nim | 0 rpcclient.nim | 2 +- rpcserver.nim | 2 +- 5 files changed, 2 insertions(+), 568 deletions(-) delete mode 100644 eth-rpc/client.nim delete mode 100644 eth-rpc/server.nim rename {eth-rpc => rpc}/jsonmarshal.nim (100%) diff --git a/eth-rpc/client.nim b/eth-rpc/client.nim deleted file mode 100644 index e4bb85b..0000000 --- a/eth-rpc/client.nim +++ /dev/null @@ -1,212 +0,0 @@ -import tables, json, macros -import asyncdispatch2 -import jsonmarshal - -type - RpcClient* = ref object - transp: StreamTransport - awaiting: Table[string, Future[Response]] - address: TransportAddress - nextId: int64 - Response* = tuple[error: bool, result: JsonNode] - -const maxRequestLength = 1024 * 128 - -proc newRpcClient*(): RpcClient = - ## Creates a new ``RpcClient`` instance. - result = RpcClient(awaiting: initTable[string, Future[Response]](), nextId: 1) - -proc call*(self: RpcClient, name: string, - params: JsonNode): Future[Response] {.async.} = - ## Remotely calls the specified RPC method. - let id = $self.nextId - self.nextId.inc - var msg = $ %{"jsonrpc": %"2.0", "method": %name, "params": params, - "id": %id} & "\c\l" - let res = await self.transp.write(msg) - # TODO: Add actions when not full packet was send, e.g. disconnect peer. - assert(res == len(msg)) - - # completed by processMessage. - var newFut = newFuture[Response]() - # add to awaiting responses - self.awaiting[id] = newFut - result = await newFut - -template handleRaise[T](fut: Future[T], errType: typedesc, msg: string) = - # complete future before raising - fut.complete((true, %msg)) - raise newException(errType, msg) - -macro checkGet(node: JsonNode, fieldName: string, - jKind: static[JsonNodeKind]): untyped = - let n = genSym(ident = "n") #`node`{`fieldName`} - result = quote: - let `n` = `node`{`fieldname`} - if `n`.isNil or `n`.kind == JNull: - raise newException(ValueError, - "Message is missing required field \"" & `fieldName` & "\"") - if `n`.kind != `jKind`.JsonNodeKind: - raise newException(ValueError, - "Expected " & $(`jKind`.JsonNodeKind) & ", got " & $`n`.kind) - case jKind - of JBool: result.add(quote do: `n`.getBool) - of JInt: result.add(quote do: `n`.getInt) - of JString: result.add(quote do: `n`.getStr) - of JFloat: result.add(quote do: `n`.getFloat) - of JObject: result.add(quote do: `n`.getObject) - else: discard - -proc processMessage(self: RpcClient, line: string) = - let node = parseJson(line) - - # TODO: Use more appropriate exception objects - let id = checkGet(node, "id", JString) - if not self.awaiting.hasKey(id): - raise newException(ValueError, - "Cannot find message id \"" & node["id"].str & "\"") - - let version = checkGet(node, "jsonrpc", JString) - if version != "2.0": - self.awaiting[id].handleRaise(ValueError, - "Unsupported version of JSON, expected 2.0, received \"" & version & "\"") - - let errorNode = node{"error"} - if errorNode.isNil or errorNode.kind == JNull: - var res = node{"result"} - if not res.isNil: - self.awaiting[id].complete((false, res)) - self.awaiting.del(id) - # TODO: actions on unable find result node - else: - self.awaiting[id].complete((true, errorNode)) - self.awaiting.del(id) - -proc connect*(self: RpcClient, address: string, port: Port): Future[void] - -proc processData(self: RpcClient) {.async.} = - while true: - let line = await self.transp.readLine(maxRequestLength) - if line == "": - # transmission ends - self.transp.close() - break - - processMessage(self, line) - # async loop reconnection and waiting - self.transp = await connect(self.address) - -proc connect*(self: RpcClient, address: string, port: Port) {.async.} = - # TODO: `address` hostname can be resolved to many IP addresses, we are using - # first one, but maybe it would be better to iterate over all IP addresses - # and try to establish connection until it will not be established. - let addresses = resolveTAddress(address, port) - self.transp = await connect(addresses[0]) - self.address = addresses[0] - asyncCheck processData(self) - -proc createRpcProc(procName, parameters, callBody: NimNode): NimNode = - # parameters come as a tree - var paramList = newSeq[NimNode]() - for p in parameters: paramList.add(p) - - # build proc - result = newProc(procName, paramList, callBody) - # make proc async - result.addPragma ident"async" - # export this proc - result[0] = nnkPostFix.newTree(ident"*", newIdentNode($procName)) - -proc toJsonArray(parameters: NimNode): NimNode = - # outputs an array of jsonified parameters - # ie; %[%a, %b, %c] - parameters.expectKind nnkFormalParams - var items = newNimNode(nnkBracket) - for i in 2 ..< parameters.len: - let curParam = parameters[i][0] - if curParam.kind != nnkEmpty: - items.add(nnkPrefix.newTree(ident"%", curParam)) - result = nnkPrefix.newTree(bindSym("%", brForceOpen), items) - -proc createRpcFromSig*(rpcDecl: NimNode): NimNode = - # Each input parameter in the rpc signature is converted - # to json with `%`. - # Return types are then converted back to native Nim types. - let iJsonNode = newIdentNode("JsonNode") - - var parameters = rpcDecl.findChild(it.kind == nnkFormalParams).copy - # ensure we have at least space for a return parameter - if parameters.isNil or parameters.kind == nnkEmpty or parameters.len == 0: - parameters = nnkFormalParams.newTree(iJsonNode) - - let - procName = rpcDecl.name - pathStr = $procName - returnType = - # if no return type specified, defaults to JsonNode - if parameters[0].kind == nnkEmpty: iJsonNode - else: parameters[0] - customReturnType = returnType != iJsonNode - - # insert rpc client as first parameter - parameters.insert(1, nnkIdentDefs.newTree(ident"client", ident"RpcClient", - newEmptyNode())) - - let - # variable used to send json to the server - jsonParamIdent = genSym(nskVar, "jsonParam") - # json array of marshalled parameters - jsonParamArray = parameters.toJsonArray() - var - # populate json params - even rpcs with no parameters have an empty json - # array node sent - callBody = newStmtList().add(quote do: - var `jsonParamIdent` = `jsonParamArray` - ) - - # convert return type to Future - parameters[0] = nnkBracketExpr.newTree(ident"Future", returnType) - # create rpc proc - result = createRpcProc(procName, parameters, callBody) - - let - # temporary variable to hold `Response` from rpc call - rpcResult = genSym(nskLet, "res") - clientIdent = newIdentNode("client") - # proc return variable - procRes = ident"result" - # actual return value, `rpcResult`.result - jsonRpcResult = nnkDotExpr.newTree(rpcResult, newIdentNode("result")) - - # perform rpc call - callBody.add(quote do: - # `rpcResult` is of type `Response` - let `rpcResult` = await `clientIdent`.call(`pathStr`, `jsonParamIdent`) - if `rpcResult`.error: raise newException(ValueError, $`rpcResult`.result) - ) - - if customReturnType: - # marshal json to native Nim type - callBody.add(jsonToNim(procRes, returnType, jsonRpcResult, "result")) - else: - # native json expected so no work - callBody.add(quote do: - `procRes` = `rpcResult`.result - ) - when defined(nimDumpRpcs): - echo pathStr, ":\n", result.repr - -proc processRpcSigs(parsedCode: NimNode): NimNode = - result = newStmtList() - - for line in parsedCode: - if line.kind == nnkProcDef: - var procDef = createRpcFromSig(line) - result.add(procDef) - -macro createRpcSigs*(filePath: static[string]): untyped = - ## Takes a file of forward declarations in Nim and builds them into RPC - ## calls, based on their parameters. - ## Inputs are marshalled to json, and results are put into the signature's - ## Nim type. - result = processRpcSigs(staticRead($filePath).parseStmt()) diff --git a/eth-rpc/server.nim b/eth-rpc/server.nim deleted file mode 100644 index 294f317..0000000 --- a/eth-rpc/server.nim +++ /dev/null @@ -1,354 +0,0 @@ -import json, tables, strutils, options, macros #, chronicles -import asyncdispatch2 -import jsonmarshal - -export asyncdispatch2, json, jsonmarshal - -# Temporarily disable logging -macro debug(body: varargs[untyped]): untyped = newStmtList() -macro info(body: varargs[untyped]): untyped = newStmtList() -macro error(body: varargs[untyped]): untyped = newStmtList() - -#logScope: -# topics = "RpcServer" - -type - RpcJsonError* = enum rjeInvalidJson, rjeVersionError, rjeNoMethod, rjeNoId - - RpcJsonErrorContainer* = tuple[err: RpcJsonError, msg: string] - - # Procedure signature accepted as an RPC call by server - RpcProc* = proc (params: JsonNode): Future[JsonNode] - - RpcServer* = ref object - servers*: seq[StreamServer] - procs*: TableRef[string, RpcProc] - - RpcProcError* = ref object of Exception - code*: int - data*: JsonNode - - RpcBindError* = object of Exception - RpcAddressUnresolvableError* = object of Exception - -const - JSON_PARSE_ERROR* = -32700 - INVALID_REQUEST* = -32600 - METHOD_NOT_FOUND* = -32601 - INVALID_PARAMS* = -32602 - INTERNAL_ERROR* = -32603 - SERVER_ERROR* = -32000 - - maxRequestLength = 1024 * 128 - - jsonErrorMessages*: array[RpcJsonError, (int, string)] = - [ - (JSON_PARSE_ERROR, "Invalid JSON"), - (INVALID_REQUEST, "JSON 2.0 required"), - (INVALID_REQUEST, "No method requested"), - (INVALID_REQUEST, "No id specified") - ] - -# Utility functions -# TODO: Move outside server -func `%`*(p: Port): JsonNode = %(p.int) - -# Json state checking - -template jsonValid*(jsonString: string, node: var JsonNode): (bool, string) = - var - valid = true - msg = "" - try: node = parseJson(line) - except: - valid = false - msg = getCurrentExceptionMsg() - debug "Cannot process json", json = jsonString, msg = msg - (valid, msg) - -proc checkJsonErrors*(line: string, - node: var JsonNode): Option[RpcJsonErrorContainer] = - ## Tries parsing line into node, if successful checks required fields - ## Returns: error state or none - let res = jsonValid(line, node) - if not res[0]: - return some((rjeInvalidJson, res[1])) - if not node.hasKey("id"): - return some((rjeNoId, "")) - if node{"jsonrpc"} != %"2.0": - return some((rjeVersionError, "")) - if not node.hasKey("method"): - return some((rjeNoMethod, "")) - return none(RpcJsonErrorContainer) - -# Json reply wrappers - -proc wrapReply*(id: JsonNode, value: JsonNode, error: JsonNode): string = - let node = %{"jsonrpc": %"2.0", "result": value, "error": error, "id": id} - return $node & "\c\l" - -proc sendError*(client: StreamTransport, code: int, msg: string, id: JsonNode, - data: JsonNode = newJNull()) {.async.} = - ## Send error message to client - let error = %{"code": %(code), "id": id, "message": %msg, "data": data} - debug "Error generated", error = error, id = id - var res = wrapReply(id, newJNull(), error) - result = client.write(res) - -proc sendJsonError*(state: RpcJsonError, client: StreamTransport, id: JsonNode, - data = newJNull()) {.async.} = - ## Send client response for invalid json state - let errMsgs = jsonErrorMessages[state] - await client.sendError(errMsgs[0], errMsgs[1], id, data) - -# Server message processing -proc processMessage(server: RpcServer, client: StreamTransport, - line: string) {.async.} = - var - node: JsonNode - # set up node and/or flag errors - jsonErrorState = checkJsonErrors(line, node) - - if jsonErrorState.isSome: - let errState = jsonErrorState.get - var id = - if errState.err == rjeInvalidJson or errState.err == rjeNoId: - newJNull() - else: - node["id"] - await errState.err.sendJsonError(client, id, %errState.msg) - else: - let - methodName = node["method"].str - id = node["id"] - - if not server.procs.hasKey(methodName): - await client.sendError(METHOD_NOT_FOUND, "Method not found", %id, - %(methodName & " is not a registered method.")) - else: - let callRes = await server.procs[methodName](node["params"]) - var res = wrapReply(id, callRes, newJNull()) - discard await client.write(res) - -proc processClient(server: StreamServer, client: StreamTransport) {.async, gcsafe.} = - var rpc = getUserData[RpcServer](server) - while true: - let line = await client.readLine(maxRequestLength) - if line == "": - client.close() - break - - debug "Processing client", addresss = client.remoteAddress(), line - - let future = processMessage(rpc, client, line) - yield future - if future.failed: - if future.readError of RpcProcError: - let err = future.readError.RpcProcError - await client.sendError(err.code, err.msg, err.data) - elif future.readError of ValueError: - let err = future.readError[].ValueError - await client.sendError(INVALID_PARAMS, err.msg, %"") - else: - await client.sendError(SERVER_ERROR, - "Error: Unknown error occurred", %"") - -proc newRpcServer*(addresses: openarray[TransportAddress]): RpcServer = - ## Create new server and assign it to addresses ``addresses``. - result = RpcServer() - result.procs = newTable[string, RpcProc]() - result.servers = newSeq[StreamServer]() - - for item in addresses: - try: - info "Creating server on ", address = $item - var server = createStreamServer(item, processClient, {ReuseAddr}, - udata = result) - result.servers.add(server) - except: - error "Failed to create server", address = $item, message = getCurrentExceptionMsg() - - if len(result.servers) == 0: - # Server was not bound, critical error. - # TODO: Custom RpcException error - raise newException(RpcBindError, "Unable to create server!") - -proc newRpcServer*(addresses: openarray[string]): RpcServer = - ## Create new server and assign it to addresses ``addresses``. - var - tas4: seq[TransportAddress] - tas6: seq[TransportAddress] - baddrs: seq[TransportAddress] - - for a in addresses: - # Attempt to resolve `address` for IPv4 address space. - try: - tas4 = resolveTAddress(a, IpAddressFamily.IPv4) - except: - discard - - # Attempt to resolve `address` for IPv6 address space. - try: - tas6 = resolveTAddress(a, IpAddressFamily.IPv6) - except: - discard - - for r in tas4: - baddrs.add(r) - for r in tas6: - baddrs.add(r) - - if len(baddrs) == 0: - # Addresses could not be resolved, critical error. - raise newException(RpcAddressUnresolvableError, "Unable to get address!") - - result = newRpcServer(baddrs) - -proc newRpcServer*(address = "localhost", port: Port = Port(8545)): RpcServer = - var - tas4: seq[TransportAddress] - tas6: seq[TransportAddress] - - # Attempt to resolve `address` for IPv4 address space. - try: - tas4 = resolveTAddress(address, port, IpAddressFamily.IPv4) - except: - discard - - # Attempt to resolve `address` for IPv6 address space. - try: - tas6 = resolveTAddress(address, port, IpAddressFamily.IPv6) - except: - discard - - if len(tas4) == 0 and len(tas6) == 0: - # Address was not resolved, critical error. - raise newException(RpcAddressUnresolvableError, - "Address " & address & " could not be resolved!") - - result = RpcServer() - result.procs = newTable[string, RpcProc]() - result.servers = newSeq[StreamServer]() - for item in tas4: - try: - info "Creating server for address", ip4address = $item - var server = createStreamServer(item, processClient, {ReuseAddr}, - udata = result) - result.servers.add(server) - except: - error "Failed to create server for address", address = $item - - for item in tas6: - try: - info "Server created", ip6address = $item - var server = createStreamServer(item, processClient, {ReuseAddr}, - udata = result) - result.servers.add(server) - except: - error "Failed to create server", address = $item - - if len(result.servers) == 0: - # Server was not bound, critical error. - raise newException(RpcBindError, - "Could not setup server on " & address & ":" & $int(port)) - -proc start*(server: RpcServer) = - ## Start the RPC server. - for item in server.servers: - item.start() - -proc stop*(server: RpcServer) = - ## Stop the RPC server. - for item in server.servers: - item.stop() - -proc close*(server: RpcServer) = - ## Cleanup resources of RPC server. - for item in server.servers: - item.close() - -# Server registration and RPC generation - -proc register*(server: RpcServer, name: string, rpc: RpcProc) = - ## Add a name/code pair to the RPC server. - server.procs[name] = rpc - -proc unRegisterAll*(server: RpcServer) = - # Remove all remote procedure calls from this server. - server.procs.clear - -proc makeProcName(s: string): string = - result = "" - for c in s: - if c.isAlphaNumeric: result.add c - -proc hasReturnType(params: NimNode): bool = - if params != nil and params.len > 0 and params[0] != nil and - params[0].kind != nnkEmpty: - result = true - -macro rpc*(server: RpcServer, path: string, body: untyped): untyped = - ## Define a remote procedure call. - ## Input and return parameters are defined using the ``do`` notation. - ## For example: - ## .. code-block:: nim - ## myServer.rpc("path") do(param1: int, param2: float) -> string: - ## result = $param1 & " " & $param2 - ## ``` - ## Input parameters are automatically marshalled from json to Nim types, - ## and output parameters are automatically marshalled to json for transport. - result = newStmtList() - let - parameters = body.findChild(it.kind == nnkFormalParams) - # all remote calls have a single parameter: `params: JsonNode` - paramsIdent = newIdentNode"params" - # procs are generated from the stripped path - pathStr = $path - # strip non alphanumeric - procNameStr = pathStr.makeProcName - # public rpc proc - procName = newIdentNode(procNameStr) - # when parameters present: proc that contains our rpc body - doMain = newIdentNode(procNameStr & "DoMain") - # async result - res = newIdentNode("result") - var - setup = jsonToNim(parameters, paramsIdent) - procBody = if body.kind == nnkStmtList: body else: body.body - - if parameters.hasReturnType: - let returnType = parameters[0] - - # delegate async proc allows return and setting of result as native type - result.add(quote do: - proc `doMain`(`paramsIdent`: JsonNode): Future[`returnType`] {.async.} = - `setup` - `procBody` - ) - - if returnType == ident"JsonNode": - # `JsonNode` results don't need conversion - result.add( quote do: - proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} = - `res` = await `doMain`(`paramsIdent`) - ) - else: - result.add(quote do: - proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} = - `res` = %await `doMain`(`paramsIdent`) - ) - else: - # no return types, inline contents - result.add(quote do: - proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} = - `setup` - `procBody` - ) - result.add( quote do: - `server`.register(`path`, `procName`) - ) - - when defined(nimDumpRpcs): - echo "\n", pathStr, ": ", result.repr - -# TODO: Allow cross checking between client signatures and server calls diff --git a/eth-rpc/jsonmarshal.nim b/rpc/jsonmarshal.nim similarity index 100% rename from eth-rpc/jsonmarshal.nim rename to rpc/jsonmarshal.nim diff --git a/rpcclient.nim b/rpcclient.nim index c1a3431..14712ef 100644 --- a/rpcclient.nim +++ b/rpcclient.nim @@ -1,3 +1,3 @@ -import eth-rpc / client +import rpc / client export client diff --git a/rpcserver.nim b/rpcserver.nim index 905d53e..af3bbfb 100644 --- a/rpcserver.nim +++ b/rpcserver.nim @@ -1,2 +1,2 @@ -import eth-rpc / server +import rpc / server export server