Merge pull request #21 from status-im/TransportAgnosticRpcNoDSL

Transport agnostic RPC
This commit is contained in:
coffeepots 2018-07-10 17:27:14 +01:00 committed by GitHub
commit 35faf61d40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 707 additions and 433 deletions

4
.gitignore vendored
View File

@ -14,3 +14,7 @@
# Ignore the nimcache folders
nimcache/
# Ignore editor settings
.vscode

View File

@ -1,354 +0,0 @@
import json, tables, strutils, options, macros #, chronicles
import asyncdispatch2
import jsonmarshal
export asyncdispatch2, json, jsonmarshal
# Temporarily disable logging
macro debug(body: varargs[untyped]): untyped = newStmtList()
macro info(body: varargs[untyped]): untyped = newStmtList()
macro error(body: varargs[untyped]): untyped = newStmtList()
#logScope:
# topics = "RpcServer"
type
RpcJsonError* = enum rjeInvalidJson, rjeVersionError, rjeNoMethod, rjeNoId
RpcJsonErrorContainer* = tuple[err: RpcJsonError, msg: string]
# Procedure signature accepted as an RPC call by server
RpcProc* = proc (params: JsonNode): Future[JsonNode]
RpcServer* = ref object
servers*: seq[StreamServer]
procs*: TableRef[string, RpcProc]
RpcProcError* = ref object of Exception
code*: int
data*: JsonNode
RpcBindError* = object of Exception
RpcAddressUnresolvableError* = object of Exception
const
JSON_PARSE_ERROR* = -32700
INVALID_REQUEST* = -32600
METHOD_NOT_FOUND* = -32601
INVALID_PARAMS* = -32602
INTERNAL_ERROR* = -32603
SERVER_ERROR* = -32000
maxRequestLength = 1024 * 128
jsonErrorMessages*: array[RpcJsonError, (int, string)] =
[
(JSON_PARSE_ERROR, "Invalid JSON"),
(INVALID_REQUEST, "JSON 2.0 required"),
(INVALID_REQUEST, "No method requested"),
(INVALID_REQUEST, "No id specified")
]
# Utility functions
# TODO: Move outside server
func `%`*(p: Port): JsonNode = %(p.int)
# Json state checking
template jsonValid*(jsonString: string, node: var JsonNode): (bool, string) =
var
valid = true
msg = ""
try: node = parseJson(line)
except:
valid = false
msg = getCurrentExceptionMsg()
debug "Cannot process json", json = jsonString, msg = msg
(valid, msg)
proc checkJsonErrors*(line: string,
node: var JsonNode): Option[RpcJsonErrorContainer] =
## Tries parsing line into node, if successful checks required fields
## Returns: error state or none
let res = jsonValid(line, node)
if not res[0]:
return some((rjeInvalidJson, res[1]))
if not node.hasKey("id"):
return some((rjeNoId, ""))
if node{"jsonrpc"} != %"2.0":
return some((rjeVersionError, ""))
if not node.hasKey("method"):
return some((rjeNoMethod, ""))
return none(RpcJsonErrorContainer)
# Json reply wrappers
proc wrapReply*(id: JsonNode, value: JsonNode, error: JsonNode): string =
let node = %{"jsonrpc": %"2.0", "result": value, "error": error, "id": id}
return $node & "\c\l"
proc sendError*(client: StreamTransport, code: int, msg: string, id: JsonNode,
data: JsonNode = newJNull()) {.async.} =
## Send error message to client
let error = %{"code": %(code), "id": id, "message": %msg, "data": data}
debug "Error generated", error = error, id = id
var res = wrapReply(id, newJNull(), error)
result = client.write(res)
proc sendJsonError*(state: RpcJsonError, client: StreamTransport, id: JsonNode,
data = newJNull()) {.async.} =
## Send client response for invalid json state
let errMsgs = jsonErrorMessages[state]
await client.sendError(errMsgs[0], errMsgs[1], id, data)
# Server message processing
proc processMessage(server: RpcServer, client: StreamTransport,
line: string) {.async.} =
var
node: JsonNode
# set up node and/or flag errors
jsonErrorState = checkJsonErrors(line, node)
if jsonErrorState.isSome:
let errState = jsonErrorState.get
var id =
if errState.err == rjeInvalidJson or errState.err == rjeNoId:
newJNull()
else:
node["id"]
await errState.err.sendJsonError(client, id, %errState.msg)
else:
let
methodName = node["method"].str
id = node["id"]
if not server.procs.hasKey(methodName):
await client.sendError(METHOD_NOT_FOUND, "Method not found", %id,
%(methodName & " is not a registered method."))
else:
let callRes = await server.procs[methodName](node["params"])
var res = wrapReply(id, callRes, newJNull())
discard await client.write(res)
proc processClient(server: StreamServer, client: StreamTransport) {.async, gcsafe.} =
var rpc = getUserData[RpcServer](server)
while true:
let line = await client.readLine(maxRequestLength)
if line == "":
client.close()
break
debug "Processing client", addresss = client.remoteAddress(), line
let future = processMessage(rpc, client, line)
yield future
if future.failed:
if future.readError of RpcProcError:
let err = future.readError.RpcProcError
await client.sendError(err.code, err.msg, err.data)
elif future.readError of ValueError:
let err = future.readError[].ValueError
await client.sendError(INVALID_PARAMS, err.msg, %"")
else:
await client.sendError(SERVER_ERROR,
"Error: Unknown error occurred", %"")
proc newRpcServer*(addresses: openarray[TransportAddress]): RpcServer =
## Create new server and assign it to addresses ``addresses``.
result = RpcServer()
result.procs = newTable[string, RpcProc]()
result.servers = newSeq[StreamServer]()
for item in addresses:
try:
info "Creating server on ", address = $item
var server = createStreamServer(item, processClient, {ReuseAddr},
udata = result)
result.servers.add(server)
except:
error "Failed to create server", address = $item, message = getCurrentExceptionMsg()
if len(result.servers) == 0:
# Server was not bound, critical error.
# TODO: Custom RpcException error
raise newException(RpcBindError, "Unable to create server!")
proc newRpcServer*(addresses: openarray[string]): RpcServer =
## Create new server and assign it to addresses ``addresses``.
var
tas4: seq[TransportAddress]
tas6: seq[TransportAddress]
baddrs: seq[TransportAddress]
for a in addresses:
# Attempt to resolve `address` for IPv4 address space.
try:
tas4 = resolveTAddress(a, IpAddressFamily.IPv4)
except:
discard
# Attempt to resolve `address` for IPv6 address space.
try:
tas6 = resolveTAddress(a, IpAddressFamily.IPv6)
except:
discard
for r in tas4:
baddrs.add(r)
for r in tas6:
baddrs.add(r)
if len(baddrs) == 0:
# Addresses could not be resolved, critical error.
raise newException(RpcAddressUnresolvableError, "Unable to get address!")
result = newRpcServer(baddrs)
proc newRpcServer*(address = "localhost", port: Port = Port(8545)): RpcServer =
var
tas4: seq[TransportAddress]
tas6: seq[TransportAddress]
# Attempt to resolve `address` for IPv4 address space.
try:
tas4 = resolveTAddress(address, port, IpAddressFamily.IPv4)
except:
discard
# Attempt to resolve `address` for IPv6 address space.
try:
tas6 = resolveTAddress(address, port, IpAddressFamily.IPv6)
except:
discard
if len(tas4) == 0 and len(tas6) == 0:
# Address was not resolved, critical error.
raise newException(RpcAddressUnresolvableError,
"Address " & address & " could not be resolved!")
result = RpcServer()
result.procs = newTable[string, RpcProc]()
result.servers = newSeq[StreamServer]()
for item in tas4:
try:
info "Creating server for address", ip4address = $item
var server = createStreamServer(item, processClient, {ReuseAddr},
udata = result)
result.servers.add(server)
except:
error "Failed to create server for address", address = $item
for item in tas6:
try:
info "Server created", ip6address = $item
var server = createStreamServer(item, processClient, {ReuseAddr},
udata = result)
result.servers.add(server)
except:
error "Failed to create server", address = $item
if len(result.servers) == 0:
# Server was not bound, critical error.
raise newException(RpcBindError,
"Could not setup server on " & address & ":" & $int(port))
proc start*(server: RpcServer) =
## Start the RPC server.
for item in server.servers:
item.start()
proc stop*(server: RpcServer) =
## Stop the RPC server.
for item in server.servers:
item.stop()
proc close*(server: RpcServer) =
## Cleanup resources of RPC server.
for item in server.servers:
item.close()
# Server registration and RPC generation
proc register*(server: RpcServer, name: string, rpc: RpcProc) =
## Add a name/code pair to the RPC server.
server.procs[name] = rpc
proc unRegisterAll*(server: RpcServer) =
# Remove all remote procedure calls from this server.
server.procs.clear
proc makeProcName(s: string): string =
result = ""
for c in s:
if c.isAlphaNumeric: result.add c
proc hasReturnType(params: NimNode): bool =
if params != nil and params.len > 0 and params[0] != nil and
params[0].kind != nnkEmpty:
result = true
macro rpc*(server: RpcServer, path: string, body: untyped): untyped =
## Define a remote procedure call.
## Input and return parameters are defined using the ``do`` notation.
## For example:
## .. code-block:: nim
## myServer.rpc("path") do(param1: int, param2: float) -> string:
## result = $param1 & " " & $param2
## ```
## Input parameters are automatically marshalled from json to Nim types,
## and output parameters are automatically marshalled to json for transport.
result = newStmtList()
let
parameters = body.findChild(it.kind == nnkFormalParams)
# all remote calls have a single parameter: `params: JsonNode`
paramsIdent = newIdentNode"params"
# procs are generated from the stripped path
pathStr = $path
# strip non alphanumeric
procNameStr = pathStr.makeProcName
# public rpc proc
procName = newIdentNode(procNameStr)
# when parameters present: proc that contains our rpc body
doMain = newIdentNode(procNameStr & "DoMain")
# async result
res = newIdentNode("result")
var
setup = jsonToNim(parameters, paramsIdent)
procBody = if body.kind == nnkStmtList: body else: body.body
if parameters.hasReturnType:
let returnType = parameters[0]
# delegate async proc allows return and setting of result as native type
result.add(quote do:
proc `doMain`(`paramsIdent`: JsonNode): Future[`returnType`] {.async.} =
`setup`
`procBody`
)
if returnType == ident"JsonNode":
# `JsonNode` results don't need conversion
result.add( quote do:
proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} =
`res` = await `doMain`(`paramsIdent`)
)
else:
result.add(quote do:
proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} =
`res` = %await `doMain`(`paramsIdent`)
)
else:
# no return types, inline contents
result.add(quote do:
proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} =
`setup`
`procBody`
)
result.add( quote do:
`server`.register(`path`, `procName`)
)
when defined(nimDumpRpcs):
echo "\n", pathStr, ": ", result.repr
# TODO: Allow cross checking between client signatures and server calls

