Use asyncdispatch2 instead of asyncdispatch.

This commit is contained in:
cheatfate 2018-06-07 10:02:14 +03:00
parent cc44cd257b
commit cbfe945fff
6 changed files with 150 additions and 106 deletions

View File

@ -1,30 +1,28 @@
import asyncnet, asyncdispatch, tables, json, macros import tables, json, macros
import asyncdispatch2
import jsonmarshal import jsonmarshal
type type
RpcClient* = ref object RpcClient* = ref object
socket: AsyncSocket transp: StreamTransport
awaiting: Table[string, Future[Response]] awaiting: Table[string, Future[Response]]
address: string address: TransportAddress
port: Port
nextId: int64 nextId: int64
Response* = tuple[error: bool, result: JsonNode] Response* = tuple[error: bool, result: JsonNode]
proc newRpcClient*(): RpcClient = proc newRpcClient*(): RpcClient =
## Creates a new ``RpcClient`` instance. ## Creates a new ``RpcClient`` instance.
RpcClient( result = RpcClient(awaiting: initTable[string, Future[Response]](), nextId: 1)
socket: newAsyncSocket(),
awaiting: initTable[string, Future[Response]](),
nextId: 1
)
proc call*(self: RpcClient, name: string, params: JsonNode): Future[Response] {.async.} = proc call*(self: RpcClient, name: string,
params: JsonNode): Future[Response] {.async.} =
## Remotely calls the specified RPC method. ## Remotely calls the specified RPC method.
let id = $self.nextId let id = $self.nextId
self.nextId.inc self.nextId.inc
let msg = $ %{"jsonrpc": %"2.0", "method": %name, "params": params, "id": %id} & "\c\l" let msg = $ %{"jsonrpc": %"2.0", "method": %name, "params": params,
await self.socket.send(msg) "id": %id} & "\c\l"
let res = await self.transp.write(msg)
assert(res == len(msg))
# completed by processMessage. # completed by processMessage.
var newFut = newFuture[Response]() var newFut = newFuture[Response]()
@ -32,12 +30,17 @@ proc call*(self: RpcClient, name: string, params: JsonNode): Future[Response] {.
self.awaiting[id] = newFut self.awaiting[id] = newFut
result = await newFut result = await newFut
macro checkGet(node: JsonNode, fieldName: string, jKind: static[JsonNodeKind]): untyped = macro checkGet(node: JsonNode, fieldName: string,
jKind: static[JsonNodeKind]): untyped =
let n = genSym(ident = "n") #`node`{`fieldName`} let n = genSym(ident = "n") #`node`{`fieldName`}
result = quote: result = quote:
let `n` = `node`{`fieldname`} let `n` = `node`{`fieldname`}
if `n`.isNil: raise newException(ValueError, "Message is missing required field \"" & `fieldName` & "\"") if `n`.isNil:
if `n`.kind != `jKind`.JsonNodeKind: raise newException(ValueError, "Expected " & $(`jKind`.JsonNodeKind) & ", got " & $`node`[`fieldName`].kind) raise newException(ValueError,
"Message is missing required field \"" & `fieldName` & "\"")
if `n`.kind != `jKind`.JsonNodeKind:
raise newException(ValueError,
"Expected " & $(`jKind`.JsonNodeKind) & ", got " & $`node`[`fieldName`].kind)
case jKind case jKind
of JBool: result.add(quote do: `node`[`fieldName`].getBool) of JBool: result.add(quote do: `node`[`fieldName`].getBool)
of JInt: result.add(quote do: `node`[`fieldName`].getInt) of JInt: result.add(quote do: `node`[`fieldName`].getInt)
@ -51,10 +54,14 @@ proc processMessage(self: RpcClient, line: string) =
# TODO: Use more appropriate exception objects # TODO: Use more appropriate exception objects
let version = checkGet(node, "jsonrpc", JString) let version = checkGet(node, "jsonrpc", JString)
if version != "2.0": raise newException(ValueError, "Unsupported version of JSON, expected 2.0, received \"" & version & "\"") if version != "2.0":
raise newException(ValueError,
"Unsupported version of JSON, expected 2.0, received \"" & version & "\"")
let id = checkGet(node, "id", JString) let id = checkGet(node, "id", JString)
if not self.awaiting.hasKey(id): raise newException(ValueError, "Cannot find message id \"" & node["id"].str & "\"") if not self.awaiting.hasKey(id):
raise newException(ValueError,
"Cannot find message id \"" & node["id"].str & "\"")
let errorNode = node{"error"} let errorNode = node{"error"}
if errorNode.isNil or errorNode.kind == JNull: if errorNode.isNil or errorNode.kind == JNull:
@ -72,22 +79,25 @@ proc connect*(self: RpcClient, address: string, port: Port): Future[void]
proc processData(self: RpcClient) {.async.} = proc processData(self: RpcClient) {.async.} =
while true: while true:
# read until no data # read until no data
let line = await self.socket.recvLine() # TODO: we need to limit number of bytes we going to read, without any
# limits server can fill all memory it can here.
let line = await self.transp.readLine()
if line == "": if line == "":
# transmission ends # transmission ends
self.socket.close() # TODO: Do we need to drop/reacquire sockets? self.transp.close()
self.socket = newAsyncSocket()
break break
processMessage(self, line) processMessage(self, line)
# async loop reconnection and waiting # async loop reconnection and waiting
await connect(self, self.address, self.port) self.transp = await connect(self.address)
proc connect*(self: RpcClient, address: string, port: Port) {.async.} = proc connect*(self: RpcClient, address: string, port: Port) {.async.} =
await self.socket.connect(address, port) # TODO: `address` hostname can be resolved to many IP addresses, we are using
self.address = address # first one, but maybe it would be better to iterate over all IP addresses
self.port = port # and try to establish connection until it will not be established.
let addresses = resolveTAddress(address & ":" & $int(port))
self.transp = await connect(addresses[0])
self.address = addresses[0]
asyncCheck processData(self) asyncCheck processData(self)
proc createRpcProc(procName, parameters, callBody: NimNode): NimNode = proc createRpcProc(procName, parameters, callBody: NimNode): NimNode =
@ -95,9 +105,12 @@ proc createRpcProc(procName, parameters, callBody: NimNode): NimNode =
var paramList = newSeq[NimNode]() var paramList = newSeq[NimNode]()
for p in parameters: paramList.add(p) for p in parameters: paramList.add(p)
result = newProc(procName, paramList, callBody) # build proc # build proc
result.addPragma ident"async" # make proc async result = newProc(procName, paramList, callBody)
result[0] = nnkPostFix.newTree(ident"*", newIdentNode($procName)) # export this proc # make proc async
result.addPragma ident"async"
# export this proc
result[0] = nnkPostFix.newTree(ident"*", newIdentNode($procName))
proc toJsonArray(parameters: NimNode): NimNode = proc toJsonArray(parameters: NimNode): NimNode =
# outputs an array of jsonified parameters # outputs an array of jsonified parameters
@ -131,13 +144,17 @@ proc createRpcFromSig*(rpcDecl: NimNode): NimNode =
customReturnType = returnType != iJsonNode customReturnType = returnType != iJsonNode
# insert rpc client as first parameter # insert rpc client as first parameter
parameters.insert(1, nnkIdentDefs.newTree(ident"client", ident"RpcClient", newEmptyNode())) parameters.insert(1, nnkIdentDefs.newTree(ident"client", ident"RpcClient",
newEmptyNode()))
let let
jsonParamIdent = genSym(nskVar, "jsonParam") # variable used to send json to the server # variable used to send json to the server
jsonParamArray = parameters.toJsonArray() # json array of marshalled parameters jsonParamIdent = genSym(nskVar, "jsonParam")
# json array of marshalled parameters
jsonParamArray = parameters.toJsonArray()
var var
# populate json params - even rpcs with no parameters have an empty json array node sent # populate json params - even rpcs with no parameters have an empty json
# array node sent
callBody = newStmtList().add(quote do: callBody = newStmtList().add(quote do:
var `jsonParamIdent` = `jsonParamArray` var `jsonParamIdent` = `jsonParamArray`
) )
@ -148,15 +165,18 @@ proc createRpcFromSig*(rpcDecl: NimNode): NimNode =
result = createRpcProc(procName, parameters, callBody) result = createRpcProc(procName, parameters, callBody)
let let
rpcResult = genSym(nskLet, "res") # temporary variable to hold `Response` from rpc call # temporary variable to hold `Response` from rpc call
rpcResult = genSym(nskLet, "res")
clientIdent = newIdentNode("client") clientIdent = newIdentNode("client")
procRes = ident"result" # proc return variable # proc return variable
jsonRpcResult = # actual return value, `rpcResult`.result procRes = ident"result"
nnkDotExpr.newTree(rpcResult, newIdentNode("result")) # actual return value, `rpcResult`.result
jsonRpcResult = nnkDotExpr.newTree(rpcResult, newIdentNode("result"))
# perform rpc call # perform rpc call
callBody.add(quote do: callBody.add(quote do:
let `rpcResult` = await `clientIdent`.call(`pathStr`, `jsonParamIdent`) # `rpcResult` is of type `Response` # `rpcResult` is of type `Response`
let `rpcResult` = await `clientIdent`.call(`pathStr`, `jsonParamIdent`)
if `rpcResult`.error: raise newException(ValueError, $`rpcResult`.result) if `rpcResult`.error: raise newException(ValueError, $`rpcResult`.result)
) )

View File

@ -1,5 +1,8 @@
import asyncdispatch, asyncnet, json, tables, strutils, options, jsonmarshal, macros import json, tables, strutils, options, macros
export asyncdispatch, asyncnet, json, jsonmarshal import asyncdispatch2
import jsonmarshal
export asyncdispatch2, json, jsonmarshal
type type
RpcJsonError* = enum rjeInvalidJson, rjeVersionError, rjeNoMethod, rjeNoId RpcJsonError* = enum rjeInvalidJson, rjeVersionError, rjeNoMethod, rjeNoId
@ -9,10 +12,8 @@ type
# Procedure signature accepted as an RPC call by server # Procedure signature accepted as an RPC call by server
RpcProc* = proc (params: JsonNode): Future[JsonNode] RpcProc* = proc (params: JsonNode): Future[JsonNode]
RpcServer* = ref object RpcServer* = ref object of RootRef
socket*: AsyncSocket servers*: seq[StreamServer]
port*: Port
address*: string
procs*: TableRef[string, RpcProc] procs*: TableRef[string, RpcProc]
RpcProcError* = ref object of Exception RpcProcError* = ref object of Exception
@ -35,20 +36,15 @@ const
(INVALID_REQUEST, "No id specified") (INVALID_REQUEST, "No id specified")
] ]
template ifDebug*(actions: untyped): untyped = when not defined(release):
# TODO: Replace with nim-chronicle template ifDebug*(actions: untyped): untyped =
when not defined(release): actions else: discard actions
else:
template ifDebug*(actions: untyped): untyped =
discard
proc `$`*(port: Port): string = $int(port) proc `$`*(port: Port): string = $int(port)
proc newRpcServer*(address = "localhost", port: Port = Port(8545)): RpcServer =
result = RpcServer(
socket: newAsyncSocket(),
port: port,
address: address,
procs: newTable[string, RpcProc]()
)
# Json state checking # Json state checking
template jsonValid*(jsonString: string, node: var JsonNode): (bool, string) = template jsonValid*(jsonString: string, node: var JsonNode): (bool, string) =
@ -61,7 +57,8 @@ template jsonValid*(jsonString: string, node: var JsonNode): (bool, string) =
msg = getCurrentExceptionMsg() msg = getCurrentExceptionMsg()
(valid, msg) (valid, msg)
proc checkJsonErrors*(line: string, node: var JsonNode): Option[RpcJsonErrorContainer] = proc checkJsonErrors*(line: string,
node: var JsonNode): Option[RpcJsonErrorContainer] =
## Tries parsing line into node, if successful checks required fields ## Tries parsing line into node, if successful checks required fields
## Returns: error state or none ## Returns: error state or none
let res = jsonValid(line, node) let res = jsonValid(line, node)
@ -81,28 +78,30 @@ proc wrapReply*(id: JsonNode, value: JsonNode, error: JsonNode): string =
let node = %{"jsonrpc": %"2.0", "result": value, "error": error, "id": id} let node = %{"jsonrpc": %"2.0", "result": value, "error": error, "id": id}
return $node & "\c\l" return $node & "\c\l"
proc sendError*(client: AsyncSocket, code: int, msg: string, id: JsonNode, data: JsonNode = newJNull()) {.async.} = proc sendError*(client: StreamTransport, code: int, msg: string, id: JsonNode,
data: JsonNode = newJNull()) {.async.} =
## Send error message to client ## Send error message to client
let error = %{"code": %(code), "message": %msg, "data": data} let error = %{"code": %(code), "message": %msg, "data": data}
ifDebug: echo "Send error json: ", wrapReply(newJNull(), error, id) ifDebug: echo "Send error json: ", wrapReply(newJNull(), error, id)
result = client.send(wrapReply(id, newJNull(), error)) result = client.write(wrapReply(id, newJNull(), error))
proc sendJsonError*(state: RpcJsonError, client: AsyncSocket, id: JsonNode, data = newJNull()) {.async.} = proc sendJsonError*(state: RpcJsonError, client: StreamTransport, id: JsonNode,
data = newJNull()) {.async.} =
## Send client response for invalid json state ## Send client response for invalid json state
let errMsgs = jsonErrorMessages[state] let errMsgs = jsonErrorMessages[state]
await client.sendError(errMsgs[0], errMsgs[1], id, data) await client.sendError(errMsgs[0], errMsgs[1], id, data)
# Server message processing # Server message processing
proc processMessage(server: RpcServer, client: AsyncSocket, line: string) {.async.} = proc processMessage(server: RpcServer, client: StreamTransport,
line: string) {.async.} =
var var
node: JsonNode node: JsonNode
jsonErrorState = checkJsonErrors(line, node) # set up node and/or flag errors # set up node and/or flag errors
jsonErrorState = checkJsonErrors(line, node)
if jsonErrorState.isSome: if jsonErrorState.isSome:
let errState = jsonErrorState.get let errState = jsonErrorState.get
var id: JsonNode var id = if errState.err == rjeInvalidJson: newJNull() else: node["id"]
if errState.err == rjeInvalidJson: id = newJNull() # id cannot be retrieved
else: id = node["id"]
await errState.err.sendJsonError(client, id, %errState.msg) await errState.err.sendJsonError(client, id, %errState.msg)
else: else:
let let
@ -110,23 +109,26 @@ proc processMessage(server: RpcServer, client: AsyncSocket, line: string) {.asyn
id = node["id"] id = node["id"]
if not server.procs.hasKey(methodName): if not server.procs.hasKey(methodName):
await client.sendError(METHOD_NOT_FOUND, "Method not found", id, %(methodName & " is not a registered method.")) await client.sendError(METHOD_NOT_FOUND, "Method not found", id,
%(methodName & " is not a registered method."))
else: else:
let callRes = await server.procs[methodName](node["params"]) let callRes = await server.procs[methodName](node["params"])
await client.send(wrapReply(id, callRes, newJNull())) discard await client.write(wrapReply(id, callRes, newJNull()))
proc processClient(server: RpcServer, client: AsyncSocket) {.async.} = proc processClient(server: StreamServer, client: StreamTransport) {.async.} =
var rpc = getUserData[RpcServer](server)
while true: while true:
let line = await client.recvLine() ## TODO: We need to put limit here, or server could be easily put out of
## service without never-ending line (data without CRLF).
let line = await client.readLine()
if line == "": if line == "":
# Disconnected.
client.close() client.close()
break break
ifDebug: echo "Process client: ", server.port, ":" & line ifDebug: echo "Process client: ", client.remoteAddress()
let future = processMessage(server, client, line) let future = processMessage(rpc, client, line)
await future yield future
if future.failed: if future.failed:
if future.readError of RpcProcError: if future.readError of RpcProcError:
let err = future.readError.RpcProcError let err = future.readError.RpcProcError
@ -135,16 +137,33 @@ proc processClient(server: RpcServer, client: AsyncSocket) {.async.} =
let err = future.readError[].ValueError let err = future.readError[].ValueError
await client.sendError(INVALID_PARAMS, err.msg, %"") await client.sendError(INVALID_PARAMS, err.msg, %"")
else: else:
await client.sendError(SERVER_ERROR, "Error: Unknown error occurred", %"") await client.sendError(SERVER_ERROR,
"Error: Unknown error occurred", %"")
proc serve*(server: RpcServer) {.async.} = proc newRpcServer*(address = "localhost", port: Port = Port(8545)): RpcServer =
let tas4 = resolveTAddress(address, port, IpAddressFamily.IPv4)
let tas6 = resolveTAddress(address, port, IpAddressFamily.IPv4)
result = RpcServer()
result.procs = newTable[string, RpcProc]()
result.servers = newSeq[StreamServer]()
for item in tas4:
var server = createStreamServer(item, processClient, {ReuseAddr},
udata = result)
result.servers.add(server)
for item in tas6:
var server = createStreamServer(item, processClient, {ReuseAddr},
udata = result)
result.servers.add(server)
proc start*(server: RpcServer) =
## Start the RPC server. ## Start the RPC server.
server.socket.bindAddr(server.port, server.address) for item in server.servers:
server.socket.listen() item.start()
while true: proc stop*(server: RpcServer) =
let client = await server.socket.accept() ## Stop the RPC server.
asyncCheck server.processClient(client) for item in server.servers:
item.stop()
# Server registration and RPC generation # Server registration and RPC generation
@ -162,7 +181,8 @@ proc makeProcName(s: string): string =
if c.isAlphaNumeric: result.add c if c.isAlphaNumeric: result.add c
proc hasReturnType(params: NimNode): bool = proc hasReturnType(params: NimNode): bool =
if params != nil and params.len > 0 and params[0] != nil and params[0].kind != nnkEmpty: if params != nil and params.len > 0 and params[0] != nil and
params[0].kind != nnkEmpty:
result = true result = true
macro rpc*(server: RpcServer, path: string, body: untyped): untyped = macro rpc*(server: RpcServer, path: string, body: untyped): untyped =
@ -178,12 +198,18 @@ macro rpc*(server: RpcServer, path: string, body: untyped): untyped =
result = newStmtList() result = newStmtList()
let let
parameters = body.findChild(it.kind == nnkFormalParams) parameters = body.findChild(it.kind == nnkFormalParams)
paramsIdent = newIdentNode"params" # all remote calls have a single parameter: `params: JsonNode` # all remote calls have a single parameter: `params: JsonNode`
pathStr = $path # procs are generated from the stripped path paramsIdent = newIdentNode"params"
procNameStr = pathStr.makeProcName # strip non alphanumeric # procs are generated from the stripped path
procName = newIdentNode(procNameStr) # public rpc proc pathStr = $path
doMain = newIdentNode(procNameStr & "DoMain") # when parameters present: proc that contains our rpc body # strip non alphanumeric
res = newIdentNode("result") # async result procNameStr = pathStr.makeProcName
# public rpc proc
procName = newIdentNode(procNameStr)
# when parameters present: proc that contains our rpc body
doMain = newIdentNode(procNameStr & "DoMain")
# async result
res = newIdentNode("result")
var var
setup = jsonToNim(parameters, paramsIdent) setup = jsonToNim(parameters, paramsIdent)
procBody = if body.kind == nnkStmtList: body else: body.body procBody = if body.kind == nnkStmtList: body else: body.body

View File

@ -8,7 +8,8 @@ skipDirs = @["tests"]
### Dependencies ### Dependencies
requires "nim >= 0.17.3", requires "nim >= 0.17.3",
"nimcrypto", "nimcrypto",
"stint" "stint",
"https://github.com/status-im/nim-asyncdispatch2"
proc configForTests() = proc configForTests() =
--hints: off --hints: off

View File

@ -1,5 +1,5 @@
import ../ rpcclient, ../ rpcserver import unittest, json, tables
import unittest, asyncdispatch, json, tables import ../rpcclient, ../rpcserver
import stint, ethtypes, ethprocs, stintjson import stint, ethtypes, ethprocs, stintjson
from os import getCurrentDir, DirSep from os import getCurrentDir, DirSep
@ -7,7 +7,7 @@ from strutils import rsplit
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
var var
server = newRpcServer() server = newRpcServer("localhost", Port(8546))
client = newRpcClient() client = newRpcClient()
## Generate Ethereum server RPCs ## Generate Ethereum server RPCs
@ -48,9 +48,7 @@ proc testSigCalls: Future[seq[string]] =
sha3 = client.web3_sha3("0x68656c6c6f20776f726c64") sha3 = client.web3_sha3("0x68656c6c6f20776f726c64")
result = all(version, sha3) result = all(version, sha3)
server.address = "localhost" server.start()
server.port = Port(8546)
asyncCheck server.serve
waitFor client.connect("localhost", Port(8546)) waitFor client.connect("localhost", Port(8546))

View File

@ -1,4 +1,5 @@
import unittest, ../ rpcserver, asyncdispatch, json, tables import unittest, json, tables
import ../rpcserver
type type
# some nested types to check object parsing # some nested types to check object parsing

View File

@ -1,16 +1,14 @@
import ../ rpcclient, ../ rpcserver
import unittest, json import unittest, json
import ../rpcclient, ../rpcserver
var srv = newRpcServer() var srv = newRpcServer()
srv.address = "localhost"
srv.port = Port(8545)
var client = newRpcClient() var client = newRpcClient()
# Create RPC on server # Create RPC on server
srv.rpc("myProc") do(input: string, data: array[0..3, int]): srv.rpc("myProc") do(input: string, data: array[0..3, int]):
result = %("Hello " & input & " data: " & $data) result = %("Hello " & input & " data: " & $data)
asyncCheck srv.serve srv.start()
waitFor client.connect("localhost", Port(8545)) waitFor client.connect("localhost", Port(8545))
# TODO: When an error occurs during a test, stop the server # TODO: When an error occurs during a test, stop the server