From 69e8be66fb16b7631c60ca0481f178c5ea5cb9c5 Mon Sep 17 00:00:00 2001 From: jangko Date: Wed, 23 Jun 2021 19:08:07 +0700 Subject: [PATCH] add compression support for websocket rpc server-client fixes #62 --- json_rpc/clients/websocketclient.nim | 35 +++++++++++++++------------- json_rpc/servers/websocketserver.nim | 24 ++++++++++++++++--- tests/testserverclient.nim | 23 ++++++++++++++++++ 3 files changed, 63 insertions(+), 19 deletions(-) diff --git a/json_rpc/clients/websocketclient.nim b/json_rpc/clients/websocketclient.nim index 85f45b3..db701e0 100644 --- a/json_rpc/clients/websocketclient.nim +++ b/json_rpc/clients/websocketclient.nim @@ -1,6 +1,7 @@ import std/[strtabs, tables, uri, strutils], pkg/[chronos, ws/ws, chronicles], + ws/extensions/compression/deflate, ../client export client @@ -67,26 +68,28 @@ proc processData(client: RpcWebSocketClient) {.async.} = client.onDisconnect() proc connect*(client: RpcWebSocketClient, uri: string, + compression: bool = false, flags: set[TLSFlags] = { NoVerifyHost, NoVerifyServerName}) {.async.} = - try: - let uri = parseUri(uri) - let secure = uri.scheme == "wss" - let port = parseInt(uri.port) + var ext: seq[ExtFactory] = if compression: + @[deflateFactory()] + else: + @[] + let uri = parseUri(uri) + let secure = uri.scheme == "wss" + let port = parseInt(uri.port) - let ws = await WebSocket.connect( - host = uri.hostname, - port = Port(port), - path = uri.path, - secure=secure, - flags=flags - ) - client.transport = ws - client.uri = uri - - except WebSocketError as exc: - error "WebSocket error", exception = exc.msg + let ws = await WebSocket.connect( + host = uri.hostname, + port = Port(port), + path = uri.path, + secure=secure, + flags=flags, + factories=ext + ) + client.transport = ws + client.uri = uri client.loop = processData(client) diff --git a/json_rpc/servers/websocketserver.nim b/json_rpc/servers/websocketserver.nim index bad2300..5cf1845 100644 --- a/json_rpc/servers/websocketserver.nim +++ b/json_rpc/servers/websocketserver.nim @@ -1,5 +1,6 @@ import chronicles, httputils, chronos, ws/ws, + ws/extensions/compression/deflate, ".."/[errors, server] export server @@ -56,8 +57,17 @@ proc handleRequest(rpc: RpcWebSocketServer, request: HttpRequest) {.async.} = except WebSocketError as exc: error "WebSocket error:", exception = exc.msg +proc initWebsocket(rpc: RpcWebSocketServer, compression: bool) = + let deflateFactory = deflateFactory() + + if compression: + rpc.wsserver = WSServer.new(factories = [deflateFactory]) + else: + rpc.wsserver = WSServer.new() + proc newRpcWebSocketServer*( address: TransportAddress, + compression: bool = false, flags: set[ServerFlags] = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr}): RpcWebSocketServer = @@ -65,7 +75,7 @@ proc newRpcWebSocketServer*( proc processCallback(request: HttpRequest): Future[void] = handleRequest(server, request) - server.wsserver = WSServer.new() + server.initWebsocket(compression) server.server = HttpServer.create( address, processCallback, @@ -77,15 +87,21 @@ proc newRpcWebSocketServer*( proc newRpcWebSocketServer*( host: string, port: Port, + compression: bool = false, flags: set[ServerFlags] = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr}): RpcWebSocketServer = - newRpcWebSocketServer(initTAddress(host, port), flags) + newRpcWebSocketServer( + initTAddress(host, port), + compression, + flags + ) proc newRpcWebSocketServer*( address: TransportAddress, tlsPrivateKey: TLSPrivateKey, tlsCertificate: TLSCertificate, + compression: bool = false, flags: set[ServerFlags] = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr}, tlsFlags: set[TLSFlags] = {}, @@ -96,7 +112,7 @@ proc newRpcWebSocketServer*( proc processCallback(request: HttpRequest): Future[void] = handleRequest(server, request) - server.wsserver = WSServer.new() + server.initWebsocket(compression) server.server = TlsHttpServer.create( address, tlsPrivateKey, @@ -115,6 +131,7 @@ proc newRpcWebSocketServer*( port: Port, tlsPrivateKey: TLSPrivateKey, tlsCertificate: TLSCertificate, + compression: bool = false, flags: set[ServerFlags] = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr}, tlsFlags: set[TLSFlags] = {}, @@ -125,6 +142,7 @@ proc newRpcWebSocketServer*( initTAddress(host, port), tlsPrivateKey, tlsCertificate, + compression, flags, tlsFlags, tlsMinVersion, diff --git a/tests/testserverclient.nim b/tests/testserverclient.nim index ba525c0..fec36f7 100644 --- a/tests/testserverclient.nim +++ b/tests/testserverclient.nim @@ -55,3 +55,26 @@ suite "Websocket Server/Client RPC": srv.stop() waitFor srv.closeWait() + +suite "Websocket Server/Client RPC with Compression": + var srv = newRpcWebSocketServer("127.0.0.1", Port(8545), compression = true) + var client = newRpcWebSocketClient() + + srv.setupServer() + srv.start() + waitFor client.connect("ws://127.0.0.1:8545/", compression = true) + + test "Successful RPC call": + let r = waitFor client.call("myProc", %[%"abc", %[1, 2, 3, 4]]) + check r.getStr == "Hello abc data: [1, 2, 3, 4]" + + test "Missing params": + expect(CatchableError): + discard waitFor client.call("myProc", %[%"abc"]) + + test "Error RPC call": + expect(CatchableError): # The error type wont be translated + discard waitFor client.call("myError", %[%"abc", %[1, 2, 3, 4]]) + + srv.stop() + waitFor srv.closeWait()