View File

@ -1,5 +1,5 @@
packageName = "eth_rpc"
version = "0.0.1"
packageName = "json_rpc"
version = "0.0.2"
author = "Status Research & Development GmbH"
description = "Ethereum remote procedure calls"
license = "Apache License 2.0"

View File

@ -1,31 +1,38 @@
import tables, json, macros
import asyncdispatch2
from strutils import toLowerAscii
import jsonmarshal
export asyncdispatch2
type
RpcClient* = ref object
transp: StreamTransport
RpcClient*[T, A] = ref object
awaiting: Table[string, Future[Response]]
address: TransportAddress
transport: T
address: A
nextId: int64
Response* = tuple[error: bool, result: JsonNode]
const maxRequestLength = 1024 * 128
const defaultMaxRequestLength* = 1024 * 128
proc newRpcClient*(): RpcClient =
proc newRpcClient*[T, A]: RpcClient[T, A] =
## Creates a new ``RpcClient`` instance.
result = RpcClient(awaiting: initTable[string, Future[Response]](), nextId: 1)
result = RpcClient[T, A](awaiting: initTable[string, Future[Response]](), nextId: 1)
proc call*(self: RpcClient, name: string,
params: JsonNode): Future[Response] {.async.} =
params: JsonNode): Future[Response] {.async.} =
## Remotely calls the specified RPC method.
let id = $self.nextId
self.nextId.inc
var msg = $ %{"jsonrpc": %"2.0", "method": %name, "params": params,
"id": %id} & "\c\l"
let res = await self.transp.write(msg)
var
value =
$ %{"jsonrpc": %"2.0",
"method": %name,
"params": params,
"id": %id} & "\c\l"
let res = await self.transport.write(value)
# TODO: Add actions when not full packet was send, e.g. disconnect peer.
assert(res == len(msg))
assert(res == len(value))
# completed by processMessage.
var newFut = newFuture[Response]()
@ -33,10 +40,8 @@ proc call*(self: RpcClient, name: string,
self.awaiting[id] = newFut
result = await newFut
template handleRaise[T](fut: Future[T], errType: typedesc, msg: string) =
# complete future before raising
fut.complete((true, %msg))
raise newException(errType, msg)
template asyncRaise[T](fut: Future[T], errType: typedesc, msg: string) =
fut.fail(newException(errType, msg))
macro checkGet(node: JsonNode, fieldName: string,
jKind: static[JsonNodeKind]): untyped =
@ -58,17 +63,18 @@ macro checkGet(node: JsonNode, fieldName: string,
else: discard
proc processMessage(self: RpcClient, line: string) =
let node = parseJson(line)
# Note: this doesn't use any transport code so doesn't need to be differentiated.
let
node = parseJson(line) # TODO: Check errors
id = checkGet(node, "id", JString)
# TODO: Use more appropriate exception objects
let id = checkGet(node, "id", JString)
if not self.awaiting.hasKey(id):
raise newException(ValueError,
"Cannot find message id \"" & node["id"].str & "\"")
let version = checkGet(node, "jsonrpc", JString)
if version != "2.0":
self.awaiting[id].handleRaise(ValueError,
self.awaiting[id].asyncRaise(ValueError,
"Unsupported version of JSON, expected 2.0, received \"" & version & "\"")
let errorNode = node{"error"}
@ -79,31 +85,34 @@ proc processMessage(self: RpcClient, line: string) =
self.awaiting.del(id)
# TODO: actions on unable find result node
else:
self.awaiting[id].complete((true, errorNode))
self.awaiting[id].fail(newException(ValueError, $errorNode))
self.awaiting.del(id)
proc connect*(self: RpcClient, address: string, port: Port): Future[void]
proc processData(self: RpcClient) {.async.} =
proc processData(client: RpcClient) {.async.} =
while true:
let line = await self.transp.readLine(maxRequestLength)
if line == "":
var value = await client.transport.readLine(defaultMaxRequestLength)
if value == "":
# transmission ends
self.transp.close()
client.transport.close
break
processMessage(self, line)
client.processMessage(value)
# async loop reconnection and waiting
self.transp = await connect(self.address)
client.transport = await connect(client.address)
proc connect*(self: RpcClient, address: string, port: Port) {.async.} =
# TODO: `address` hostname can be resolved to many IP addresses, we are using
# first one, but maybe it would be better to iterate over all IP addresses
# and try to establish connection until it will not be established.
type RpcStreamClient* = RpcClient[StreamTransport, TransportAddress]
proc connect*(client: RpcStreamClient, address: string, port: Port) {.async.} =
let addresses = resolveTAddress(address, port)
self.transp = await connect(addresses[0])
self.address = addresses[0]
asyncCheck processData(self)
client.transport = await connect(addresses[0])
client.address = addresses[0]
asyncCheck processData(client)
proc newRpcStreamClient*(): RpcStreamClient =
## Create new server and assign it to addresses ``addresses``.
result = newRpcClient[StreamTransport, TransportAddress]()
# Signature processing
proc createRpcProc(procName, parameters, callBody: NimNode): NimNode =
# parameters come as a tree

240
json_rpc/router.nim Normal file
View File

@ -0,0 +1,240 @@
import
json, tables, asyncdispatch2, jsonmarshal, strutils, macros,
chronicles, options
export asyncdispatch2, json, jsonmarshal
type
RpcJsonError* = enum rjeInvalidJson, rjeVersionError, rjeNoMethod, rjeNoId, rjeNoParams
RpcJsonErrorContainer* = tuple[err: RpcJsonError, msg: string]
# Procedure signature accepted as an RPC call by server
RpcProc* = proc(input: JsonNode): Future[JsonNode] {.gcsafe.}
RpcProcError* = ref object of Exception
code*: int
data*: JsonNode
RpcBindError* = object of Exception
RpcAddressUnresolvableError* = object of Exception
RpcRouter* = object
procs*: TableRef[string, RpcProc]
const
methodField = "method"
paramsField = "params"
jsonRpcField = "jsonrpc"
idField = "id"
resultField = "result"
errorField = "error"
codeField = "code"
messageField = "message"
dataField = "data"
messageTerminator = "\c\l"
JSON_PARSE_ERROR* = -32700
INVALID_REQUEST* = -32600
METHOD_NOT_FOUND* = -32601
INVALID_PARAMS* = -32602
INTERNAL_ERROR* = -32603
SERVER_ERROR* = -32000
defaultMaxRequestLength* = 1024 * 128
jsonErrorMessages*: array[RpcJsonError, (int, string)] =
[
(JSON_PARSE_ERROR, "Invalid JSON"),
(INVALID_REQUEST, "JSON 2.0 required"),
(INVALID_REQUEST, "No method requested"),
(INVALID_REQUEST, "No id specified"),
(INVALID_PARAMS, "No parameters specified")
]
proc newRpcRouter*: RpcRouter =
result.procs = newTable[string, RpcProc]()
proc register*(router: var RpcRouter, path: string, call: RpcProc) =
router.procs.add(path, call)
proc clear*(router: var RpcRouter) = router.procs.clear
proc hasMethod*(router: RpcRouter, methodName: string): bool = router.procs.hasKey(methodName)
template isEmpty(node: JsonNode): bool = node.isNil or node.kind == JNull
# Json state checking
template jsonValid*(jsonString: string, node: var JsonNode): (bool, string) =
var
valid = true
msg = ""
try: node = parseJson(line)
except:
valid = false
msg = getCurrentExceptionMsg()
debug "Cannot process json", json = jsonString, msg = msg
(valid, msg)
proc checkJsonState*(line: string,
node: var JsonNode): Option[RpcJsonErrorContainer] =
## Tries parsing line into node, if successful checks required fields
## Returns: error state or none
let res = jsonValid(line, node)
if not res[0]:
return some((rjeInvalidJson, res[1]))
if not node.hasKey(idField):
return some((rjeNoId, ""))
let jVer = node{jsonRpcField}
if jVer != nil and jVer.kind != JNull and jVer != %"2.0":
return some((rjeVersionError, ""))
if not node.hasKey(methodField):
return some((rjeNoMethod, ""))
if not node.hasKey(paramsField):
return some((rjeNoParams, ""))
return none(RpcJsonErrorContainer)
# Json reply wrappers
proc wrapReply*(id: JsonNode, value: JsonNode, error: JsonNode): JsonNode =
return %{jsonRpcField: %"2.0", resultField: value, errorField: error, idField: id}
proc wrapError*(code: int, msg: string, id: JsonNode,
data: JsonNode = newJNull()): JsonNode =
# Create standardised error json
result = %{codeField: %(code), idField: id, messageField: %msg, dataField: data}
debug "Error generated", error = result, id = id
proc route*(router: RpcRouter, node: JsonNode): Future[JsonNode] {.async, gcsafe.} =
## Assumes correct setup of node
let
methodName = node[methodField].str
id = node[idField]
rpcProc = router.procs.getOrDefault(methodName)
if rpcProc.isNil:
let
methodNotFound = %(methodName & " is not a registered RPC method.")
error = wrapError(METHOD_NOT_FOUND, "Method not found", id, methodNotFound)
result = wrapReply(id, newJNull(), error)
else:
let
jParams = node[paramsField]
res = await rpcProc(jParams)
result = wrapReply(id, res, newJNull())
proc route*(router: RpcRouter, data: string): Future[string] {.async, gcsafe.} =
## Route to RPC from string data. Data is expected to be able to be converted to Json.
## Returns string of Json from RPC result/error node
var
node: JsonNode
# parse json node and/or flag missing fields and errors
jsonErrorState = checkJsonState(data, node)
if jsonErrorState.isSome:
let errState = jsonErrorState.get
var id =
if errState.err == rjeInvalidJson or errState.err == rjeNoId:
newJNull()
else:
node["id"]
let
errMsg = jsonErrorMessages[errState.err]
res = wrapError(code = errMsg[0], msg = errMsg[1], id = id)
# return error state as json
result = $res & messageTerminator
else:
let res = await router.route(node)
result = $res & messageTerminator
proc tryRoute*(router: RpcRouter, data: JsonNode, fut: var Future[JsonNode]): bool =
## Route to RPC, returns false if the method or params cannot be found.
## Expects json input and returns json output.
let
jPath = data{methodField}
jParams = data{paramsField}
if jPath.isEmpty or jParams.isEmpty:
return false
let
path = jPath.getStr
rpc = router.procs.getOrDefault(path)
if rpc != nil:
fut = rpc(jParams)
return true
proc makeProcName(s: string): string =
result = ""
for c in s:
if c.isAlphaNumeric: result.add c
proc hasReturnType(params: NimNode): bool =
if params != nil and params.len > 0 and params[0] != nil and
params[0].kind != nnkEmpty:
result = true
macro rpc*(server: RpcRouter, path: string, body: untyped): untyped =
## Define a remote procedure call.
## Input and return parameters are defined using the ``do`` notation.
## For example:
## .. code-block:: nim
## myServer.rpc("path") do(param1: int, param2: float) -> string:
## result = $param1 & " " & $param2
## ```
## Input parameters are automatically marshalled from json to Nim types,
## and output parameters are automatically marshalled to json for transport.
result = newStmtList()
let
parameters = body.findChild(it.kind == nnkFormalParams)
# all remote calls have a single parameter: `params: JsonNode`
paramsIdent = newIdentNode"params"
# procs are generated from the stripped path
pathStr = $path
# strip non alphanumeric
procNameStr = pathStr.makeProcName
# public rpc proc
procName = newIdentNode(procNameStr)
# when parameters present: proc that contains our rpc body
doMain = newIdentNode(procNameStr & "DoMain")
# async result
res = newIdentNode("result")
var
setup = jsonToNim(parameters, paramsIdent)
procBody = if body.kind == nnkStmtList: body else: body.body
errTrappedBody = quote do:
try:
`procBody`
except:
debug "Error occurred within RPC ", path = `path`, errorMessage = getCurrentExceptionMsg()
if parameters.hasReturnType:
let returnType = parameters[0]
# delegate async proc allows return and setting of result as native type
result.add(quote do:
proc `doMain`(`paramsIdent`: JsonNode): Future[`returnType`] {.async.} =
`setup`
`errTrappedBody`
)
if returnType == ident"JsonNode":
# `JsonNode` results don't need conversion
result.add( quote do:
proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} =
`res` = await `doMain`(`paramsIdent`)
)
else:
result.add(quote do:
proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} =
`res` = %await `doMain`(`paramsIdent`)
)
else:
# no return types, inline contents
result.add(quote do:
proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} =
`setup`
`errTrappedBody`
)
result.add( quote do:
`server`.register(`path`, `procName`)
)
when defined(nimDumpRpcs):
echo "\n", pathStr, ": ", result.repr

