Remove DSL, add router and simplify server

This commit is contained in:
coffeepots 2018-07-06 17:47:43 +01:00
parent b968d96923
commit cf44cc552d
13 changed files with 396 additions and 599 deletions

View File

@ -6,8 +6,8 @@ export asyncdispatch2
type
RpcClient*[T, A] = ref object
transp*: T
awaiting: Table[string, Future[Response]]
transport: T
address: A
nextId: int64
@ -15,34 +15,30 @@ type
const defaultMaxRequestLength* = 1024 * 128
proc newRpcClient*[T, A](): RpcClient[T, A] =
proc newRpcClient*[T, A]: RpcClient[T, A] =
## Creates a new ``RpcClient`` instance.
result = RpcClient[T, A](awaiting: initTable[string, Future[Response]](), nextId: 1)
proc genCall(rpcType, callName, writeCode: NimNode): NimNode =
let res = newIdentNode("result")
result = quote do:
proc `callName`*(self: `rpcType`, name: string,
params: JsonNode): Future[Response] {.async.} =
## Remotely calls the specified RPC method.
let id = $self.nextId
self.nextId.inc
var
value {.inject.} =
$ %{"jsonrpc": %"2.0",
"method": %name,
"params": params,
"id": %id} & "\c\l"
template client: untyped = self
let res = await `writeCode`
# TODO: Add actions when not full packet was send, e.g. disconnect peer.
assert(res == len(value))
proc call*(self: RpcClient, name: string,
params: JsonNode): Future[Response] {.async.} =
## Remotely calls the specified RPC method.
let id = $self.nextId
self.nextId.inc
var
value =
$ %{"jsonrpc": %"2.0",
"method": %name,
"params": params,
"id": %id} & "\c\l"
let res = await self.transport.write(value)
# TODO: Add actions when not full packet was send, e.g. disconnect peer.
assert(res == len(value))
# completed by processMessage.
var newFut = newFuture[Response]()
# add to awaiting responses
self.awaiting[id] = newFut
`res` = await newFut
# completed by processMessage.
var newFut = newFuture[Response]()
# add to awaiting responses
self.awaiting[id] = newFut
result = await newFut
template asyncRaise[T](fut: Future[T], errType: typedesc, msg: string) =
fut.fail(newException(errType, msg))
@ -66,12 +62,12 @@ macro checkGet(node: JsonNode, fieldName: string,
of JObject: result.add(quote do: `n`.getObject)
else: discard
proc processMessage[T, A](self: RpcClient[T, A], line: string) =
proc processMessage(self: RpcClient, line: string) =
# Note: this doesn't use any transport code so doesn't need to be differentiated.
let node = parseJson(line) # TODO: Check errors
let
node = parseJson(line) # TODO: Check errors
id = checkGet(node, "id", JString)
# TODO: Use more appropriate exception objects
let id = checkGet(node, "id", JString)
if not self.awaiting.hasKey(id):
raise newException(ValueError,
"Cannot find message id \"" & node["id"].str & "\"")
@ -92,108 +88,26 @@ proc processMessage[T, A](self: RpcClient[T, A], line: string) =
self.awaiting[id].fail(newException(ValueError, $errorNode))
self.awaiting.del(id)
proc genProcessData(rpcType, processDataName, readCode, afterReadCode, closeCode: NimNode): NimNode =
result = quote do:
proc `processDataName`(clientTransport: `rpcType`) {.async.} =
while true:
var maxRequestLength {.inject.} = defaultMaxRequestLength
template client: untyped = clientTransport
var value {.inject.} = await `readCode`
`afterReadCode`
if value == "":
# transmission ends
`closeCode`
break
proc processData(client: RpcClient) {.async.} =
while true:
var value = await client.transport.readLine(defaultMaxRequestLength)
if value == "":
# transmission ends
client.transport.close
break
processMessage(clientTransport, value)
# async loop reconnection and waiting
clientTransport.transp = await connect(clientTransport.address)
proc genConnect(rpcType, connectName, processDataName, connectCode: NimNode): NimNode =
result = quote do:
proc `connectName`*(clientTransport: `rpcType`, address: string, port: Port) {.async.} =
var
address {.inject.} = address
port {.inject.} = port
template client: untyped = clientTransport
`connectCode`
asyncCheck `processDataName`(clientTransport)
macro defineRpcClientTransport*(transType, addrType: untyped, prefix: string = "", body: untyped = nil): untyped =
var
writeCode = quote do:
client.transp.write(value)
readCode = quote do:
client.transp.readLine(defaultMaxRequestLength)
closeCode = quote do:
client.transp.close
connectCode = quote do:
# TODO: `address` hostname can be resolved to many IP addresses, we are using
# first one, but maybe it would be better to iterate over all IP addresses
# and try to establish connection until it will not be established.
let addresses = resolveTAddress(address, port)
client.transp = await connect(addresses[0])
client.address = addresses[0]
afterReadCode = newStmtList()
if body != nil:
body.expectKind nnkStmtList
for item in body:
item.expectKind nnkCall
item[0].expectKind nnkIdent
item[1].expectKind nnkStmtList
let
verb = $item[0]
code = item[1]
case verb.toLowerAscii
of "write":
# `client`, the RpcClient
# `value`, the data to be sent to the server
# Note: Update `value` so it's length can be sent afterwards
writeCode = code
of "read":
# `client`, the RpcClient
# `maxRequestLength`, initially set to defaultMaxRequestLength
readCode = code
of "close":
# `client`, the RpcClient
# `value`, the data returned from the server
# `maxRequestLength`, initially set to defaultMaxRequestLength
closeCode = code
of "connect":
# `client`, the RpcClient
# `address`, server destination address string
# `port`, server destination port
connectCode = code
of "afterread":
# `client`, the RpcClient
# `value`, the data returned from the server
# `maxRequestLength`, initially set to defaultMaxRequestLength
afterReadCode = code
else: error("Unknown RPC verb \"" & verb & "\"")
result = newStmtList()
let
rpcType = quote: RpcClient[`transType`, `addrType`]
processDataName = newIdentNode(prefix.strVal & "processData")
connectName = newIdentNode(prefix.strVal & "connect")
callName = newIdentNode(prefix.strVal & "call")
result.add(genProcessData(rpcType, processDataName, readCode, afterReadCode, closeCode))
result.add(genConnect(rpcType, connectName, processDataName, connectCode))
result.add(genCall(rpcType, callName, writeCode))
when defined(nimDumpRpcs):
echo "defineClient:\n", result.repr
# Define default stream server
# TODO: Move this into a separate unit so users can define 'connect', 'call' etc without requiring a prefix?
defineRpcClientTransport(StreamTransport, TransportAddress)
client.processMessage(value)
# async loop reconnection and waiting
client.transport = await connect(client.address)
type RpcStreamClient* = RpcClient[StreamTransport, TransportAddress]
proc connect*(client: RpcStreamClient, address: string, port: Port) {.async.} =
let addresses = resolveTAddress(address, port)
client.transport = await connect(addresses[0])
client.address = addresses[0]
asyncCheck processData(client)
proc newRpcStreamClient*(): RpcStreamClient =
## Create new server and assign it to addresses ``addresses``.
result = newRpcClient[StreamTransport, TransportAddress]()

138
json_rpc/router.nim Normal file
View File

@ -0,0 +1,138 @@
import json, tables, asyncdispatch2, jsonmarshal, strutils, macros
export asyncdispatch2, json, jsonmarshal
type
# Procedure signature accepted as an RPC call by server
RpcProc* = proc(input: JsonNode): Future[JsonNode]
RpcRouter* = object
procs*: TableRef[string, RpcProc]
const
methodField = "method"
paramsField = "params"
proc newRpcRouter*: RpcRouter =
result.procs = newTable[string, RpcProc]()
proc register*(router: var RpcRouter, path: string, call: RpcProc) =
router.procs.add(path, call)
proc clear*(router: var RpcRouter) = router.procs.clear
proc hasMethod*(router: RpcRouter, methodName: string): bool = router.procs.hasKey(methodName)
template isEmpty(node: JsonNode): bool = node.isNil or node.kind == JNull
proc route*(router: RpcRouter, data: JsonNode): Future[JsonNode] {.async, gcsafe.} =
## Route to RPC, raises exceptions on missing data
let jPath = data{methodField}
if jPath.isEmpty:
raise newException(ValueError, "No " & methodField & " field found")
let jParams = data{paramsField}
if jParams.isEmpty:
raise newException(ValueError, "No " & paramsField & " field found")
let
path = jPath.getStr
rpc = router.procs.getOrDefault(path)
# TODO: not GC-safe as it accesses 'rpc' which is a global using GC'ed memory!
if rpc != nil:
result = await rpc(jParams)
else:
raise newException(ValueError, "Method \"" & path & "\" not found")
proc ifRoute*(router: RpcRouter, data: JsonNode, fut: var Future[JsonNode]): bool =
## Route to RPC, returns false if the method or params cannot be found
# TODO: This is already checked in processMessages, but allows safer use externally
let
jPath = data{methodField}
jParams = data{paramsField}
if jPath.isEmpty or jParams.isEmpty:
return false
let
path = jPath.getStr
rpc = router.procs.getOrDefault(path)
if rpc != nil:
fut = rpc(jParams)
return true
proc makeProcName(s: string): string =
result = ""
for c in s:
if c.isAlphaNumeric: result.add c
proc hasReturnType(params: NimNode): bool =
if params != nil and params.len > 0 and params[0] != nil and
params[0].kind != nnkEmpty:
result = true
macro rpc*(server: RpcRouter, path: string, body: untyped): untyped =
## Define a remote procedure call.
## Input and return parameters are defined using the ``do`` notation.
## For example:
## .. code-block:: nim
## myServer.rpc("path") do(param1: int, param2: float) -> string:
## result = $param1 & " " & $param2
## ```
## Input parameters are automatically marshalled from json to Nim types,
## and output parameters are automatically marshalled to json for transport.
result = newStmtList()
let
parameters = body.findChild(it.kind == nnkFormalParams)
# all remote calls have a single parameter: `params: JsonNode`
paramsIdent = newIdentNode"params"
# procs are generated from the stripped path
pathStr = $path
# strip non alphanumeric
procNameStr = pathStr.makeProcName
# public rpc proc
procName = newIdentNode(procNameStr)
# when parameters present: proc that contains our rpc body
doMain = newIdentNode(procNameStr & "DoMain")
# async result
res = newIdentNode("result")
var
setup = jsonToNim(parameters, paramsIdent)
procBody = if body.kind == nnkStmtList: body else: body.body
errTrappedBody = quote do:
try:
`procBody`
except:
debug "Error occurred within RPC ", path = `path`, errorMessage = getCurrentExceptionMsg()
if parameters.hasReturnType:
let returnType = parameters[0]
# delegate async proc allows return and setting of result as native type
result.add(quote do:
proc `doMain`(`paramsIdent`: JsonNode): Future[`returnType`] {.async.} =
`setup`
`errTrappedBody`
)
if returnType == ident"JsonNode":
# `JsonNode` results don't need conversion
result.add( quote do:
proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} =
`res` = await `doMain`(`paramsIdent`)
)
else:
result.add(quote do:
proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} =
`res` = %await `doMain`(`paramsIdent`)
)
else:
# no return types, inline contents
result.add(quote do:
proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} =
`setup`
`errTrappedBody`
)
result.add( quote do:
`server`.register(`path`, `procName`)
)
when defined(nimDumpRpcs):
echo "\n", pathStr, ": ", result.repr

