159 lines
4.6 KiB
Nim
159 lines
4.6 KiB
Nim
# json-rpc
|
|
# Copyright (c) 2019-2024 Status Research & Development GmbH
|
|
# Licensed under either of
|
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
# at your option.
|
|
# This file may not be copied, modified, or distributed except according to
|
|
# those terms.
|
|
|
|
{.push raises: [], gcsafe.}
|
|
|
|
import
|
|
std/uri,
|
|
pkg/websock/[websock, extensions/compression/deflate],
|
|
pkg/[chronos, chronos/apps/http/httptable, chronicles],
|
|
stew/byteutils,
|
|
../errors
|
|
|
|
# avoid clash between Json.encode and Base64Pad.encode
|
|
import ../client except encode
|
|
|
|
logScope:
|
|
topics = "JSONRPC-WS-CLIENT"
|
|
|
|
type
|
|
RpcWebSocketClient* = ref object of RpcClient
|
|
transport*: WSSession
|
|
uri*: Uri
|
|
loop*: Future[void]
|
|
getHeaders*: GetJsonRpcRequestHeaders
|
|
|
|
proc new*(
|
|
T: type RpcWebSocketClient, getHeaders: GetJsonRpcRequestHeaders = nil): T =
|
|
T(getHeaders: getHeaders)
|
|
|
|
proc newRpcWebSocketClient*(
|
|
getHeaders: GetJsonRpcRequestHeaders = nil): RpcWebSocketClient =
|
|
## Creates a new client instance.
|
|
RpcWebSocketClient.new(getHeaders)
|
|
|
|
method call*(client: RpcWebSocketClient, name: string,
|
|
params: RequestParamsTx): Future[JsonString] {.async.} =
|
|
## Remotely calls the specified RPC method.
|
|
if client.transport.isNil:
|
|
raise newException(JsonRpcError,
|
|
"Transport is not initialised (missing a call to connect?)")
|
|
|
|
let
|
|
id = client.getNextId()
|
|
reqBody = requestTxEncode(name, params, id) & "\r\n"
|
|
newFut = newFuture[JsonString]() # completed by processMessage
|
|
# add to awaiting responses
|
|
client.awaiting[id] = newFut
|
|
|
|
debug "Sending JSON-RPC request",
|
|
address = $client.uri, len = len(reqBody), name
|
|
|
|
await client.transport.send(reqBody)
|
|
return await newFut
|
|
|
|
method callBatch*(client: RpcWebSocketClient,
|
|
calls: RequestBatchTx): Future[ResponseBatchRx]
|
|
{.async.} =
|
|
if client.transport.isNil:
|
|
raise newException(JsonRpcError,
|
|
"Transport is not initialised (missing a call to connect?)")
|
|
|
|
if client.batchFut.isNil or client.batchFut.finished():
|
|
client.batchFut = newFuture[ResponseBatchRx]()
|
|
|
|
let reqBody = requestBatchEncode(calls) & "\r\n"
|
|
debug "Sending JSON-RPC batch",
|
|
address = $client.uri, len = len(reqBody)
|
|
await client.transport.send(reqBody)
|
|
|
|
return await client.batchFut
|
|
|
|
proc processData(client: RpcWebSocketClient) {.async.} =
|
|
var error: ref CatchableError
|
|
|
|
template processError() =
|
|
for k, v in client.awaiting:
|
|
v.fail(error)
|
|
if client.batchFut.isNil.not and not client.batchFut.completed():
|
|
client.batchFut.fail(error)
|
|
client.awaiting.clear()
|
|
|
|
let ws = client.transport
|
|
try:
|
|
while ws.readyState != ReadyState.Closed:
|
|
var value = await ws.recvMsg(MaxMessageBodyBytes)
|
|
|
|
if value.len == 0:
|
|
# transmission ends
|
|
break
|
|
|
|
let res = client.processMessage(string.fromBytes(value))
|
|
if res.isErr:
|
|
error = newException(JsonRpcError, res.error)
|
|
processError()
|
|
|
|
except CatchableError as e:
|
|
error = e
|
|
|
|
await client.transport.close()
|
|
|
|
client.transport = nil
|
|
|
|
if client.awaiting.len != 0:
|
|
if error.isNil:
|
|
error = newException(IOError, "Transport was closed while waiting for response")
|
|
processError()
|
|
if not client.onDisconnect.isNil:
|
|
client.onDisconnect()
|
|
|
|
proc addExtraHeaders(
|
|
headers: var HttpTable,
|
|
client: RpcWebSocketClient,
|
|
extraHeaders: HttpTable) =
|
|
# Apply client instance overrides
|
|
if client.getHeaders != nil:
|
|
for header in client.getHeaders():
|
|
headers.add(header[0], header[1])
|
|
|
|
# Apply call specific overrides
|
|
for header in extraHeaders.stringItems:
|
|
headers.add(header.key, header.value)
|
|
|
|
# Apply default origin
|
|
discard headers.hasKeyOrPut("Origin", "http://localhost")
|
|
|
|
proc connect*(
|
|
client: RpcWebSocketClient,
|
|
uri: string,
|
|
extraHeaders: HttpTable = default(HttpTable),
|
|
compression = false,
|
|
hooks: seq[Hook] = @[],
|
|
flags: set[TLSFlags] = {}) {.async.} =
|
|
proc headersHook(ctx: Hook, headers: var HttpTable): Result[void, string] =
|
|
headers.addExtraHeaders(client, extraHeaders)
|
|
ok()
|
|
var ext: seq[ExtFactory] = if compression: @[deflateFactory()]
|
|
else: @[]
|
|
let uri = parseUri(uri)
|
|
let ws = await WebSocket.connect(
|
|
uri=uri,
|
|
factories=ext,
|
|
hooks=hooks & Hook(append: headersHook),
|
|
flags=flags)
|
|
client.transport = ws
|
|
client.uri = uri
|
|
client.loop = processData(client)
|
|
|
|
method close*(client: RpcWebSocketClient) {.async.} =
|
|
await client.loop.cancelAndWait()
|
|
if not client.transport.isNil:
|
|
await client.transport.close()
|
|
client.transport = nil
|