37
json_rpc/server.nim Normal file
View File

@ -0,0 +1,37 @@
import json, tables, options, macros
import asyncdispatch2, router
import jsonmarshal
export asyncdispatch2, json, jsonmarshal, router
type
RpcServer*[S] = ref object
servers*: seq[S]
router*: RpcRouter
proc newRpcServer*[S](): RpcServer[S] =
new result
result.router = newRpcRouter()
result.servers = @[]
template rpc*(server: RpcServer, path: string, body: untyped): untyped =
server.router.rpc(path, body)
template hasMethod*(server: RpcServer, methodName: string): bool = server.router.hasMethod(methodName)
# Wrapper for message processing
proc route*[T](server: RpcServer[T], line: string): Future[string] {.async, gcsafe.} =
result = await server.router.route(line)
# Server registration
proc register*(server: RpcServer, name: string, rpc: RpcProc) =
## Add a name/code pair to the RPC server.
server.router.addRoute(name, rpc)
proc unRegisterAll*(server: RpcServer) =
# Remove all remote procedure calls from this server.
server.router.clear

View File

@ -0,0 +1,156 @@
import server, json, chronicles
proc sendError*[T](transport: T, code: int, msg: string, id: JsonNode,
data: JsonNode = newJNull()) {.async.} =
## Send error message to client
let error = wrapError(code, msg, id, data)
var value = $wrapReply(id, newJNull(), error)
result = transport.write(value)
proc processClient(server: StreamServer, transport: StreamTransport) {.async, gcsafe.} =
## Process transport data to the RPC server
var rpc = getUserData[RpcServer[StreamTransport]](server)
while true:
var
maxRequestLength = defaultMaxRequestLength
value = await transport.readLine(defaultMaxRequestLength)
if value == "":
transport.close
break
debug "Processing message", address = transport.remoteAddress(), line = value
let future = rpc.route(value)
yield future
if future.failed:
if future.readError of RpcProcError:
let err = future.readError.RpcProcError
await transport.sendError(err.code, err.msg, err.data)
elif future.readError of ValueError:
let err = future.readError[].ValueError
await transport.sendError(INVALID_PARAMS, err.msg, %"")
else:
await transport.sendError(SERVER_ERROR,
"Error: Unknown error occurred", %"")
else:
let res = await future
result = transport.write(res)
# Utility functions for setting up servers using stream transport addresses
proc addStreamServer*(server: RpcServer[StreamServer], address: TransportAddress) =
try:
info "Creating server on ", address = $address
var transportServer = createStreamServer(address, processClient, {ReuseAddr}, udata = server)
server.servers.add(transportServer)
except:
error "Failed to create server", address = $address, message = getCurrentExceptionMsg()
if len(server.servers) == 0:
# Server was not bound, critical error.
raise newException(RpcBindError, "Unable to create server!")
proc addStreamServers*(server: RpcServer[StreamServer], addresses: openarray[TransportAddress]) =
for item in addresses:
server.addStreamServer(item)
proc addStreamServer*(server: RpcServer[StreamServer], address: string) =
## Create new server and assign it to addresses ``addresses``.
var
tas4: seq[TransportAddress]
tas6: seq[TransportAddress]
added = 0
# Attempt to resolve `address` for IPv4 address space.
try:
tas4 = resolveTAddress(address, IpAddressFamily.IPv4)
except:
discard
# Attempt to resolve `address` for IPv6 address space.
try:
tas6 = resolveTAddress(address, IpAddressFamily.IPv6)
except:
discard
for r in tas4:
server.addStreamServer(r)
added.inc
for r in tas6:
server.addStreamServer(r)
added.inc
if added == 0:
# Addresses could not be resolved, critical error.
raise newException(RpcAddressUnresolvableError, "Unable to get address!")
proc addStreamServers*(server: RpcServer[StreamServer], addresses: openarray[string]) =
for address in addresses:
server.addStreamServer(address)
proc addStreamServer*(server: RpcServer[StreamServer], address: string, port: Port) =
var
tas4: seq[TransportAddress]
tas6: seq[TransportAddress]
added = 0
# Attempt to resolve `address` for IPv4 address space.
try:
tas4 = resolveTAddress(address, port, IpAddressFamily.IPv4)
except:
discard
# Attempt to resolve `address` for IPv6 address space.
try:
tas6 = resolveTAddress(address, port, IpAddressFamily.IPv6)
except:
discard
if len(tas4) == 0 and len(tas6) == 0:
# Address was not resolved, critical error.
raise newException(RpcAddressUnresolvableError,
"Address " & address & " could not be resolved!")
for r in tas4:
server.addStreamServer(r)
added.inc
for r in tas6:
server.addStreamServer(r)
added.inc
if len(server.servers) == 0:
# Server was not bound, critical error.
raise newException(RpcBindError,
"Could not setup server on " & address & ":" & $int(port))
type RpcStreamServer* = RpcServer[StreamServer]
proc newRpcStreamServer*(addresses: openarray[TransportAddress]): RpcStreamServer =
## Create new server and assign it to addresses ``addresses``.
result = newRpcServer[StreamServer]()
result.addStreamServers(addresses)
proc newRpcStreamServer*(addresses: openarray[string]): RpcStreamServer =
## Create new server and assign it to addresses ``addresses``.
result = newRpcServer[StreamServer]()
result.addStreamServers(addresses)
proc newRpcStreamServer*(address = "localhost", port: Port = Port(8545)): RpcStreamServer =
# Create server on specified port
result = newRpcServer[StreamServer]()
result.addStreamServer(address, port)
proc start*(server: RpcStreamServer) =
## Start the RPC server.
for item in server.servers:
item.start()
proc stop*(server: RpcStreamServer) =
## Stop the RPC server.
for item in server.servers:
item.stop()
proc close*(server: RpcStreamServer) =
## Cleanup resources of RPC server.
for item in server.servers:
item.close()