View File

@ -1,37 +1,20 @@
import json, tables, strutils, options, macros, chronicles
import asyncdispatch2
import json, tables, options, macros, chronicles
import asyncdispatch2, router
import jsonmarshal
export asyncdispatch2, json, jsonmarshal, options
export asyncdispatch2, json, jsonmarshal, router
logScope:
topics = "RpcServer"
type
RpcJsonError* = enum rjeInvalidJson, rjeVersionError, rjeNoMethod, rjeNoId
RpcJsonError* = enum rjeInvalidJson, rjeVersionError, rjeNoMethod, rjeNoId, rjeNoParams
RpcJsonErrorContainer* = tuple[err: RpcJsonError, msg: string]
# Procedure signature accepted as an RPC call by server
RpcProc* = proc (params: JsonNode): Future[JsonNode]
RpcClientTransport* = concept t
t.write(var string) is Future[int]
t.readLine(int) is Future[string]
t.close
t.remoteAddress() # Required for logging
t.localAddress()
RpcServerTransport* = concept t
t.start
t.stop
t.close
RpcProcessClient* = proc (server: RpcServerTransport, client: RpcClientTransport): Future[void] {.gcsafe.}
RpcServer*[S: RpcServerTransport] = ref object
RpcServer*[S] = ref object
servers*: seq[S]
procs*: TableRef[string, RpcProc]
router*: RpcRouter
RpcProcError* = ref object of Exception
code*: int
@ -49,23 +32,28 @@ const
SERVER_ERROR* = -32000
defaultMaxRequestLength* = 1024 * 128
jsonErrorMessages*: array[RpcJsonError, (int, string)] =
[
(JSON_PARSE_ERROR, "Invalid JSON"),
(INVALID_REQUEST, "JSON 2.0 required"),
(INVALID_REQUEST, "No method requested"),
(INVALID_REQUEST, "No id specified")
(INVALID_REQUEST, "No id specified"),
(INVALID_PARAMS, "No parameters specified")
]
proc newRpcServer*[S](): RpcServer[S] =
new result
result.procs = newTable[string, RpcProc]()
result.router = newRpcRouter()
result.servers = @[]
# Utility functions
# TODO: Move outside server
func `%`*(p: Port): JsonNode = %(p.int)
# TODO: Move outside server?
#func `%`*(p: Port): JsonNode = %(p.int)
template rpc*(server: RpcServer, path: string, body: untyped): untyped =
server.router.rpc(path, body)
template hasMethod*(server: RpcServer, methodName: string): bool = server.router.hasMethod(methodName)
# Json state checking
@ -80,7 +68,7 @@ template jsonValid*(jsonString: string, node: var JsonNode): (bool, string) =
debug "Cannot process json", json = jsonString, msg = msg
(valid, msg)
proc checkJsonErrors*(line: string,
proc checkJsonState*(line: string,
node: var JsonNode): Option[RpcJsonErrorContainer] =
## Tries parsing line into node, if successful checks required fields
## Returns: error state or none
@ -89,10 +77,13 @@ proc checkJsonErrors*(line: string,
return some((rjeInvalidJson, res[1]))
if not node.hasKey("id"):
return some((rjeNoId, ""))
if node{"jsonrpc"} != %"2.0":
let jVer = node{"jsonrpc"}
if jVer != nil and jVer.kind != JNull and jVer != %"2.0":
return some((rjeVersionError, ""))
if not node.hasKey("method"):
return some((rjeNoMethod, ""))
if not node.hasKey("params"):
return some((rjeNoParams, ""))
return none(RpcJsonErrorContainer)
# Json reply wrappers
@ -101,170 +92,47 @@ proc wrapReply*(id: JsonNode, value: JsonNode, error: JsonNode): string =
let node = %{"jsonrpc": %"2.0", "result": value, "error": error, "id": id}
return $node & "\c\l"
proc genErrorSending(name, writeCode, errorCode: NimNode): NimNode =
let
res = newIdentNode("result")
sendJsonErr = newIdentNode($name & "Json")
result = quote do:
proc `name`*[T: RpcClientTransport](clientTrans: T, code: int, msg: string, id: JsonNode,
data: JsonNode = newJNull()) {.async.} =
## Send error message to client
let error = %{"code": %(code), "id": id, "message": %msg, "data": data}
debug "Error generated", error = error, id = id
template transport: untyped = clientTrans
var value {.inject.} = wrapReply(id, newJNull(), error)
`errorCode`
`res` = `writeCode`
proc `sendJsonErr`*(state: RpcJsonError, clientTrans: RpcClientTransport, id: JsonNode,
data = newJNull()) {.async.} =
## Send client response for invalid json state
let errMsgs = jsonErrorMessages[state]
await clientTrans.`name`(errMsgs[0], errMsgs[1], id, data)
proc wrapError*(code: int, msg: string, id: JsonNode,
data: JsonNode = newJNull()): JsonNode =
# Create standardised error json
result = %{"code": %(code), "id": id, "message": %msg, "data": data}
debug "Error generated", error = result, id = id
# Server message processing
proc genProcessMessages(name, sendErrorName, writeCode: NimNode): NimNode =
let idSendErrJson = newIdentNode($sendErrorName & "Json")
result = quote do:
proc `name`[T: RpcClientTransport](server: RpcServer, clientTrans: T,
line: string) {.async.} =
var
node: JsonNode
# set up node and/or flag errors
jsonErrorState = checkJsonErrors(line, node)
if jsonErrorState.isSome:
let errState = jsonErrorState.get
var id =
if errState.err == rjeInvalidJson or errState.err == rjeNoId:
newJNull()
else:
node["id"]
await errState.err.`idSendErrJson`(clientTrans, id, %errState.msg)
else:
let
methodName = node["method"].str
id = node["id"]
if not server.procs.hasKey(methodName):
await clientTrans.`sendErrorName`(METHOD_NOT_FOUND, "Method not found", %id,
%(methodName & " is not a registered method."))
else:
let callRes = await server.procs[methodName](node["params"])
template transport: untyped = clientTrans
var value {.inject.} = wrapReply(id, callRes, newJNull())
asyncCheck `writeCode`
proc genProcessClient(nameIdent, procMessagesIdent, sendErrIdent, readCode, afterReadCode, closeCode: NimNode): NimNode =
# This generates the processClient proc to match transport.
# processClient is compatible with createStreamServer and thus StreamCallback.
# However the constraints are conceptualised so you only need to match it's interface
# Note: https://github.com/nim-lang/Nim/issues/644
result = quote do:
proc `nameIdent`[S: RpcServerTransport, C: RpcClientTransport](server: S, clientTrans: C) {.async, gcsafe.} =
var rpc = getUserData[RpcServer[S]](server)
while true:
var maxRequestLength {.inject.} = defaultMaxRequestLength
template transport: untyped = clientTrans
var value {.inject.} = await `readCode`
`afterReadCode`
if value == "":
`closeCode`
break
debug "Processing message", address = clientTrans.remoteAddress(), line = value
let future = `procMessagesIdent`(rpc, clientTrans, value)
yield future
if future.failed:
if future.readError of RpcProcError:
let err = future.readError.RpcProcError
await clientTrans.`sendErrIdent`(err.code, err.msg, err.data)
elif future.readError of ValueError:
let err = future.readError[].ValueError
await clientTrans.`sendErrIdent`(INVALID_PARAMS, err.msg, %"")
else:
await clientTrans.`sendErrIdent`(SERVER_ERROR,
"Error: Unknown error occurred", %"")
macro defineRpcServerTransport*(procClientName: untyped, body: untyped = nil): untyped =
## Build an rpcServer type that inlines data access operations
#[
Injects:
client: RpcClientTransport type
maxRequestLength: optional bytes to read
value: Json string to be written to transport
Example:
defineRpcTransport(myServer):
write:
client.write(value)
read:
client.readLine(maxRequestLength)
close:
client.close
]#
procClientName.expectKind nnkIdent
proc processMessages*[T](server: RpcServer[T], line: string): Future[string] {.async, gcsafe.} =
var
writeCode = quote do:
transport.write(value)
readCode = quote do:
transport.readLine(defaultMaxRequestLength)
closeCode = quote do:
transport.close
afterReadCode = newStmtList()
errorCode = newStmtList()
node: JsonNode
# parse json node and/or flag missing fields and errors
jsonErrorState = checkJsonState(line, node)
if body != nil:
body.expectKind nnkStmtList
for item in body:
item.expectKind nnkCall
item[0].expectKind nnkIdent
item[1].expectKind nnkStmtList
if jsonErrorState.isSome:
let errState = jsonErrorState.get
var id =
if errState.err == rjeInvalidJson or errState.err == rjeNoId:
newJNull()
else:
node["id"]
let errMsg = jsonErrorMessages[errState.err]
# return error state as json
result = $wrapError(
code = errMsg[0],
msg = errMsg[1],
id = id)
else:
let
methodName = node["method"].str
id = node["id"]
var callRes: Future[JsonNode]
if server.router.ifRoute(node, callRes):
let res = await callRes
result = $wrapReply(id, res, newJNull())
else:
let
verb = $item[0]
code = item[1]
case verb.toLowerAscii
of "write":
# `transport`, the client transport
# `value`, the data returned from the invoked RPC
# Note: Update `value` so it's length can be sent afterwards
writeCode = code
of "read":
# `transport`, the client transport
# `maxRequestLength`, set to defaultMaxRequestLength
# Note: Result of expression is awaited
readCode = code
of "close":
# `transport`, the client transport
# `value`, which contains the data read by `readCode`
closeCode = code
of "afterread":
# `transport`, the client transport
# `value`, which contains the data read by `readCode`
afterReadCode = code
of "error":
# `transport`, the client transport
# `value`, the data returned from the invoked RPC
# Note: Update `value` so it's length can be sent afterwards
errorCode = code
else: error("Unknown RPC verb \"" & verb & "\"")
result = newStmtList()
let
sendErr = newIdentNode($procClientName & "SendError")
procMsgs = newIdentNode($procClientName & "ProcessMessages")
result.add(genErrorSending(sendErr, writeCode, errorCode))
result.add(genProcessMessages(procMsgs, sendErr, writeCode))
result.add(genProcessClient(procClientName, procMsgs, sendErr, readCode, afterReadCode, closeCode))
when defined(nimDumpRpcs):
echo "defineServer:\n", result.repr
methodNotFound = %(methodName & " is not a registered method.")
error = wrapError(METHOD_NOT_FOUND, "Method not found", id, methodNotFound)
result = $wrapReply(id, newJNull(), error)
proc start*(server: RpcServer) =
## Start the RPC server.
@ -281,201 +149,14 @@ proc close*(server: RpcServer) =
for item in server.servers:
item.close()
# Server registration and RPC generation
# Server registration
proc register*(server: RpcServer, name: string, rpc: RpcProc) =
## Add a name/code pair to the RPC server.
server.procs[name] = rpc
server.router.addRoute(name, rpc)
proc unRegisterAll*(server: RpcServer) =
# Remove all remote procedure calls from this server.
server.procs.clear
proc makeProcName(s: string): string =
result = ""
for c in s:
if c.isAlphaNumeric: result.add c
proc hasReturnType(params: NimNode): bool =
if params != nil and params.len > 0 and params[0] != nil and
params[0].kind != nnkEmpty:
result = true
macro rpc*(server: RpcServer, path: string, body: untyped): untyped =
## Define a remote procedure call.
## Input and return parameters are defined using the ``do`` notation.
## For example:
## .. code-block:: nim
## myServer.rpc("path") do(param1: int, param2: float) -> string:
## result = $param1 & " " & $param2
## ```
## Input parameters are automatically marshalled from json to Nim types,
## and output parameters are automatically marshalled to json for transport.
result = newStmtList()
let
parameters = body.findChild(it.kind == nnkFormalParams)
# all remote calls have a single parameter: `params: JsonNode`
paramsIdent = newIdentNode"params"
# procs are generated from the stripped path
pathStr = $path
# strip non alphanumeric
procNameStr = pathStr.makeProcName
# public rpc proc
procName = newIdentNode(procNameStr)
# when parameters present: proc that contains our rpc body
doMain = newIdentNode(procNameStr & "DoMain")
# async result
res = newIdentNode("result")
var
setup = jsonToNim(parameters, paramsIdent)
procBody = if body.kind == nnkStmtList: body else: body.body
errTrappedBody = quote do:
try:
`procBody`
except:
debug "Error occurred within RPC ", path = `path`, errorMessage = getCurrentExceptionMsg()
if parameters.hasReturnType:
let returnType = parameters[0]
# delegate async proc allows return and setting of result as native type
result.add(quote do:
proc `doMain`(`paramsIdent`: JsonNode): Future[`returnType`] {.async.} =
`setup`
`errTrappedBody`
)
if returnType == ident"JsonNode":
# `JsonNode` results don't need conversion
result.add( quote do:
proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} =
`res` = await `doMain`(`paramsIdent`)
)
else:
result.add(quote do:
proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} =
`res` = %await `doMain`(`paramsIdent`)
)
else:
# no return types, inline contents
result.add(quote do:
proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} =
`setup`
`errTrappedBody`
)
result.add( quote do:
`server`.register(`path`, `procName`)
)
when defined(nimDumpRpcs):
echo "\n", pathStr, ": ", result.repr
# Utility functions for setting up servers using stream transport addresses
# Create a default transport that's suitable for createStreamServer
defineRpcServerTransport(processStreamClient)
proc addStreamServer*[S](server: RpcServer[S], address: TransportAddress, callBack: StreamCallback = processStreamClient) =
#makeProcessClient(processClient, StreamTransport)
try:
info "Creating server on ", address = $address
var transportServer = createStreamServer(address, callBack, {ReuseAddr}, udata = server)
server.servers.add(transportServer)
except:
error "Failed to create server", address = $address, message = getCurrentExceptionMsg()
if len(server.servers) == 0:
# Server was not bound, critical error.
raise newException(RpcBindError, "Unable to create server!")
proc addStreamServers*[T: RpcServer](server: T, addresses: openarray[TransportAddress], callBack: StreamCallback = processStreamClient) =
for item in addresses:
server.addStreamServer(item, callBack)
proc addStreamServer*[T: RpcServer](server: T, address: string, callBack: StreamCallback = processStreamClient) =
## Create new server and assign it to addresses ``addresses``.
var
tas4: seq[TransportAddress]
tas6: seq[TransportAddress]
added = 0
# Attempt to resolve `address` for IPv4 address space.
try:
tas4 = resolveTAddress(address, IpAddressFamily.IPv4)
except:
discard
# Attempt to resolve `address` for IPv6 address space.
try:
tas6 = resolveTAddress(address, IpAddressFamily.IPv6)
except:
discard
for r in tas4:
server.addStreamServer(r, callBack)
added.inc
for r in tas6:
server.addStreamServer(r, callBack)
added.inc
if added == 0:
# Addresses could not be resolved, critical error.
raise newException(RpcAddressUnresolvableError, "Unable to get address!")
proc addStreamServers*[T: RpcServer](server: T, addresses: openarray[string], callBack: StreamCallback = processStreamClient) =
for address in addresses:
server.addStreamServer(address, callBack)
proc addStreamServer*[T: RpcServer](server: T, address: string, port: Port, callBack: StreamCallback = processStreamClient) =
var
tas4: seq[TransportAddress]
tas6: seq[TransportAddress]
added = 0
# Attempt to resolve `address` for IPv4 address space.
try:
tas4 = resolveTAddress(address, port, IpAddressFamily.IPv4)
except:
discard
# Attempt to resolve `address` for IPv6 address space.
try:
tas6 = resolveTAddress(address, port, IpAddressFamily.IPv6)
except:
discard
if len(tas4) == 0 and len(tas6) == 0:
# Address was not resolved, critical error.
raise newException(RpcAddressUnresolvableError,
"Address " & address & " could not be resolved!")
for r in tas4:
server.addStreamServer(r, callBack)
added.inc
for r in tas6:
server.addStreamServer(r, callBack)
added.inc
if len(server.servers) == 0:
# Server was not bound, critical error.
raise newException(RpcBindError,
"Could not setup server on " & address & ":" & $int(port))
type RpcStreamServer* = RpcServer[StreamServer]
proc newRpcStreamServer*(addresses: openarray[TransportAddress]): RpcStreamServer =
## Create new server and assign it to addresses ``addresses``.
result = newRpcServer[StreamServer]()
result.addStreamServers(addresses)
proc newRpcStreamServer*(addresses: openarray[string]): RpcStreamServer =
## Create new server and assign it to addresses ``addresses``.
result = newRpcServer[StreamServer]()
result.addStreamServers(addresses)
proc newRpcStreamServer*(address = "localhost", port: Port = Port(8545)): RpcStreamServer =
# Create server on specified port
result = newRpcServer[StreamServer]()
result.addStreamServer(address, port)
server.router.clear
# TODO: Allow cross checking between client signatures and server calls

