diff --git a/eth-rpc/client.nim b/eth-rpc/client.nim index 5d42642..581cba5 100644 --- a/eth-rpc/client.nim +++ b/eth-rpc/client.nim @@ -1,30 +1,29 @@ -import asyncnet, asyncdispatch, tables, json, macros +import tables, json, macros +import asyncdispatch2 import jsonmarshal type RpcClient* = ref object - socket: AsyncSocket + transp: StreamTransport awaiting: Table[string, Future[Response]] - address: string - port: Port + address: TransportAddress nextId: int64 Response* = tuple[error: bool, result: JsonNode] - proc newRpcClient*(): RpcClient = - ## Creates a new ``RpcClient`` instance. - RpcClient( - socket: newAsyncSocket(), - awaiting: initTable[string, Future[Response]](), - nextId: 1 - ) + ## Creates a new ``RpcClient`` instance. + result = RpcClient(awaiting: initTable[string, Future[Response]](), nextId: 1) -proc call*(self: RpcClient, name: string, params: JsonNode): Future[Response] {.async.} = +proc call*(self: RpcClient, name: string, + params: JsonNode): Future[Response] {.async.} = ## Remotely calls the specified RPC method. let id = $self.nextId self.nextId.inc - let msg = $ %{"jsonrpc": %"2.0", "method": %name, "params": params, "id": %id} & "\c\l" - await self.socket.send(msg) + let msg = $ %{"jsonrpc": %"2.0", "method": %name, "params": params, + "id": %id} & "\c\l" + let res = await self.transp.write(msg) + # TODO: Add actions when not full packet was send, e.g. disconnect peer. + assert(res == len(msg)) # completed by processMessage. var newFut = newFuture[Response]() @@ -32,12 +31,17 @@ proc call*(self: RpcClient, name: string, params: JsonNode): Future[Response] {. self.awaiting[id] = newFut result = await newFut -macro checkGet(node: JsonNode, fieldName: string, jKind: static[JsonNodeKind]): untyped = +macro checkGet(node: JsonNode, fieldName: string, + jKind: static[JsonNodeKind]): untyped = let n = genSym(ident = "n") #`node`{`fieldName`} result = quote: let `n` = `node`{`fieldname`} - if `n`.isNil: raise newException(ValueError, "Message is missing required field \"" & `fieldName` & "\"") - if `n`.kind != `jKind`.JsonNodeKind: raise newException(ValueError, "Expected " & $(`jKind`.JsonNodeKind) & ", got " & $`node`[`fieldName`].kind) + if `n`.isNil: + raise newException(ValueError, + "Message is missing required field \"" & `fieldName` & "\"") + if `n`.kind != `jKind`.JsonNodeKind: + raise newException(ValueError, + "Expected " & $(`jKind`.JsonNodeKind) & ", got " & $`node`[`fieldName`].kind) case jKind of JBool: result.add(quote do: `node`[`fieldName`].getBool) of JInt: result.add(quote do: `node`[`fieldName`].getInt) @@ -48,13 +52,17 @@ macro checkGet(node: JsonNode, fieldName: string, jKind: static[JsonNodeKind]): proc processMessage(self: RpcClient, line: string) = let node = parseJson(line) - + # TODO: Use more appropriate exception objects let version = checkGet(node, "jsonrpc", JString) - if version != "2.0": raise newException(ValueError, "Unsupported version of JSON, expected 2.0, received \"" & version & "\"") + if version != "2.0": + raise newException(ValueError, + "Unsupported version of JSON, expected 2.0, received \"" & version & "\"") let id = checkGet(node, "id", JString) - if not self.awaiting.hasKey(id): raise newException(ValueError, "Cannot find message id \"" & node["id"].str & "\"") + if not self.awaiting.hasKey(id): + raise newException(ValueError, + "Cannot find message id \"" & node["id"].str & "\"") let errorNode = node{"error"} if errorNode.isNil or errorNode.kind == JNull: @@ -72,22 +80,25 @@ proc connect*(self: RpcClient, address: string, port: Port): Future[void] proc processData(self: RpcClient) {.async.} = while true: # read until no data - let line = await self.socket.recvLine() - + # TODO: we need to limit number of bytes we going to read, without any + # limits server can fill all memory it can here. + let line = await self.transp.readLine() if line == "": # transmission ends - self.socket.close() # TODO: Do we need to drop/reacquire sockets? - self.socket = newAsyncSocket() + self.transp.close() break - + processMessage(self, line) # async loop reconnection and waiting - await connect(self, self.address, self.port) + self.transp = await connect(self.address) proc connect*(self: RpcClient, address: string, port: Port) {.async.} = - await self.socket.connect(address, port) - self.address = address - self.port = port + # TODO: `address` hostname can be resolved to many IP addresses, we are using + # first one, but maybe it would be better to iterate over all IP addresses + # and try to establish connection until it will not be established. + let addresses = resolveTAddress(address, port) + self.transp = await connect(addresses[0]) + self.address = addresses[0] asyncCheck processData(self) proc createRpcProc(procName, parameters, callBody: NimNode): NimNode = @@ -95,9 +106,12 @@ proc createRpcProc(procName, parameters, callBody: NimNode): NimNode = var paramList = newSeq[NimNode]() for p in parameters: paramList.add(p) - result = newProc(procName, paramList, callBody) # build proc - result.addPragma ident"async" # make proc async - result[0] = nnkPostFix.newTree(ident"*", newIdentNode($procName)) # export this proc + # build proc + result = newProc(procName, paramList, callBody) + # make proc async + result.addPragma ident"async" + # export this proc + result[0] = nnkPostFix.newTree(ident"*", newIdentNode($procName)) proc toJsonArray(parameters: NimNode): NimNode = # outputs an array of jsonified parameters @@ -117,7 +131,7 @@ proc createRpcFromSig*(rpcDecl: NimNode): NimNode = let iJsonNode = newIdentNode("JsonNode") var parameters = rpcDecl.findChild(it.kind == nnkFormalParams).copy - # ensure we have at least space for a return parameter + # ensure we have at least space for a return parameter if parameters.isNil or parameters.kind == nnkEmpty or parameters.len == 0: parameters = nnkFormalParams.newTree(iJsonNode) @@ -131,13 +145,17 @@ proc createRpcFromSig*(rpcDecl: NimNode): NimNode = customReturnType = returnType != iJsonNode # insert rpc client as first parameter - parameters.insert(1, nnkIdentDefs.newTree(ident"client", ident"RpcClient", newEmptyNode())) + parameters.insert(1, nnkIdentDefs.newTree(ident"client", ident"RpcClient", + newEmptyNode())) let - jsonParamIdent = genSym(nskVar, "jsonParam") # variable used to send json to the server - jsonParamArray = parameters.toJsonArray() # json array of marshalled parameters + # variable used to send json to the server + jsonParamIdent = genSym(nskVar, "jsonParam") + # json array of marshalled parameters + jsonParamArray = parameters.toJsonArray() var - # populate json params - even rpcs with no parameters have an empty json array node sent + # populate json params - even rpcs with no parameters have an empty json + # array node sent callBody = newStmtList().add(quote do: var `jsonParamIdent` = `jsonParamArray` ) @@ -148,15 +166,18 @@ proc createRpcFromSig*(rpcDecl: NimNode): NimNode = result = createRpcProc(procName, parameters, callBody) let - rpcResult = genSym(nskLet, "res") # temporary variable to hold `Response` from rpc call + # temporary variable to hold `Response` from rpc call + rpcResult = genSym(nskLet, "res") clientIdent = newIdentNode("client") - procRes = ident"result" # proc return variable - jsonRpcResult = # actual return value, `rpcResult`.result - nnkDotExpr.newTree(rpcResult, newIdentNode("result")) - + # proc return variable + procRes = ident"result" + # actual return value, `rpcResult`.result + jsonRpcResult = nnkDotExpr.newTree(rpcResult, newIdentNode("result")) + # perform rpc call callBody.add(quote do: - let `rpcResult` = await `clientIdent`.call(`pathStr`, `jsonParamIdent`) # `rpcResult` is of type `Response` + # `rpcResult` is of type `Response` + let `rpcResult` = await `clientIdent`.call(`pathStr`, `jsonParamIdent`) if `rpcResult`.error: raise newException(ValueError, $`rpcResult`.result) ) diff --git a/eth-rpc/server.nim b/eth-rpc/server.nim index a5079b3..00ef4b8 100644 --- a/eth-rpc/server.nim +++ b/eth-rpc/server.nim @@ -1,5 +1,8 @@ -import asyncdispatch, asyncnet, json, tables, strutils, options, jsonmarshal, macros -export asyncdispatch, asyncnet, json, jsonmarshal +import json, tables, strutils, options, macros +import asyncdispatch2 +import jsonmarshal + +export asyncdispatch2, json, jsonmarshal type RpcJsonError* = enum rjeInvalidJson, rjeVersionError, rjeNoMethod, rjeNoId @@ -10,9 +13,7 @@ type RpcProc* = proc (params: JsonNode): Future[JsonNode] RpcServer* = ref object - socket*: AsyncSocket - port*: Port - address*: string + servers*: seq[StreamServer] procs*: TableRef[string, RpcProc] RpcProcError* = ref object of Exception @@ -35,19 +36,12 @@ const (INVALID_REQUEST, "No id specified") ] -template ifDebug*(actions: untyped): untyped = - # TODO: Replace with nim-chronicle - when not defined(release): actions else: discard - -proc `$`*(port: Port): string = $int(port) - -proc newRpcServer*(address = "localhost", port: Port = Port(8545)): RpcServer = - result = RpcServer( - socket: newAsyncSocket(), - port: port, - address: address, - procs: newTable[string, RpcProc]() - ) +when not defined(release): + template ifDebug*(actions: untyped): untyped = + actions +else: + template ifDebug*(actions: untyped): untyped = + discard # Json state checking @@ -61,7 +55,8 @@ template jsonValid*(jsonString: string, node: var JsonNode): (bool, string) = msg = getCurrentExceptionMsg() (valid, msg) -proc checkJsonErrors*(line: string, node: var JsonNode): Option[RpcJsonErrorContainer] = +proc checkJsonErrors*(line: string, + node: var JsonNode): Option[RpcJsonErrorContainer] = ## Tries parsing line into node, if successful checks required fields ## Returns: error state or none let res = jsonValid(line, node) @@ -79,30 +74,31 @@ proc checkJsonErrors*(line: string, node: var JsonNode): Option[RpcJsonErrorCont proc wrapReply*(id: JsonNode, value: JsonNode, error: JsonNode): string = let node = %{"jsonrpc": %"2.0", "result": value, "error": error, "id": id} - return $node & "\c\l" + return $node & "\c\l" -proc sendError*(client: AsyncSocket, code: int, msg: string, id: JsonNode, data: JsonNode = newJNull()) {.async.} = +proc sendError*(client: StreamTransport, code: int, msg: string, id: JsonNode, + data: JsonNode = newJNull()) {.async.} = ## Send error message to client let error = %{"code": %(code), "message": %msg, "data": data} ifDebug: echo "Send error json: ", wrapReply(newJNull(), error, id) - result = client.send(wrapReply(id, newJNull(), error)) + result = client.write(wrapReply(id, newJNull(), error)) -proc sendJsonError*(state: RpcJsonError, client: AsyncSocket, id: JsonNode, data = newJNull()) {.async.} = +proc sendJsonError*(state: RpcJsonError, client: StreamTransport, id: JsonNode, + data = newJNull()) {.async.} = ## Send client response for invalid json state let errMsgs = jsonErrorMessages[state] await client.sendError(errMsgs[0], errMsgs[1], id, data) # Server message processing - -proc processMessage(server: RpcServer, client: AsyncSocket, line: string) {.async.} = +proc processMessage(server: RpcServer, client: StreamTransport, + line: string) {.async.} = var node: JsonNode - jsonErrorState = checkJsonErrors(line, node) # set up node and/or flag errors + # set up node and/or flag errors + jsonErrorState = checkJsonErrors(line, node) if jsonErrorState.isSome: let errState = jsonErrorState.get - var id: JsonNode - if errState.err == rjeInvalidJson: id = newJNull() # id cannot be retrieved - else: id = node["id"] + var id = if errState.err == rjeInvalidJson: newJNull() else: node["id"] await errState.err.sendJsonError(client, id, %errState.msg) else: let @@ -110,23 +106,26 @@ proc processMessage(server: RpcServer, client: AsyncSocket, line: string) {.asyn id = node["id"] if not server.procs.hasKey(methodName): - await client.sendError(METHOD_NOT_FOUND, "Method not found", id, %(methodName & " is not a registered method.")) + await client.sendError(METHOD_NOT_FOUND, "Method not found", id, + %(methodName & " is not a registered method.")) else: let callRes = await server.procs[methodName](node["params"]) - await client.send(wrapReply(id, callRes, newJNull())) + discard await client.write(wrapReply(id, callRes, newJNull())) -proc processClient(server: RpcServer, client: AsyncSocket) {.async.} = +proc processClient(server: StreamServer, client: StreamTransport) {.async.} = + var rpc = getUserData[RpcServer](server) while true: - let line = await client.recvLine() + ## TODO: We need to put limit here, or server could be easily put out of + ## service without never-ending line (data without CRLF). + let line = await client.readLine() if line == "": - # Disconnected. client.close() break - ifDebug: echo "Process client: ", server.port, ":" & line + ifDebug: echo "Process client: ", client.remoteAddress() - let future = processMessage(server, client, line) - await future + let future = processMessage(rpc, client, line) + yield future if future.failed: if future.readError of RpcProcError: let err = future.readError.RpcProcError @@ -135,16 +134,124 @@ proc processClient(server: RpcServer, client: AsyncSocket) {.async.} = let err = future.readError[].ValueError await client.sendError(INVALID_PARAMS, err.msg, %"") else: - await client.sendError(SERVER_ERROR, "Error: Unknown error occurred", %"") + await client.sendError(SERVER_ERROR, + "Error: Unknown error occurred", %"") -proc serve*(server: RpcServer) {.async.} = +proc newRpcServer*(addresses: openarray[TransportAddress]): RpcServer = + ## Create new server and assign it to addresses ``addresses``. + result = RpcServer() + result.procs = newTable[string, RpcProc]() + result.servers = newSeq[StreamServer]() + + for item in addresses: + try: + ifDebug: echo "Create server on " & $item + var server = createStreamServer(item, processClient, {ReuseAddr}, + udata = result) + result.servers.add(server) + except: + ifDebug: echo "Failed to create server on " & $item + + if len(result.servers) == 0: + # Server was not bound, critical error. + # TODO: Custom RpcException error + raise newException(ValueError, "Unable to create server!") + +proc newRpcServer*(addresses: openarray[string]): RpcServer = + ## Create new server and assign it to addresses ``addresses``. + var + tas4: seq[TransportAddress] + tas6: seq[TransportAddress] + baddrs: seq[TransportAddress] + + for a in addresses: + # Attempt to resolve `address` for IPv4 address space. + try: + tas4 = resolveTAddress(a, IpAddressFamily.IPv4) + except: + discard + + # Attempt to resolve `address` for IPv6 address space. + try: + tas6 = resolveTAddress(a, IpAddressFamily.IPv6) + except: + discard + + for r in tas4: + baddrs.add(r) + for r in tas6: + baddrs.add(r) + + if len(baddrs) == 0: + # Addresses could not be resolved, critical error. + raise newException(ValueError, "Unable to get address!") + + result = newRpcServer(baddrs) + +proc newRpcServer*(address = "localhost", port: Port = Port(8545)): RpcServer = + var + tas4: seq[TransportAddress] + tas6: seq[TransportAddress] + + # Attempt to resolve `address` for IPv4 address space. + try: + tas4 = resolveTAddress(address, port, IpAddressFamily.IPv4) + except: + discard + + # Attempt to resolve `address` for IPv6 address space. + try: + tas6 = resolveTAddress(address, port, IpAddressFamily.IPv6) + except: + discard + + if len(tas4) == 0 and len(tas6) == 0: + # Address was not resolved, critical error. + # TODO: Custom RpcException error. + raise newException(ValueError, + "Address " & address & " could not be resolved!") + + result = RpcServer() + result.procs = newTable[string, RpcProc]() + result.servers = newSeq[StreamServer]() + for item in tas4: + try: + ifDebug: echo "Create server on " & $item + var server = createStreamServer(item, processClient, {ReuseAddr}, + udata = result) + result.servers.add(server) + except: + ifDebug: echo "Failed to create server on " & $item + + for item in tas6: + try: + ifDebug: echo "Create server on " & $item + var server = createStreamServer(item, processClient, {ReuseAddr}, + udata = result) + result.servers.add(server) + except: + ifDebug: echo "Failed to create server on " & $item + + if len(result.servers) == 0: + # Server was not bound, critical error. + # TODO: Custom RpcException error + raise newException(ValueError, + "Could not setup server on " & address & ":" & $int(port)) + +proc start*(server: RpcServer) = ## Start the RPC server. - server.socket.bindAddr(server.port, server.address) - server.socket.listen() + for item in server.servers: + item.start() - while true: - let client = await server.socket.accept() - asyncCheck server.processClient(client) +proc stop*(server: RpcServer) = + ## Stop the RPC server. + for item in server.servers: + item.stop() + +proc close*(server: RpcServer) = + ## Cleanup resources of RPC server. + for item in server.servers: + item.close() # Server registration and RPC generation @@ -162,7 +269,8 @@ proc makeProcName(s: string): string = if c.isAlphaNumeric: result.add c proc hasReturnType(params: NimNode): bool = - if params != nil and params.len > 0 and params[0] != nil and params[0].kind != nnkEmpty: + if params != nil and params.len > 0 and params[0] != nil and + params[0].kind != nnkEmpty: result = true macro rpc*(server: RpcServer, path: string, body: untyped): untyped = @@ -178,12 +286,18 @@ macro rpc*(server: RpcServer, path: string, body: untyped): untyped = result = newStmtList() let parameters = body.findChild(it.kind == nnkFormalParams) - paramsIdent = newIdentNode"params" # all remote calls have a single parameter: `params: JsonNode` - pathStr = $path # procs are generated from the stripped path - procNameStr = pathStr.makeProcName # strip non alphanumeric - procName = newIdentNode(procNameStr) # public rpc proc - doMain = newIdentNode(procNameStr & "DoMain") # when parameters present: proc that contains our rpc body - res = newIdentNode("result") # async result + # all remote calls have a single parameter: `params: JsonNode` + paramsIdent = newIdentNode"params" + # procs are generated from the stripped path + pathStr = $path + # strip non alphanumeric + procNameStr = pathStr.makeProcName + # public rpc proc + procName = newIdentNode(procNameStr) + # when parameters present: proc that contains our rpc body + doMain = newIdentNode(procNameStr & "DoMain") + # async result + res = newIdentNode("result") var setup = jsonToNim(parameters, paramsIdent) procBody = if body.kind == nnkStmtList: body else: body.body diff --git a/eth_rpc.nimble b/eth_rpc.nimble index 71a35dd..0db493a 100644 --- a/eth_rpc.nimble +++ b/eth_rpc.nimble @@ -8,7 +8,8 @@ skipDirs = @["tests"] ### Dependencies requires "nim >= 0.17.3", "nimcrypto", - "stint" + "stint", + "https://github.com/status-im/nim-asyncdispatch2" proc configForTests() = --hints: off diff --git a/tests/testethcalls.nim b/tests/testethcalls.nim index 4c76b39..13d54a1 100644 --- a/tests/testethcalls.nim +++ b/tests/testethcalls.nim @@ -1,5 +1,5 @@ -import ../ rpcclient, ../ rpcserver -import unittest, asyncdispatch, json, tables +import unittest, json, tables +import ../rpcclient, ../rpcserver import stint, ethtypes, ethprocs, stintjson from os import getCurrentDir, DirSep @@ -7,7 +7,7 @@ from strutils import rsplit template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] var - server = newRpcServer() + server = newRpcServer("localhost", Port(8546)) client = newRpcClient() ## Generate Ethereum server RPCs @@ -48,9 +48,7 @@ proc testSigCalls: Future[seq[string]] = sha3 = client.web3_sha3("0x68656c6c6f20776f726c64") result = all(version, sha3) -server.address = "localhost" -server.port = Port(8546) -asyncCheck server.serve +server.start() waitFor client.connect("localhost", Port(8546)) @@ -74,3 +72,6 @@ suite "Generated from signatures": check sigResults[0] == "Nimbus-RPC-Test" test "SHA3": check sigResults[1] == "0x47173285A8D7341E5E972FC677286384F802F8EF42A5EC5F03BBFA254CB01FAD" + +server.stop() +server.close() diff --git a/tests/testrpcmacro.nim b/tests/testrpcmacro.nim index 4317fc3..6d39c78 100644 --- a/tests/testrpcmacro.nim +++ b/tests/testrpcmacro.nim @@ -1,4 +1,5 @@ -import unittest, ../ rpcserver, asyncdispatch, json, tables +import unittest, json, tables +import ../rpcserver type # some nested types to check object parsing @@ -26,14 +27,14 @@ let }, "c": %1.23} -var s = newRpcServer("localhost") +var s = newRpcServer(["localhost:8545"]) # RPC definitions -s.rpc("rpc.simplepath"): +s.rpc("rpc.simplepath"): result = %1 -s.rpc("rpc.differentparams") do(a: int, b: string): +s.rpc("rpc.differentparams") do(a: int, b: string): result = %[%a, %b] s.rpc("rpc.arrayparam") do(arr: array[0..5, byte], b: string): @@ -131,3 +132,6 @@ suite "Server types": # wrong param type let res = waitFor rpcDifferentParams(%[%"abc", %1]) # TODO: When errors are proper return values, check error for param name + +s.stop() +s.close() diff --git a/tests/testserverclient.nim b/tests/testserverclient.nim index 3dd30be..4706732 100644 --- a/tests/testserverclient.nim +++ b/tests/testserverclient.nim @@ -1,20 +1,21 @@ -import ../ rpcclient, ../ rpcserver import unittest, json +import ../rpcclient, ../rpcserver -var srv = newRpcServer() -srv.address = "localhost" -srv.port = Port(8545) +var srv = newRpcServer(["localhost:8545"]) var client = newRpcClient() # Create RPC on server srv.rpc("myProc") do(input: string, data: array[0..3, int]): result = %("Hello " & input & " data: " & $data) -asyncCheck srv.serve +srv.start() waitFor client.connect("localhost", Port(8545)) # TODO: When an error occurs during a test, stop the server suite "Server/Client RPC": test "Custom RPC": var r = waitFor client.call("myProc", %[%"abc", %[1, 2, 3, 4]]) - check r.result.getStr == "Hello abc data: [1, 2, 3, 4]" \ No newline at end of file + check r.result.getStr == "Hello abc data: [1, 2, 3, 4]" + +srv.stop() +srv.close()