From 35e6d7e6ff1e89b44d2d3fdd8a0a16ac9d2195cf Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Mon, 29 Nov 2021 13:17:41 +0200 Subject: [PATCH] Restore the support for using the news package --- json_rpc.nimble | 10 ++- json_rpc/clients/config.nim | 7 ++ json_rpc/clients/websocketclient.nim | 125 ++++++++++++++++++--------- json_rpc/rpcproxy.nim | 14 +-- tests/all.nim | 14 ++- tests/testserverclient.nim | 11 ++- 6 files changed, 129 insertions(+), 52 deletions(-) create mode 100644 json_rpc/clients/config.nim diff --git a/json_rpc.nimble b/json_rpc.nimble index daf3e32..6994e6c 100644 --- a/json_rpc.nimble +++ b/json_rpc.nimble @@ -13,7 +13,8 @@ requires "nim >= 1.2.0", "chronos", "httputils", "chronicles#ba2817f1", - "https://github.com/status-im/nim-websock", + "https://github.com/status-im/news#status", + "websock", "json_serialization" proc getLang(): string = @@ -29,6 +30,11 @@ proc buildBinary(name: string, srcDir = "./", params = "", cmdParams = "", lang task test, "run tests": buildBinary "all", "tests/", - params = "-r -f --hints:off --debuginfo --path:'.' --threads:on -d:chronicles_log_level=ERROR", + params = "-r -f --hints:off --debuginfo --path:'.' --threads:on -d:chronicles_log_level=ERROR -d:json_rpc_websocket_package=websock", + cmdParams = "", + lang = getLang() + + buildBinary "all", "tests/", + params = "-r -f --hints:off --debuginfo --path:'.' --threads:on -d:chronicles_log_level=ERROR -d:json_rpc_websocket_package=news", cmdParams = "", lang = getLang() diff --git a/json_rpc/clients/config.nim b/json_rpc/clients/config.nim new file mode 100644 index 0000000..1f6cb37 --- /dev/null +++ b/json_rpc/clients/config.nim @@ -0,0 +1,7 @@ +const + json_rpc_websocket_package {.strdefine.} = "websock" + useNews* = json_rpc_websocket_package == "news" + +when json_rpc_websocket_package notin ["websock", "news"]: + {.fatal: "json_rpc_websocket_package should be set to either 'websock' or 'news'".} + diff --git a/json_rpc/clients/websocketclient.nim b/json_rpc/clients/websocketclient.nim index 5dfb1e4..13f7424 100644 --- a/json_rpc/clients/websocketclient.nim +++ b/json_rpc/clients/websocketclient.nim @@ -1,20 +1,33 @@ import - std/[strtabs, tables, uri, strutils], - pkg/[chronos, websock/websock, chronicles], - websock/extensions/compression/deflate, + std/[strtabs, tables], + pkg/[chronos, chronicles], stew/byteutils, - ../client + ../client, ./config export client logScope: topics = "JSONRPC-WS-CLIENT" -type - RpcWebSocketClient* = ref object of RpcClient - transport*: WSSession - uri*: Uri - loop*: Future[void] +when useNews: + const newsUseChronos = true + include pkg/news + + type + RpcWebSocketClient* = ref object of RpcClient + transport*: WebSocket + uri*: string + loop*: Future[void] + +else: + import std/[uri, strutils] + import pkg/websock/[websock, extensions/compression/deflate] + + type + RpcWebSocketClient* = ref object of RpcClient + transport*: WSSession + uri*: Uri + loop*: Future[void] proc new*(T: type RpcWebSocketClient): T = T() @@ -43,20 +56,36 @@ method call*(self: RpcWebSocketClient, name: string, proc processData(client: RpcWebSocketClient) {.async.} = var error: ref CatchableError - let ws = client.transport - try: - while ws.readystate != ReadyState.Closed: - var value = await ws.recvMsg() - if value.len == 0: - # transmission ends - break + when useNews: + try: + while true: + var value = await client.transport.receiveString() + if value == "": + # transmission ends + break - client.processMessage(string.fromBytes(value)) - except CatchableError as e: - error = e + client.processMessage(value) + except CatchableError as e: + error = e + + client.transport.close() + else: + let ws = client.transport + try: + while ws.readystate != ReadyState.Closed: + var value = await ws.recvMsg() + + if value.len == 0: + # transmission ends + break + + client.processMessage(string.fromBytes(value)) + except CatchableError as e: + error = e + + await client.transport.close() - await client.transport.close() client.transport = nil if client.awaiting.len != 0: @@ -68,28 +97,46 @@ proc processData(client: RpcWebSocketClient) {.async.} = if not client.onDisconnect.isNil: client.onDisconnect() -proc connect*(client: RpcWebSocketClient, uri: string, - compression: bool = false, - flags: set[TLSFlags] = { - NoVerifyHost, NoVerifyServerName}) {.async.} = +when useNews: + proc connect*( + client: RpcWebSocketClient, + uri: string, + headers: StringTableRef = nil, + compression = false) {.async.} = + if compression: + warn "compression is not supported with the news back-end" + var headers = headers + if headers.isNil: + headers = newStringTable({"Origin": "http://localhost"}) + elif "Origin" notin headers: + # TODO: This is a hack, because the table might be case sensitive. Ideally strtabs module has + # to be extended with case insensitive accessors. + headers["Origin"] = "http://localhost" + client.transport = await newWebSocket(uri, headers) + client.uri = uri +else: + proc connect*( + client: RpcWebSocketClient, uri: string, + compression = false, + flags: set[TLSFlags] = {NoVerifyHost, NoVerifyServerName}) {.async.} = + var ext: seq[ExtFactory] = if compression: @[deflateFactory()] + else: @[] + let uri = parseUri(uri) + let ws = await WebSocket.connect( + uri=uri, + factories=ext, + flags=flags + ) + client.transport = ws + client.uri = uri - var ext: seq[ExtFactory] = if compression: - @[deflateFactory()] - else: - @[] - let uri = parseUri(uri) - let ws = await WebSocket.connect( - uri=uri, - factories=ext, - flags=flags - ) - client.transport = ws - client.uri = uri - - client.loop = processData(client) + client.loop = processData(client) method close*(client: RpcWebSocketClient) {.async.} = await client.loop.cancelAndWait() if not client.transport.isNil: - await client.transport.close() + when useNews: + client.transport.close() + else: + await client.transport.close() client.transport = nil diff --git a/json_rpc/rpcproxy.nim b/json_rpc/rpcproxy.nim index ec253ba..f22b68e 100644 --- a/json_rpc/rpcproxy.nim +++ b/json_rpc/rpcproxy.nim @@ -42,7 +42,7 @@ proc getWebSocketClientConfig*( flags: set[TLSFlags] = { NoVerifyHost, NoVerifyServerName}): ClientConfig = ClientConfig(kind: WebSocket, wsUri: uri, compression: compression, flags: flags) - + proc proxyCall(client: RpcClient, name: string): RpcProc = return proc (params: JsonNode): Future[StringOfJson] {.async.} = let res = await client.call(name, params) @@ -63,14 +63,14 @@ proc new*(T: type RpcProxy, server: RpcHttpServer, cfg: ClientConfig): T = of WebSocket: let client = newRpcWebSocketClient() return T( - rpcHttpServer: server, - kind: WebSocket, - wsUri: cfg.wsUri, - webSocketClient: client, - compression: cfg.compression, + rpcHttpServer: server, + kind: WebSocket, + wsUri: cfg.wsUri, + webSocketClient: client, + compression: cfg.compression, flags: cfg.flags ) - + proc new*(T: type RpcProxy, listenAddresses: openArray[TransportAddress], cfg: ClientConfig): T {.raises: [Defect, CatchableError].} = RpcProxy.new(newRpcHttpServer(listenAddresses, RpcRouter.init()), cfg) diff --git a/tests/all.nim b/tests/all.nim index eb0270d..215aee7 100644 --- a/tests/all.nim +++ b/tests/all.nim @@ -1,4 +1,16 @@ {. warning[UnusedImport]:off .} import - testrpcmacro, testserverclient, testethcalls, testhttp, testproxy + ../json_rpc/clients/config + +import + testrpcmacro, testethcalls, testhttp + +when not useNews: + # TODO The websock server doesn't interop properly + # with the news client at the moment + import testserverclient + +when not useNews: + # The proxy implementation is based on websock + import testproxy diff --git a/tests/testserverclient.nim b/tests/testserverclient.nim index fec36f7..4845a27 100644 --- a/tests/testserverclient.nim +++ b/tests/testserverclient.nim @@ -1,6 +1,9 @@ import unittest, json, chronicles, - ../json_rpc/[rpcclient, rpcserver] + ../json_rpc/[rpcclient, rpcserver, clients/config] + +const + compressionSupported = useNews # Create RPC on server proc setupServer*(srv: RpcServer) = @@ -57,12 +60,14 @@ suite "Websocket Server/Client RPC": waitFor srv.closeWait() suite "Websocket Server/Client RPC with Compression": - var srv = newRpcWebSocketServer("127.0.0.1", Port(8545), compression = true) + var srv = newRpcWebSocketServer("127.0.0.1", Port(8545), + compression = compressionSupported) var client = newRpcWebSocketClient() srv.setupServer() srv.start() - waitFor client.connect("ws://127.0.0.1:8545/", compression = true) + waitFor client.connect("ws://127.0.0.1:8545/", + compression = compressionSupported) test "Successful RPC call": let r = waitFor client.call("myProc", %[%"abc", %[1, 2, 3, 4]])