From b245a237450e9cdd6a8f66a07a0994cc1cf42c25 Mon Sep 17 00:00:00 2001 From: coffeepots Date: Thu, 21 Jun 2018 18:15:21 +0100 Subject: [PATCH] Server now allows defining read, write and close code directly --- rpc/server.nim | 255 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 177 insertions(+), 78 deletions(-) diff --git a/rpc/server.nim b/rpc/server.nim index 9591f73..f830ff9 100644 --- a/rpc/server.nim +++ b/rpc/server.nim @@ -2,7 +2,7 @@ import json, tables, strutils, options, macros, chronicles import asyncdispatch2 import jsonmarshal -export asyncdispatch2, json, jsonmarshal +export asyncdispatch2, json, jsonmarshal, options logScope: topics = "RpcServer" @@ -48,7 +48,7 @@ const INTERNAL_ERROR* = -32603 SERVER_ERROR* = -32000 - maxRequestLength = 1024 * 128 + defaultMaxRequestLength = 1024 * 128 jsonErrorMessages*: array[RpcJsonError, (int, string)] = [ @@ -58,8 +58,8 @@ const (INVALID_REQUEST, "No id specified") ] -proc newRpcServer*[T]: RpcServer[T] = - result = RpcServer[T]() +proc newRpcServer*[S](): RpcServer[S] = + new result result.procs = newTable[string, RpcProc]() result.servers = @[] @@ -101,71 +101,166 @@ proc wrapReply*(id: JsonNode, value: JsonNode, error: JsonNode): string = let node = %{"jsonrpc": %"2.0", "result": value, "error": error, "id": id} return $node & "\c\l" -proc sendError*(client: RpcClientTransport, code: int, msg: string, id: JsonNode, - data: JsonNode = newJNull()) {.async.} = - ## Send error message to client - let error = %{"code": %(code), "id": id, "message": %msg, "data": data} - debug "Error generated", error = error, id = id - var res = wrapReply(id, newJNull(), error) - result = client.write(res) +proc addErrorSending(name, writeCode: NimNode): NimNode = + let + res = newIdentNode("result") + sendJsonErr = newIdentNode($name & "Json") + result = quote do: + proc `name`*[T: RpcClientTransport](clientTrans: T, code: int, msg: string, id: JsonNode, + data: JsonNode = newJNull()) {.async.} = + ## Send error message to client + let error = %{"code": %(code), "id": id, "message": %msg, "data": data} + debug "Error generated", error = error, id = id + var + value {.inject.} = wrapReply(id, newJNull(), error) + client {.inject.}: T + shallowCopy(client, clientTrans) + `res` = `writeCode` -proc sendJsonError*(state: RpcJsonError, client: RpcClientTransport, id: JsonNode, - data = newJNull()) {.async.} = - ## Send client response for invalid json state - let errMsgs = jsonErrorMessages[state] - await client.sendError(errMsgs[0], errMsgs[1], id, data) + proc `sendJsonErr`*(state: RpcJsonError, clientTrans: RpcClientTransport, id: JsonNode, + data = newJNull()) {.async.} = + ## Send client response for invalid json state + let errMsgs = jsonErrorMessages[state] + await clientTrans.`name`(errMsgs[0], errMsgs[1], id, data) # Server message processing -proc processMessage[T](server: RpcServer[T], client: RpcClientTransport, - line: string) {.async.} = + +proc genProcessMessages(name, sendErrorName, writeCode: NimNode): NimNode = + let idSendErrJson = newIdentNode($sendErrorName & "Json") + result = quote do: + proc `name`[T: RpcClientTransport](server: RpcServer, clientTrans: T, + line: string) {.async.} = + var + node: JsonNode + # set up node and/or flag errors + jsonErrorState = checkJsonErrors(line, node) + + if jsonErrorState.isSome: + let errState = jsonErrorState.get + var id = + if errState.err == rjeInvalidJson or errState.err == rjeNoId: + newJNull() + else: + node["id"] + await errState.err.`idSendErrJson`(clientTrans, id, %errState.msg) + else: + let + methodName = node["method"].str + id = node["id"] + + if not server.procs.hasKey(methodName): + await clientTrans.`sendErrorName`(METHOD_NOT_FOUND, "Method not found", %id, + %(methodName & " is not a registered method.")) + else: + let callRes = await server.procs[methodName](node["params"]) + var + value {.inject.} = wrapReply(id, callRes, newJNull()) + client {.inject.}: T + shallowCopy(client, clientTrans) + asyncCheck `writeCode` + +proc genProcessClient(nameIdent, procMessagesIdent, sendErrIdent, readCode, closeCode: NimNode): NimNode = + # This generates the processClient proc to match transport. + # processClient is compatible with createStreamServer and thus StreamCallback. + # However the constraints are conceptualised so you only need to match it's interface + # Note: https://github.com/nim-lang/Nim/issues/644 + result = quote do: + proc `nameIdent`[S: RpcServerTransport, C: RpcClientTransport](server: S, clientTrans: C) {.async, gcsafe.} = + var rpc = getUserData[RpcServer[S]](server) + while true: + var + client {.inject}: C + maxRequestLength {.inject.} = defaultMaxRequestLength + shallowCopy(client, clientTrans) + let line = await `readCode` + if line == "": + `closeCode` + break + + debug "Processing message", address = clientTrans.remoteAddress(), line = line + + let future = `procMessagesIdent`(rpc, clientTrans, line) + yield future + if future.failed: + if future.readError of RpcProcError: + let err = future.readError.RpcProcError + await clientTrans.`sendErrIdent`(err.code, err.msg, err.data) + elif future.readError of ValueError: + let err = future.readError[].ValueError + await clientTrans.`sendErrIdent`(INVALID_PARAMS, err.msg, %"") + else: + await clientTrans.`sendErrIdent`(SERVER_ERROR, + "Error: Unknown error occurred", %"") + echo "$$", result.repr + + +#[ + New API: + For custom RpcServers that do their own work upon getting/sending data. + + newServer = defineRpcServer[StreamTransport, StreamServer]: + write: + mySpecialWriter(server, client, line) + # Note: Anything not defined here will use the default code to operate + + Code is directly inserted into processMessages. You can still define your + own transports, but this lets you define operations for existing transports + without needing to rework them. +]# + +import random + +macro defineRpcTransport*(procClientName: untyped, body: untyped = nil): untyped = + ## Build an rpcServer type that inlines data access operations + #[ + Injects: + line: to be populated by the transport + + Example: + defineRpcTransport(myServer): + write: + write("http://" & value) + read: + readLine + ]# + procClientName.expectKind nnkIdent var - node: JsonNode - # set up node and/or flag errors - jsonErrorState = checkJsonErrors(line, node) + writeCode = quote do: + client.write(value) + readCode = quote do: + client.readLine(defaultMaxRequestLength) + closeCode = quote do: + client.close - if jsonErrorState.isSome: - let errState = jsonErrorState.get - var id = - if errState.err == rjeInvalidJson or errState.err == rjeNoId: - newJNull() - else: - node["id"] - await errState.err.sendJsonError(client, id, %errState.msg) - else: - let - methodName = node["method"].str - id = node["id"] + if body != nil: + body.expectKind nnkStmtList + for item in body: + item.expectKind nnkCall + item[0].expectKind nnkIdent + item[1].expectKind nnkStmtList + let + verb = $item[0] + code = item[1] - if not server.procs.hasKey(methodName): - await client.sendError(METHOD_NOT_FOUND, "Method not found", %id, - %(methodName & " is not a registered method.")) - else: - let callRes = await server.procs[methodName](node["params"]) - var res = wrapReply(id, callRes, newJNull()) - discard await client.write(res) + case verb.toLowerAscii + of "write": + writeCode = item[1] + of "read": + readCode = item[1] + of "close": + closeCode = item[1] + else: error("Unknown verb \"" & verb & "\"") + + result = newStmtList() -proc processClient*[S: RpcServerTransport, C: RpcClientTransport](server: S, client: C) {.async, gcsafe.} = - var rpc = getUserData[RpcServer[S]](server) - while true: - let line = await client.readLine(maxRequestLength) - if line == "": - client.close() - break - - debug "Processing client", address = client.remoteAddress(), line - - let future = processMessage(rpc, client, line) - yield future - if future.failed: - if future.readError of RpcProcError: - let err = future.readError.RpcProcError - await client.sendError(err.code, err.msg, err.data) - elif future.readError of ValueError: - let err = future.readError[].ValueError - await client.sendError(INVALID_PARAMS, err.msg, %"") - else: - await client.sendError(SERVER_ERROR, - "Error: Unknown error occurred", %"") + let + sendErr = newIdentNode($procClientName & "sendError") + procMsgs = newIdentNode($procClientName & "processMessages") + result.add(addErrorSending(sendErr, writeCode)) + result.add(genProcessMessages(procMsgs, sendErr, writeCode)) + result.add(genProcessClient(procClientName, procMsgs, sendErr, readCode, closeCode)) + + echo "defineRpc:\n", result.repr proc start*(server: RpcServer) = ## Start the RPC server. @@ -270,12 +365,16 @@ macro rpc*(server: RpcServer, path: string, body: untyped): untyped = when defined(nimDumpRpcs): echo "\n", pathStr, ": ", result.repr -# Utility functions for setting up servers using transport addresses +# Utility functions for setting up servers using stream transport addresses -proc addStreamServer*[T: RpcServer](server: T, address: TransportAddress, streamCallback: StreamCallback) = +# Create a default transport that's suitable for createStreamServer +defineRpcTransport(processStreamClient) + +proc addStreamServer*[S](server: RpcServer[S], address: TransportAddress, callBack: StreamCallback = processStreamClient) = + #makeProcessClient(processClient, StreamTransport) try: info "Creating server on ", address = $address - var transportServer = createStreamServer(address, streamCallback, {ReuseAddr}, udata = server) + var transportServer = createStreamServer(address, callBack, {ReuseAddr}, udata = server) server.servers.add(transportServer) except: error "Failed to create server", address = $address, message = getCurrentExceptionMsg() @@ -284,11 +383,11 @@ proc addStreamServer*[T: RpcServer](server: T, address: TransportAddress, stream # Server was not bound, critical error. raise newException(RpcBindError, "Unable to create server!") -proc addStreamServers*[T: RpcServer](server: T, addresses: openarray[TransportAddress], streamCallback: StreamCallback) = +proc addStreamServers*[T: RpcServer](server: T, addresses: openarray[TransportAddress], callBack: StreamCallback = processStreamClient) = for item in addresses: - server.addStreamServer(item, streamCallback) + server.addStreamServer(item, callBack) -proc addStreamServer*[T: RpcServer](server: T, address: string, streamCallback: StreamCallback) = +proc addStreamServer*[T: RpcServer](server: T, address: string, callBack: StreamCallback = processStreamClient) = ## Create new server and assign it to addresses ``addresses``. var tas4: seq[TransportAddress] @@ -308,21 +407,21 @@ proc addStreamServer*[T: RpcServer](server: T, address: string, streamCallback: discard for r in tas4: - server.addStreamServer(r, streamCallback) + server.addStreamServer(r, callBack) added.inc for r in tas6: - server.addStreamServer(r, streamCallback) + server.addStreamServer(r, callBack) added.inc if added == 0: # Addresses could not be resolved, critical error. raise newException(RpcAddressUnresolvableError, "Unable to get address!") -proc addStreamServers*[T: RpcServer](server: T, addresses: openarray[string], streamCallback: StreamCallback) = +proc addStreamServers*[T: RpcServer](server: T, addresses: openarray[string], callBack: StreamCallback = processStreamClient) = for address in addresses: - server.addStreamServer(address, streamCallback) + server.addStreamServer(address, callBack) -proc addStreamServer*[T: RpcServer](server: T, address: string, port: Port, streamCallback: StreamCallback) = +proc addStreamServer*[T: RpcServer](server: T, address: string, port: Port, callBack: StreamCallback = processStreamClient) = var tas4: seq[TransportAddress] tas6: seq[TransportAddress] @@ -346,10 +445,10 @@ proc addStreamServer*[T: RpcServer](server: T, address: string, port: Port, stre "Address " & address & " could not be resolved!") for r in tas4: - server.addStreamServer(r, streamCallback) + server.addStreamServer(r, callBack) added.inc for r in tas6: - server.addStreamServer(r, streamCallback) + server.addStreamServer(r, callBack) added.inc if len(server.servers) == 0: @@ -362,17 +461,17 @@ type RpcStreamServer* = RpcServer[StreamServer] proc newRpcStreamServer*(addresses: openarray[TransportAddress]): RpcStreamServer = ## Create new server and assign it to addresses ``addresses``. result = newRpcServer[StreamServer]() - result.addStreamServers(addresses, processClient) + result.addStreamServers(addresses) proc newRpcStreamServer*(addresses: openarray[string]): RpcStreamServer = ## Create new server and assign it to addresses ``addresses``. result = newRpcServer[StreamServer]() - result.addStreamServers(addresses, processClient) + result.addStreamServers(addresses) proc newRpcStreamServer*(address = "localhost", port: Port = Port(8545)): RpcStreamServer = # Create server on specified port result = newRpcServer[StreamServer]() - result.addStreamServer(address, port, processClient) + result.addStreamServer(address, port) # TODO: Allow cross checking between client signatures and server calls