diff --git a/rpc/client.nim b/rpc/client.nim index 1f7c345..58e4904 100644 --- a/rpc/client.nim +++ b/rpc/client.nim @@ -19,10 +19,10 @@ proc newRpcClient*[T, A](): RpcClient[T, A] = ## Creates a new ``RpcClient`` instance. result = RpcClient[T, A](awaiting: initTable[string, Future[Response]](), nextId: 1) -proc genCall(rpcType, writeCode: NimNode): NimNode = +proc genCall(rpcType, callName, writeCode: NimNode): NimNode = let res = newIdentNode("result") result = quote do: - proc call*(self: `rpcType`, name: string, + proc `callName`*(self: `rpcType`, name: string, params: JsonNode): Future[Response] {.async.} = ## Remotely calls the specified RPC method. let id = $self.nextId @@ -67,6 +67,7 @@ macro checkGet(node: JsonNode, fieldName: string, else: discard proc processMessage[T, A](self: RpcClient[T, A], line: string) = + # Note: this doesn't use any transport code so doesn't need to be differentiated. let node = parseJson(line) # TODO: Check errors # TODO: Use more appropriate exception objects @@ -91,25 +92,26 @@ proc processMessage[T, A](self: RpcClient[T, A], line: string) = self.awaiting[id].fail(newException(ValueError, $errorNode)) self.awaiting.del(id) -proc genProcessData(rpcType, processDataName, readCode, closeCode: NimNode): NimNode = +proc genProcessData(rpcType, processDataName, readCode, afterReadCode, closeCode: NimNode): NimNode = result = quote do: proc `processDataName`(clientTransport: `rpcType`) {.async.} = while true: var maxRequestLength {.inject.} = defaultMaxRequestLength template client: untyped = clientTransport - let line = await `readCode` - if line == "": + var value {.inject.} = await `readCode` + `afterReadCode` + if value == "": # transmission ends `closeCode` break - processMessage(clientTransport, line) + processMessage(clientTransport, value) # async loop reconnection and waiting clientTransport.transp = await connect(clientTransport.address) -proc genConnectAndProcess(rpcType, processDataName, connectCode, readCode, closeCode: NimNode): NimNode = +proc genConnect(rpcType, connectName, processDataName, connectCode: NimNode): NimNode = result = quote do: - proc connect*(clientTransport: `rpcType`, address: string, port: Port) {.async.} = + proc `connectName`*(clientTransport: `rpcType`, address: string, port: Port) {.async.} = var address {.inject.} = address port {.inject.} = port @@ -117,13 +119,11 @@ proc genConnectAndProcess(rpcType, processDataName, connectCode, readCode, close `connectCode` asyncCheck `processDataName`(clientTransport) -macro defineRpcClientTransport*(transType, addrType: untyped, body: untyped = nil): untyped = - let processDataName = newIdentNode("processData") +macro defineRpcClientTransport*(transType, addrType: untyped, prefix: string = "", body: untyped = nil): untyped = var writeCode = quote do: client.transp.write(value) readCode = quote do: - # looks like this is being checked before insertion client.transp.readLine(defaultMaxRequestLength) closeCode = quote do: client.transp.close @@ -131,10 +131,10 @@ macro defineRpcClientTransport*(transType, addrType: untyped, body: untyped = ni # TODO: `address` hostname can be resolved to many IP addresses, we are using # first one, but maybe it would be better to iterate over all IP addresses # and try to establish connection until it will not be established. - # TODO: Even as a default this is too tied to StreamServer let addresses = resolveTAddress(address, port) client.transp = await connect(addresses[0]) client.address = addresses[0] + afterReadCode = newStmtList() if body != nil: body.expectKind nnkStmtList @@ -148,27 +148,46 @@ macro defineRpcClientTransport*(transType, addrType: untyped, body: untyped = ni case verb.toLowerAscii of "write": + # `client`, the RpcClient + # `value`, the data to be sent to the server writeCode = code of "read": + # `client`, the RpcClient + # `maxRequestLength`, initially set to defaultMaxRequestLength readCode = code of "close": + # `client`, the RpcClient + # `value`, the data returned from the server + # `maxRequestLength`, initially set to defaultMaxRequestLength closeCode = code of "connect": + # `client`, the RpcClient + # `address`, server destination address string + # `port`, server destination port connectCode = code + of "afterread": + # `client`, the RpcClient + # `value`, the data returned from the server + # `maxRequestLength`, initially set to defaultMaxRequestLength + afterReadCode = code else: error("Unknown RPC verb \"" & verb & "\"") result = newStmtList() - let rpcType = quote: RpcClient[`transType`, `addrType`] + let + rpcType = quote: RpcClient[`transType`, `addrType`] + processDataName = newIdentNode(prefix.strVal & "processData") + connectName = newIdentNode(prefix.strVal & "connect") + callName = newIdentNode(prefix.strVal & "call") - let procData = newIdentNode($processDataName) - result.add(genProcessData(rpcType, procData, readCode, closeCode)) - result.add(genConnectAndProcess(rpcType, procData, connectCode, readCode, closeCode)) - result.add(genCall(rpcType, writeCode)) + result.add(genProcessData(rpcType, processDataName, readCode, afterReadCode, closeCode)) + result.add(genConnect(rpcType, connectName, processDataName, connectCode)) + result.add(genCall(rpcType, callName, writeCode)) when defined(nimDumpRpcs): echo "defineClient:\n", result.repr # Define default stream server +# TODO: Move this into a separate unit so users can define 'connect', 'call' etc without requiring a prefix? defineRpcClientTransport(StreamTransport, TransportAddress)