Websocket client support and some fixes

This commit is contained in:
Yuriy Glukhov 2019-06-12 16:44:19 +03:00
parent 707567c7ab
commit f19d66b35c
8 changed files with 148 additions and 77 deletions

View File

@ -11,7 +11,9 @@ requires "nim >= 0.17.3",
"stint",
"chronos",
"httputils",
"chronicles"
"chronicles",
"byteutils",
"news"
proc configForTests() =
--hints: off

View File

@ -9,12 +9,14 @@ type
RpcClient* = ref object of RootRef
awaiting*: Table[ClientId, Future[Response]]
nextId: ClientId
methodHandlers: Table[string, proc(j: JsonNode)]
Response* = tuple[error: bool, result: JsonNode]
proc initRpcClient*[T: RpcClient](client: var T) =
client.awaiting = initTable[ClientId, Future[Response]]()
client.nextId = 1
client.methodHandlers = initTable[string, proc(j: JsonNode)]()
proc getNextId*(client: RpcClient): ClientId =
result = client.nextId
@ -23,6 +25,11 @@ proc getNextId*(client: RpcClient): ClientId =
proc rpcCallNode*(path: string, params: JsonNode, id: ClientId): JsonNode =
%{"jsonrpc": %"2.0", "method": %path, "params": params, "id": %id}
method call*(client: RpcClient, name: string,
params: JsonNode): Future[Response] {.async, base.} = discard
method close*(client: RpcClient) {.base.} = discard
template asyncRaise[T](fut: Future[T], errType: typedesc, msg: string) =
fut.fail(newException(errType, msg))
@ -45,32 +52,41 @@ macro checkGet(node: JsonNode, fieldName: string,
of JObject: result.add(quote do: `n`.getObject)
else: discard
proc processMessage*[T: RpcClient](self: T, line: string) =
proc processMessage*(self: RpcClient, line: string) =
# Note: this doesn't use any transport code so doesn't need to be
# differentiated.
let
node = parseJson(line)
id = checkGet(node, "id", JInt)
let node = parseJson(line)
if not self.awaiting.hasKey(id):
raise newException(ValueError,
"Cannot find message id \"" & node["id"].str & "\"")
if "id" in node:
let id = checkGet(node, "id", JInt)
let version = checkGet(node, "jsonrpc", JString)
if version != "2.0":
self.awaiting[id].asyncRaise(ValueError,
"Unsupported version of JSON, expected 2.0, received \"" & version & "\"")
if not self.awaiting.hasKey(id):
raise newException(ValueError,
"Cannot find message id \"" & node["id"].str & "\"")
let errorNode = node{"error"}
if errorNode.isNil or errorNode.kind == JNull:
var res = node{"result"}
if not res.isNil:
self.awaiting[id].complete((false, res))
self.awaiting.del(id)
# TODO: actions on unable find result node
let version = checkGet(node, "jsonrpc", JString)
if version != "2.0":
self.awaiting[id].asyncRaise(ValueError,
"Unsupported version of JSON, expected 2.0, received \"" & version & "\"")
let errorNode = node{"error"}
if errorNode.isNil or errorNode.kind == JNull:
var res = node{"result"}
if not res.isNil:
self.awaiting[id].complete((false, res))
self.awaiting.del(id)
# TODO: actions on unable find result node
else:
self.awaiting[id].fail(newException(ValueError, $errorNode))
self.awaiting.del(id)
elif "method" in node:
# This could be subscription notification
let name = node["method"].getStr()
let handler = self.methodHandlers.getOrDefault(name)
if not handler.isNil:
handler(node{"params"})
else:
self.awaiting[id].fail(newException(ValueError, $errorNode))
self.awaiting.del(id)
raise newException(ValueError, "Invalid jsonrpc message: " & $node)
# Signature processing
@ -173,6 +189,12 @@ proc processRpcSigs(clientType, parsedCode: NimNode): NimNode =
var procDef = createRpcFromSig(clientType, line)
result.add(procDef)
proc setMethodHandler*(cl: RpcClient, name: string, callback: proc(j: JsonNode)) =
cl.methodHandlers[name] = callback
proc delMethodHandler*(cl: RpcClient, name: string) =
cl.methodHandlers.del(name)
macro createRpcSigs*(clientType: untyped, filePath: static[string]): untyped =
## Takes a file of forward declarations in Nim and builds them into RPC
## calls, based on their parameters.

View File

@ -10,7 +10,6 @@ type
httpMethod: HttpMethod
RpcHttpClient* = ref object of RpcClient
transp*: StreamTransport
loop: Future[void]
addresses: seq[TransportAddress]
options: HttpClientOptions
@ -69,13 +68,11 @@ proc validateResponse*(transp: StreamTransport,
if header["Transfer-Encoding"].toLowerAscii() == "chunked":
debug "Chunked encoding is not supported",
address = transp.remoteAddress()
result = false
return
else:
debug "Content body size could not be calculated",
address = transp.remoteAddress()
result = false
return
result = false
return
result = true
@ -116,7 +113,6 @@ proc recvData(transp: StreamTransport): Future[string] {.async.} =
error = true
if error or not transp.validateResponse(header):
await transp.closeWait()
result = ""
return
@ -156,13 +152,12 @@ proc recvData(transp: StreamTransport): Future[string] {.async.} =
error = true
if error:
await transp.closeWait()
result = ""
else:
result = cast[string](buffer)
proc init(opts: var HttpClientOptions) =
opts.httpMethod = MethodGet
opts.httpMethod = MethodPost
proc newRpcHttpClient*(): RpcHttpClient =
## Creates a new HTTP client instance.
@ -176,60 +171,36 @@ proc httpMethod*(client: RpcHttpClient): HttpMethod =
proc httpMethod*(client: RpcHttpClient, m: HttpMethod) =
client.options.httpMethod = m
proc call*(client: RpcHttpClient, name: string,
params: JsonNode, httpMethod: HttpMethod): Future[Response] {.async.} =
method call*(client: RpcHttpClient, name: string,
params: JsonNode): Future[Response] {.async.} =
## Remotely calls the specified RPC method.
let id = client.getNextId()
var value = $rpcCallNode(name, params, id) & "\c\l"
if isNil(client.transp) or client.transp.closed():
raise newException(ValueError,
"Transport is not initialised or already closed")
let res = await client.transp.sendRequest(value, httpMethod)
let transp = await connect(client.addresses[0])
var reqBody = $rpcCallNode(name, params, id)
echo "Sending (", client.httpMethod, "): ", reqBody
let res = await transp.sendRequest(reqBody, client.httpMethod)
if not res:
debug "Failed to send message to RPC server",
address = client.transp.remoteAddress(), msg_len = len(value)
await client.transp.closeWait()
address = transp.remoteAddress(), msg_len = len(reqBody)
transp.close()
raise newException(ValueError, "Transport error")
else:
debug "Message sent to RPC server", address = client.transp.remoteAddress(),
msg_len = len(value)
trace "Message", msg = value
debug "Message sent to RPC server", address = transp.remoteAddress(),
msg_len = len(reqBody)
trace "Message", msg = reqBody
var value = await transp.recvData()
transp.close()
if value.len == 0:
raise newException(ValueError, "Empty response from server")
# completed by processMessage.
var newFut = newFuture[Response]()
# add to awaiting responses
client.awaiting[id] = newFut
client.processMessage(value)
result = await newFut
template call*(client: RpcHttpClient, name: string,
params: JsonNode): untyped =
client.call(name, params, client.httpMethod)
proc processData(client: RpcHttpClient) {.async.} =
while true:
while true:
var value = await client.transp.recvData()
debug "Returned from recvData()", address = client.transp.remoteAddress()
if value == "":
debug "Empty response from RPC server",
address = client.transp.remoteAddress()
break
debug "Received response from RPC server",
address = client.transp.remoteAddress(),
msg_len = len(value)
trace "Message", msg = value
client.processMessage(value)
# async loop reconnection and waiting
try:
client.transp = await connect(client.addresses[0])
except:
debug "Could not establish new connection to RPC server",
address = client.addresses[0]
break
proc connect*(client: RpcHttpClient, address: string, port: Port) {.async.} =
client.addresses = resolveTAddress(address, port)
client.transp = await connect(client.addresses[0])
client.loop = processData(client)

View File

@ -13,7 +13,7 @@ proc newRpcSocketClient*: RpcSocketClient =
new result
result.initRpcClient()
proc call*(self: RpcSocketClient, name: string,
method call*(self: RpcSocketClient, name: string,
params: JsonNode): Future[Response] {.async.} =
## Remotely calls the specified RPC method.
let id = self.getNextId()
@ -49,3 +49,7 @@ proc connect*(client: RpcSocketClient, address: string, port: Port) {.async.} =
client.transport = await connect(addresses[0])
client.address = addresses[0]
client.loop = processData(client)
method close*(client: RpcSocketClient) =
# TODO: Stop the processData loop
client.transport.close()

View File

@ -0,0 +1,54 @@
import ../client, chronos, tables, json
const newsUseChronos = true
include news
type
RpcWebSocketClient* = ref object of RpcClient
transport*: WebSocket
uri*: string
loop*: Future[void]
proc newRpcWebSocketClient*: RpcWebSocketClient =
## Creates a new client instance.
new result
result.initRpcClient()
method call*(self: RpcWebSocketClient, name: string,
params: JsonNode): Future[Response] {.async.} =
## Remotely calls the specified RPC method.
let id = self.getNextId()
var value = $rpcCallNode(name, params, id) & "\c\l"
if self.transport.isNil:
raise newException(ValueError,
"Transport is not initialised (missing a call to connect?)")
# echo "Sent msg: ", value
await self.transport.send(value)
# completed by processMessage.
var newFut = newFuture[Response]()
# add to awaiting responses
self.awaiting[id] = newFut
result = await newFut
proc processData(client: RpcWebSocketClient) {.async.} =
while true:
while true:
var value = await client.transport.receivePacket()
if value == "":
# transmission ends
client.transport.close()
break
client.processMessage(value)
# async loop reconnection and waiting
client.transport = await newWebSocket(client.uri)
proc connect*(client: RpcWebSocketClient, uri: string) {.async.} =
client.transport = await newWebSocket(uri)
client.uri = uri
client.loop = processData(client)
method close*(client: RpcWebSocketClient) =
# TODO: Stop the processData loop
client.transport.close()

View File

@ -1,4 +1,4 @@
import macros, json, options, typetraits
import macros, json, options, strutils, typetraits, byteutils
proc expect*(actual, expected: JsonNodeKind, argName: string) =
if actual != expected: raise newException(ValueError, "Parameter [" & argName & "] expected " & $expected & " but got " & $actual)
@ -36,6 +36,13 @@ proc fromJson(n: JsonNode, argName: string, result: var ref int64)
proc fromJson(n: JsonNode, argName: string, result: var ref int)
proc fromJson[T](n: JsonNode, argName: string, result: var Option[T])
proc parseStringToInt(s: string): int =
## If s starts with '0x' parse as hexadecimal, otherwise parse as decimal.
if s.len > 2 and s[0] == '0' and s[1] in {'x', 'X'}:
result = parseHexInt(s)
else:
result = parseInt(s)
# This can't be forward declared: https://github.com/nim-lang/Nim/issues/7868
proc fromJson[T: enum](n: JsonNode, argName: string, result: var T) =
n.kind.expect(JInt, argName)
@ -62,8 +69,11 @@ proc fromJson(n: JsonNode, argName: string, result: var bool) =
result = n.getBool()
proc fromJson(n: JsonNode, argName: string, result: var int) =
n.kind.expect(JInt, argName)
result = n.getInt()
if n.kind == JString:
result = n.getStr().parseStringToInt()
else:
n.kind.expect(JInt, argName)
result = n.getInt()
proc fromJson[T: ref object](n: JsonNode, argName: string, result: var T) =
n.kind.expect(JObject, argName)
@ -110,6 +120,14 @@ proc fromJson[T](n: JsonNode, argName: string, result: var seq[T]) =
fromJson(n[i], argName, result[i])
proc fromJson[N, T](n: JsonNode, argName: string, result: var array[N, T]) =
when T is byte:
if n.kind == JString:
let s = n.getStr
if s.len >= result.len + 2 and # (2 for 0x prefix)
s[0] == '0' and s[1] in {'x', 'X'}:
hexToByteArray(n.getStr, result)
return
n.kind.expect(JArray, argName)
if n.len > result.len: raise newException(ValueError, "Parameter \"" & argName & "\" item count is too big for array")
for i in 0 ..< n.len:

View File

@ -1,3 +1,3 @@
import client
import clients/[socketclient, httpclient]
export client, socketclient, httpclient
import clients/[socketclient, httpclient, websocketclient]
export client, socketclient, httpclient, websocketclient

View File

@ -66,7 +66,7 @@ proc continuousTest(address: string, port: Port): Future[int] {.async.} =
var r = await client.call("myProc", %[%"abc", %[1, 2, 3, i]])
if r.result.getStr == "Hello abc data: [1, 2, 3, " & $i & "]":
result += 1
client.transp.close()
client.close()
proc customMessage(address: TransportAddress,
data: string,