View File

@ -1,3 +1,3 @@
import eth-rpc / client
import json_rpc / client
export client

View File

@ -1,2 +1,2 @@
import eth-rpc / server
import json_rpc / server
export server

2
rpcsockets.nim Normal file
View File

@ -0,0 +1,2 @@
import json_rpc / [server, sockettransport]
export server, sockettransport

View File

@ -1,3 +1,3 @@
import
testrpcmacro, testserverclient, testethcalls, testerrors
testrpcmacro, testserverclient, testethcalls #, testerrors

View File

@ -1,4 +1,4 @@
include ../ eth-rpc / client
include ../ json_rpc / client
proc nextId*(self: RpcClient): int64 = self.nextId
@ -9,7 +9,7 @@ proc rawCall*(self: RpcClient, name: string,
self.nextId.inc
var s = msg & "\c\l"
let res = await self.transp.write(s)
let res = await self.transport.write(s)
assert res == len(s)
# completed by processMessage.

157
tests/ethhexstrings.nim Normal file
View File

@ -0,0 +1,157 @@
type
HexQuantityStr* = distinct string
HexDataStr* = distinct string
# Hex validation
template stripLeadingZeros(value: string): string =
var cidx = 0
# ignore the last character so we retain '0' on zero value
while cidx < value.len - 1 and value[cidx] == '0':
cidx.inc
value[cidx .. ^1]
proc encodeQuantity*(value: SomeUnsignedInt): string =
var hValue = value.toHex.stripLeadingZeros
result = "0x" & hValue
template hasHexHeader*(value: string | HexDataStr | HexQuantityStr): bool =
template strVal: untyped = value.string
if strVal != "" and strVal[0] == '0' and strVal[1] in {'x', 'X'} and strVal.len > 2: true
else: false
template isHexChar*(c: char): bool =
if c notin {'0'..'9'} and
c notin {'a'..'f'} and
c notin {'A'..'F'}: false
else: true
proc validate*(value: HexQuantityStr): bool =
template strVal: untyped = value.string
if not value.hasHexHeader:
return false
# No leading zeros
if strVal[2] == '0': return false
for i in 2..<strVal.len:
let c = strVal[i]
if not c.isHexChar:
return false
return true
proc validate*(value: HexDataStr): bool =
template strVal: untyped = value.string
if not value.hasHexHeader:
return false
# Leading zeros are allowed
for i in 2..<strVal.len:
let c = strVal[i]
if not c.isHexChar:
return false
# Must be even number of digits
if strVal.len mod 2 != 0: return false
return true
# Initialisation
template hexDataStr*(value: string): HexDataStr = value.HexDataStr
template hexQuantityStr*(value: string): HexQuantityStr = value.HexQuantityStr
# Converters
import json
from ../rpcserver import expect
proc `%`*(value: HexDataStr): JsonNode =
if not value.validate:
raise newException(ValueError, "HexDataStr: Invalid hex for Ethereum: " & value.string)
else:
result = %(value.string)
proc `%`*(value: HexQuantityStr): JsonNode =
if not value.validate:
raise newException(ValueError, "HexQuantityStr: Invalid hex for Ethereum: " & value.string)
else:
result = %(value.string)
proc fromJson*(n: JsonNode, argName: string, result: var HexDataStr) =
# Note that '0x' is stripped after validation
n.kind.expect(JString, argName)
let hexStr = n.getStr()
if not hexStr.hexDataStr.validate:
raise newException(ValueError, "Parameter \"" & argName & "\" value is not valid as a Ethereum data \"" & hexStr & "\"")
result = hexStr[2..hexStr.high].hexDataStr
proc fromJson*(n: JsonNode, argName: string, result: var HexQuantityStr) =
# Note that '0x' is stripped after validation
n.kind.expect(JString, argName)
let hexStr = n.getStr()
if not hexStr.hexQuantityStr.validate:
raise newException(ValueError, "Parameter \"" & argName & "\" value is not valid as an Ethereum hex quantity \"" & hexStr & "\"")
result = hexStr[2..hexStr.high].hexQuantityStr
# testing
when isMainModule:
import unittest
suite "Hex quantity":
test "Empty string":
expect ValueError:
let
source = ""
x = hexQuantityStr source
check %x == %source
test "Even length":
let
source = "0x123"
x = hexQuantityStr source
check %x == %source
test "Odd length":
let
source = "0x123"
x = hexQuantityStr"0x123"
check %x == %source
test "Missing header":
expect ValueError:
let
source = "1234"
x = hexQuantityStr source
check %x != %source
expect ValueError:
let
source = "01234"
x = hexQuantityStr source
check %x != %source
expect ValueError:
let
source = "x1234"
x = hexQuantityStr source
check %x != %source
suite "Hex data":
test "Even length":
let
source = "0x1234"
x = hexDataStr source
check %x == %source
test "Odd length":
expect ValueError:
let
source = "0x123"
x = hexDataStr source
check %x != %source
test "Missing header":
expect ValueError:
let
source = "1234"
x = hexDataStr source
check %x != %source
expect ValueError:
let
source = "01234"
x = hexDataStr source
check %x != %source
expect ValueError:
let
source = "x1234"
x = hexDataStr source
check %x != %source

View File

@ -1,4 +1,4 @@
import ../rpcserver, nimcrypto, json, stint, strutils, ethtypes, stintjson
import ../rpcserver, nimcrypto, json, stint, strutils, ethtypes, stintjson, ethhexstrings
#[
For details on available RPC calls, see: https://github.com/ethereum/wiki/wiki/JSON-RPC
@ -31,19 +31,16 @@ proc addEthRpcs*(server: RpcServer) =
## Returns the current client version.
result = "Nimbus-RPC-Test"
server.rpc("web3_sha3") do(data: string) -> string:
server.rpc("web3_sha3") do(data: HexDataStr) -> HexDataStr:
## Returns Keccak-256 (not the standardized SHA3-256) of the given data.
##
## data: the data to convert into a SHA3 hash.
## Returns the SHA3 result of the given string.
# TODO: Capture error on malformed input
var rawData: seq[byte]
if data.len > 2 and data[0] == '0' and data[1] in ['x', 'X']:
rawData = data[2..data.high].fromHex
else:
rawData = data.fromHex
rawData = data.string.fromHex
# data will have 0x prefix
result = "0x" & $keccak_256.digest(rawData)
result = hexDataStr "0x" & $keccak_256.digest(rawData)
server.rpc("net_version") do() -> string:
## Returns string of the current network id:

View File

@ -3,11 +3,11 @@
allow unchecked and unformatted calls.
]#
import unittest, debugclient, ../rpcserver
import unittest, debugclient, ../rpcsockets
import strformat, chronicles
var server = newRpcServer("localhost", 8547.Port)
var client = newRpcClient()
var server = newRpcStreamServer("localhost", 8547.Port)
var client = newRpcStreamClient()
server.start()
waitFor client.connect("localhost", Port(8547))
@ -15,6 +15,10 @@ waitFor client.connect("localhost", Port(8547))
server.rpc("rpc") do(a: int, b: int):
result = %(&"a: {a}, b: {b}")
server.rpc("makeError"):
if true:
raise newException(ValueError, "Test")
proc testMissingRpc: Future[Response] {.async.} =
var fut = client.call("phantomRpc", %[])
result = await fut
@ -33,22 +37,44 @@ proc testMalformed: Future[Response] {.async.} =
if fut.finished: result = fut.read()
else: result = (true, %"Timeout")
proc testRaise: Future[Response] {.async.} =
var fut = client.call("rpcMakeError", %[])
result = await fut
suite "RPC Errors":
# Note: We don't expect a exceptions for most of the tests,
# because the server should respond with the error in json
test "Missing RPC":
let res = waitFor testMissingRpc()
check res.error == true and
res.result["message"] == %"Method not found" and
res.result["data"] == %"phantomRpc is not a registered method."
#expect ValueError:
try:
let res = waitFor testMissingRpc()
check res.error == true and
res.result["message"] == %"Method not found" and
res.result["data"] == %"phantomRpc is not a registered method."
except:
echo "Error ", getCurrentExceptionMsg()
test "Incorrect json version":
let res = waitFor testInvalidJsonVer()
check res.error == true and res.result["message"] == %"JSON 2.0 required"
#expect ValueError:
try:
let res = waitFor testInvalidJsonVer()
check res.error == true and res.result["message"] == %"JSON 2.0 required"
except:
echo "Error ", getCurrentExceptionMsg()
test "Raising exceptions":
#expect ValueError:
try:
let res = waitFor testRaise()
except:
echo "Error ", getCurrentExceptionMsg()
test "Malformed json":
# TODO: We time out here because the server won't be able to
# find an id to return to us, so we cannot complete the future.
let res = waitFor testMalformed()
check res.error == true and res.result == %"Timeout"
try:
let res = waitFor testMalformed()
check res.error == true and res.result == %"Timeout"
except:
echo "Error ", getCurrentExceptionMsg()

