Make client usable with different transports

This commit is contained in:
coffeepots 2018-07-12 18:36:40 +01:00
parent 6466b7e778
commit 1ca1847600
6 changed files with 83 additions and 70 deletions

View File

@ -5,43 +5,26 @@ import jsonmarshal
export asyncdispatch2
type
RpcClient*[T, A] = ref object
awaiting: Table[string, Future[Response]]
transport: T
address: A
nextId: int64
ClientId* = int64
RpcClient* = ref object of RootRef#
awaiting*: Table[ClientId, Future[Response]]
nextId: ClientId
Response* = tuple[error: bool, result: JsonNode]
const defaultMaxRequestLength* = 1024 * 128
proc initRpcClient*[T: RpcClient](client: var T) =
client.awaiting = initTable[ClientId, Future[Response]]()
client.nextId = 1
proc newRpcClient*[T, A]: RpcClient[T, A] =
## Creates a new ``RpcClient`` instance.
result = RpcClient[T, A](awaiting: initTable[string, Future[Response]](), nextId: 1)
proc getNextId*(client: RpcClient): ClientId =
result = client.nextId
client.nextId.inc
proc call*(self: RpcClient, name: string,
params: JsonNode): Future[Response] {.async.} =
## Remotely calls the specified RPC method.
let id = $self.nextId
self.nextId.inc
var
value =
$ %{"jsonrpc": %"2.0",
"method": %name,
"params": params,
"id": %id} & "\c\l"
if self.transport.isNil:
var connectStr = ""
raise newException(ValueError, "Transport is not initialised (missing a call to connect?)")
let res = await self.transport.write(value)
# TODO: Add actions when not full packet was send, e.g. disconnect peer.
assert(res == len(value))
# completed by processMessage.
var newFut = newFuture[Response]()
# add to awaiting responses
self.awaiting[id] = newFut
result = await newFut
proc rpcCallNode*(path: string, params: JsonNode, id: ClientId): JsonNode =
%{"jsonrpc": %"2.0",
"method": %path,
"params": params,
"id": %id}
template asyncRaise[T](fut: Future[T], errType: typedesc, msg: string) =
fut.fail(newException(errType, msg))
@ -65,11 +48,11 @@ macro checkGet(node: JsonNode, fieldName: string,
of JObject: result.add(quote do: `n`.getObject)
else: discard
proc processMessage(self: RpcClient, line: string) =
proc processMessage*[T: RpcClient](self: T, line: string) =
# Note: this doesn't use any transport code so doesn't need to be differentiated.
let
node = parseJson(line) # TODO: Check errors
id = checkGet(node, "id", JString)
node = parseJson(line)
id = checkGet(node, "id", JInt)
if not self.awaiting.hasKey(id):
raise newException(ValueError,
@ -91,30 +74,6 @@ proc processMessage(self: RpcClient, line: string) =
self.awaiting[id].fail(newException(ValueError, $errorNode))
self.awaiting.del(id)
proc processData(client: RpcClient) {.async.} =
while true:
var value = await client.transport.readLine(defaultMaxRequestLength)
if value == "":
# transmission ends
client.transport.close
break
client.processMessage(value)
# async loop reconnection and waiting
client.transport = await connect(client.address)
type RpcSocketClient* = RpcClient[StreamTransport, TransportAddress]
proc connect*(client: RpcSocketClient, address: string, port: Port) {.async.} =
let addresses = resolveTAddress(address, port)
client.transport = await connect(addresses[0])
client.address = addresses[0]
asyncCheck processData(client)
proc newRpcSocketClient*(): RpcSocketClient =
## Create new server and assign it to addresses ``addresses``.
result = newRpcClient[StreamTransport, TransportAddress]()
# Signature processing
proc createRpcProc(procName, parameters, callBody: NimNode): NimNode =
@ -140,7 +99,7 @@ proc toJsonArray(parameters: NimNode): NimNode =
items.add(nnkPrefix.newTree(ident"%", curParam))
result = nnkPrefix.newTree(bindSym("%", brForceOpen), items)
proc createRpcFromSig*(rpcDecl: NimNode): NimNode =
proc createRpcFromSig*(clientType, rpcDecl: NimNode): NimNode =
# Each input parameter in the rpc signature is converted
# to json with `%`.
# Return types are then converted back to native Nim types.
@ -161,7 +120,7 @@ proc createRpcFromSig*(rpcDecl: NimNode): NimNode =
customReturnType = returnType != iJsonNode
# insert rpc client as first parameter
parameters.insert(1, nnkIdentDefs.newTree(ident"client", ident"RpcClient",
parameters.insert(1, nnkIdentDefs.newTree(ident"client", ident($clientType),
newEmptyNode()))
let
@ -208,17 +167,17 @@ proc createRpcFromSig*(rpcDecl: NimNode): NimNode =
when defined(nimDumpRpcs):
echo pathStr, ":\n", result.repr
proc processRpcSigs(parsedCode: NimNode): NimNode =
proc processRpcSigs(clientType, parsedCode: NimNode): NimNode =
result = newStmtList()
for line in parsedCode:
if line.kind == nnkProcDef:
var procDef = createRpcFromSig(line)
var procDef = createRpcFromSig(clientType, line)
result.add(procDef)
macro createRpcSigs*(filePath: static[string]): untyped =
macro createRpcSigs*(clientType: untyped, filePath: static[string]): untyped =
## Takes a file of forward declarations in Nim and builds them into RPC
## calls, based on their parameters.
## Inputs are marshalled to json, and results are put into the signature's
## Nim type.
result = processRpcSigs(staticRead($filePath).parseStmt())
result = processRpcSigs(clientType, staticRead($filePath).parseStmt())

View File

@ -0,0 +1,52 @@
import ../client, asyncdispatch2, tables, json
type
RpcSocketClient* = ref object of RpcClient
transport*: StreamTransport
address*: TransportAddress
const defaultMaxRequestLength* = 1024 * 128
proc newRpcSocketClient*: RpcSocketClient =
## Creates a new client instance.
new result
result.initRpcClient()
proc call*(self: RpcSocketClient, name: string,
params: JsonNode): Future[Response] {.async.} =
## Remotely calls the specified RPC method.
let id = self.getNextId()
var
value =
$rpcCallNode(name, params, id) & "\c\l"
if self.transport.isNil:
var connectStr = ""
raise newException(ValueError, "Transport is not initialised (missing a call to connect?)")
let res = await self.transport.write(value)
# TODO: Add actions when not full packet was send, e.g. disconnect peer.
assert(res == len(value))
# completed by processMessage.
var newFut = newFuture[Response]()
# add to awaiting responses
self.awaiting[id] = newFut
result = await newFut
proc processData(client: RpcSocketClient) {.async.} =
while true:
var value = await client.transport.readLine(defaultMaxRequestLength)
if value == "":
# transmission ends
client.transport.close
break
client.processMessage(value)
# async loop reconnection and waiting
client.transport = await connect(client.address)
proc connect*(client: RpcSocketClient, address: string, port: Port) {.async.} =
let addresses = resolveTAddress(address, port)
client.transport = await connect(addresses[0])
client.address = addresses[0]
asyncCheck processData(client)

View File

@ -1,3 +1,5 @@
import json_rpc / client
export client
import
json_rpc / client,
json_rpc / clients / socketclient
export client, socketclient

View File

@ -1,2 +1,2 @@
import json_rpc / server, json_rpc / transports / socket
export server, socket
import json_rpc / server, json_rpc / servers / socketserver
export server, socketserver

View File

@ -14,7 +14,7 @@ var
server.addEthRpcs()
## Generate client convenience marshalling wrappers from forward declarations
createRpcSigs(sourceDir & DirSep & "ethcallsigs.nim")
createRpcSigs(RpcSocketClient, sourceDir & DirSep & "ethcallsigs.nim")
## Create custom RPC with StUint input parameter
server.rpc("rpc.uint256param") do(i: UInt256):