diff --git a/rpc/client.nim b/rpc/client.nim index c8ef0b2..95a48a9 100644 --- a/rpc/client.nim +++ b/rpc/client.nim @@ -1,37 +1,50 @@ import tables, json, macros import asyncdispatch2 +from strutils import toLowerAscii import jsonmarshal +export asyncdispatch2 type - RpcClient* = ref object - transp: StreamTransport + RpcClient*[T, A] = ref object + transp: T # StreamTransport awaiting: Table[string, Future[Response]] - address: TransportAddress + address: A # TransportAddress nextId: int64 + Response* = tuple[error: bool, result: JsonNode] -const maxRequestLength = 1024 * 128 +const defaultMaxRequestLength* = 1024 * 128 -proc newRpcClient*(): RpcClient = +proc newRpcClient*[T, A](): RpcClient[T, A] = ## Creates a new ``RpcClient`` instance. - result = RpcClient(awaiting: initTable[string, Future[Response]](), nextId: 1) + result = RpcClient[T, A](awaiting: initTable[string, Future[Response]](), nextId: 1) -proc call*(self: RpcClient, name: string, - params: JsonNode): Future[Response] {.async.} = - ## Remotely calls the specified RPC method. - let id = $self.nextId - self.nextId.inc - var msg = $ %{"jsonrpc": %"2.0", "method": %name, "params": params, - "id": %id} & "\c\l" - let res = await self.transp.write(msg) - # TODO: Add actions when not full packet was send, e.g. disconnect peer. - assert(res == len(msg)) +proc genCall(writeCode: NimNode): NimNode = + result = quote do: + proc call*[T, A](self: RpcClient[T, A], name: string, + params: JsonNode): Future[Response] {.async.} = + ## Remotely calls the specified RPC method. + let id = $self.nextId + self.nextId.inc + var + msg = $ %{"jsonrpc": %"2.0", "method": %name, "params": params, + "id": %id} & "\c\l" + value {.inject.} = + $ %{"jsonrpc": %"2.0", + "method": %name, + "params": params, + "id": %id} & "\c\l" + client {.inject.}: RpcClient[T, A] + shallowCopy(client, self) + let res = await `writeCode` #let res = await self.transp.write(msg) + # TODO: Add actions when not full packet was send, e.g. disconnect peer. + assert(res == len(msg)) - # completed by processMessage. - var newFut = newFuture[Response]() - # add to awaiting responses - self.awaiting[id] = newFut - result = await newFut + # completed by processMessage. + var newFut = newFuture[Response]() + # add to awaiting responses + self.awaiting[id] = newFut + result = await newFut template handleRaise[T](fut: Future[T], errType: typedesc, msg: string) = # complete future before raising @@ -57,8 +70,8 @@ macro checkGet(node: JsonNode, fieldName: string, of JObject: result.add(quote do: `n`.getObject) else: discard -proc processMessage(self: RpcClient, line: string) = - let node = parseJson(line) +proc processMessage[T, A](self: RpcClient[T, A], line: string) = + let node = parseJson(line) # TODO: Check errors # TODO: Use more appropriate exception objects let id = checkGet(node, "id", JString) @@ -82,28 +95,106 @@ proc processMessage(self: RpcClient, line: string) = self.awaiting[id].fail(newException(ValueError, $errorNode)) self.awaiting.del(id) -proc connect*(self: RpcClient, address: string, port: Port): Future[void] +#proc connect*(self: RpcClient, address: string, port: Port): Future[void] -proc processData(self: RpcClient) {.async.} = - while true: - let line = await self.transp.readLine(maxRequestLength) - if line == "": - # transmission ends - self.transp.close() - break +proc genProcessData(name, readCode, closeCode: NimNode): NimNode = + result = quote do: + proc `name`[T, A](self: RpcClient[T, A]) {.async.} = + while true: + #let line = await self.transp.readLine(maxRequestLength) + var + maxRequestLength {.inject.} = defaultMaxRequestLength + client {.inject.}: RpcClient[T, A] + shallowCopy(client, self) + let line = await `readCode` # TODO: Make it easier for callers to know this is expecting a future + if line == "": + # transmission ends + `closeCode` #self.transp.close() + break - processMessage(self, line) - # async loop reconnection and waiting - self.transp = await connect(self.address) + processMessage(self, line) + # async loop reconnection and waiting + self.transp = await connect(self.address) -proc connect*(self: RpcClient, address: string, port: Port) {.async.} = - # TODO: `address` hostname can be resolved to many IP addresses, we are using - # first one, but maybe it would be better to iterate over all IP addresses - # and try to establish connection until it will not be established. - let addresses = resolveTAddress(address, port) - self.transp = await connect(addresses[0]) - self.address = addresses[0] - asyncCheck processData(self) +proc genConnect(procDataName, connectCode: NimNode): NimNode = + result = quote do: + proc `procDataName`[T, A](self: RpcClient[T, A]) {.async.} + + proc connect*[T, A](self: RpcClient[T, A], address: string, port: Port) {.async.} = + # TODO: `address` hostname can be resolved to many IP addresses, we are using + # first one, but maybe it would be better to iterate over all IP addresses + # and try to establish connection until it will not be established. + var + client {.inject.}: RpcClient[T, A] + address {.inject.} = address + port {.inject.} = port + shallowCopy(client, self) + `connectCode` + #let addresses = resolveTAddress(address, port) + #self.transp = await connect(addresses[0]) + #self.address = addresses[0] + asyncCheck `procDataName`[T, A](self) + +macro defineRpcClientTransport*(procDataName: untyped, body: untyped = nil): untyped = + procDataName.expectKind nnkIdent + var + writeCode = quote do: + client.write(value) + readCode = quote do: + client.readLine(defaultMaxRequestLength) + closeCode = quote do: + client.close + connectCode = quote do: + # TODO: Even as a default this is too tied to StreamServer + let addresses = resolveTAddress(address, port) + client.transp = await connect(addresses[0]) + client.address = addresses[0] + + if body != nil: + body.expectKind nnkStmtList + for item in body: + item.expectKind nnkCall + item[0].expectKind nnkIdent + item[1].expectKind nnkStmtList + let + verb = $item[0] + code = item[1] + + case verb.toLowerAscii + of "write": + writeCode = code + of "read": + readCode = code + of "close": + closeCode = code + of "connect": + connectCode = code + else: error("Unknown RPC verb \"" & verb & "\"") + + result = newStmtList() + + let + procData = newIdentNode($procDataName) + result.add(genConnect(procData, connectCode)) + result.add(genCall(writeCode)) + result.add(genProcessData(procData, readCode, closeCode)) + + when defined(nimDumpRpcs): + echo "defineClient:\n", result.repr + +## + +# Default stream server + +defineRpcClientTransport(processStreamData) + +type RpcStreamClient* = RpcClient[StreamTransport, TransportAddress] + +proc newRpcStreamClient*(): RpcStreamClient = + ## Create new server and assign it to addresses ``addresses``. + result = newRpcClient[StreamTransport, TransportAddress]() + +## proc createRpcProc(procName, parameters, callBody: NimNode): NimNode = # parameters come as a tree