View File

@ -1,14 +1,14 @@
import unittest, json, tables
import ../rpcclient, ../rpcserver
import stint, ethtypes, ethprocs, stintjson
import ../rpcclient, ../rpcsockets
import stint, ethtypes, ethprocs, stintjson, nimcrypto, ethhexstrings, chronicles
from os import getCurrentDir, DirSep
from strutils import rsplit
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
var
server = newRpcServer("localhost", Port(8546))
client = newRpcClient()
server = newRpcStreamServer("localhost", Port(8546))
client = newRpcStreamClient()
## Generate Ethereum server RPCs
server.addEthRpcs()

View File

@ -1,5 +1,5 @@
import unittest, json, tables
import ../rpcserver
import unittest, json, tables, chronicles
import ../rpcsockets
type
# some nested types to check object parsing
@ -27,7 +27,7 @@ let
},
"c": %1.23}
var s = newRpcServer(["localhost:8545"])
var s = newRpcStreamServer(["localhost:8545"])
# RPC definitions
@ -67,14 +67,14 @@ s.rpc("rpc.testreturns") do() -> int:
suite "Server types":
test "On macro registration":
check s.procs.hasKey("rpc.simplepath")
check s.procs.hasKey("rpc.differentparams")
check s.procs.hasKey("rpc.arrayparam")
check s.procs.hasKey("rpc.seqparam")
check s.procs.hasKey("rpc.objparam")
check s.procs.hasKey("rpc.returntypesimple")
check s.procs.hasKey("rpc.returntypecomplex")
check s.procs.hasKey("rpc.testreturns")
check s.hasMethod("rpc.simplepath")
check s.hasMethod("rpc.differentparams")
check s.hasMethod("rpc.arrayparam")
check s.hasMethod("rpc.seqparam")
check s.hasMethod("rpc.objparam")
check s.hasMethod("rpc.returntypesimple")
check s.hasMethod("rpc.returntypecomplex")
check s.hasMethod("rpc.testreturns")
test "Simple paths":
let r = waitFor rpcSimplePath(%[])

View File

@ -1,8 +1,8 @@
import unittest, json
import ../rpcclient, ../rpcserver
import unittest, json, chronicles
import ../rpcclient, ../rpcsockets
var srv = newRpcServer(["localhost:8545"])
var client = newRpcClient()
var srv = newRpcStreamServer(["localhost:8545"])
var client = newRpcStreamClient()
# Create RPC on server
srv.rpc("myProc") do(input: string, data: array[0..3, int]):