Re-enabled chronicles, extended RpcClientTransport, added stream servers

This commit is contained in:
coffeepots 2018-06-19 18:16:39 +01:00
parent ce94ba8b41
commit c162f24253

View File

@ -1,16 +1,11 @@
import json, tables, strutils, options, macros #, chronicles
import json, tables, strutils, options, macros, chronicles
import asyncdispatch2
import jsonmarshal
export asyncdispatch2, json, jsonmarshal
# Temporarily disable logging
macro debug(body: varargs[untyped]): untyped = newStmtList()
macro info(body: varargs[untyped]): untyped = newStmtList()
macro error(body: varargs[untyped]): untyped = newStmtList()
#logScope:
# topics = "RpcServer"
logScope:
topics = "RpcServer"
type
RpcJsonError* = enum rjeInvalidJson, rjeVersionError, rjeNoMethod, rjeNoId
@ -23,12 +18,17 @@ type
RpcClientTransport* = concept t
t.write(var string) is Future[int]
t.readLine(int) is Future[string]
t.close
t.remoteAddress() # Required for logging
t.localAddress()
RpcServerTransport* = concept t
t.start
t.stop
t.close
RpcProcessClient* = proc (server: RpcServerTransport, client: RpcClientTransport): Future[void] {.gcsafe.}
RpcServer*[S: RpcServerTransport] = ref object
servers*: seq[S]
procs*: TableRef[string, RpcProc]
@ -152,7 +152,7 @@ proc processClient*[S: RpcServerTransport, C: RpcClientTransport](server: S, cli
client.close()
break
debug "Processing client", addresss = client.remoteAddress(), line
debug "Processing client", address = client.remoteAddress(), line
let future = processMessage(rpc, client, line)
yield future
@ -230,7 +230,11 @@ macro rpc*(server: RpcServer, path: string, body: untyped): untyped =
var
setup = jsonToNim(parameters, paramsIdent)
procBody = if body.kind == nnkStmtList: body else: body.body
errTrappedBody = quote do:
try:
`procBody`
except:
debug "Error occurred within RPC " & `path` & ": ", getCurrentExceptionMsg()
if parameters.hasReturnType:
let returnType = parameters[0]
@ -238,7 +242,7 @@ macro rpc*(server: RpcServer, path: string, body: untyped): untyped =
result.add(quote do:
proc `doMain`(`paramsIdent`: JsonNode): Future[`returnType`] {.async.} =
`setup`
`procBody`
`errTrappedBody`
)
if returnType == ident"JsonNode":
@ -257,7 +261,7 @@ macro rpc*(server: RpcServer, path: string, body: untyped): untyped =
result.add(quote do:
proc `procName`(`paramsIdent`: JsonNode): Future[JsonNode] {.async.} =
`setup`
`procBody`
`errTrappedBody`
)
result.add( quote do:
`server`.register(`path`, `procName`)
@ -266,4 +270,109 @@ macro rpc*(server: RpcServer, path: string, body: untyped): untyped =
when defined(nimDumpRpcs):
echo "\n", pathStr, ": ", result.repr
# Utility functions for setting up servers using transport addresses
proc addStreamServer*[T: RpcServer](server: T, address: TransportAddress, streamCallback: StreamCallback) =
try:
info "Creating server on ", address = $address
var transportServer = createStreamServer(address, streamCallback, {ReuseAddr}, udata = server)
server.servers.add(transportServer)
except:
error "Failed to create server", address = $address, message = getCurrentExceptionMsg()
if len(server.servers) == 0:
# Server was not bound, critical error.
raise newException(RpcBindError, "Unable to create server!")
proc addStreamServers*[T: RpcServer](server: T, addresses: openarray[TransportAddress], streamCallback: StreamCallback) =
for item in addresses:
server.addStreamServer(item, streamCallback)
proc addStreamServer*[T: RpcServer](server: T, address: string, streamCallback: StreamCallback) =
## Create new server and assign it to addresses ``addresses``.
var
tas4: seq[TransportAddress]
tas6: seq[TransportAddress]
added = 0
# Attempt to resolve `address` for IPv4 address space.
try:
tas4 = resolveTAddress(address, IpAddressFamily.IPv4)
except:
discard
# Attempt to resolve `address` for IPv6 address space.
try:
tas6 = resolveTAddress(address, IpAddressFamily.IPv6)
except:
discard
for r in tas4:
server.addStreamServer(r, streamCallback)
added.inc
for r in tas6:
server.addStreamServer(r, streamCallback)
added.inc
if added == 0:
# Addresses could not be resolved, critical error.
raise newException(RpcAddressUnresolvableError, "Unable to get address!")
proc addStreamServers*[T: RpcServer](server: T, addresses: openarray[string], streamCallback: StreamCallback) =
for address in addresses:
server.addStreamServer(address, streamCallback)
proc addStreamServer*[T: RpcServer](server: T, address: string, port: Port, streamCallback: StreamCallback) =
var
tas4: seq[TransportAddress]
tas6: seq[TransportAddress]
added = 0
# Attempt to resolve `address` for IPv4 address space.
try:
tas4 = resolveTAddress(address, port, IpAddressFamily.IPv4)
except:
discard
# Attempt to resolve `address` for IPv6 address space.
try:
tas6 = resolveTAddress(address, port, IpAddressFamily.IPv6)
except:
discard
if len(tas4) == 0 and len(tas6) == 0:
# Address was not resolved, critical error.
raise newException(RpcAddressUnresolvableError,
"Address " & address & " could not be resolved!")
for r in tas4:
server.addStreamServer(r, streamCallback)
added.inc
for r in tas6:
server.addStreamServer(r, streamCallback)
added.inc
if len(server.servers) == 0:
# Server was not bound, critical error.
raise newException(RpcBindError,
"Could not setup server on " & address & ":" & $int(port))
type RpcStreamServer* = RpcServer[StreamServer]
proc newRpcStreamServer*(addresses: openarray[TransportAddress]): RpcStreamServer =
## Create new server and assign it to addresses ``addresses``.
result = newRpcServer[StreamServer]()
result.addStreamServers(addresses, processClient)
proc newRpcStreamServer*(addresses: openarray[string]): RpcStreamServer =
## Create new server and assign it to addresses ``addresses``.
result = newRpcServer[StreamServer]()
result.addStreamServers(addresses, processClient)
proc newRpcStreamServer*(address = "localhost", port: Port = Port(8545)): RpcStreamServer =
# Create server on specified port
result = newRpcServer[StreamServer]()
result.addStreamServer(address, port, processClient)
# TODO: Allow cross checking between client signatures and server calls