View File

@ -0,0 +1,142 @@
import server, json, chronicles
proc sendError*[T](transport: T, code: int, msg: string, id: JsonNode,
data: JsonNode = newJNull()) {.async.} =
## Send error message to client
let error = wrapError(code, msg, id, data)
var value = wrapReply(id, newJNull(), error)
result = transport.write(value)
proc processClient(server: StreamServer, transport: StreamTransport) {.async, gcsafe.} =
## Process transport data to the RPC server
var rpc = getUserData[RpcServer[StreamTransport]](server)
while true:
var
maxRequestLength = defaultMaxRequestLength
value = await transport.readLine(defaultMaxRequestLength)
if value == "":
transport.close
break
debug "Processing message", address = transport.remoteAddress(), line = value
let future = processMessages(rpc, value)
yield future
if future.failed:
if future.readError of RpcProcError:
let err = future.readError.RpcProcError
await transport.sendError(err.code, err.msg, err.data)
elif future.readError of ValueError:
let err = future.readError[].ValueError
await transport.sendError(INVALID_PARAMS, err.msg, %"")
else:
await transport.sendError(SERVER_ERROR,
"Error: Unknown error occurred", %"")
else:
let res = await future
result = transport.write(res)
# Utility functions for setting up servers using stream transport addresses
proc addStreamServer*(server: RpcServer[StreamServer], address: TransportAddress, callBack: StreamCallback) =
try:
info "Creating server on ", address = $address
var transportServer = createStreamServer(address, callBack, {ReuseAddr}, udata = server)
server.servers.add(transportServer)
except:
error "Failed to create server", address = $address, message = getCurrentExceptionMsg()
if len(server.servers) == 0:
# Server was not bound, critical error.
raise newException(RpcBindError, "Unable to create server!")
proc addStreamServers*(server: RpcServer[StreamServer], addresses: openarray[TransportAddress], callBack: StreamCallback) =
for item in addresses:
server.addStreamServer(item, callBack)
proc addStreamServer*(server: RpcServer[StreamServer], address: string, callBack: StreamCallback) =
## Create new server and assign it to addresses ``addresses``.
var
tas4: seq[TransportAddress]
tas6: seq[TransportAddress]
added = 0
# Attempt to resolve `address` for IPv4 address space.
try:
tas4 = resolveTAddress(address, IpAddressFamily.IPv4)
except:
discard
# Attempt to resolve `address` for IPv6 address space.
try:
tas6 = resolveTAddress(address, IpAddressFamily.IPv6)
except:
discard
for r in tas4:
server.addStreamServer(r, callBack)
added.inc
for r in tas6:
server.addStreamServer(r, callBack)
added.inc
if added == 0:
# Addresses could not be resolved, critical error.
raise newException(RpcAddressUnresolvableError, "Unable to get address!")
proc addStreamServers*(server: RpcServer[StreamServer], addresses: openarray[string], callBack: StreamCallback) =
for address in addresses:
server.addStreamServer(address, callBack)
proc addStreamServer*(server: RpcServer[StreamServer], address: string, port: Port, callBack: StreamCallback) =
var
tas4: seq[TransportAddress]
tas6: seq[TransportAddress]
added = 0
# Attempt to resolve `address` for IPv4 address space.
try:
tas4 = resolveTAddress(address, port, IpAddressFamily.IPv4)
except:
discard
# Attempt to resolve `address` for IPv6 address space.
try:
tas6 = resolveTAddress(address, port, IpAddressFamily.IPv6)
except:
discard
if len(tas4) == 0 and len(tas6) == 0:
# Address was not resolved, critical error.
raise newException(RpcAddressUnresolvableError,
"Address " & address & " could not be resolved!")
for r in tas4:
server.addStreamServer(r, callBack)
added.inc
for r in tas6:
server.addStreamServer(r, callBack)
added.inc
if len(server.servers) == 0:
# Server was not bound, critical error.
raise newException(RpcBindError,
"Could not setup server on " & address & ":" & $int(port))
type RpcStreamServer* = RpcServer[StreamServer]
proc newRpcStreamServer*(addresses: openarray[TransportAddress]): RpcStreamServer =
## Create new server and assign it to addresses ``addresses``.
result = newRpcServer[StreamServer]()
result.addStreamServers(addresses, processClient)
proc newRpcStreamServer*(addresses: openarray[string]): RpcStreamServer =
## Create new server and assign it to addresses ``addresses``.
result = newRpcServer[StreamServer]()
result.addStreamServers(addresses, processClient)
proc newRpcStreamServer*(address = "localhost", port: Port = Port(8545)): RpcStreamServer =
# Create server on specified port
result = newRpcServer[StreamServer]()
result.addStreamServer(address, port, processClient)

