Allow custom client transports (WIP)

This commit is contained in:
coffeepots 2018-06-25 17:54:28 +01:00
parent 13d560738a
commit ac9964370e

View File

@ -6,9 +6,9 @@ export asyncdispatch2
type
RpcClient*[T, A] = ref object
transp: T # StreamTransport
transp*: T
awaiting: Table[string, Future[Response]]
address: A # TransportAddress
address: A
nextId: int64
Response* = tuple[error: bool, result: JsonNode]
@ -19,37 +19,33 @@ proc newRpcClient*[T, A](): RpcClient[T, A] =
## Creates a new ``RpcClient`` instance.
result = RpcClient[T, A](awaiting: initTable[string, Future[Response]](), nextId: 1)
proc genCall(writeCode: NimNode): NimNode =
proc genCall(rpcType, writeCode: NimNode): NimNode =
let res = newIdentNode("result")
result = quote do:
proc call*[T, A](self: RpcClient[T, A], name: string,
proc call*(self: `rpcType`, name: string,
params: JsonNode): Future[Response] {.async.} =
## Remotely calls the specified RPC method.
let id = $self.nextId
self.nextId.inc
var
msg = $ %{"jsonrpc": %"2.0", "method": %name, "params": params,
"id": %id} & "\c\l"
value {.inject.} =
$ %{"jsonrpc": %"2.0",
"method": %name,
"params": params,
"id": %id} & "\c\l"
client {.inject.}: RpcClient[T, A]
shallowCopy(client, self)
let res = await `writeCode` #let res = await self.transp.write(msg)
template client: untyped = self
let res = await `writeCode`
# TODO: Add actions when not full packet was send, e.g. disconnect peer.
assert(res == len(msg))
assert(res == len(value))
# completed by processMessage.
var newFut = newFuture[Response]()
# add to awaiting responses
self.awaiting[id] = newFut
result = await newFut
`res` = await newFut
template handleRaise[T](fut: Future[T], errType: typedesc, msg: string) =
# complete future before raising
fut.complete((true, %msg))
raise newException(errType, msg)
template asyncRaise[T](fut: Future[T], errType: typedesc, msg: string) =
fut.fail(newException(errType, msg))
macro checkGet(node: JsonNode, fieldName: string,
jKind: static[JsonNodeKind]): untyped =
@ -81,7 +77,7 @@ proc processMessage[T, A](self: RpcClient[T, A], line: string) =
let version = checkGet(node, "jsonrpc", JString)
if version != "2.0":
self.awaiting[id].handleRaise(ValueError,
self.awaiting[id].asyncRaise(ValueError,
"Unsupported version of JSON, expected 2.0, received \"" & version & "\"")
let errorNode = node{"error"}
@ -95,56 +91,46 @@ proc processMessage[T, A](self: RpcClient[T, A], line: string) =
self.awaiting[id].fail(newException(ValueError, $errorNode))
self.awaiting.del(id)
#proc connect*(self: RpcClient, address: string, port: Port): Future[void]
proc genProcessData(name, readCode, closeCode: NimNode): NimNode =
proc genProcessData(rpcType, processDataName, readCode, closeCode: NimNode): NimNode =
result = quote do:
proc `name`[T, A](self: RpcClient[T, A]) {.async.} =
proc `processDataName`(clientTransport: `rpcType`) {.async.} =
while true:
#let line = await self.transp.readLine(maxRequestLength)
var
maxRequestLength {.inject.} = defaultMaxRequestLength
client {.inject.}: RpcClient[T, A]
shallowCopy(client, self)
let line = await `readCode` # TODO: Make it easier for callers to know this is expecting a future
var maxRequestLength {.inject.} = defaultMaxRequestLength
template client: untyped = clientTransport
let line = await `readCode`
if line == "":
# transmission ends
`closeCode` #self.transp.close()
`closeCode`
break
processMessage(self, line)
processMessage(clientTransport, line)
# async loop reconnection and waiting
self.transp = await connect(self.address)
clientTransport.transp = await connect(clientTransport.address)
proc genConnect(procDataName, connectCode: NimNode): NimNode =
proc genConnectAndProcess(rpcType, processDataName, connectCode, readCode, closeCode: NimNode): NimNode =
result = quote do:
proc `procDataName`[T, A](self: RpcClient[T, A]) {.async.}
proc connect*(clientTransport: `rpcType`, address: string, port: Port) {.async.} =
var
address {.inject.} = address
port {.inject.} = port
template client: untyped = clientTransport
`connectCode`
asyncCheck `processDataName`(clientTransport)
proc connect*[T, A](self: RpcClient[T, A], address: string, port: Port) {.async.} =
macro defineRpcClientTransport*(transType, addrType: untyped, body: untyped = nil): untyped =
let processDataName = newIdentNode("processData")
var
writeCode = quote do:
client.transp.write(value)
readCode = quote do:
# looks like this is being checked before insertion
client.transp.readLine(defaultMaxRequestLength)
closeCode = quote do:
client.transp.close
connectCode = quote do:
# TODO: `address` hostname can be resolved to many IP addresses, we are using
# first one, but maybe it would be better to iterate over all IP addresses
# and try to establish connection until it will not be established.
var
client {.inject.}: RpcClient[T, A]
address {.inject.} = address
port {.inject.} = port
shallowCopy(client, self)
`connectCode`
#let addresses = resolveTAddress(address, port)
#self.transp = await connect(addresses[0])
#self.address = addresses[0]
asyncCheck `procDataName`[T, A](self)
macro defineRpcClientTransport*(procDataName: untyped, body: untyped = nil): untyped =
procDataName.expectKind nnkIdent
var
writeCode = quote do:
client.write(value)
readCode = quote do:
client.readLine(defaultMaxRequestLength)
closeCode = quote do:
client.close
connectCode = quote do:
# TODO: Even as a default this is too tied to StreamServer
let addresses = resolveTAddress(address, port)
client.transp = await connect(addresses[0])
@ -172,21 +158,19 @@ macro defineRpcClientTransport*(procDataName: untyped, body: untyped = nil): unt
else: error("Unknown RPC verb \"" & verb & "\"")
result = newStmtList()
let rpcType = quote: RpcClient[`transType`, `addrType`]
let
procData = newIdentNode($procDataName)
result.add(genConnect(procData, connectCode))
result.add(genCall(writeCode))
result.add(genProcessData(procData, readCode, closeCode))
let procData = newIdentNode($processDataName)
result.add(genProcessData(rpcType, procData, readCode, closeCode))
result.add(genConnectAndProcess(rpcType, procData, connectCode, readCode, closeCode))
result.add(genCall(rpcType, writeCode))
when defined(nimDumpRpcs):
echo "defineClient:\n", result.repr
##
# Define default stream server
# Default stream server
defineRpcClientTransport(processStreamData)
defineRpcClientTransport(StreamTransport, TransportAddress)
type RpcStreamClient* = RpcClient[StreamTransport, TransportAddress]
@ -194,7 +178,7 @@ proc newRpcStreamClient*(): RpcStreamClient =
## Create new server and assign it to addresses ``addresses``.
result = newRpcClient[StreamTransport, TransportAddress]()
##
# Signature processing
proc createRpcProc(procName, parameters, callBody: NimNode): NimNode =
# parameters come as a tree