diff --git a/rpc/client.nim b/rpc/client.nim index 95a48a9..1f7c345 100644 --- a/rpc/client.nim +++ b/rpc/client.nim @@ -6,9 +6,9 @@ export asyncdispatch2 type RpcClient*[T, A] = ref object - transp: T # StreamTransport + transp*: T awaiting: Table[string, Future[Response]] - address: A # TransportAddress + address: A nextId: int64 Response* = tuple[error: bool, result: JsonNode] @@ -19,37 +19,33 @@ proc newRpcClient*[T, A](): RpcClient[T, A] = ## Creates a new ``RpcClient`` instance. result = RpcClient[T, A](awaiting: initTable[string, Future[Response]](), nextId: 1) -proc genCall(writeCode: NimNode): NimNode = +proc genCall(rpcType, writeCode: NimNode): NimNode = + let res = newIdentNode("result") result = quote do: - proc call*[T, A](self: RpcClient[T, A], name: string, + proc call*(self: `rpcType`, name: string, params: JsonNode): Future[Response] {.async.} = ## Remotely calls the specified RPC method. let id = $self.nextId self.nextId.inc var - msg = $ %{"jsonrpc": %"2.0", "method": %name, "params": params, - "id": %id} & "\c\l" value {.inject.} = $ %{"jsonrpc": %"2.0", "method": %name, "params": params, "id": %id} & "\c\l" - client {.inject.}: RpcClient[T, A] - shallowCopy(client, self) - let res = await `writeCode` #let res = await self.transp.write(msg) + template client: untyped = self + let res = await `writeCode` # TODO: Add actions when not full packet was send, e.g. disconnect peer. - assert(res == len(msg)) + assert(res == len(value)) # completed by processMessage. var newFut = newFuture[Response]() # add to awaiting responses self.awaiting[id] = newFut - result = await newFut + `res` = await newFut -template handleRaise[T](fut: Future[T], errType: typedesc, msg: string) = - # complete future before raising - fut.complete((true, %msg)) - raise newException(errType, msg) +template asyncRaise[T](fut: Future[T], errType: typedesc, msg: string) = + fut.fail(newException(errType, msg)) macro checkGet(node: JsonNode, fieldName: string, jKind: static[JsonNodeKind]): untyped = @@ -81,7 +77,7 @@ proc processMessage[T, A](self: RpcClient[T, A], line: string) = let version = checkGet(node, "jsonrpc", JString) if version != "2.0": - self.awaiting[id].handleRaise(ValueError, + self.awaiting[id].asyncRaise(ValueError, "Unsupported version of JSON, expected 2.0, received \"" & version & "\"") let errorNode = node{"error"} @@ -95,56 +91,46 @@ proc processMessage[T, A](self: RpcClient[T, A], line: string) = self.awaiting[id].fail(newException(ValueError, $errorNode)) self.awaiting.del(id) -#proc connect*(self: RpcClient, address: string, port: Port): Future[void] - -proc genProcessData(name, readCode, closeCode: NimNode): NimNode = +proc genProcessData(rpcType, processDataName, readCode, closeCode: NimNode): NimNode = result = quote do: - proc `name`[T, A](self: RpcClient[T, A]) {.async.} = + proc `processDataName`(clientTransport: `rpcType`) {.async.} = while true: - #let line = await self.transp.readLine(maxRequestLength) - var - maxRequestLength {.inject.} = defaultMaxRequestLength - client {.inject.}: RpcClient[T, A] - shallowCopy(client, self) - let line = await `readCode` # TODO: Make it easier for callers to know this is expecting a future + var maxRequestLength {.inject.} = defaultMaxRequestLength + template client: untyped = clientTransport + let line = await `readCode` if line == "": # transmission ends - `closeCode` #self.transp.close() + `closeCode` break - processMessage(self, line) + processMessage(clientTransport, line) # async loop reconnection and waiting - self.transp = await connect(self.address) + clientTransport.transp = await connect(clientTransport.address) -proc genConnect(procDataName, connectCode: NimNode): NimNode = +proc genConnectAndProcess(rpcType, processDataName, connectCode, readCode, closeCode: NimNode): NimNode = result = quote do: - proc `procDataName`[T, A](self: RpcClient[T, A]) {.async.} + proc connect*(clientTransport: `rpcType`, address: string, port: Port) {.async.} = + var + address {.inject.} = address + port {.inject.} = port + template client: untyped = clientTransport + `connectCode` + asyncCheck `processDataName`(clientTransport) - proc connect*[T, A](self: RpcClient[T, A], address: string, port: Port) {.async.} = +macro defineRpcClientTransport*(transType, addrType: untyped, body: untyped = nil): untyped = + let processDataName = newIdentNode("processData") + var + writeCode = quote do: + client.transp.write(value) + readCode = quote do: + # looks like this is being checked before insertion + client.transp.readLine(defaultMaxRequestLength) + closeCode = quote do: + client.transp.close + connectCode = quote do: # TODO: `address` hostname can be resolved to many IP addresses, we are using # first one, but maybe it would be better to iterate over all IP addresses # and try to establish connection until it will not be established. - var - client {.inject.}: RpcClient[T, A] - address {.inject.} = address - port {.inject.} = port - shallowCopy(client, self) - `connectCode` - #let addresses = resolveTAddress(address, port) - #self.transp = await connect(addresses[0]) - #self.address = addresses[0] - asyncCheck `procDataName`[T, A](self) - -macro defineRpcClientTransport*(procDataName: untyped, body: untyped = nil): untyped = - procDataName.expectKind nnkIdent - var - writeCode = quote do: - client.write(value) - readCode = quote do: - client.readLine(defaultMaxRequestLength) - closeCode = quote do: - client.close - connectCode = quote do: # TODO: Even as a default this is too tied to StreamServer let addresses = resolveTAddress(address, port) client.transp = await connect(addresses[0]) @@ -172,21 +158,19 @@ macro defineRpcClientTransport*(procDataName: untyped, body: untyped = nil): unt else: error("Unknown RPC verb \"" & verb & "\"") result = newStmtList() + let rpcType = quote: RpcClient[`transType`, `addrType`] - let - procData = newIdentNode($procDataName) - result.add(genConnect(procData, connectCode)) - result.add(genCall(writeCode)) - result.add(genProcessData(procData, readCode, closeCode)) + let procData = newIdentNode($processDataName) + result.add(genProcessData(rpcType, procData, readCode, closeCode)) + result.add(genConnectAndProcess(rpcType, procData, connectCode, readCode, closeCode)) + result.add(genCall(rpcType, writeCode)) when defined(nimDumpRpcs): echo "defineClient:\n", result.repr -## +# Define default stream server -# Default stream server - -defineRpcClientTransport(processStreamData) +defineRpcClientTransport(StreamTransport, TransportAddress) type RpcStreamClient* = RpcClient[StreamTransport, TransportAddress] @@ -194,7 +178,7 @@ proc newRpcStreamClient*(): RpcStreamClient = ## Create new server and assign it to addresses ``addresses``. result = newRpcClient[StreamTransport, TransportAddress]() -## +# Signature processing proc createRpcProc(procName, parameters, callBody: NimNode): NimNode = # parameters come as a tree