View File

@ -1,60 +0,0 @@
import rpcserver, rpcclient, tables, chronicles, strformat, strutils
export rpcserver, rpcclient
proc extractJsonStr(msgSource: string, value: string): string =
result = ""
let p1 = find(value, '{')
if p1 > -1:
let p2 = rFind(value, '}')
if p2 == -1:
info "Cannot find json end brace", source = msgSource, msg = value
else:
result = value[p1 .. p2]
debug "Extracted json", source = msgSource, json = result
else:
info "Cannot find json start brace", source = msgSource, msg = value
type
RpcHttpServer* = RpcServer[StreamServer]
defineRpcServerTransport(httpProcessClient):
write:
const contentType = "Content-Type: application/json-rpc"
value = &"Host: {$transport.localAddress} {contentType} Content-Length: {$value.len} {value}"
debug "HTTP server: write", msg = value
transport.write(value)
afterRead:
debug "HTTP server: read", msg = value
value = "HTTP Server".extractJsonStr(value)
proc newRpcHttpServer*(addresses: openarray[TransportAddress]): RpcHttpServer =
## Create new server and assign it to addresses ``addresses``.
result = newRpcServer[StreamServer]()
result.addStreamServers(addresses, httpProcessClient)
proc newRpcHttpServer*(addresses: openarray[string]): RpcHttpServer =
## Create new server and assign it to addresses ``addresses``.
result = newRpcServer[StreamServer]()
result.addStreamServers(addresses, httpProcessClient)
proc newRpcHttpServer*(address = "localhost", port: Port = Port(8545)): RpcHttpServer =
result = newRpcServer[StreamServer]()
result.addStreamServer(address, port, httpProcessClient)
type RpcHttpClient* = RpcClient[StreamTransport, TransportAddress]
defineRpcClientTransport(StreamTransport, TransportAddress, "http"):
write:
const contentType = "Content-Type: application/json-rpc"
value = &"Host: {$client.transp.localAddress} {contentType} Content-Length: {$value.len} {value}"
debug "HTTP client: write", msg = value
client.transp.write(value)
afterRead:
# Strip out http header
# TODO: Performance
debug "HTTP client: read", msg = value
value = "HTTP Client".extractJsonStr(value)
proc newRpcHttpClient*(): RpcHttpClient =
result = newRpcClient[StreamTransport, TransportAddress]()

