WIP custom clients

This commit is contained in:
coffeepots 2018-06-22 19:05:32 +01:00
parent 99018eede4
commit ef6041ea6a

View File

@ -1,29 +1,42 @@
import tables, json, macros
import asyncdispatch2
from strutils import toLowerAscii
import jsonmarshal
export asyncdispatch2
type
RpcClient* = ref object
transp: StreamTransport
RpcClient*[T, A] = ref object
transp: T # StreamTransport
awaiting: Table[string, Future[Response]]
address: TransportAddress
address: A # TransportAddress
nextId: int64
Response* = tuple[error: bool, result: JsonNode]
const maxRequestLength = 1024 * 128
const defaultMaxRequestLength* = 1024 * 128
proc newRpcClient*(): RpcClient =
proc newRpcClient*[T, A](): RpcClient[T, A] =
## Creates a new ``RpcClient`` instance.
result = RpcClient(awaiting: initTable[string, Future[Response]](), nextId: 1)
result = RpcClient[T, A](awaiting: initTable[string, Future[Response]](), nextId: 1)
proc call*(self: RpcClient, name: string,
proc genCall(writeCode: NimNode): NimNode =
result = quote do:
proc call*[T, A](self: RpcClient[T, A], name: string,
params: JsonNode): Future[Response] {.async.} =
## Remotely calls the specified RPC method.
let id = $self.nextId
self.nextId.inc
var msg = $ %{"jsonrpc": %"2.0", "method": %name, "params": params,
var
msg = $ %{"jsonrpc": %"2.0", "method": %name, "params": params,
"id": %id} & "\c\l"
let res = await self.transp.write(msg)
value {.inject.} =
$ %{"jsonrpc": %"2.0",
"method": %name,
"params": params,
"id": %id} & "\c\l"
client {.inject.}: RpcClient[T, A]
shallowCopy(client, self)
let res = await `writeCode` #let res = await self.transp.write(msg)
# TODO: Add actions when not full packet was send, e.g. disconnect peer.
assert(res == len(msg))
@ -57,8 +70,8 @@ macro checkGet(node: JsonNode, fieldName: string,
of JObject: result.add(quote do: `n`.getObject)
else: discard
proc processMessage(self: RpcClient, line: string) =
let node = parseJson(line)
proc processMessage[T, A](self: RpcClient[T, A], line: string) =
let node = parseJson(line) # TODO: Check errors
# TODO: Use more appropriate exception objects
let id = checkGet(node, "id", JString)
@ -82,28 +95,106 @@ proc processMessage(self: RpcClient, line: string) =
self.awaiting[id].fail(newException(ValueError, $errorNode))
self.awaiting.del(id)
proc connect*(self: RpcClient, address: string, port: Port): Future[void]
#proc connect*(self: RpcClient, address: string, port: Port): Future[void]
proc processData(self: RpcClient) {.async.} =
proc genProcessData(name, readCode, closeCode: NimNode): NimNode =
result = quote do:
proc `name`[T, A](self: RpcClient[T, A]) {.async.} =
while true:
let line = await self.transp.readLine(maxRequestLength)
#let line = await self.transp.readLine(maxRequestLength)
var
maxRequestLength {.inject.} = defaultMaxRequestLength
client {.inject.}: RpcClient[T, A]
shallowCopy(client, self)
let line = await `readCode` # TODO: Make it easier for callers to know this is expecting a future
if line == "":
# transmission ends
self.transp.close()
`closeCode` #self.transp.close()
break
processMessage(self, line)
# async loop reconnection and waiting
self.transp = await connect(self.address)
proc connect*(self: RpcClient, address: string, port: Port) {.async.} =
proc genConnect(procDataName, connectCode: NimNode): NimNode =
result = quote do:
proc `procDataName`[T, A](self: RpcClient[T, A]) {.async.}
proc connect*[T, A](self: RpcClient[T, A], address: string, port: Port) {.async.} =
# TODO: `address` hostname can be resolved to many IP addresses, we are using
# first one, but maybe it would be better to iterate over all IP addresses
# and try to establish connection until it will not be established.
var
client {.inject.}: RpcClient[T, A]
address {.inject.} = address
port {.inject.} = port
shallowCopy(client, self)
`connectCode`
#let addresses = resolveTAddress(address, port)
#self.transp = await connect(addresses[0])
#self.address = addresses[0]
asyncCheck `procDataName`[T, A](self)
macro defineRpcClientTransport*(procDataName: untyped, body: untyped = nil): untyped =
procDataName.expectKind nnkIdent
var
writeCode = quote do:
client.write(value)
readCode = quote do:
client.readLine(defaultMaxRequestLength)
closeCode = quote do:
client.close
connectCode = quote do:
# TODO: Even as a default this is too tied to StreamServer
let addresses = resolveTAddress(address, port)
self.transp = await connect(addresses[0])
self.address = addresses[0]
asyncCheck processData(self)
client.transp = await connect(addresses[0])
client.address = addresses[0]
if body != nil:
body.expectKind nnkStmtList
for item in body:
item.expectKind nnkCall
item[0].expectKind nnkIdent
item[1].expectKind nnkStmtList
let
verb = $item[0]
code = item[1]
case verb.toLowerAscii
of "write":
writeCode = code
of "read":
readCode = code
of "close":
closeCode = code
of "connect":
connectCode = code
else: error("Unknown RPC verb \"" & verb & "\"")
result = newStmtList()
let
procData = newIdentNode($procDataName)
result.add(genConnect(procData, connectCode))
result.add(genCall(writeCode))
result.add(genProcessData(procData, readCode, closeCode))
when defined(nimDumpRpcs):
echo "defineClient:\n", result.repr
##
# Default stream server
defineRpcClientTransport(processStreamData)
type RpcStreamClient* = RpcClient[StreamTransport, TransportAddress]
proc newRpcStreamClient*(): RpcStreamClient =
## Create new server and assign it to addresses ``addresses``.
result = newRpcClient[StreamTransport, TransportAddress]()
##
proc createRpcProc(procName, parameters, callBody: NimNode): NimNode =
# parameters come as a tree