Merge pull request #24 from status-im/AgnosticClient

Agnostic client
This commit is contained in:
coffeepots 2018-07-12 19:15:20 +01:00 committed by GitHub
commit e29850f1d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 113 additions and 101 deletions

View File

@ -5,40 +5,26 @@ import jsonmarshal
export asyncdispatch2
type
RpcClient*[T, A] = ref object
awaiting: Table[string, Future[Response]]
transport: T
address: A
nextId: int64
ClientId* = int64
RpcClient* = ref object of RootRef#
awaiting*: Table[ClientId, Future[Response]]
nextId: ClientId
Response* = tuple[error: bool, result: JsonNode]
const defaultMaxRequestLength* = 1024 * 128
proc initRpcClient*[T: RpcClient](client: var T) =
client.awaiting = initTable[ClientId, Future[Response]]()
client.nextId = 1
proc newRpcClient*[T, A]: RpcClient[T, A] =
## Creates a new ``RpcClient`` instance.
result = RpcClient[T, A](awaiting: initTable[string, Future[Response]](), nextId: 1)
proc getNextId*(client: RpcClient): ClientId =
result = client.nextId
client.nextId.inc
proc call*(self: RpcClient, name: string,
params: JsonNode): Future[Response] {.async.} =
## Remotely calls the specified RPC method.
let id = $self.nextId
self.nextId.inc
var
value =
$ %{"jsonrpc": %"2.0",
"method": %name,
"params": params,
"id": %id} & "\c\l"
let res = await self.transport.write(value)
# TODO: Add actions when not full packet was send, e.g. disconnect peer.
assert(res == len(value))
# completed by processMessage.
var newFut = newFuture[Response]()
# add to awaiting responses
self.awaiting[id] = newFut
result = await newFut
proc rpcCallNode*(path: string, params: JsonNode, id: ClientId): JsonNode =
%{"jsonrpc": %"2.0",
"method": %path,
"params": params,
"id": %id}
template asyncRaise[T](fut: Future[T], errType: typedesc, msg: string) =
fut.fail(newException(errType, msg))
@ -62,11 +48,11 @@ macro checkGet(node: JsonNode, fieldName: string,
of JObject: result.add(quote do: `n`.getObject)
else: discard
proc processMessage(self: RpcClient, line: string) =
proc processMessage*[T: RpcClient](self: T, line: string) =
# Note: this doesn't use any transport code so doesn't need to be differentiated.
let
node = parseJson(line) # TODO: Check errors
id = checkGet(node, "id", JString)
node = parseJson(line)
id = checkGet(node, "id", JInt)
if not self.awaiting.hasKey(id):
raise newException(ValueError,
@ -88,30 +74,6 @@ proc processMessage(self: RpcClient, line: string) =
self.awaiting[id].fail(newException(ValueError, $errorNode))
self.awaiting.del(id)
proc processData(client: RpcClient) {.async.} =
while true:
var value = await client.transport.readLine(defaultMaxRequestLength)
if value == "":
# transmission ends
client.transport.close
break
client.processMessage(value)
# async loop reconnection and waiting
client.transport = await connect(client.address)
type RpcStreamClient* = RpcClient[StreamTransport, TransportAddress]
proc connect*(client: RpcStreamClient, address: string, port: Port) {.async.} =
let addresses = resolveTAddress(address, port)
client.transport = await connect(addresses[0])
client.address = addresses[0]
asyncCheck processData(client)
proc newRpcStreamClient*(): RpcStreamClient =
## Create new server and assign it to addresses ``addresses``.
result = newRpcClient[StreamTransport, TransportAddress]()
# Signature processing
proc createRpcProc(procName, parameters, callBody: NimNode): NimNode =
@ -137,7 +99,7 @@ proc toJsonArray(parameters: NimNode): NimNode =
items.add(nnkPrefix.newTree(ident"%", curParam))
result = nnkPrefix.newTree(bindSym("%", brForceOpen), items)
proc createRpcFromSig*(rpcDecl: NimNode): NimNode =
proc createRpcFromSig*(clientType, rpcDecl: NimNode): NimNode =
# Each input parameter in the rpc signature is converted
# to json with `%`.
# Return types are then converted back to native Nim types.
@ -158,7 +120,7 @@ proc createRpcFromSig*(rpcDecl: NimNode): NimNode =
customReturnType = returnType != iJsonNode
# insert rpc client as first parameter
parameters.insert(1, nnkIdentDefs.newTree(ident"client", ident"RpcClient",
parameters.insert(1, nnkIdentDefs.newTree(ident"client", ident($clientType),
newEmptyNode()))
let
@ -205,17 +167,17 @@ proc createRpcFromSig*(rpcDecl: NimNode): NimNode =
when defined(nimDumpRpcs):
echo pathStr, ":\n", result.repr
proc processRpcSigs(parsedCode: NimNode): NimNode =
proc processRpcSigs(clientType, parsedCode: NimNode): NimNode =
result = newStmtList()
for line in parsedCode:
if line.kind == nnkProcDef:
var procDef = createRpcFromSig(line)
var procDef = createRpcFromSig(clientType, line)
result.add(procDef)
macro createRpcSigs*(filePath: static[string]): untyped =
macro createRpcSigs*(clientType: untyped, filePath: static[string]): untyped =
## Takes a file of forward declarations in Nim and builds them into RPC
## calls, based on their parameters.
## Inputs are marshalled to json, and results are put into the signature's
## Nim type.
result = processRpcSigs(staticRead($filePath).parseStmt())
result = processRpcSigs(clientType, staticRead($filePath).parseStmt())

View File

@ -0,0 +1,52 @@
import ../client, asyncdispatch2, tables, json
type
RpcSocketClient* = ref object of RpcClient
transport*: StreamTransport
address*: TransportAddress
const defaultMaxRequestLength* = 1024 * 128
proc newRpcSocketClient*: RpcSocketClient =
## Creates a new client instance.
new result
result.initRpcClient()
proc call*(self: RpcSocketClient, name: string,
params: JsonNode): Future[Response] {.async.} =
## Remotely calls the specified RPC method.
let id = self.getNextId()
var
value =
$rpcCallNode(name, params, id) & "\c\l"
if self.transport.isNil:
var connectStr = ""
raise newException(ValueError, "Transport is not initialised (missing a call to connect?)")
let res = await self.transport.write(value)
# TODO: Add actions when not full packet was send, e.g. disconnect peer.
assert(res == len(value))
# completed by processMessage.
var newFut = newFuture[Response]()
# add to awaiting responses
self.awaiting[id] = newFut
result = await newFut
proc processData(client: RpcSocketClient) {.async.} =
while true:
var value = await client.transport.readLine(defaultMaxRequestLength)
if value == "":
# transmission ends
client.transport.close
break
client.processMessage(value)
# async loop reconnection and waiting
client.transport = await connect(client.address)
proc connect*(client: RpcSocketClient, address: string, port: Port) {.async.} =
let addresses = resolveTAddress(address, port)
client.transport = await connect(addresses[0])
client.address = addresses[0]
asyncCheck processData(client)

View File

@ -144,7 +144,7 @@ proc route*(router: RpcRouter, data: string): Future[string] {.async, gcsafe.} =
else:
node["id"]
let
# fixed error code and message
# const error code and message
errKind = jsonErrorMessages[errState.err]
# pass on the actual error message
fullMsg = errKind[1] & " " & errState[1]
@ -159,8 +159,8 @@ proc tryRoute*(router: RpcRouter, data: JsonNode, fut: var Future[JsonNode]): bo
## Route to RPC, returns false if the method or params cannot be found.
## Expects json input and returns json output.
let
jPath = data{methodField}
jParams = data{paramsField}
jPath = data.getOrDefault(methodField)
jParams = data.getOrDefault(paramsField)
if jPath.isEmpty or jParams.isEmpty:
return false
@ -181,6 +181,14 @@ proc hasReturnType(params: NimNode): bool =
params[0].kind != nnkEmpty:
result = true
template trap(path: string, body: untyped): untyped =
try:
body
except:
let msg = getCurrentExceptionMsg()
debug "Error occurred within RPC ", path = path, errorMessage = msg
result = %*{codeField: %SERVER_ERROR, messageField: %msg}
macro rpc*(server: RpcRouter, path: string, body: untyped): untyped =
## Define a remote procedure call.
## Input and return parameters are defined using the ``do`` notation.
@ -210,47 +218,37 @@ macro rpc*(server: RpcRouter, path: string, body: untyped): untyped =
var
setup = jsonToNim(parameters, paramsIdent)
procBody = if body.kind == nnkStmtList: body else: body.body
errTrappedBody = quote do:
try:
`procBody`
except:
let msg = getCurrentExceptionMsg()
debug "Error occurred within RPC ", path = `path`, errorMessage = msg
`errJson` = %*{codeField: %SERVER_ERROR, messageField: %msg}
if parameters.hasReturnType:
let returnType = parameters[0]
# delegate async proc allows return and setting of result as native type
result.add(quote do:
proc `doMain`(`paramsIdent`: JsonNode, `errJson`: var JsonNode): `returnType` =
proc `doMain`(`paramsIdent`: JsonNode): `returnType` =
`setup`
`errTrappedBody`
`procBody`
)
if returnType == ident"JsonNode":
# `JsonNode` results don't need conversion
result.add( quote do:
proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} =
var `errJson`: JsonNode
`res` = `doMain`(`paramsIdent`, `errJson`)
if `errJson` != nil: `res` = `errJson`
proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async, gcsafe.} =
trap(`pathStr`):
`res` = `doMain`(`paramsIdent`)
)
else:
result.add(quote do:
proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} =
var `errJson`: JsonNode
`res` = %`doMain`(`paramsIdent`, `errJson`)
if `errJson` != nil: `res` = `errJson`
proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async, gcsafe.} =
trap(`pathStr`):
`res` = %`doMain`(`paramsIdent`)
)
else:
# no return types, inline contents
result.add(quote do:
proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} =
proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async, gcsafe.} =
`setup`
var `errJson`: JsonNode
`errTrappedBody`
if `errJson` != nil: `res` = `errJson`
trap(`pathStr`):
`procBody`
)
result.add( quote do:
`server`.register(`path`, `procName`)

View File

@ -1,3 +1,5 @@
import json_rpc / client
export client
import
json_rpc / client,
json_rpc / clients / socketclient
export client, socketclient

View File

@ -1,2 +1,2 @@
import json_rpc / server
export server
import json_rpc / server, json_rpc / servers / socketserver
export server, socketserver

View File

@ -1,2 +0,0 @@
import json_rpc / server, json_rpc / transports / socket
export server, socket

View File

@ -3,11 +3,11 @@
allow unchecked and unformatted calls.
]#
import unittest, debugclient, ../rpcsocket
import unittest, debugclient, ../rpcserver
import strformat, chronicles
var server = newRpcSocketServer("localhost", 8547.Port)
var client = newRpcStreamClient()
var client = newRpcSocketClient()
server.start()
waitFor client.connect("localhost", Port(8547))