2
rpcsockets.nim Normal file
View File

@ -0,0 +1,2 @@
import json_rpc / [server, sockettransport]
export server, sockettransport

View File

@ -1,3 +1,3 @@
import
testrpcmacro, testserverclient, testethcalls, testhttp #, testerrors
testrpcmacro, testserverclient, testethcalls #, testerrors

View File

@ -9,7 +9,7 @@ proc rawCall*(self: RpcClient, name: string,
self.nextId.inc
var s = msg & "\c\l"
let res = await self.transp.write(s)
let res = await self.transport.write(s)
assert res == len(s)
# completed by processMessage.

View File

@ -3,7 +3,7 @@
allow unchecked and unformatted calls.
]#
import unittest, debugclient, ../rpcserver
import unittest, debugclient, ../rpcsockets
import strformat, chronicles
var server = newRpcStreamServer("localhost", 8547.Port)

View File

@ -1,5 +1,5 @@
import unittest, json, tables
import ../rpcclient, ../rpcserver
import ../rpcclient, ../rpcsockets
import stint, ethtypes, ethprocs, stintjson, nimcrypto, ethhexstrings, chronicles
from os import getCurrentDir, DirSep

View File

@ -1,20 +0,0 @@
import unittest, json, chronicles, unittest
import ../rpchttpservers
var srv = newRpcHttpServer(["localhost:8545"])
var client = newRpcHttpClient()
# Create RPC on server
srv.rpc("myProc") do(input: string, data: array[0..3, int]):
result = %("Hello " & input & " data: " & $data)
srv.start()
waitFor client.httpConnect("localhost", Port(8545))
suite "HTTP RPC transport":
test "Call":
var r = waitFor client.httpcall("myProc", %[%"abc", %[1, 2, 3, 4]])
check r.error == false and r.result == %"Hello abc data: [1, 2, 3, 4]"
srv.stop()
srv.close()

