diff --git a/.gitignore b/.gitignore index 0e69085..bf65179 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,7 @@ # Ignore the nimcache folders nimcache/ +# Ignore editor settings +.vscode + + diff --git a/eth-rpc/server.nim b/eth-rpc/server.nim deleted file mode 100644 index 294f317..0000000 --- a/eth-rpc/server.nim +++ /dev/null @@ -1,354 +0,0 @@ -import json, tables, strutils, options, macros #, chronicles -import asyncdispatch2 -import jsonmarshal - -export asyncdispatch2, json, jsonmarshal - -# Temporarily disable logging -macro debug(body: varargs[untyped]): untyped = newStmtList() -macro info(body: varargs[untyped]): untyped = newStmtList() -macro error(body: varargs[untyped]): untyped = newStmtList() - -#logScope: -# topics = "RpcServer" - -type - RpcJsonError* = enum rjeInvalidJson, rjeVersionError, rjeNoMethod, rjeNoId - - RpcJsonErrorContainer* = tuple[err: RpcJsonError, msg: string] - - # Procedure signature accepted as an RPC call by server - RpcProc* = proc (params: JsonNode): Future[JsonNode] - - RpcServer* = ref object - servers*: seq[StreamServer] - procs*: TableRef[string, RpcProc] - - RpcProcError* = ref object of Exception - code*: int - data*: JsonNode - - RpcBindError* = object of Exception - RpcAddressUnresolvableError* = object of Exception - -const - JSON_PARSE_ERROR* = -32700 - INVALID_REQUEST* = -32600 - METHOD_NOT_FOUND* = -32601 - INVALID_PARAMS* = -32602 - INTERNAL_ERROR* = -32603 - SERVER_ERROR* = -32000 - - maxRequestLength = 1024 * 128 - - jsonErrorMessages*: array[RpcJsonError, (int, string)] = - [ - (JSON_PARSE_ERROR, "Invalid JSON"), - (INVALID_REQUEST, "JSON 2.0 required"), - (INVALID_REQUEST, "No method requested"), - (INVALID_REQUEST, "No id specified") - ] - -# Utility functions -# TODO: Move outside server -func `%`*(p: Port): JsonNode = %(p.int) - -# Json state checking - -template jsonValid*(jsonString: string, node: var JsonNode): (bool, string) = - var - valid = true - msg = "" - try: node = parseJson(line) - except: - valid = false - msg = getCurrentExceptionMsg() - debug "Cannot process json", json = jsonString, msg = msg - (valid, msg) - -proc checkJsonErrors*(line: string, - node: var JsonNode): Option[RpcJsonErrorContainer] = - ## Tries parsing line into node, if successful checks required fields - ## Returns: error state or none - let res = jsonValid(line, node) - if not res[0]: - return some((rjeInvalidJson, res[1])) - if not node.hasKey("id"): - return some((rjeNoId, "")) - if node{"jsonrpc"} != %"2.0": - return some((rjeVersionError, "")) - if not node.hasKey("method"): - return some((rjeNoMethod, "")) - return none(RpcJsonErrorContainer) - -# Json reply wrappers - -proc wrapReply*(id: JsonNode, value: JsonNode, error: JsonNode): string = - let node = %{"jsonrpc": %"2.0", "result": value, "error": error, "id": id} - return $node & "\c\l" - -proc sendError*(client: StreamTransport, code: int, msg: string, id: JsonNode, - data: JsonNode = newJNull()) {.async.} = - ## Send error message to client - let error = %{"code": %(code), "id": id, "message": %msg, "data": data} - debug "Error generated", error = error, id = id - var res = wrapReply(id, newJNull(), error) - result = client.write(res) - -proc sendJsonError*(state: RpcJsonError, client: StreamTransport, id: JsonNode, - data = newJNull()) {.async.} = - ## Send client response for invalid json state - let errMsgs = jsonErrorMessages[state] - await client.sendError(errMsgs[0], errMsgs[1], id, data) - -# Server message processing -proc processMessage(server: RpcServer, client: StreamTransport, - line: string) {.async.} = - var - node: JsonNode - # set up node and/or flag errors - jsonErrorState = checkJsonErrors(line, node) - - if jsonErrorState.isSome: - let errState = jsonErrorState.get - var id = - if errState.err == rjeInvalidJson or errState.err == rjeNoId: - newJNull() - else: - node["id"] - await errState.err.sendJsonError(client, id, %errState.msg) - else: - let - methodName = node["method"].str - id = node["id"] - - if not server.procs.hasKey(methodName): - await client.sendError(METHOD_NOT_FOUND, "Method not found", %id, - %(methodName & " is not a registered method.")) - else: - let callRes = await server.procs[methodName](node["params"]) - var res = wrapReply(id, callRes, newJNull()) - discard await client.write(res) - -proc processClient(server: StreamServer, client: StreamTransport) {.async, gcsafe.} = - var rpc = getUserData[RpcServer](server) - while true: - let line = await client.readLine(maxRequestLength) - if line == "": - client.close() - break - - debug "Processing client", addresss = client.remoteAddress(), line - - let future = processMessage(rpc, client, line) - yield future - if future.failed: - if future.readError of RpcProcError: - let err = future.readError.RpcProcError - await client.sendError(err.code, err.msg, err.data) - elif future.readError of ValueError: - let err = future.readError[].ValueError - await client.sendError(INVALID_PARAMS, err.msg, %"") - else: - await client.sendError(SERVER_ERROR, - "Error: Unknown error occurred", %"") - -proc newRpcServer*(addresses: openarray[TransportAddress]): RpcServer = - ## Create new server and assign it to addresses ``addresses``. - result = RpcServer() - result.procs = newTable[string, RpcProc]() - result.servers = newSeq[StreamServer]() - - for item in addresses: - try: - info "Creating server on ", address = $item - var server = createStreamServer(item, processClient, {ReuseAddr}, - udata = result) - result.servers.add(server) - except: - error "Failed to create server", address = $item, message = getCurrentExceptionMsg() - - if len(result.servers) == 0: - # Server was not bound, critical error. - # TODO: Custom RpcException error - raise newException(RpcBindError, "Unable to create server!") - -proc newRpcServer*(addresses: openarray[string]): RpcServer = - ## Create new server and assign it to addresses ``addresses``. - var - tas4: seq[TransportAddress] - tas6: seq[TransportAddress] - baddrs: seq[TransportAddress] - - for a in addresses: - # Attempt to resolve `address` for IPv4 address space. - try: - tas4 = resolveTAddress(a, IpAddressFamily.IPv4) - except: - discard - - # Attempt to resolve `address` for IPv6 address space. - try: - tas6 = resolveTAddress(a, IpAddressFamily.IPv6) - except: - discard - - for r in tas4: - baddrs.add(r) - for r in tas6: - baddrs.add(r) - - if len(baddrs) == 0: - # Addresses could not be resolved, critical error. - raise newException(RpcAddressUnresolvableError, "Unable to get address!") - - result = newRpcServer(baddrs) - -proc newRpcServer*(address = "localhost", port: Port = Port(8545)): RpcServer = - var - tas4: seq[TransportAddress] - tas6: seq[TransportAddress] - - # Attempt to resolve `address` for IPv4 address space. - try: - tas4 = resolveTAddress(address, port, IpAddressFamily.IPv4) - except: - discard - - # Attempt to resolve `address` for IPv6 address space. - try: - tas6 = resolveTAddress(address, port, IpAddressFamily.IPv6) - except: - discard - - if len(tas4) == 0 and len(tas6) == 0: - # Address was not resolved, critical error. - raise newException(RpcAddressUnresolvableError, - "Address " & address & " could not be resolved!") - - result = RpcServer() - result.procs = newTable[string, RpcProc]() - result.servers = newSeq[StreamServer]() - for item in tas4: - try: - info "Creating server for address", ip4address = $item - var server = createStreamServer(item, processClient, {ReuseAddr}, - udata = result) - result.servers.add(server) - except: - error "Failed to create server for address", address = $item - - for item in tas6: - try: - info "Server created", ip6address = $item - var server = createStreamServer(item, processClient, {ReuseAddr}, - udata = result) - result.servers.add(server) - except: - error "Failed to create server", address = $item - - if len(result.servers) == 0: - # Server was not bound, critical error. - raise newException(RpcBindError, - "Could not setup server on " & address & ":" & $int(port)) - -proc start*(server: RpcServer) = - ## Start the RPC server. - for item in server.servers: - item.start() - -proc stop*(server: RpcServer) = - ## Stop the RPC server. - for item in server.servers: - item.stop() - -proc close*(server: RpcServer) = - ## Cleanup resources of RPC server. - for item in server.servers: - item.close() - -# Server registration and RPC generation - -proc register*(server: RpcServer, name: string, rpc: RpcProc) = - ## Add a name/code pair to the RPC server. - server.procs[name] = rpc - -proc unRegisterAll*(server: RpcServer) = - # Remove all remote procedure calls from this server. - server.procs.clear - -proc makeProcName(s: string): string = - result = "" - for c in s: - if c.isAlphaNumeric: result.add c - -proc hasReturnType(params: NimNode): bool = - if params != nil and params.len > 0 and params[0] != nil and - params[0].kind != nnkEmpty: - result = true - -macro rpc*(server: RpcServer, path: string, body: untyped): untyped = - ## Define a remote procedure call. - ## Input and return parameters are defined using the ``do`` notation. - ## For example: - ## .. code-block:: nim - ## myServer.rpc("path") do(param1: int, param2: float) -> string: - ## result = $param1 & " " & $param2 - ## ``` - ## Input parameters are automatically marshalled from json to Nim types, - ## and output parameters are automatically marshalled to json for transport. - result = newStmtList() - let - parameters = body.findChild(it.kind == nnkFormalParams) - # all remote calls have a single parameter: `params: JsonNode` - paramsIdent = newIdentNode"params" - # procs are generated from the stripped path - pathStr = $path - # strip non alphanumeric - procNameStr = pathStr.makeProcName - # public rpc proc - procName = newIdentNode(procNameStr) - # when parameters present: proc that contains our rpc body - doMain = newIdentNode(procNameStr & "DoMain") - # async result - res = newIdentNode("result") - var - setup = jsonToNim(parameters, paramsIdent) - procBody = if body.kind == nnkStmtList: body else: body.body - - if parameters.hasReturnType: - let returnType = parameters[0] - - # delegate async proc allows return and setting of result as native type - result.add(quote do: - proc `doMain`(`paramsIdent`: JsonNode): Future[`returnType`] {.async.} = - `setup` - `procBody` - ) - - if returnType == ident"JsonNode": - # `JsonNode` results don't need conversion - result.add( quote do: - proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} = - `res` = await `doMain`(`paramsIdent`) - ) - else: - result.add(quote do: - proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} = - `res` = %await `doMain`(`paramsIdent`) - ) - else: - # no return types, inline contents - result.add(quote do: - proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} = - `setup` - `procBody` - ) - result.add( quote do: - `server`.register(`path`, `procName`) - ) - - when defined(nimDumpRpcs): - echo "\n", pathStr, ": ", result.repr - -# TODO: Allow cross checking between client signatures and server calls diff --git a/eth_rpc.nimble b/json_rpc.nimble similarity index 90% rename from eth_rpc.nimble rename to json_rpc.nimble index f6b2a08..c840ac7 100644 --- a/eth_rpc.nimble +++ b/json_rpc.nimble @@ -1,5 +1,5 @@ -packageName = "eth_rpc" -version = "0.0.1" +packageName = "json_rpc" +version = "0.0.2" author = "Status Research & Development GmbH" description = "Ethereum remote procedure calls" license = "Apache License 2.0" diff --git a/eth-rpc/client.nim b/json_rpc/client.nim similarity index 77% rename from eth-rpc/client.nim rename to json_rpc/client.nim index e4bb85b..524ca30 100644 --- a/eth-rpc/client.nim +++ b/json_rpc/client.nim @@ -1,31 +1,38 @@ import tables, json, macros import asyncdispatch2 +from strutils import toLowerAscii import jsonmarshal +export asyncdispatch2 type - RpcClient* = ref object - transp: StreamTransport + RpcClient*[T, A] = ref object awaiting: Table[string, Future[Response]] - address: TransportAddress + transport: T + address: A nextId: int64 + Response* = tuple[error: bool, result: JsonNode] -const maxRequestLength = 1024 * 128 +const defaultMaxRequestLength* = 1024 * 128 -proc newRpcClient*(): RpcClient = +proc newRpcClient*[T, A]: RpcClient[T, A] = ## Creates a new ``RpcClient`` instance. - result = RpcClient(awaiting: initTable[string, Future[Response]](), nextId: 1) + result = RpcClient[T, A](awaiting: initTable[string, Future[Response]](), nextId: 1) proc call*(self: RpcClient, name: string, - params: JsonNode): Future[Response] {.async.} = + params: JsonNode): Future[Response] {.async.} = ## Remotely calls the specified RPC method. let id = $self.nextId self.nextId.inc - var msg = $ %{"jsonrpc": %"2.0", "method": %name, "params": params, - "id": %id} & "\c\l" - let res = await self.transp.write(msg) + var + value = + $ %{"jsonrpc": %"2.0", + "method": %name, + "params": params, + "id": %id} & "\c\l" + let res = await self.transport.write(value) # TODO: Add actions when not full packet was send, e.g. disconnect peer. - assert(res == len(msg)) + assert(res == len(value)) # completed by processMessage. var newFut = newFuture[Response]() @@ -33,10 +40,8 @@ proc call*(self: RpcClient, name: string, self.awaiting[id] = newFut result = await newFut -template handleRaise[T](fut: Future[T], errType: typedesc, msg: string) = - # complete future before raising - fut.complete((true, %msg)) - raise newException(errType, msg) +template asyncRaise[T](fut: Future[T], errType: typedesc, msg: string) = + fut.fail(newException(errType, msg)) macro checkGet(node: JsonNode, fieldName: string, jKind: static[JsonNodeKind]): untyped = @@ -58,17 +63,18 @@ macro checkGet(node: JsonNode, fieldName: string, else: discard proc processMessage(self: RpcClient, line: string) = - let node = parseJson(line) + # Note: this doesn't use any transport code so doesn't need to be differentiated. + let + node = parseJson(line) # TODO: Check errors + id = checkGet(node, "id", JString) - # TODO: Use more appropriate exception objects - let id = checkGet(node, "id", JString) if not self.awaiting.hasKey(id): raise newException(ValueError, "Cannot find message id \"" & node["id"].str & "\"") let version = checkGet(node, "jsonrpc", JString) if version != "2.0": - self.awaiting[id].handleRaise(ValueError, + self.awaiting[id].asyncRaise(ValueError, "Unsupported version of JSON, expected 2.0, received \"" & version & "\"") let errorNode = node{"error"} @@ -79,31 +85,34 @@ proc processMessage(self: RpcClient, line: string) = self.awaiting.del(id) # TODO: actions on unable find result node else: - self.awaiting[id].complete((true, errorNode)) + self.awaiting[id].fail(newException(ValueError, $errorNode)) self.awaiting.del(id) -proc connect*(self: RpcClient, address: string, port: Port): Future[void] - -proc processData(self: RpcClient) {.async.} = +proc processData(client: RpcClient) {.async.} = while true: - let line = await self.transp.readLine(maxRequestLength) - if line == "": + var value = await client.transport.readLine(defaultMaxRequestLength) + if value == "": # transmission ends - self.transp.close() + client.transport.close break - processMessage(self, line) + client.processMessage(value) # async loop reconnection and waiting - self.transp = await connect(self.address) + client.transport = await connect(client.address) -proc connect*(self: RpcClient, address: string, port: Port) {.async.} = - # TODO: `address` hostname can be resolved to many IP addresses, we are using - # first one, but maybe it would be better to iterate over all IP addresses - # and try to establish connection until it will not be established. +type RpcStreamClient* = RpcClient[StreamTransport, TransportAddress] + +proc connect*(client: RpcStreamClient, address: string, port: Port) {.async.} = let addresses = resolveTAddress(address, port) - self.transp = await connect(addresses[0]) - self.address = addresses[0] - asyncCheck processData(self) + client.transport = await connect(addresses[0]) + client.address = addresses[0] + asyncCheck processData(client) + +proc newRpcStreamClient*(): RpcStreamClient = + ## Create new server and assign it to addresses ``addresses``. + result = newRpcClient[StreamTransport, TransportAddress]() + +# Signature processing proc createRpcProc(procName, parameters, callBody: NimNode): NimNode = # parameters come as a tree diff --git a/eth-rpc/jsonmarshal.nim b/json_rpc/jsonmarshal.nim similarity index 100% rename from eth-rpc/jsonmarshal.nim rename to json_rpc/jsonmarshal.nim diff --git a/json_rpc/router.nim b/json_rpc/router.nim new file mode 100644 index 0000000..bc811cb --- /dev/null +++ b/json_rpc/router.nim @@ -0,0 +1,240 @@ +import + json, tables, asyncdispatch2, jsonmarshal, strutils, macros, + chronicles, options +export asyncdispatch2, json, jsonmarshal + +type + RpcJsonError* = enum rjeInvalidJson, rjeVersionError, rjeNoMethod, rjeNoId, rjeNoParams + RpcJsonErrorContainer* = tuple[err: RpcJsonError, msg: string] + + # Procedure signature accepted as an RPC call by server + RpcProc* = proc(input: JsonNode): Future[JsonNode] {.gcsafe.} + + RpcProcError* = ref object of Exception + code*: int + data*: JsonNode + + RpcBindError* = object of Exception + RpcAddressUnresolvableError* = object of Exception + + RpcRouter* = object + procs*: TableRef[string, RpcProc] + +const + methodField = "method" + paramsField = "params" + jsonRpcField = "jsonrpc" + idField = "id" + resultField = "result" + errorField = "error" + codeField = "code" + messageField = "message" + dataField = "data" + messageTerminator = "\c\l" + + JSON_PARSE_ERROR* = -32700 + INVALID_REQUEST* = -32600 + METHOD_NOT_FOUND* = -32601 + INVALID_PARAMS* = -32602 + INTERNAL_ERROR* = -32603 + SERVER_ERROR* = -32000 + + defaultMaxRequestLength* = 1024 * 128 + jsonErrorMessages*: array[RpcJsonError, (int, string)] = + [ + (JSON_PARSE_ERROR, "Invalid JSON"), + (INVALID_REQUEST, "JSON 2.0 required"), + (INVALID_REQUEST, "No method requested"), + (INVALID_REQUEST, "No id specified"), + (INVALID_PARAMS, "No parameters specified") + ] + +proc newRpcRouter*: RpcRouter = + result.procs = newTable[string, RpcProc]() + +proc register*(router: var RpcRouter, path: string, call: RpcProc) = + router.procs.add(path, call) + +proc clear*(router: var RpcRouter) = router.procs.clear + +proc hasMethod*(router: RpcRouter, methodName: string): bool = router.procs.hasKey(methodName) + +template isEmpty(node: JsonNode): bool = node.isNil or node.kind == JNull + +# Json state checking + +template jsonValid*(jsonString: string, node: var JsonNode): (bool, string) = + var + valid = true + msg = "" + try: node = parseJson(line) + except: + valid = false + msg = getCurrentExceptionMsg() + debug "Cannot process json", json = jsonString, msg = msg + (valid, msg) + +proc checkJsonState*(line: string, + node: var JsonNode): Option[RpcJsonErrorContainer] = + ## Tries parsing line into node, if successful checks required fields + ## Returns: error state or none + let res = jsonValid(line, node) + if not res[0]: + return some((rjeInvalidJson, res[1])) + if not node.hasKey(idField): + return some((rjeNoId, "")) + let jVer = node{jsonRpcField} + if jVer != nil and jVer.kind != JNull and jVer != %"2.0": + return some((rjeVersionError, "")) + if not node.hasKey(methodField): + return some((rjeNoMethod, "")) + if not node.hasKey(paramsField): + return some((rjeNoParams, "")) + return none(RpcJsonErrorContainer) + +# Json reply wrappers + +proc wrapReply*(id: JsonNode, value: JsonNode, error: JsonNode): JsonNode = + return %{jsonRpcField: %"2.0", resultField: value, errorField: error, idField: id} + +proc wrapError*(code: int, msg: string, id: JsonNode, + data: JsonNode = newJNull()): JsonNode = + # Create standardised error json + result = %{codeField: %(code), idField: id, messageField: %msg, dataField: data} + debug "Error generated", error = result, id = id + +proc route*(router: RpcRouter, node: JsonNode): Future[JsonNode] {.async, gcsafe.} = + ## Assumes correct setup of node + let + methodName = node[methodField].str + id = node[idField] + rpcProc = router.procs.getOrDefault(methodName) + + if rpcProc.isNil: + let + methodNotFound = %(methodName & " is not a registered RPC method.") + error = wrapError(METHOD_NOT_FOUND, "Method not found", id, methodNotFound) + result = wrapReply(id, newJNull(), error) + else: + let + jParams = node[paramsField] + res = await rpcProc(jParams) + result = wrapReply(id, res, newJNull()) + +proc route*(router: RpcRouter, data: string): Future[string] {.async, gcsafe.} = + ## Route to RPC from string data. Data is expected to be able to be converted to Json. + ## Returns string of Json from RPC result/error node + var + node: JsonNode + # parse json node and/or flag missing fields and errors + jsonErrorState = checkJsonState(data, node) + + if jsonErrorState.isSome: + let errState = jsonErrorState.get + var id = + if errState.err == rjeInvalidJson or errState.err == rjeNoId: + newJNull() + else: + node["id"] + let + errMsg = jsonErrorMessages[errState.err] + res = wrapError(code = errMsg[0], msg = errMsg[1], id = id) + # return error state as json + result = $res & messageTerminator + else: + let res = await router.route(node) + result = $res & messageTerminator + +proc tryRoute*(router: RpcRouter, data: JsonNode, fut: var Future[JsonNode]): bool = + ## Route to RPC, returns false if the method or params cannot be found. + ## Expects json input and returns json output. + let + jPath = data{methodField} + jParams = data{paramsField} + if jPath.isEmpty or jParams.isEmpty: + return false + + let + path = jPath.getStr + rpc = router.procs.getOrDefault(path) + if rpc != nil: + fut = rpc(jParams) + return true + +proc makeProcName(s: string): string = + result = "" + for c in s: + if c.isAlphaNumeric: result.add c + +proc hasReturnType(params: NimNode): bool = + if params != nil and params.len > 0 and params[0] != nil and + params[0].kind != nnkEmpty: + result = true + +macro rpc*(server: RpcRouter, path: string, body: untyped): untyped = + ## Define a remote procedure call. + ## Input and return parameters are defined using the ``do`` notation. + ## For example: + ## .. code-block:: nim + ## myServer.rpc("path") do(param1: int, param2: float) -> string: + ## result = $param1 & " " & $param2 + ## ``` + ## Input parameters are automatically marshalled from json to Nim types, + ## and output parameters are automatically marshalled to json for transport. + result = newStmtList() + let + parameters = body.findChild(it.kind == nnkFormalParams) + # all remote calls have a single parameter: `params: JsonNode` + paramsIdent = newIdentNode"params" + # procs are generated from the stripped path + pathStr = $path + # strip non alphanumeric + procNameStr = pathStr.makeProcName + # public rpc proc + procName = newIdentNode(procNameStr) + # when parameters present: proc that contains our rpc body + doMain = newIdentNode(procNameStr & "DoMain") + # async result + res = newIdentNode("result") + var + setup = jsonToNim(parameters, paramsIdent) + procBody = if body.kind == nnkStmtList: body else: body.body + errTrappedBody = quote do: + try: + `procBody` + except: + debug "Error occurred within RPC ", path = `path`, errorMessage = getCurrentExceptionMsg() + if parameters.hasReturnType: + let returnType = parameters[0] + + # delegate async proc allows return and setting of result as native type + result.add(quote do: + proc `doMain`(`paramsIdent`: JsonNode): Future[`returnType`] {.async.} = + `setup` + `errTrappedBody` + ) + + if returnType == ident"JsonNode": + # `JsonNode` results don't need conversion + result.add( quote do: + proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} = + `res` = await `doMain`(`paramsIdent`) + ) + else: + result.add(quote do: + proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} = + `res` = %await `doMain`(`paramsIdent`) + ) + else: + # no return types, inline contents + result.add(quote do: + proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} = + `setup` + `errTrappedBody` + ) + result.add( quote do: + `server`.register(`path`, `procName`) + ) + + when defined(nimDumpRpcs): + echo "\n", pathStr, ": ", result.repr diff --git a/json_rpc/server.nim b/json_rpc/server.nim new file mode 100644 index 0000000..6ef2720 --- /dev/null +++ b/json_rpc/server.nim @@ -0,0 +1,37 @@ +import json, tables, options, macros +import asyncdispatch2, router +import jsonmarshal + +export asyncdispatch2, json, jsonmarshal, router + +type + RpcServer*[S] = ref object + servers*: seq[S] + router*: RpcRouter + +proc newRpcServer*[S](): RpcServer[S] = + new result + result.router = newRpcRouter() + result.servers = @[] + +template rpc*(server: RpcServer, path: string, body: untyped): untyped = + server.router.rpc(path, body) + +template hasMethod*(server: RpcServer, methodName: string): bool = server.router.hasMethod(methodName) + +# Wrapper for message processing + +proc route*[T](server: RpcServer[T], line: string): Future[string] {.async, gcsafe.} = + result = await server.router.route(line) + +# Server registration + +proc register*(server: RpcServer, name: string, rpc: RpcProc) = + ## Add a name/code pair to the RPC server. + server.router.addRoute(name, rpc) + +proc unRegisterAll*(server: RpcServer) = + # Remove all remote procedure calls from this server. + server.router.clear + + diff --git a/json_rpc/sockettransport.nim b/json_rpc/sockettransport.nim new file mode 100644 index 0000000..26608fa --- /dev/null +++ b/json_rpc/sockettransport.nim @@ -0,0 +1,156 @@ +import server, json, chronicles + +proc sendError*[T](transport: T, code: int, msg: string, id: JsonNode, + data: JsonNode = newJNull()) {.async.} = + ## Send error message to client + let error = wrapError(code, msg, id, data) + var value = $wrapReply(id, newJNull(), error) + result = transport.write(value) + +proc processClient(server: StreamServer, transport: StreamTransport) {.async, gcsafe.} = + ## Process transport data to the RPC server + var rpc = getUserData[RpcServer[StreamTransport]](server) + while true: + var + maxRequestLength = defaultMaxRequestLength + value = await transport.readLine(defaultMaxRequestLength) + if value == "": + transport.close + break + + debug "Processing message", address = transport.remoteAddress(), line = value + + let future = rpc.route(value) + yield future + if future.failed: + if future.readError of RpcProcError: + let err = future.readError.RpcProcError + await transport.sendError(err.code, err.msg, err.data) + elif future.readError of ValueError: + let err = future.readError[].ValueError + await transport.sendError(INVALID_PARAMS, err.msg, %"") + else: + await transport.sendError(SERVER_ERROR, + "Error: Unknown error occurred", %"") + else: + let res = await future + result = transport.write(res) + +# Utility functions for setting up servers using stream transport addresses + +proc addStreamServer*(server: RpcServer[StreamServer], address: TransportAddress) = + try: + info "Creating server on ", address = $address + var transportServer = createStreamServer(address, processClient, {ReuseAddr}, udata = server) + server.servers.add(transportServer) + except: + error "Failed to create server", address = $address, message = getCurrentExceptionMsg() + + if len(server.servers) == 0: + # Server was not bound, critical error. + raise newException(RpcBindError, "Unable to create server!") + +proc addStreamServers*(server: RpcServer[StreamServer], addresses: openarray[TransportAddress]) = + for item in addresses: + server.addStreamServer(item) + +proc addStreamServer*(server: RpcServer[StreamServer], address: string) = + ## Create new server and assign it to addresses ``addresses``. + var + tas4: seq[TransportAddress] + tas6: seq[TransportAddress] + added = 0 + + # Attempt to resolve `address` for IPv4 address space. + try: + tas4 = resolveTAddress(address, IpAddressFamily.IPv4) + except: + discard + + # Attempt to resolve `address` for IPv6 address space. + try: + tas6 = resolveTAddress(address, IpAddressFamily.IPv6) + except: + discard + + for r in tas4: + server.addStreamServer(r) + added.inc + for r in tas6: + server.addStreamServer(r) + added.inc + + if added == 0: + # Addresses could not be resolved, critical error. + raise newException(RpcAddressUnresolvableError, "Unable to get address!") + +proc addStreamServers*(server: RpcServer[StreamServer], addresses: openarray[string]) = + for address in addresses: + server.addStreamServer(address) + +proc addStreamServer*(server: RpcServer[StreamServer], address: string, port: Port) = + var + tas4: seq[TransportAddress] + tas6: seq[TransportAddress] + added = 0 + + # Attempt to resolve `address` for IPv4 address space. + try: + tas4 = resolveTAddress(address, port, IpAddressFamily.IPv4) + except: + discard + + # Attempt to resolve `address` for IPv6 address space. + try: + tas6 = resolveTAddress(address, port, IpAddressFamily.IPv6) + except: + discard + + if len(tas4) == 0 and len(tas6) == 0: + # Address was not resolved, critical error. + raise newException(RpcAddressUnresolvableError, + "Address " & address & " could not be resolved!") + + for r in tas4: + server.addStreamServer(r) + added.inc + for r in tas6: + server.addStreamServer(r) + added.inc + + if len(server.servers) == 0: + # Server was not bound, critical error. + raise newException(RpcBindError, + "Could not setup server on " & address & ":" & $int(port)) + +type RpcStreamServer* = RpcServer[StreamServer] + +proc newRpcStreamServer*(addresses: openarray[TransportAddress]): RpcStreamServer = + ## Create new server and assign it to addresses ``addresses``. + result = newRpcServer[StreamServer]() + result.addStreamServers(addresses) + +proc newRpcStreamServer*(addresses: openarray[string]): RpcStreamServer = + ## Create new server and assign it to addresses ``addresses``. + result = newRpcServer[StreamServer]() + result.addStreamServers(addresses) + +proc newRpcStreamServer*(address = "localhost", port: Port = Port(8545)): RpcStreamServer = + # Create server on specified port + result = newRpcServer[StreamServer]() + result.addStreamServer(address, port) + +proc start*(server: RpcStreamServer) = + ## Start the RPC server. + for item in server.servers: + item.start() + +proc stop*(server: RpcStreamServer) = + ## Stop the RPC server. + for item in server.servers: + item.stop() + +proc close*(server: RpcStreamServer) = + ## Cleanup resources of RPC server. + for item in server.servers: + item.close() \ No newline at end of file diff --git a/rpcclient.nim b/rpcclient.nim index c1a3431..ebecba9 100644 --- a/rpcclient.nim +++ b/rpcclient.nim @@ -1,3 +1,3 @@ -import eth-rpc / client +import json_rpc / client export client diff --git a/rpcserver.nim b/rpcserver.nim index 905d53e..ef197d1 100644 --- a/rpcserver.nim +++ b/rpcserver.nim @@ -1,2 +1,2 @@ -import eth-rpc / server +import json_rpc / server export server diff --git a/rpcsockets.nim b/rpcsockets.nim new file mode 100644 index 0000000..06ee707 --- /dev/null +++ b/rpcsockets.nim @@ -0,0 +1,2 @@ +import json_rpc / [server, sockettransport] +export server, sockettransport diff --git a/tests/all.nim b/tests/all.nim index d3923d1..ae9eb62 100644 --- a/tests/all.nim +++ b/tests/all.nim @@ -1,3 +1,3 @@ import - testrpcmacro, testserverclient, testethcalls, testerrors + testrpcmacro, testserverclient, testethcalls #, testerrors diff --git a/tests/debugclient.nim b/tests/debugclient.nim index cd48d5e..15c5f90 100644 --- a/tests/debugclient.nim +++ b/tests/debugclient.nim @@ -1,4 +1,4 @@ -include ../ eth-rpc / client +include ../ json_rpc / client proc nextId*(self: RpcClient): int64 = self.nextId @@ -9,7 +9,7 @@ proc rawCall*(self: RpcClient, name: string, self.nextId.inc var s = msg & "\c\l" - let res = await self.transp.write(s) + let res = await self.transport.write(s) assert res == len(s) # completed by processMessage. diff --git a/tests/ethhexstrings.nim b/tests/ethhexstrings.nim new file mode 100644 index 0000000..f711689 --- /dev/null +++ b/tests/ethhexstrings.nim @@ -0,0 +1,157 @@ +type + HexQuantityStr* = distinct string + HexDataStr* = distinct string + +# Hex validation + +template stripLeadingZeros(value: string): string = + var cidx = 0 + # ignore the last character so we retain '0' on zero value + while cidx < value.len - 1 and value[cidx] == '0': + cidx.inc + value[cidx .. ^1] + +proc encodeQuantity*(value: SomeUnsignedInt): string = + var hValue = value.toHex.stripLeadingZeros + result = "0x" & hValue + +template hasHexHeader*(value: string | HexDataStr | HexQuantityStr): bool = + template strVal: untyped = value.string + if strVal != "" and strVal[0] == '0' and strVal[1] in {'x', 'X'} and strVal.len > 2: true + else: false + +template isHexChar*(c: char): bool = + if c notin {'0'..'9'} and + c notin {'a'..'f'} and + c notin {'A'..'F'}: false + else: true + +proc validate*(value: HexQuantityStr): bool = + template strVal: untyped = value.string + if not value.hasHexHeader: + return false + # No leading zeros + if strVal[2] == '0': return false + for i in 2.. string: + server.rpc("web3_sha3") do(data: HexDataStr) -> HexDataStr: ## Returns Keccak-256 (not the standardized SHA3-256) of the given data. ## ## data: the data to convert into a SHA3 hash. ## Returns the SHA3 result of the given string. # TODO: Capture error on malformed input var rawData: seq[byte] - if data.len > 2 and data[0] == '0' and data[1] in ['x', 'X']: - rawData = data[2..data.high].fromHex - else: - rawData = data.fromHex + rawData = data.string.fromHex # data will have 0x prefix - result = "0x" & $keccak_256.digest(rawData) + result = hexDataStr "0x" & $keccak_256.digest(rawData) server.rpc("net_version") do() -> string: ## Returns string of the current network id: diff --git a/tests/testerrors.nim b/tests/testerrors.nim index 9f03777..1304cf6 100644 --- a/tests/testerrors.nim +++ b/tests/testerrors.nim @@ -3,11 +3,11 @@ allow unchecked and unformatted calls. ]# -import unittest, debugclient, ../rpcserver +import unittest, debugclient, ../rpcsockets import strformat, chronicles -var server = newRpcServer("localhost", 8547.Port) -var client = newRpcClient() +var server = newRpcStreamServer("localhost", 8547.Port) +var client = newRpcStreamClient() server.start() waitFor client.connect("localhost", Port(8547)) @@ -15,6 +15,10 @@ waitFor client.connect("localhost", Port(8547)) server.rpc("rpc") do(a: int, b: int): result = %(&"a: {a}, b: {b}") +server.rpc("makeError"): + if true: + raise newException(ValueError, "Test") + proc testMissingRpc: Future[Response] {.async.} = var fut = client.call("phantomRpc", %[]) result = await fut @@ -33,22 +37,44 @@ proc testMalformed: Future[Response] {.async.} = if fut.finished: result = fut.read() else: result = (true, %"Timeout") +proc testRaise: Future[Response] {.async.} = + var fut = client.call("rpcMakeError", %[]) + result = await fut + suite "RPC Errors": # Note: We don't expect a exceptions for most of the tests, # because the server should respond with the error in json test "Missing RPC": - let res = waitFor testMissingRpc() - check res.error == true and - res.result["message"] == %"Method not found" and - res.result["data"] == %"phantomRpc is not a registered method." + #expect ValueError: + try: + let res = waitFor testMissingRpc() + check res.error == true and + res.result["message"] == %"Method not found" and + res.result["data"] == %"phantomRpc is not a registered method." + except: + echo "Error ", getCurrentExceptionMsg() test "Incorrect json version": - let res = waitFor testInvalidJsonVer() - check res.error == true and res.result["message"] == %"JSON 2.0 required" + #expect ValueError: + try: + let res = waitFor testInvalidJsonVer() + check res.error == true and res.result["message"] == %"JSON 2.0 required" + except: + echo "Error ", getCurrentExceptionMsg() + + test "Raising exceptions": + #expect ValueError: + try: + let res = waitFor testRaise() + except: + echo "Error ", getCurrentExceptionMsg() test "Malformed json": # TODO: We time out here because the server won't be able to # find an id to return to us, so we cannot complete the future. - let res = waitFor testMalformed() - check res.error == true and res.result == %"Timeout" + try: + let res = waitFor testMalformed() + check res.error == true and res.result == %"Timeout" + except: + echo "Error ", getCurrentExceptionMsg() diff --git a/tests/testethcalls.nim b/tests/testethcalls.nim index 13d54a1..ec23e46 100644 --- a/tests/testethcalls.nim +++ b/tests/testethcalls.nim @@ -1,14 +1,14 @@ import unittest, json, tables -import ../rpcclient, ../rpcserver -import stint, ethtypes, ethprocs, stintjson +import ../rpcclient, ../rpcsockets +import stint, ethtypes, ethprocs, stintjson, nimcrypto, ethhexstrings, chronicles from os import getCurrentDir, DirSep from strutils import rsplit template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] var - server = newRpcServer("localhost", Port(8546)) - client = newRpcClient() + server = newRpcStreamServer("localhost", Port(8546)) + client = newRpcStreamClient() ## Generate Ethereum server RPCs server.addEthRpcs() diff --git a/tests/testrpcmacro.nim b/tests/testrpcmacro.nim index 6d39c78..d3cb07c 100644 --- a/tests/testrpcmacro.nim +++ b/tests/testrpcmacro.nim @@ -1,5 +1,5 @@ -import unittest, json, tables -import ../rpcserver +import unittest, json, tables, chronicles +import ../rpcsockets type # some nested types to check object parsing @@ -27,7 +27,7 @@ let }, "c": %1.23} -var s = newRpcServer(["localhost:8545"]) +var s = newRpcStreamServer(["localhost:8545"]) # RPC definitions @@ -67,14 +67,14 @@ s.rpc("rpc.testreturns") do() -> int: suite "Server types": test "On macro registration": - check s.procs.hasKey("rpc.simplepath") - check s.procs.hasKey("rpc.differentparams") - check s.procs.hasKey("rpc.arrayparam") - check s.procs.hasKey("rpc.seqparam") - check s.procs.hasKey("rpc.objparam") - check s.procs.hasKey("rpc.returntypesimple") - check s.procs.hasKey("rpc.returntypecomplex") - check s.procs.hasKey("rpc.testreturns") + check s.hasMethod("rpc.simplepath") + check s.hasMethod("rpc.differentparams") + check s.hasMethod("rpc.arrayparam") + check s.hasMethod("rpc.seqparam") + check s.hasMethod("rpc.objparam") + check s.hasMethod("rpc.returntypesimple") + check s.hasMethod("rpc.returntypecomplex") + check s.hasMethod("rpc.testreturns") test "Simple paths": let r = waitFor rpcSimplePath(%[]) diff --git a/tests/testserverclient.nim b/tests/testserverclient.nim index 4706732..bb2a0f7 100644 --- a/tests/testserverclient.nim +++ b/tests/testserverclient.nim @@ -1,8 +1,8 @@ -import unittest, json -import ../rpcclient, ../rpcserver +import unittest, json, chronicles +import ../rpcclient, ../rpcsockets -var srv = newRpcServer(["localhost:8545"]) -var client = newRpcClient() +var srv = newRpcStreamServer(["localhost:8545"]) +var client = newRpcStreamClient() # Create RPC on server srv.rpc("myProc") do(input: string, data: array[0..3, int]):