View File

@ -1,5 +1,5 @@
import unittest, json, tables
import ../rpcclient, ../rpcsocket
import ../rpcclient, ../rpcserver
import stint, ethtypes, ethprocs, stintjson, nimcrypto, ethhexstrings, chronicles
from os import getCurrentDir, DirSep
@ -8,13 +8,13 @@ template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
var
server = newRpcSocketServer("localhost", Port(8546))
client = newRpcStreamClient()
client = newRpcSocketClient()
## Generate Ethereum server RPCs
server.addEthRpcs()
## Generate client convenience marshalling wrappers from forward declarations
createRpcSigs(sourceDir & DirSep & "ethcallsigs.nim")
createRpcSigs(RpcSocketClient, sourceDir & DirSep & "ethcallsigs.nim")
## Create custom RPC with StUint input parameter
server.rpc("rpc.uint256param") do(i: UInt256):

View File

@ -1,5 +1,5 @@
import unittest, json, tables, chronicles
import ../rpcsocket
import ../rpcserver
type
# some nested types to check object parsing

View File

@ -1,8 +1,8 @@
import unittest, json, chronicles
import ../rpcclient, ../rpcsocket
import ../rpcclient, ../rpcserver
var srv = newRpcSocketServer(["localhost:8545"])
var client = newRpcStreamClient()
var client = newRpcSocketClient()
# Create RPC on server
srv.rpc("myProc") do(input: string, data: array[0..3, int]):