diff --git a/rpc/server.nim b/rpc/server.nim index 0bb7504..2f79de0 100644 --- a/rpc/server.nim +++ b/rpc/server.nim @@ -1,16 +1,11 @@ -import json, tables, strutils, options, macros #, chronicles +import json, tables, strutils, options, macros, chronicles import asyncdispatch2 import jsonmarshal export asyncdispatch2, json, jsonmarshal -# Temporarily disable logging -macro debug(body: varargs[untyped]): untyped = newStmtList() -macro info(body: varargs[untyped]): untyped = newStmtList() -macro error(body: varargs[untyped]): untyped = newStmtList() - -#logScope: -# topics = "RpcServer" +logScope: + topics = "RpcServer" type RpcJsonError* = enum rjeInvalidJson, rjeVersionError, rjeNoMethod, rjeNoId @@ -23,12 +18,17 @@ type RpcClientTransport* = concept t t.write(var string) is Future[int] t.readLine(int) is Future[string] + t.close + t.remoteAddress() # Required for logging + t.localAddress() RpcServerTransport* = concept t t.start t.stop t.close + RpcProcessClient* = proc (server: RpcServerTransport, client: RpcClientTransport): Future[void] {.gcsafe.} + RpcServer*[S: RpcServerTransport] = ref object servers*: seq[S] procs*: TableRef[string, RpcProc] @@ -152,7 +152,7 @@ proc processClient*[S: RpcServerTransport, C: RpcClientTransport](server: S, cli client.close() break - debug "Processing client", addresss = client.remoteAddress(), line + debug "Processing client", address = client.remoteAddress(), line let future = processMessage(rpc, client, line) yield future @@ -230,7 +230,11 @@ macro rpc*(server: RpcServer, path: string, body: untyped): untyped = var setup = jsonToNim(parameters, paramsIdent) procBody = if body.kind == nnkStmtList: body else: body.body - + errTrappedBody = quote do: + try: + `procBody` + except: + debug "Error occurred within RPC " & `path` & ": ", getCurrentExceptionMsg() if parameters.hasReturnType: let returnType = parameters[0] @@ -238,7 +242,7 @@ macro rpc*(server: RpcServer, path: string, body: untyped): untyped = result.add(quote do: proc `doMain`(`paramsIdent`: JsonNode): Future[`returnType`] {.async.} = `setup` - `procBody` + `errTrappedBody` ) if returnType == ident"JsonNode": @@ -257,7 +261,7 @@ macro rpc*(server: RpcServer, path: string, body: untyped): untyped = result.add(quote do: proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} = `setup` - `procBody` + `errTrappedBody` ) result.add( quote do: `server`.register(`path`, `procName`) @@ -266,4 +270,109 @@ macro rpc*(server: RpcServer, path: string, body: untyped): untyped = when defined(nimDumpRpcs): echo "\n", pathStr, ": ", result.repr +# Utility functions for setting up servers using transport addresses + +proc addStreamServer*[T: RpcServer](server: T, address: TransportAddress, streamCallback: StreamCallback) = + try: + info "Creating server on ", address = $address + var transportServer = createStreamServer(address, streamCallback, {ReuseAddr}, udata = server) + server.servers.add(transportServer) + except: + error "Failed to create server", address = $address, message = getCurrentExceptionMsg() + + if len(server.servers) == 0: + # Server was not bound, critical error. + raise newException(RpcBindError, "Unable to create server!") + +proc addStreamServers*[T: RpcServer](server: T, addresses: openarray[TransportAddress], streamCallback: StreamCallback) = + for item in addresses: + server.addStreamServer(item, streamCallback) + +proc addStreamServer*[T: RpcServer](server: T, address: string, streamCallback: StreamCallback) = + ## Create new server and assign it to addresses ``addresses``. + var + tas4: seq[TransportAddress] + tas6: seq[TransportAddress] + added = 0 + + # Attempt to resolve `address` for IPv4 address space. + try: + tas4 = resolveTAddress(address, IpAddressFamily.IPv4) + except: + discard + + # Attempt to resolve `address` for IPv6 address space. + try: + tas6 = resolveTAddress(address, IpAddressFamily.IPv6) + except: + discard + + for r in tas4: + server.addStreamServer(r, streamCallback) + added.inc + for r in tas6: + server.addStreamServer(r, streamCallback) + added.inc + + if added == 0: + # Addresses could not be resolved, critical error. + raise newException(RpcAddressUnresolvableError, "Unable to get address!") + +proc addStreamServers*[T: RpcServer](server: T, addresses: openarray[string], streamCallback: StreamCallback) = + for address in addresses: + server.addStreamServer(address, streamCallback) + +proc addStreamServer*[T: RpcServer](server: T, address: string, port: Port, streamCallback: StreamCallback) = + var + tas4: seq[TransportAddress] + tas6: seq[TransportAddress] + added = 0 + + # Attempt to resolve `address` for IPv4 address space. + try: + tas4 = resolveTAddress(address, port, IpAddressFamily.IPv4) + except: + discard + + # Attempt to resolve `address` for IPv6 address space. + try: + tas6 = resolveTAddress(address, port, IpAddressFamily.IPv6) + except: + discard + + if len(tas4) == 0 and len(tas6) == 0: + # Address was not resolved, critical error. + raise newException(RpcAddressUnresolvableError, + "Address " & address & " could not be resolved!") + + for r in tas4: + server.addStreamServer(r, streamCallback) + added.inc + for r in tas6: + server.addStreamServer(r, streamCallback) + added.inc + + if len(server.servers) == 0: + # Server was not bound, critical error. + raise newException(RpcBindError, + "Could not setup server on " & address & ":" & $int(port)) + +type RpcStreamServer* = RpcServer[StreamServer] + +proc newRpcStreamServer*(addresses: openarray[TransportAddress]): RpcStreamServer = + ## Create new server and assign it to addresses ``addresses``. + result = newRpcServer[StreamServer]() + result.addStreamServers(addresses, processClient) + +proc newRpcStreamServer*(addresses: openarray[string]): RpcStreamServer = + ## Create new server and assign it to addresses ``addresses``. + result = newRpcServer[StreamServer]() + result.addStreamServers(addresses, processClient) + +proc newRpcStreamServer*(address = "localhost", port: Port = Port(8545)): RpcStreamServer = + # Create server on specified port + result = newRpcServer[StreamServer]() + result.addStreamServer(address, port, processClient) + + # TODO: Allow cross checking between client signatures and server calls