Custom client transports operational

This commit is contained in:
coffeepots 2018-06-26 15:38:49 +01:00
parent ac9964370e
commit e508cc077d
1 changed files with 36 additions and 17 deletions

View File

@ -19,10 +19,10 @@ proc newRpcClient*[T, A](): RpcClient[T, A] =
## Creates a new ``RpcClient`` instance.
result = RpcClient[T, A](awaiting: initTable[string, Future[Response]](), nextId: 1)
proc genCall(rpcType, writeCode: NimNode): NimNode =
proc genCall(rpcType, callName, writeCode: NimNode): NimNode =
let res = newIdentNode("result")
result = quote do:
proc call*(self: `rpcType`, name: string,
proc `callName`*(self: `rpcType`, name: string,
params: JsonNode): Future[Response] {.async.} =
## Remotely calls the specified RPC method.
let id = $self.nextId
@ -67,6 +67,7 @@ macro checkGet(node: JsonNode, fieldName: string,
else: discard
proc processMessage[T, A](self: RpcClient[T, A], line: string) =
# Note: this doesn't use any transport code so doesn't need to be differentiated.
let node = parseJson(line) # TODO: Check errors
# TODO: Use more appropriate exception objects
@ -91,25 +92,26 @@ proc processMessage[T, A](self: RpcClient[T, A], line: string) =
self.awaiting[id].fail(newException(ValueError, $errorNode))
self.awaiting.del(id)
proc genProcessData(rpcType, processDataName, readCode, closeCode: NimNode): NimNode =
proc genProcessData(rpcType, processDataName, readCode, afterReadCode, closeCode: NimNode): NimNode =
result = quote do:
proc `processDataName`(clientTransport: `rpcType`) {.async.} =
while true:
var maxRequestLength {.inject.} = defaultMaxRequestLength
template client: untyped = clientTransport
let line = await `readCode`
if line == "":
var value {.inject.} = await `readCode`
`afterReadCode`
if value == "":
# transmission ends
`closeCode`
break
processMessage(clientTransport, line)
processMessage(clientTransport, value)
# async loop reconnection and waiting
clientTransport.transp = await connect(clientTransport.address)
proc genConnectAndProcess(rpcType, processDataName, connectCode, readCode, closeCode: NimNode): NimNode =
proc genConnect(rpcType, connectName, processDataName, connectCode: NimNode): NimNode =
result = quote do:
proc connect*(clientTransport: `rpcType`, address: string, port: Port) {.async.} =
proc `connectName`*(clientTransport: `rpcType`, address: string, port: Port) {.async.} =
var
address {.inject.} = address
port {.inject.} = port
@ -117,13 +119,11 @@ proc genConnectAndProcess(rpcType, processDataName, connectCode, readCode, close
`connectCode`
asyncCheck `processDataName`(clientTransport)
macro defineRpcClientTransport*(transType, addrType: untyped, body: untyped = nil): untyped =
let processDataName = newIdentNode("processData")
macro defineRpcClientTransport*(transType, addrType: untyped, prefix: string = "", body: untyped = nil): untyped =
var
writeCode = quote do:
client.transp.write(value)
readCode = quote do:
# looks like this is being checked before insertion
client.transp.readLine(defaultMaxRequestLength)
closeCode = quote do:
client.transp.close
@ -131,10 +131,10 @@ macro defineRpcClientTransport*(transType, addrType: untyped, body: untyped = ni
# TODO: `address` hostname can be resolved to many IP addresses, we are using
# first one, but maybe it would be better to iterate over all IP addresses
# and try to establish connection until it will not be established.
# TODO: Even as a default this is too tied to StreamServer
let addresses = resolveTAddress(address, port)
client.transp = await connect(addresses[0])
client.address = addresses[0]
afterReadCode = newStmtList()
if body != nil:
body.expectKind nnkStmtList
@ -148,27 +148,46 @@ macro defineRpcClientTransport*(transType, addrType: untyped, body: untyped = ni
case verb.toLowerAscii
of "write":
# `client`, the RpcClient
# `value`, the data to be sent to the server
writeCode = code
of "read":
# `client`, the RpcClient
# `maxRequestLength`, initially set to defaultMaxRequestLength
readCode = code
of "close":
# `client`, the RpcClient
# `value`, the data returned from the server
# `maxRequestLength`, initially set to defaultMaxRequestLength
closeCode = code
of "connect":
# `client`, the RpcClient
# `address`, server destination address string
# `port`, server destination port
connectCode = code
of "afterread":
# `client`, the RpcClient
# `value`, the data returned from the server
# `maxRequestLength`, initially set to defaultMaxRequestLength
afterReadCode = code
else: error("Unknown RPC verb \"" & verb & "\"")
result = newStmtList()
let rpcType = quote: RpcClient[`transType`, `addrType`]
let
rpcType = quote: RpcClient[`transType`, `addrType`]
processDataName = newIdentNode(prefix.strVal & "processData")
connectName = newIdentNode(prefix.strVal & "connect")
callName = newIdentNode(prefix.strVal & "call")
let procData = newIdentNode($processDataName)
result.add(genProcessData(rpcType, procData, readCode, closeCode))
result.add(genConnectAndProcess(rpcType, procData, connectCode, readCode, closeCode))
result.add(genCall(rpcType, writeCode))
result.add(genProcessData(rpcType, processDataName, readCode, afterReadCode, closeCode))
result.add(genConnect(rpcType, connectName, processDataName, connectCode))
result.add(genCall(rpcType, callName, writeCode))
when defined(nimDumpRpcs):
echo "defineClient:\n", result.repr
# Define default stream server
# TODO: Move this into a separate unit so users can define 'connect', 'call' etc without requiring a prefix?
defineRpcClientTransport(StreamTransport, TransportAddress)