Server now allows defining read, write and close code directly

This commit is contained in:
coffeepots 2018-06-21 18:15:21 +01:00
parent 6b619472f3
commit b245a23745

View File

@ -2,7 +2,7 @@ import json, tables, strutils, options, macros, chronicles
import asyncdispatch2
import jsonmarshal
export asyncdispatch2, json, jsonmarshal
export asyncdispatch2, json, jsonmarshal, options
logScope:
topics = "RpcServer"
@ -48,7 +48,7 @@ const
INTERNAL_ERROR* = -32603
SERVER_ERROR* = -32000
maxRequestLength = 1024 * 128
defaultMaxRequestLength = 1024 * 128
jsonErrorMessages*: array[RpcJsonError, (int, string)] =
[
@ -58,8 +58,8 @@ const
(INVALID_REQUEST, "No id specified")
]
proc newRpcServer*[T]: RpcServer[T] =
result = RpcServer[T]()
proc newRpcServer*[S](): RpcServer[S] =
new result
result.procs = newTable[string, RpcProc]()
result.servers = @[]
@ -101,22 +101,34 @@ proc wrapReply*(id: JsonNode, value: JsonNode, error: JsonNode): string =
let node = %{"jsonrpc": %"2.0", "result": value, "error": error, "id": id}
return $node & "\c\l"
proc sendError*(client: RpcClientTransport, code: int, msg: string, id: JsonNode,
proc addErrorSending(name, writeCode: NimNode): NimNode =
let
res = newIdentNode("result")
sendJsonErr = newIdentNode($name & "Json")
result = quote do:
proc `name`*[T: RpcClientTransport](clientTrans: T, code: int, msg: string, id: JsonNode,
data: JsonNode = newJNull()) {.async.} =
## Send error message to client
let error = %{"code": %(code), "id": id, "message": %msg, "data": data}
debug "Error generated", error = error, id = id
var res = wrapReply(id, newJNull(), error)
result = client.write(res)
var
value {.inject.} = wrapReply(id, newJNull(), error)
client {.inject.}: T
shallowCopy(client, clientTrans)
`res` = `writeCode`
proc sendJsonError*(state: RpcJsonError, client: RpcClientTransport, id: JsonNode,
proc `sendJsonErr`*(state: RpcJsonError, clientTrans: RpcClientTransport, id: JsonNode,
data = newJNull()) {.async.} =
## Send client response for invalid json state
let errMsgs = jsonErrorMessages[state]
await client.sendError(errMsgs[0], errMsgs[1], id, data)
await clientTrans.`name`(errMsgs[0], errMsgs[1], id, data)
# Server message processing
proc processMessage[T](server: RpcServer[T], client: RpcClientTransport,
proc genProcessMessages(name, sendErrorName, writeCode: NimNode): NimNode =
let idSendErrJson = newIdentNode($sendErrorName & "Json")
result = quote do:
proc `name`[T: RpcClientTransport](server: RpcServer, clientTrans: T,
line: string) {.async.} =
var
node: JsonNode
@ -130,42 +142,125 @@ proc processMessage[T](server: RpcServer[T], client: RpcClientTransport,
newJNull()
else:
node["id"]
await errState.err.sendJsonError(client, id, %errState.msg)
await errState.err.`idSendErrJson`(clientTrans, id, %errState.msg)
else:
let
methodName = node["method"].str
id = node["id"]
if not server.procs.hasKey(methodName):
await client.sendError(METHOD_NOT_FOUND, "Method not found", %id,
await clientTrans.`sendErrorName`(METHOD_NOT_FOUND, "Method not found", %id,
%(methodName & " is not a registered method."))
else:
let callRes = await server.procs[methodName](node["params"])
var res = wrapReply(id, callRes, newJNull())
discard await client.write(res)
var
value {.inject.} = wrapReply(id, callRes, newJNull())
client {.inject.}: T
shallowCopy(client, clientTrans)
asyncCheck `writeCode`
proc processClient*[S: RpcServerTransport, C: RpcClientTransport](server: S, client: C) {.async, gcsafe.} =
proc genProcessClient(nameIdent, procMessagesIdent, sendErrIdent, readCode, closeCode: NimNode): NimNode =
# This generates the processClient proc to match transport.
# processClient is compatible with createStreamServer and thus StreamCallback.
# However the constraints are conceptualised so you only need to match it's interface
# Note: https://github.com/nim-lang/Nim/issues/644
result = quote do:
proc `nameIdent`[S: RpcServerTransport, C: RpcClientTransport](server: S, clientTrans: C) {.async, gcsafe.} =
var rpc = getUserData[RpcServer[S]](server)
while true:
let line = await client.readLine(maxRequestLength)
var
client {.inject}: C
maxRequestLength {.inject.} = defaultMaxRequestLength
shallowCopy(client, clientTrans)
let line = await `readCode`
if line == "":
client.close()
`closeCode`
break
debug "Processing client", address = client.remoteAddress(), line
debug "Processing message", address = clientTrans.remoteAddress(), line = line
let future = processMessage(rpc, client, line)
let future = `procMessagesIdent`(rpc, clientTrans, line)
yield future
if future.failed:
if future.readError of RpcProcError:
let err = future.readError.RpcProcError
await client.sendError(err.code, err.msg, err.data)
await clientTrans.`sendErrIdent`(err.code, err.msg, err.data)
elif future.readError of ValueError:
let err = future.readError[].ValueError
await client.sendError(INVALID_PARAMS, err.msg, %"")
await clientTrans.`sendErrIdent`(INVALID_PARAMS, err.msg, %"")
else:
await client.sendError(SERVER_ERROR,
await clientTrans.`sendErrIdent`(SERVER_ERROR,
"Error: Unknown error occurred", %"")
echo "$$", result.repr
#[
New API:
For custom RpcServers that do their own work upon getting/sending data.
newServer = defineRpcServer[StreamTransport, StreamServer]:
write:
mySpecialWriter(server, client, line)
# Note: Anything not defined here will use the default code to operate
Code is directly inserted into processMessages. You can still define your
own transports, but this lets you define operations for existing transports
without needing to rework them.
]#
import random
macro defineRpcTransport*(procClientName: untyped, body: untyped = nil): untyped =
## Build an rpcServer type that inlines data access operations
#[
Injects:
line: to be populated by the transport
Example:
defineRpcTransport(myServer):
write:
write("http://" & value)
read:
readLine
]#
procClientName.expectKind nnkIdent
var
writeCode = quote do:
client.write(value)
readCode = quote do:
client.readLine(defaultMaxRequestLength)
closeCode = quote do:
client.close
if body != nil:
body.expectKind nnkStmtList
for item in body:
item.expectKind nnkCall
item[0].expectKind nnkIdent
item[1].expectKind nnkStmtList
let
verb = $item[0]
code = item[1]
case verb.toLowerAscii
of "write":
writeCode = item[1]
of "read":
readCode = item[1]
of "close":
closeCode = item[1]
else: error("Unknown verb \"" & verb & "\"")
result = newStmtList()
let
sendErr = newIdentNode($procClientName & "sendError")
procMsgs = newIdentNode($procClientName & "processMessages")
result.add(addErrorSending(sendErr, writeCode))
result.add(genProcessMessages(procMsgs, sendErr, writeCode))
result.add(genProcessClient(procClientName, procMsgs, sendErr, readCode, closeCode))
echo "defineRpc:\n", result.repr
proc start*(server: RpcServer) =
## Start the RPC server.
@ -270,12 +365,16 @@ macro rpc*(server: RpcServer, path: string, body: untyped): untyped =
when defined(nimDumpRpcs):
echo "\n", pathStr, ": ", result.repr
# Utility functions for setting up servers using transport addresses
# Utility functions for setting up servers using stream transport addresses
proc addStreamServer*[T: RpcServer](server: T, address: TransportAddress, streamCallback: StreamCallback) =
# Create a default transport that's suitable for createStreamServer
defineRpcTransport(processStreamClient)
proc addStreamServer*[S](server: RpcServer[S], address: TransportAddress, callBack: StreamCallback = processStreamClient) =
#makeProcessClient(processClient, StreamTransport)
try:
info "Creating server on ", address = $address
var transportServer = createStreamServer(address, streamCallback, {ReuseAddr}, udata = server)
var transportServer = createStreamServer(address, callBack, {ReuseAddr}, udata = server)
server.servers.add(transportServer)
except:
error "Failed to create server", address = $address, message = getCurrentExceptionMsg()
@ -284,11 +383,11 @@ proc addStreamServer*[T: RpcServer](server: T, address: TransportAddress, stream
# Server was not bound, critical error.
raise newException(RpcBindError, "Unable to create server!")
proc addStreamServers*[T: RpcServer](server: T, addresses: openarray[TransportAddress], streamCallback: StreamCallback) =
proc addStreamServers*[T: RpcServer](server: T, addresses: openarray[TransportAddress], callBack: StreamCallback = processStreamClient) =
for item in addresses:
server.addStreamServer(item, streamCallback)
server.addStreamServer(item, callBack)
proc addStreamServer*[T: RpcServer](server: T, address: string, streamCallback: StreamCallback) =
proc addStreamServer*[T: RpcServer](server: T, address: string, callBack: StreamCallback = processStreamClient) =
## Create new server and assign it to addresses ``addresses``.
var
tas4: seq[TransportAddress]
@ -308,21 +407,21 @@ proc addStreamServer*[T: RpcServer](server: T, address: string, streamCallback:
discard
for r in tas4:
server.addStreamServer(r, streamCallback)
server.addStreamServer(r, callBack)
added.inc
for r in tas6:
server.addStreamServer(r, streamCallback)
server.addStreamServer(r, callBack)
added.inc
if added == 0:
# Addresses could not be resolved, critical error.
raise newException(RpcAddressUnresolvableError, "Unable to get address!")
proc addStreamServers*[T: RpcServer](server: T, addresses: openarray[string], streamCallback: StreamCallback) =
proc addStreamServers*[T: RpcServer](server: T, addresses: openarray[string], callBack: StreamCallback = processStreamClient) =
for address in addresses:
server.addStreamServer(address, streamCallback)
server.addStreamServer(address, callBack)
proc addStreamServer*[T: RpcServer](server: T, address: string, port: Port, streamCallback: StreamCallback) =
proc addStreamServer*[T: RpcServer](server: T, address: string, port: Port, callBack: StreamCallback = processStreamClient) =
var
tas4: seq[TransportAddress]
tas6: seq[TransportAddress]
@ -346,10 +445,10 @@ proc addStreamServer*[T: RpcServer](server: T, address: string, port: Port, stre
"Address " & address & " could not be resolved!")
for r in tas4:
server.addStreamServer(r, streamCallback)
server.addStreamServer(r, callBack)
added.inc
for r in tas6:
server.addStreamServer(r, streamCallback)
server.addStreamServer(r, callBack)
added.inc
if len(server.servers) == 0:
@ -362,17 +461,17 @@ type RpcStreamServer* = RpcServer[StreamServer]
proc newRpcStreamServer*(addresses: openarray[TransportAddress]): RpcStreamServer =
## Create new server and assign it to addresses ``addresses``.
result = newRpcServer[StreamServer]()
result.addStreamServers(addresses, processClient)
result.addStreamServers(addresses)
proc newRpcStreamServer*(addresses: openarray[string]): RpcStreamServer =
## Create new server and assign it to addresses ``addresses``.
result = newRpcServer[StreamServer]()
result.addStreamServers(addresses, processClient)
result.addStreamServers(addresses)
proc newRpcStreamServer*(address = "localhost", port: Port = Port(8545)): RpcStreamServer =
# Create server on specified port
result = newRpcServer[StreamServer]()
result.addStreamServer(address, port, processClient)
result.addStreamServer(address, port)
# TODO: Allow cross checking between client signatures and server calls