View File

@ -1,5 +1,5 @@
import unittest, json, tables, chronicles
import ../rpcserver
import ../rpcsockets
type
# some nested types to check object parsing
@ -67,14 +67,14 @@ s.rpc("rpc.testreturns") do() -> int:
suite "Server types":
test "On macro registration":
check s.procs.hasKey("rpc.simplepath")
check s.procs.hasKey("rpc.differentparams")
check s.procs.hasKey("rpc.arrayparam")
check s.procs.hasKey("rpc.seqparam")
check s.procs.hasKey("rpc.objparam")
check s.procs.hasKey("rpc.returntypesimple")
check s.procs.hasKey("rpc.returntypecomplex")
check s.procs.hasKey("rpc.testreturns")
check s.hasMethod("rpc.simplepath")
check s.hasMethod("rpc.differentparams")
check s.hasMethod("rpc.arrayparam")
check s.hasMethod("rpc.seqparam")
check s.hasMethod("rpc.objparam")
check s.hasMethod("rpc.returntypesimple")
check s.hasMethod("rpc.returntypecomplex")
check s.hasMethod("rpc.testreturns")
test "Simple paths":
let r = waitFor rpcSimplePath(%[])

View File

@ -1,5 +1,5 @@
import unittest, json, chronicles
import ../rpcclient, ../rpcserver
import ../rpcclient, ../rpcsockets
var srv = newRpcStreamServer(["localhost:8545"])
var client = newRpcStreamClient()