diff --git a/json_rpc/client.nim b/json_rpc/client.nim index 70842f8..524ca30 100644 --- a/json_rpc/client.nim +++ b/json_rpc/client.nim @@ -6,8 +6,8 @@ export asyncdispatch2 type RpcClient*[T, A] = ref object - transp*: T awaiting: Table[string, Future[Response]] + transport: T address: A nextId: int64 @@ -15,34 +15,30 @@ type const defaultMaxRequestLength* = 1024 * 128 -proc newRpcClient*[T, A](): RpcClient[T, A] = +proc newRpcClient*[T, A]: RpcClient[T, A] = ## Creates a new ``RpcClient`` instance. result = RpcClient[T, A](awaiting: initTable[string, Future[Response]](), nextId: 1) -proc genCall(rpcType, callName, writeCode: NimNode): NimNode = - let res = newIdentNode("result") - result = quote do: - proc `callName`*(self: `rpcType`, name: string, - params: JsonNode): Future[Response] {.async.} = - ## Remotely calls the specified RPC method. - let id = $self.nextId - self.nextId.inc - var - value {.inject.} = - $ %{"jsonrpc": %"2.0", - "method": %name, - "params": params, - "id": %id} & "\c\l" - template client: untyped = self - let res = await `writeCode` - # TODO: Add actions when not full packet was send, e.g. disconnect peer. - assert(res == len(value)) +proc call*(self: RpcClient, name: string, + params: JsonNode): Future[Response] {.async.} = + ## Remotely calls the specified RPC method. + let id = $self.nextId + self.nextId.inc + var + value = + $ %{"jsonrpc": %"2.0", + "method": %name, + "params": params, + "id": %id} & "\c\l" + let res = await self.transport.write(value) + # TODO: Add actions when not full packet was send, e.g. disconnect peer. + assert(res == len(value)) - # completed by processMessage. - var newFut = newFuture[Response]() - # add to awaiting responses - self.awaiting[id] = newFut - `res` = await newFut + # completed by processMessage. + var newFut = newFuture[Response]() + # add to awaiting responses + self.awaiting[id] = newFut + result = await newFut template asyncRaise[T](fut: Future[T], errType: typedesc, msg: string) = fut.fail(newException(errType, msg)) @@ -66,12 +62,12 @@ macro checkGet(node: JsonNode, fieldName: string, of JObject: result.add(quote do: `n`.getObject) else: discard -proc processMessage[T, A](self: RpcClient[T, A], line: string) = +proc processMessage(self: RpcClient, line: string) = # Note: this doesn't use any transport code so doesn't need to be differentiated. - let node = parseJson(line) # TODO: Check errors + let + node = parseJson(line) # TODO: Check errors + id = checkGet(node, "id", JString) - # TODO: Use more appropriate exception objects - let id = checkGet(node, "id", JString) if not self.awaiting.hasKey(id): raise newException(ValueError, "Cannot find message id \"" & node["id"].str & "\"") @@ -92,108 +88,26 @@ proc processMessage[T, A](self: RpcClient[T, A], line: string) = self.awaiting[id].fail(newException(ValueError, $errorNode)) self.awaiting.del(id) -proc genProcessData(rpcType, processDataName, readCode, afterReadCode, closeCode: NimNode): NimNode = - result = quote do: - proc `processDataName`(clientTransport: `rpcType`) {.async.} = - while true: - var maxRequestLength {.inject.} = defaultMaxRequestLength - template client: untyped = clientTransport - var value {.inject.} = await `readCode` - `afterReadCode` - if value == "": - # transmission ends - `closeCode` - break +proc processData(client: RpcClient) {.async.} = + while true: + var value = await client.transport.readLine(defaultMaxRequestLength) + if value == "": + # transmission ends + client.transport.close + break - processMessage(clientTransport, value) - # async loop reconnection and waiting - clientTransport.transp = await connect(clientTransport.address) - -proc genConnect(rpcType, connectName, processDataName, connectCode: NimNode): NimNode = - result = quote do: - proc `connectName`*(clientTransport: `rpcType`, address: string, port: Port) {.async.} = - var - address {.inject.} = address - port {.inject.} = port - template client: untyped = clientTransport - `connectCode` - asyncCheck `processDataName`(clientTransport) - -macro defineRpcClientTransport*(transType, addrType: untyped, prefix: string = "", body: untyped = nil): untyped = - var - writeCode = quote do: - client.transp.write(value) - readCode = quote do: - client.transp.readLine(defaultMaxRequestLength) - closeCode = quote do: - client.transp.close - connectCode = quote do: - # TODO: `address` hostname can be resolved to many IP addresses, we are using - # first one, but maybe it would be better to iterate over all IP addresses - # and try to establish connection until it will not be established. - let addresses = resolveTAddress(address, port) - client.transp = await connect(addresses[0]) - client.address = addresses[0] - afterReadCode = newStmtList() - - if body != nil: - body.expectKind nnkStmtList - for item in body: - item.expectKind nnkCall - item[0].expectKind nnkIdent - item[1].expectKind nnkStmtList - let - verb = $item[0] - code = item[1] - - case verb.toLowerAscii - of "write": - # `client`, the RpcClient - # `value`, the data to be sent to the server - # Note: Update `value` so it's length can be sent afterwards - writeCode = code - of "read": - # `client`, the RpcClient - # `maxRequestLength`, initially set to defaultMaxRequestLength - readCode = code - of "close": - # `client`, the RpcClient - # `value`, the data returned from the server - # `maxRequestLength`, initially set to defaultMaxRequestLength - closeCode = code - of "connect": - # `client`, the RpcClient - # `address`, server destination address string - # `port`, server destination port - connectCode = code - of "afterread": - # `client`, the RpcClient - # `value`, the data returned from the server - # `maxRequestLength`, initially set to defaultMaxRequestLength - afterReadCode = code - else: error("Unknown RPC verb \"" & verb & "\"") - - result = newStmtList() - let - rpcType = quote: RpcClient[`transType`, `addrType`] - processDataName = newIdentNode(prefix.strVal & "processData") - connectName = newIdentNode(prefix.strVal & "connect") - callName = newIdentNode(prefix.strVal & "call") - - result.add(genProcessData(rpcType, processDataName, readCode, afterReadCode, closeCode)) - result.add(genConnect(rpcType, connectName, processDataName, connectCode)) - result.add(genCall(rpcType, callName, writeCode)) - - when defined(nimDumpRpcs): - echo "defineClient:\n", result.repr - -# Define default stream server -# TODO: Move this into a separate unit so users can define 'connect', 'call' etc without requiring a prefix? - -defineRpcClientTransport(StreamTransport, TransportAddress) + client.processMessage(value) + # async loop reconnection and waiting + client.transport = await connect(client.address) type RpcStreamClient* = RpcClient[StreamTransport, TransportAddress] +proc connect*(client: RpcStreamClient, address: string, port: Port) {.async.} = + let addresses = resolveTAddress(address, port) + client.transport = await connect(addresses[0]) + client.address = addresses[0] + asyncCheck processData(client) + proc newRpcStreamClient*(): RpcStreamClient = ## Create new server and assign it to addresses ``addresses``. result = newRpcClient[StreamTransport, TransportAddress]() diff --git a/json_rpc/router.nim b/json_rpc/router.nim new file mode 100644 index 0000000..7ac0614 --- /dev/null +++ b/json_rpc/router.nim @@ -0,0 +1,138 @@ +import json, tables, asyncdispatch2, jsonmarshal, strutils, macros +export asyncdispatch2, json, jsonmarshal + +type + # Procedure signature accepted as an RPC call by server + RpcProc* = proc(input: JsonNode): Future[JsonNode] + + RpcRouter* = object + procs*: TableRef[string, RpcProc] + +const + methodField = "method" + paramsField = "params" + +proc newRpcRouter*: RpcRouter = + result.procs = newTable[string, RpcProc]() + +proc register*(router: var RpcRouter, path: string, call: RpcProc) = + router.procs.add(path, call) + +proc clear*(router: var RpcRouter) = router.procs.clear + +proc hasMethod*(router: RpcRouter, methodName: string): bool = router.procs.hasKey(methodName) + +template isEmpty(node: JsonNode): bool = node.isNil or node.kind == JNull + +proc route*(router: RpcRouter, data: JsonNode): Future[JsonNode] {.async, gcsafe.} = + ## Route to RPC, raises exceptions on missing data + let jPath = data{methodField} + if jPath.isEmpty: + raise newException(ValueError, "No " & methodField & " field found") + + let jParams = data{paramsField} + if jParams.isEmpty: + raise newException(ValueError, "No " & paramsField & " field found") + + let + path = jPath.getStr + rpc = router.procs.getOrDefault(path) + # TODO: not GC-safe as it accesses 'rpc' which is a global using GC'ed memory! + if rpc != nil: + result = await rpc(jParams) + else: + raise newException(ValueError, "Method \"" & path & "\" not found") + +proc ifRoute*(router: RpcRouter, data: JsonNode, fut: var Future[JsonNode]): bool = + ## Route to RPC, returns false if the method or params cannot be found + # TODO: This is already checked in processMessages, but allows safer use externally + let + jPath = data{methodField} + jParams = data{paramsField} + if jPath.isEmpty or jParams.isEmpty: + return false + + let + path = jPath.getStr + rpc = router.procs.getOrDefault(path) + if rpc != nil: + fut = rpc(jParams) + return true + +proc makeProcName(s: string): string = + result = "" + for c in s: + if c.isAlphaNumeric: result.add c + +proc hasReturnType(params: NimNode): bool = + if params != nil and params.len > 0 and params[0] != nil and + params[0].kind != nnkEmpty: + result = true + +macro rpc*(server: RpcRouter, path: string, body: untyped): untyped = + ## Define a remote procedure call. + ## Input and return parameters are defined using the ``do`` notation. + ## For example: + ## .. code-block:: nim + ## myServer.rpc("path") do(param1: int, param2: float) -> string: + ## result = $param1 & " " & $param2 + ## ``` + ## Input parameters are automatically marshalled from json to Nim types, + ## and output parameters are automatically marshalled to json for transport. + result = newStmtList() + let + parameters = body.findChild(it.kind == nnkFormalParams) + # all remote calls have a single parameter: `params: JsonNode` + paramsIdent = newIdentNode"params" + # procs are generated from the stripped path + pathStr = $path + # strip non alphanumeric + procNameStr = pathStr.makeProcName + # public rpc proc + procName = newIdentNode(procNameStr) + # when parameters present: proc that contains our rpc body + doMain = newIdentNode(procNameStr & "DoMain") + # async result + res = newIdentNode("result") + var + setup = jsonToNim(parameters, paramsIdent) + procBody = if body.kind == nnkStmtList: body else: body.body + errTrappedBody = quote do: + try: + `procBody` + except: + debug "Error occurred within RPC ", path = `path`, errorMessage = getCurrentExceptionMsg() + if parameters.hasReturnType: + let returnType = parameters[0] + + # delegate async proc allows return and setting of result as native type + result.add(quote do: + proc `doMain`(`paramsIdent`: JsonNode): Future[`returnType`] {.async.} = + `setup` + `errTrappedBody` + ) + + if returnType == ident"JsonNode": + # `JsonNode` results don't need conversion + result.add( quote do: + proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} = + `res` = await `doMain`(`paramsIdent`) + ) + else: + result.add(quote do: + proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} = + `res` = %await `doMain`(`paramsIdent`) + ) + else: + # no return types, inline contents + result.add(quote do: + proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} = + `setup` + `errTrappedBody` + ) + result.add( quote do: + `server`.register(`path`, `procName`) + ) + + when defined(nimDumpRpcs): + echo "\n", pathStr, ": ", result.repr diff --git a/json_rpc/server.nim b/json_rpc/server.nim index d675903..a564957 100644 --- a/json_rpc/server.nim +++ b/json_rpc/server.nim @@ -1,37 +1,20 @@ -import json, tables, strutils, options, macros, chronicles -import asyncdispatch2 +import json, tables, options, macros, chronicles +import asyncdispatch2, router import jsonmarshal -export asyncdispatch2, json, jsonmarshal, options +export asyncdispatch2, json, jsonmarshal, router logScope: topics = "RpcServer" type - RpcJsonError* = enum rjeInvalidJson, rjeVersionError, rjeNoMethod, rjeNoId + RpcJsonError* = enum rjeInvalidJson, rjeVersionError, rjeNoMethod, rjeNoId, rjeNoParams RpcJsonErrorContainer* = tuple[err: RpcJsonError, msg: string] - # Procedure signature accepted as an RPC call by server - RpcProc* = proc (params: JsonNode): Future[JsonNode] - - RpcClientTransport* = concept t - t.write(var string) is Future[int] - t.readLine(int) is Future[string] - t.close - t.remoteAddress() # Required for logging - t.localAddress() - - RpcServerTransport* = concept t - t.start - t.stop - t.close - - RpcProcessClient* = proc (server: RpcServerTransport, client: RpcClientTransport): Future[void] {.gcsafe.} - - RpcServer*[S: RpcServerTransport] = ref object + RpcServer*[S] = ref object servers*: seq[S] - procs*: TableRef[string, RpcProc] + router*: RpcRouter RpcProcError* = ref object of Exception code*: int @@ -49,23 +32,28 @@ const SERVER_ERROR* = -32000 defaultMaxRequestLength* = 1024 * 128 - jsonErrorMessages*: array[RpcJsonError, (int, string)] = [ (JSON_PARSE_ERROR, "Invalid JSON"), (INVALID_REQUEST, "JSON 2.0 required"), (INVALID_REQUEST, "No method requested"), - (INVALID_REQUEST, "No id specified") + (INVALID_REQUEST, "No id specified"), + (INVALID_PARAMS, "No parameters specified") ] proc newRpcServer*[S](): RpcServer[S] = new result - result.procs = newTable[string, RpcProc]() + result.router = newRpcRouter() result.servers = @[] # Utility functions -# TODO: Move outside server -func `%`*(p: Port): JsonNode = %(p.int) +# TODO: Move outside server? +#func `%`*(p: Port): JsonNode = %(p.int) + +template rpc*(server: RpcServer, path: string, body: untyped): untyped = + server.router.rpc(path, body) + +template hasMethod*(server: RpcServer, methodName: string): bool = server.router.hasMethod(methodName) # Json state checking @@ -80,7 +68,7 @@ template jsonValid*(jsonString: string, node: var JsonNode): (bool, string) = debug "Cannot process json", json = jsonString, msg = msg (valid, msg) -proc checkJsonErrors*(line: string, +proc checkJsonState*(line: string, node: var JsonNode): Option[RpcJsonErrorContainer] = ## Tries parsing line into node, if successful checks required fields ## Returns: error state or none @@ -89,10 +77,13 @@ proc checkJsonErrors*(line: string, return some((rjeInvalidJson, res[1])) if not node.hasKey("id"): return some((rjeNoId, "")) - if node{"jsonrpc"} != %"2.0": + let jVer = node{"jsonrpc"} + if jVer != nil and jVer.kind != JNull and jVer != %"2.0": return some((rjeVersionError, "")) if not node.hasKey("method"): return some((rjeNoMethod, "")) + if not node.hasKey("params"): + return some((rjeNoParams, "")) return none(RpcJsonErrorContainer) # Json reply wrappers @@ -101,170 +92,47 @@ proc wrapReply*(id: JsonNode, value: JsonNode, error: JsonNode): string = let node = %{"jsonrpc": %"2.0", "result": value, "error": error, "id": id} return $node & "\c\l" -proc genErrorSending(name, writeCode, errorCode: NimNode): NimNode = - let - res = newIdentNode("result") - sendJsonErr = newIdentNode($name & "Json") - result = quote do: - proc `name`*[T: RpcClientTransport](clientTrans: T, code: int, msg: string, id: JsonNode, - data: JsonNode = newJNull()) {.async.} = - ## Send error message to client - let error = %{"code": %(code), "id": id, "message": %msg, "data": data} - debug "Error generated", error = error, id = id - - template transport: untyped = clientTrans - var value {.inject.} = wrapReply(id, newJNull(), error) - `errorCode` - `res` = `writeCode` - - proc `sendJsonErr`*(state: RpcJsonError, clientTrans: RpcClientTransport, id: JsonNode, - data = newJNull()) {.async.} = - ## Send client response for invalid json state - let errMsgs = jsonErrorMessages[state] - await clientTrans.`name`(errMsgs[0], errMsgs[1], id, data) +proc wrapError*(code: int, msg: string, id: JsonNode, + data: JsonNode = newJNull()): JsonNode = + # Create standardised error json + result = %{"code": %(code), "id": id, "message": %msg, "data": data} + debug "Error generated", error = result, id = id # Server message processing -proc genProcessMessages(name, sendErrorName, writeCode: NimNode): NimNode = - let idSendErrJson = newIdentNode($sendErrorName & "Json") - result = quote do: - proc `name`[T: RpcClientTransport](server: RpcServer, clientTrans: T, - line: string) {.async.} = - var - node: JsonNode - # set up node and/or flag errors - jsonErrorState = checkJsonErrors(line, node) - - if jsonErrorState.isSome: - let errState = jsonErrorState.get - var id = - if errState.err == rjeInvalidJson or errState.err == rjeNoId: - newJNull() - else: - node["id"] - await errState.err.`idSendErrJson`(clientTrans, id, %errState.msg) - else: - let - methodName = node["method"].str - id = node["id"] - - if not server.procs.hasKey(methodName): - await clientTrans.`sendErrorName`(METHOD_NOT_FOUND, "Method not found", %id, - %(methodName & " is not a registered method.")) - else: - let callRes = await server.procs[methodName](node["params"]) - template transport: untyped = clientTrans - var value {.inject.} = wrapReply(id, callRes, newJNull()) - asyncCheck `writeCode` - -proc genProcessClient(nameIdent, procMessagesIdent, sendErrIdent, readCode, afterReadCode, closeCode: NimNode): NimNode = - # This generates the processClient proc to match transport. - # processClient is compatible with createStreamServer and thus StreamCallback. - # However the constraints are conceptualised so you only need to match it's interface - # Note: https://github.com/nim-lang/Nim/issues/644 - result = quote do: - proc `nameIdent`[S: RpcServerTransport, C: RpcClientTransport](server: S, clientTrans: C) {.async, gcsafe.} = - var rpc = getUserData[RpcServer[S]](server) - while true: - var maxRequestLength {.inject.} = defaultMaxRequestLength - template transport: untyped = clientTrans - - var value {.inject.} = await `readCode` - `afterReadCode` - if value == "": - `closeCode` - break - - debug "Processing message", address = clientTrans.remoteAddress(), line = value - - let future = `procMessagesIdent`(rpc, clientTrans, value) - yield future - if future.failed: - if future.readError of RpcProcError: - let err = future.readError.RpcProcError - await clientTrans.`sendErrIdent`(err.code, err.msg, err.data) - elif future.readError of ValueError: - let err = future.readError[].ValueError - await clientTrans.`sendErrIdent`(INVALID_PARAMS, err.msg, %"") - else: - await clientTrans.`sendErrIdent`(SERVER_ERROR, - "Error: Unknown error occurred", %"") - -macro defineRpcServerTransport*(procClientName: untyped, body: untyped = nil): untyped = - ## Build an rpcServer type that inlines data access operations - #[ - Injects: - client: RpcClientTransport type - maxRequestLength: optional bytes to read - value: Json string to be written to transport - - Example: - defineRpcTransport(myServer): - write: - client.write(value) - read: - client.readLine(maxRequestLength) - close: - client.close - ]# - procClientName.expectKind nnkIdent +proc processMessages*[T](server: RpcServer[T], line: string): Future[string] {.async, gcsafe.} = var - writeCode = quote do: - transport.write(value) - readCode = quote do: - transport.readLine(defaultMaxRequestLength) - closeCode = quote do: - transport.close - afterReadCode = newStmtList() - errorCode = newStmtList() + node: JsonNode + # parse json node and/or flag missing fields and errors + jsonErrorState = checkJsonState(line, node) - if body != nil: - body.expectKind nnkStmtList - for item in body: - item.expectKind nnkCall - item[0].expectKind nnkIdent - item[1].expectKind nnkStmtList + if jsonErrorState.isSome: + let errState = jsonErrorState.get + var id = + if errState.err == rjeInvalidJson or errState.err == rjeNoId: + newJNull() + else: + node["id"] + let errMsg = jsonErrorMessages[errState.err] + # return error state as json + result = $wrapError( + code = errMsg[0], + msg = errMsg[1], + id = id) + else: + let + methodName = node["method"].str + id = node["id"] + var callRes: Future[JsonNode] + + if server.router.ifRoute(node, callRes): + let res = await callRes + result = $wrapReply(id, res, newJNull()) + else: let - verb = $item[0] - code = item[1] - - case verb.toLowerAscii - of "write": - # `transport`, the client transport - # `value`, the data returned from the invoked RPC - # Note: Update `value` so it's length can be sent afterwards - writeCode = code - of "read": - # `transport`, the client transport - # `maxRequestLength`, set to defaultMaxRequestLength - # Note: Result of expression is awaited - readCode = code - of "close": - # `transport`, the client transport - # `value`, which contains the data read by `readCode` - closeCode = code - of "afterread": - # `transport`, the client transport - # `value`, which contains the data read by `readCode` - afterReadCode = code - of "error": - # `transport`, the client transport - # `value`, the data returned from the invoked RPC - # Note: Update `value` so it's length can be sent afterwards - errorCode = code - else: error("Unknown RPC verb \"" & verb & "\"") - - result = newStmtList() - - let - sendErr = newIdentNode($procClientName & "SendError") - procMsgs = newIdentNode($procClientName & "ProcessMessages") - result.add(genErrorSending(sendErr, writeCode, errorCode)) - result.add(genProcessMessages(procMsgs, sendErr, writeCode)) - result.add(genProcessClient(procClientName, procMsgs, sendErr, readCode, afterReadCode, closeCode)) - - when defined(nimDumpRpcs): - echo "defineServer:\n", result.repr + methodNotFound = %(methodName & " is not a registered method.") + error = wrapError(METHOD_NOT_FOUND, "Method not found", id, methodNotFound) + result = $wrapReply(id, newJNull(), error) proc start*(server: RpcServer) = ## Start the RPC server. @@ -281,201 +149,14 @@ proc close*(server: RpcServer) = for item in server.servers: item.close() -# Server registration and RPC generation +# Server registration proc register*(server: RpcServer, name: string, rpc: RpcProc) = ## Add a name/code pair to the RPC server. - server.procs[name] = rpc + server.router.addRoute(name, rpc) proc unRegisterAll*(server: RpcServer) = # Remove all remote procedure calls from this server. - server.procs.clear - -proc makeProcName(s: string): string = - result = "" - for c in s: - if c.isAlphaNumeric: result.add c - -proc hasReturnType(params: NimNode): bool = - if params != nil and params.len > 0 and params[0] != nil and - params[0].kind != nnkEmpty: - result = true - -macro rpc*(server: RpcServer, path: string, body: untyped): untyped = - ## Define a remote procedure call. - ## Input and return parameters are defined using the ``do`` notation. - ## For example: - ## .. code-block:: nim - ## myServer.rpc("path") do(param1: int, param2: float) -> string: - ## result = $param1 & " " & $param2 - ## ``` - ## Input parameters are automatically marshalled from json to Nim types, - ## and output parameters are automatically marshalled to json for transport. - result = newStmtList() - let - parameters = body.findChild(it.kind == nnkFormalParams) - # all remote calls have a single parameter: `params: JsonNode` - paramsIdent = newIdentNode"params" - # procs are generated from the stripped path - pathStr = $path - # strip non alphanumeric - procNameStr = pathStr.makeProcName - # public rpc proc - procName = newIdentNode(procNameStr) - # when parameters present: proc that contains our rpc body - doMain = newIdentNode(procNameStr & "DoMain") - # async result - res = newIdentNode("result") - var - setup = jsonToNim(parameters, paramsIdent) - procBody = if body.kind == nnkStmtList: body else: body.body - errTrappedBody = quote do: - try: - `procBody` - except: - debug "Error occurred within RPC ", path = `path`, errorMessage = getCurrentExceptionMsg() - if parameters.hasReturnType: - let returnType = parameters[0] - - # delegate async proc allows return and setting of result as native type - result.add(quote do: - proc `doMain`(`paramsIdent`: JsonNode): Future[`returnType`] {.async.} = - `setup` - `errTrappedBody` - ) - - if returnType == ident"JsonNode": - # `JsonNode` results don't need conversion - result.add( quote do: - proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} = - `res` = await `doMain`(`paramsIdent`) - ) - else: - result.add(quote do: - proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} = - `res` = %await `doMain`(`paramsIdent`) - ) - else: - # no return types, inline contents - result.add(quote do: - proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} = - `setup` - `errTrappedBody` - ) - result.add( quote do: - `server`.register(`path`, `procName`) - ) - - when defined(nimDumpRpcs): - echo "\n", pathStr, ": ", result.repr - -# Utility functions for setting up servers using stream transport addresses - -# Create a default transport that's suitable for createStreamServer -defineRpcServerTransport(processStreamClient) - -proc addStreamServer*[S](server: RpcServer[S], address: TransportAddress, callBack: StreamCallback = processStreamClient) = - #makeProcessClient(processClient, StreamTransport) - try: - info "Creating server on ", address = $address - var transportServer = createStreamServer(address, callBack, {ReuseAddr}, udata = server) - server.servers.add(transportServer) - except: - error "Failed to create server", address = $address, message = getCurrentExceptionMsg() - - if len(server.servers) == 0: - # Server was not bound, critical error. - raise newException(RpcBindError, "Unable to create server!") - -proc addStreamServers*[T: RpcServer](server: T, addresses: openarray[TransportAddress], callBack: StreamCallback = processStreamClient) = - for item in addresses: - server.addStreamServer(item, callBack) - -proc addStreamServer*[T: RpcServer](server: T, address: string, callBack: StreamCallback = processStreamClient) = - ## Create new server and assign it to addresses ``addresses``. - var - tas4: seq[TransportAddress] - tas6: seq[TransportAddress] - added = 0 - - # Attempt to resolve `address` for IPv4 address space. - try: - tas4 = resolveTAddress(address, IpAddressFamily.IPv4) - except: - discard - - # Attempt to resolve `address` for IPv6 address space. - try: - tas6 = resolveTAddress(address, IpAddressFamily.IPv6) - except: - discard - - for r in tas4: - server.addStreamServer(r, callBack) - added.inc - for r in tas6: - server.addStreamServer(r, callBack) - added.inc - - if added == 0: - # Addresses could not be resolved, critical error. - raise newException(RpcAddressUnresolvableError, "Unable to get address!") - -proc addStreamServers*[T: RpcServer](server: T, addresses: openarray[string], callBack: StreamCallback = processStreamClient) = - for address in addresses: - server.addStreamServer(address, callBack) - -proc addStreamServer*[T: RpcServer](server: T, address: string, port: Port, callBack: StreamCallback = processStreamClient) = - var - tas4: seq[TransportAddress] - tas6: seq[TransportAddress] - added = 0 - - # Attempt to resolve `address` for IPv4 address space. - try: - tas4 = resolveTAddress(address, port, IpAddressFamily.IPv4) - except: - discard - - # Attempt to resolve `address` for IPv6 address space. - try: - tas6 = resolveTAddress(address, port, IpAddressFamily.IPv6) - except: - discard - - if len(tas4) == 0 and len(tas6) == 0: - # Address was not resolved, critical error. - raise newException(RpcAddressUnresolvableError, - "Address " & address & " could not be resolved!") - - for r in tas4: - server.addStreamServer(r, callBack) - added.inc - for r in tas6: - server.addStreamServer(r, callBack) - added.inc - - if len(server.servers) == 0: - # Server was not bound, critical error. - raise newException(RpcBindError, - "Could not setup server on " & address & ":" & $int(port)) - -type RpcStreamServer* = RpcServer[StreamServer] - -proc newRpcStreamServer*(addresses: openarray[TransportAddress]): RpcStreamServer = - ## Create new server and assign it to addresses ``addresses``. - result = newRpcServer[StreamServer]() - result.addStreamServers(addresses) - -proc newRpcStreamServer*(addresses: openarray[string]): RpcStreamServer = - ## Create new server and assign it to addresses ``addresses``. - result = newRpcServer[StreamServer]() - result.addStreamServers(addresses) - -proc newRpcStreamServer*(address = "localhost", port: Port = Port(8545)): RpcStreamServer = - # Create server on specified port - result = newRpcServer[StreamServer]() - result.addStreamServer(address, port) + server.router.clear -# TODO: Allow cross checking between client signatures and server calls diff --git a/json_rpc/sockettransport.nim b/json_rpc/sockettransport.nim new file mode 100644 index 0000000..948e554 --- /dev/null +++ b/json_rpc/sockettransport.nim @@ -0,0 +1,142 @@ +import server, json, chronicles + +proc sendError*[T](transport: T, code: int, msg: string, id: JsonNode, + data: JsonNode = newJNull()) {.async.} = + ## Send error message to client + let error = wrapError(code, msg, id, data) + var value = wrapReply(id, newJNull(), error) + result = transport.write(value) + +proc processClient(server: StreamServer, transport: StreamTransport) {.async, gcsafe.} = + ## Process transport data to the RPC server + var rpc = getUserData[RpcServer[StreamTransport]](server) + while true: + var + maxRequestLength = defaultMaxRequestLength + value = await transport.readLine(defaultMaxRequestLength) + if value == "": + transport.close + break + + debug "Processing message", address = transport.remoteAddress(), line = value + + let future = processMessages(rpc, value) + yield future + if future.failed: + if future.readError of RpcProcError: + let err = future.readError.RpcProcError + await transport.sendError(err.code, err.msg, err.data) + elif future.readError of ValueError: + let err = future.readError[].ValueError + await transport.sendError(INVALID_PARAMS, err.msg, %"") + else: + await transport.sendError(SERVER_ERROR, + "Error: Unknown error occurred", %"") + else: + let res = await future + result = transport.write(res) + +# Utility functions for setting up servers using stream transport addresses + +proc addStreamServer*(server: RpcServer[StreamServer], address: TransportAddress, callBack: StreamCallback) = + try: + info "Creating server on ", address = $address + var transportServer = createStreamServer(address, callBack, {ReuseAddr}, udata = server) + server.servers.add(transportServer) + except: + error "Failed to create server", address = $address, message = getCurrentExceptionMsg() + + if len(server.servers) == 0: + # Server was not bound, critical error. + raise newException(RpcBindError, "Unable to create server!") + +proc addStreamServers*(server: RpcServer[StreamServer], addresses: openarray[TransportAddress], callBack: StreamCallback) = + for item in addresses: + server.addStreamServer(item, callBack) + +proc addStreamServer*(server: RpcServer[StreamServer], address: string, callBack: StreamCallback) = + ## Create new server and assign it to addresses ``addresses``. + var + tas4: seq[TransportAddress] + tas6: seq[TransportAddress] + added = 0 + + # Attempt to resolve `address` for IPv4 address space. + try: + tas4 = resolveTAddress(address, IpAddressFamily.IPv4) + except: + discard + + # Attempt to resolve `address` for IPv6 address space. + try: + tas6 = resolveTAddress(address, IpAddressFamily.IPv6) + except: + discard + + for r in tas4: + server.addStreamServer(r, callBack) + added.inc + for r in tas6: + server.addStreamServer(r, callBack) + added.inc + + if added == 0: + # Addresses could not be resolved, critical error. + raise newException(RpcAddressUnresolvableError, "Unable to get address!") + +proc addStreamServers*(server: RpcServer[StreamServer], addresses: openarray[string], callBack: StreamCallback) = + for address in addresses: + server.addStreamServer(address, callBack) + +proc addStreamServer*(server: RpcServer[StreamServer], address: string, port: Port, callBack: StreamCallback) = + var + tas4: seq[TransportAddress] + tas6: seq[TransportAddress] + added = 0 + + # Attempt to resolve `address` for IPv4 address space. + try: + tas4 = resolveTAddress(address, port, IpAddressFamily.IPv4) + except: + discard + + # Attempt to resolve `address` for IPv6 address space. + try: + tas6 = resolveTAddress(address, port, IpAddressFamily.IPv6) + except: + discard + + if len(tas4) == 0 and len(tas6) == 0: + # Address was not resolved, critical error. + raise newException(RpcAddressUnresolvableError, + "Address " & address & " could not be resolved!") + + for r in tas4: + server.addStreamServer(r, callBack) + added.inc + for r in tas6: + server.addStreamServer(r, callBack) + added.inc + + if len(server.servers) == 0: + # Server was not bound, critical error. + raise newException(RpcBindError, + "Could not setup server on " & address & ":" & $int(port)) + +type RpcStreamServer* = RpcServer[StreamServer] + +proc newRpcStreamServer*(addresses: openarray[TransportAddress]): RpcStreamServer = + ## Create new server and assign it to addresses ``addresses``. + result = newRpcServer[StreamServer]() + result.addStreamServers(addresses, processClient) + +proc newRpcStreamServer*(addresses: openarray[string]): RpcStreamServer = + ## Create new server and assign it to addresses ``addresses``. + result = newRpcServer[StreamServer]() + result.addStreamServers(addresses, processClient) + +proc newRpcStreamServer*(address = "localhost", port: Port = Port(8545)): RpcStreamServer = + # Create server on specified port + result = newRpcServer[StreamServer]() + result.addStreamServer(address, port, processClient) + diff --git a/rpchttpservers.nim b/rpchttpservers.nim deleted file mode 100644 index 4b308a1..0000000 --- a/rpchttpservers.nim +++ /dev/null @@ -1,60 +0,0 @@ -import rpcserver, rpcclient, tables, chronicles, strformat, strutils -export rpcserver, rpcclient - -proc extractJsonStr(msgSource: string, value: string): string = - result = "" - let p1 = find(value, '{') - if p1 > -1: - let p2 = rFind(value, '}') - if p2 == -1: - info "Cannot find json end brace", source = msgSource, msg = value - else: - result = value[p1 .. p2] - debug "Extracted json", source = msgSource, json = result - else: - info "Cannot find json start brace", source = msgSource, msg = value - -type - RpcHttpServer* = RpcServer[StreamServer] - -defineRpcServerTransport(httpProcessClient): - write: - const contentType = "Content-Type: application/json-rpc" - value = &"Host: {$transport.localAddress} {contentType} Content-Length: {$value.len} {value}" - debug "HTTP server: write", msg = value - transport.write(value) - afterRead: - debug "HTTP server: read", msg = value - value = "HTTP Server".extractJsonStr(value) - -proc newRpcHttpServer*(addresses: openarray[TransportAddress]): RpcHttpServer = - ## Create new server and assign it to addresses ``addresses``. - result = newRpcServer[StreamServer]() - result.addStreamServers(addresses, httpProcessClient) - -proc newRpcHttpServer*(addresses: openarray[string]): RpcHttpServer = - ## Create new server and assign it to addresses ``addresses``. - result = newRpcServer[StreamServer]() - result.addStreamServers(addresses, httpProcessClient) - -proc newRpcHttpServer*(address = "localhost", port: Port = Port(8545)): RpcHttpServer = - result = newRpcServer[StreamServer]() - result.addStreamServer(address, port, httpProcessClient) - -type RpcHttpClient* = RpcClient[StreamTransport, TransportAddress] - -defineRpcClientTransport(StreamTransport, TransportAddress, "http"): - write: - const contentType = "Content-Type: application/json-rpc" - value = &"Host: {$client.transp.localAddress} {contentType} Content-Length: {$value.len} {value}" - debug "HTTP client: write", msg = value - client.transp.write(value) - afterRead: - # Strip out http header - # TODO: Performance - debug "HTTP client: read", msg = value - value = "HTTP Client".extractJsonStr(value) - -proc newRpcHttpClient*(): RpcHttpClient = - result = newRpcClient[StreamTransport, TransportAddress]() - diff --git a/rpcsockets.nim b/rpcsockets.nim new file mode 100644 index 0000000..06ee707 --- /dev/null +++ b/rpcsockets.nim @@ -0,0 +1,2 @@ +import json_rpc / [server, sockettransport] +export server, sockettransport diff --git a/tests/all.nim b/tests/all.nim index 69a109c..ae9eb62 100644 --- a/tests/all.nim +++ b/tests/all.nim @@ -1,3 +1,3 @@ import - testrpcmacro, testserverclient, testethcalls, testhttp #, testerrors + testrpcmacro, testserverclient, testethcalls #, testerrors diff --git a/tests/debugclient.nim b/tests/debugclient.nim index 36b2b21..15c5f90 100644 --- a/tests/debugclient.nim +++ b/tests/debugclient.nim @@ -9,7 +9,7 @@ proc rawCall*(self: RpcClient, name: string, self.nextId.inc var s = msg & "\c\l" - let res = await self.transp.write(s) + let res = await self.transport.write(s) assert res == len(s) # completed by processMessage. diff --git a/tests/testerrors.nim b/tests/testerrors.nim index 9f44a80..1304cf6 100644 --- a/tests/testerrors.nim +++ b/tests/testerrors.nim @@ -3,7 +3,7 @@ allow unchecked and unformatted calls. ]# -import unittest, debugclient, ../rpcserver +import unittest, debugclient, ../rpcsockets import strformat, chronicles var server = newRpcStreamServer("localhost", 8547.Port) diff --git a/tests/testethcalls.nim b/tests/testethcalls.nim index 538a32f..ec23e46 100644 --- a/tests/testethcalls.nim +++ b/tests/testethcalls.nim @@ -1,5 +1,5 @@ import unittest, json, tables -import ../rpcclient, ../rpcserver +import ../rpcclient, ../rpcsockets import stint, ethtypes, ethprocs, stintjson, nimcrypto, ethhexstrings, chronicles from os import getCurrentDir, DirSep diff --git a/tests/testhttp.nim b/tests/testhttp.nim deleted file mode 100644 index 0f59837..0000000 --- a/tests/testhttp.nim +++ /dev/null @@ -1,20 +0,0 @@ -import unittest, json, chronicles, unittest -import ../rpchttpservers - -var srv = newRpcHttpServer(["localhost:8545"]) -var client = newRpcHttpClient() - -# Create RPC on server -srv.rpc("myProc") do(input: string, data: array[0..3, int]): - result = %("Hello " & input & " data: " & $data) - -srv.start() -waitFor client.httpConnect("localhost", Port(8545)) - -suite "HTTP RPC transport": - test "Call": - var r = waitFor client.httpcall("myProc", %[%"abc", %[1, 2, 3, 4]]) - check r.error == false and r.result == %"Hello abc data: [1, 2, 3, 4]" - -srv.stop() -srv.close() diff --git a/tests/testrpcmacro.nim b/tests/testrpcmacro.nim index 89573e9..d3cb07c 100644 --- a/tests/testrpcmacro.nim +++ b/tests/testrpcmacro.nim @@ -1,5 +1,5 @@ import unittest, json, tables, chronicles -import ../rpcserver +import ../rpcsockets type # some nested types to check object parsing @@ -67,14 +67,14 @@ s.rpc("rpc.testreturns") do() -> int: suite "Server types": test "On macro registration": - check s.procs.hasKey("rpc.simplepath") - check s.procs.hasKey("rpc.differentparams") - check s.procs.hasKey("rpc.arrayparam") - check s.procs.hasKey("rpc.seqparam") - check s.procs.hasKey("rpc.objparam") - check s.procs.hasKey("rpc.returntypesimple") - check s.procs.hasKey("rpc.returntypecomplex") - check s.procs.hasKey("rpc.testreturns") + check s.hasMethod("rpc.simplepath") + check s.hasMethod("rpc.differentparams") + check s.hasMethod("rpc.arrayparam") + check s.hasMethod("rpc.seqparam") + check s.hasMethod("rpc.objparam") + check s.hasMethod("rpc.returntypesimple") + check s.hasMethod("rpc.returntypecomplex") + check s.hasMethod("rpc.testreturns") test "Simple paths": let r = waitFor rpcSimplePath(%[]) diff --git a/tests/testserverclient.nim b/tests/testserverclient.nim index d48f9cb..bb2a0f7 100644 --- a/tests/testserverclient.nim +++ b/tests/testserverclient.nim @@ -1,5 +1,5 @@ import unittest, json, chronicles -import ../rpcclient, ../rpcserver +import ../rpcclient, ../rpcsockets var srv = newRpcStreamServer(["localhost:8545"]) var client = newRpcStreamClient()