nim-json-rpc/json_rpc/clients/websocketclient.nim

102 lines
2.7 KiB
Nim
Raw Normal View History

import
2021-06-23 18:43:02 +07:00
std/[strtabs, tables, uri, strutils],
2021-06-27 13:41:21 +07:00
pkg/[chronos, websock/websock, chronicles],
websock/extensions/compression/deflate,
stew/byteutils,
../client
export client
2021-06-23 18:43:02 +07:00
logScope:
topics = "JSONRPC-WS-CLIENT"
type
RpcWebSocketClient* = ref object of RpcClient
2021-06-23 18:43:02 +07:00
transport*: WSSession
uri*: Uri
loop*: Future[void]
proc new*(T: type RpcWebSocketClient): T =
T()
proc newRpcWebSocketClient*: RpcWebSocketClient =
## Creates a new client instance.
RpcWebSocketClient.new()
method call*(self: RpcWebSocketClient, name: string,
params: JsonNode): Future[Response] {.
async, gcsafe, raises: [Defect, CatchableError].} =
## Remotely calls the specified RPC method.
let id = self.getNextId()
var value = $rpcCallNode(name, params, id) & "\r\n"
if self.transport.isNil:
raise newException(ValueError,
"Transport is not initialised (missing a call to connect?)")
# completed by processMessage.
var newFut = newFuture[Response]()
# add to awaiting responses
self.awaiting[id] = newFut
await self.transport.send(value)
return await newFut
proc processData(client: RpcWebSocketClient) {.async.} =
var error: ref CatchableError
2021-06-23 18:43:02 +07:00
let ws = client.transport
2019-11-22 14:37:54 +02:00
try:
2021-06-23 18:43:02 +07:00
while ws.readystate != ReadyState.Closed:
var value = await ws.recv()
if value.len == 0:
# transmission ends
break
client.processMessage(string.fromBytes(value))
2019-11-22 14:37:54 +02:00
except CatchableError as e:
error = e
2021-06-23 18:43:02 +07:00
await client.transport.close()
2019-11-22 14:37:54 +02:00
client.transport = nil
if client.awaiting.len != 0:
if error.isNil:
error = newException(IOError, "Transport was closed while waiting for response")
for k, v in client.awaiting:
v.fail(error)
client.awaiting.clear()
if not client.onDisconnect.isNil:
client.onDisconnect()
2021-06-23 18:43:02 +07:00
proc connect*(client: RpcWebSocketClient, uri: string,
compression: bool = false,
2021-06-23 18:43:02 +07:00
flags: set[TLSFlags] = {
NoVerifyHost, NoVerifyServerName}) {.async.} =
var ext: seq[ExtFactory] = if compression:
@[deflateFactory()]
else:
@[]
let uri = parseUri(uri)
let secure = uri.scheme == "wss"
let port = parseInt(uri.port)
let ws = await WebSocket.connect(
host = uri.hostname,
port = Port(port),
path = uri.path,
secure=secure,
flags=flags,
factories=ext
)
client.transport = ws
client.uri = uri
2021-06-23 18:43:02 +07:00
client.loop = processData(client)
2019-06-17 19:56:19 +03:00
method close*(client: RpcWebSocketClient) {.async.} =
2021-04-06 21:48:32 +03:00
await client.loop.cancelAndWait()
2019-11-22 14:37:54 +02:00
if not client.transport.isNil:
2021-06-23 18:43:02 +07:00
await client.transport.close()
2021-04-06 21:48:32 +03:00
client.transport = nil