nim-json-rpc/json_rpc/clients/websocketclientimpl.nim

159 lines
4.6 KiB
Nim

# json-rpc
# Copyright (c) 2019-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [], gcsafe.}
import
std/uri,
pkg/websock/[websock, extensions/compression/deflate],
pkg/[chronos, chronos/apps/http/httptable, chronicles],
stew/byteutils,
../errors
# avoid clash between Json.encode and Base64Pad.encode
import ../client except encode
logScope:
topics = "JSONRPC-WS-CLIENT"
type
RpcWebSocketClient* = ref object of RpcClient
transport*: WSSession
uri*: Uri
loop*: Future[void]
getHeaders*: GetJsonRpcRequestHeaders
proc new*(
T: type RpcWebSocketClient, getHeaders: GetJsonRpcRequestHeaders = nil): T =
T(getHeaders: getHeaders)
proc newRpcWebSocketClient*(
getHeaders: GetJsonRpcRequestHeaders = nil): RpcWebSocketClient =
## Creates a new client instance.
RpcWebSocketClient.new(getHeaders)
method call*(client: RpcWebSocketClient, name: string,
params: RequestParamsTx): Future[JsonString] {.async.} =
## Remotely calls the specified RPC method.
if client.transport.isNil:
raise newException(JsonRpcError,
"Transport is not initialised (missing a call to connect?)")
let
id = client.getNextId()
reqBody = requestTxEncode(name, params, id) & "\r\n"
newFut = newFuture[JsonString]() # completed by processMessage
# add to awaiting responses
client.awaiting[id] = newFut
debug "Sending JSON-RPC request",
address = $client.uri, len = len(reqBody), name
await client.transport.send(reqBody)
return await newFut
method callBatch*(client: RpcWebSocketClient,
calls: RequestBatchTx): Future[ResponseBatchRx]
{.async.} =
if client.transport.isNil:
raise newException(JsonRpcError,
"Transport is not initialised (missing a call to connect?)")
if client.batchFut.isNil or client.batchFut.finished():
client.batchFut = newFuture[ResponseBatchRx]()
let reqBody = requestBatchEncode(calls) & "\r\n"
debug "Sending JSON-RPC batch",
address = $client.uri, len = len(reqBody)
await client.transport.send(reqBody)
return await client.batchFut
proc processData(client: RpcWebSocketClient) {.async.} =
var error: ref CatchableError
template processError() =
for k, v in client.awaiting:
v.fail(error)
if client.batchFut.isNil.not and not client.batchFut.completed():
client.batchFut.fail(error)
client.awaiting.clear()
let ws = client.transport
try:
while ws.readyState != ReadyState.Closed:
var value = await ws.recvMsg(MaxMessageBodyBytes)
if value.len == 0:
# transmission ends
break
let res = client.processMessage(string.fromBytes(value))
if res.isErr:
error = newException(JsonRpcError, res.error)
processError()
except CatchableError as e:
error = e
await client.transport.close()
client.transport = nil
if client.awaiting.len != 0:
if error.isNil:
error = newException(IOError, "Transport was closed while waiting for response")
processError()
if not client.onDisconnect.isNil:
client.onDisconnect()
proc addExtraHeaders(
headers: var HttpTable,
client: RpcWebSocketClient,
extraHeaders: HttpTable) =
# Apply client instance overrides
if client.getHeaders != nil:
for header in client.getHeaders():
headers.add(header[0], header[1])
# Apply call specific overrides
for header in extraHeaders.stringItems:
headers.add(header.key, header.value)
# Apply default origin
discard headers.hasKeyOrPut("Origin", "http://localhost")
proc connect*(
client: RpcWebSocketClient,
uri: string,
extraHeaders: HttpTable = default(HttpTable),
compression = false,
hooks: seq[Hook] = @[],
flags: set[TLSFlags] = {}) {.async.} =
proc headersHook(ctx: Hook, headers: var HttpTable): Result[void, string] =
headers.addExtraHeaders(client, extraHeaders)
ok()
var ext: seq[ExtFactory] = if compression: @[deflateFactory()]
else: @[]
let uri = parseUri(uri)
let ws = await WebSocket.connect(
uri=uri,
factories=ext,
hooks=hooks & Hook(append: headersHook),
flags=flags)
client.transport = ws
client.uri = uri
client.loop = processData(client)
method close*(client: RpcWebSocketClient) {.async.} =
await client.loop.cancelAndWait()
if not client.transport.isNil:
await client.transport.close()
client.transport = nil