add compression support for websocket rpc server-client

fixes #62
This commit is contained in:
jangko 2021-06-23 19:08:07 +07:00
parent ccb19734e6
commit 69e8be66fb
No known key found for this signature in database
GPG Key ID: 31702AE10541E6B9
3 changed files with 63 additions and 19 deletions

View File

@ -1,6 +1,7 @@
import
std/[strtabs, tables, uri, strutils],
pkg/[chronos, ws/ws, chronicles],
ws/extensions/compression/deflate,
../client
export client
@ -67,26 +68,28 @@ proc processData(client: RpcWebSocketClient) {.async.} =
client.onDisconnect()
proc connect*(client: RpcWebSocketClient, uri: string,
compression: bool = false,
flags: set[TLSFlags] = {
NoVerifyHost, NoVerifyServerName}) {.async.} =
try:
let uri = parseUri(uri)
let secure = uri.scheme == "wss"
let port = parseInt(uri.port)
var ext: seq[ExtFactory] = if compression:
@[deflateFactory()]
else:
@[]
let uri = parseUri(uri)
let secure = uri.scheme == "wss"
let port = parseInt(uri.port)
let ws = await WebSocket.connect(
host = uri.hostname,
port = Port(port),
path = uri.path,
secure=secure,
flags=flags
)
client.transport = ws
client.uri = uri
except WebSocketError as exc:
error "WebSocket error", exception = exc.msg
let ws = await WebSocket.connect(
host = uri.hostname,
port = Port(port),
path = uri.path,
secure=secure,
flags=flags,
factories=ext
)
client.transport = ws
client.uri = uri
client.loop = processData(client)

View File

@ -1,5 +1,6 @@
import
chronicles, httputils, chronos, ws/ws,
ws/extensions/compression/deflate,
".."/[errors, server]
export server
@ -56,8 +57,17 @@ proc handleRequest(rpc: RpcWebSocketServer, request: HttpRequest) {.async.} =
except WebSocketError as exc:
error "WebSocket error:", exception = exc.msg
proc initWebsocket(rpc: RpcWebSocketServer, compression: bool) =
let deflateFactory = deflateFactory()
if compression:
rpc.wsserver = WSServer.new(factories = [deflateFactory])
else:
rpc.wsserver = WSServer.new()
proc newRpcWebSocketServer*(
address: TransportAddress,
compression: bool = false,
flags: set[ServerFlags] = {ServerFlags.TcpNoDelay,
ServerFlags.ReuseAddr}): RpcWebSocketServer =
@ -65,7 +75,7 @@ proc newRpcWebSocketServer*(
proc processCallback(request: HttpRequest): Future[void] =
handleRequest(server, request)
server.wsserver = WSServer.new()
server.initWebsocket(compression)
server.server = HttpServer.create(
address,
processCallback,
@ -77,15 +87,21 @@ proc newRpcWebSocketServer*(
proc newRpcWebSocketServer*(
host: string,
port: Port,
compression: bool = false,
flags: set[ServerFlags] = {ServerFlags.TcpNoDelay,
ServerFlags.ReuseAddr}): RpcWebSocketServer =
newRpcWebSocketServer(initTAddress(host, port), flags)
newRpcWebSocketServer(
initTAddress(host, port),
compression,
flags
)
proc newRpcWebSocketServer*(
address: TransportAddress,
tlsPrivateKey: TLSPrivateKey,
tlsCertificate: TLSCertificate,
compression: bool = false,
flags: set[ServerFlags] = {ServerFlags.TcpNoDelay,
ServerFlags.ReuseAddr},
tlsFlags: set[TLSFlags] = {},
@ -96,7 +112,7 @@ proc newRpcWebSocketServer*(
proc processCallback(request: HttpRequest): Future[void] =
handleRequest(server, request)
server.wsserver = WSServer.new()
server.initWebsocket(compression)
server.server = TlsHttpServer.create(
address,
tlsPrivateKey,
@ -115,6 +131,7 @@ proc newRpcWebSocketServer*(
port: Port,
tlsPrivateKey: TLSPrivateKey,
tlsCertificate: TLSCertificate,
compression: bool = false,
flags: set[ServerFlags] = {ServerFlags.TcpNoDelay,
ServerFlags.ReuseAddr},
tlsFlags: set[TLSFlags] = {},
@ -125,6 +142,7 @@ proc newRpcWebSocketServer*(
initTAddress(host, port),
tlsPrivateKey,
tlsCertificate,
compression,
flags,
tlsFlags,
tlsMinVersion,

View File

@ -55,3 +55,26 @@ suite "Websocket Server/Client RPC":
srv.stop()
waitFor srv.closeWait()
suite "Websocket Server/Client RPC with Compression":
var srv = newRpcWebSocketServer("127.0.0.1", Port(8545), compression = true)
var client = newRpcWebSocketClient()
srv.setupServer()
srv.start()
waitFor client.connect("ws://127.0.0.1:8545/", compression = true)
test "Successful RPC call":
let r = waitFor client.call("myProc", %[%"abc", %[1, 2, 3, 4]])
check r.getStr == "Hello abc data: [1, 2, 3, 4]"
test "Missing params":
expect(CatchableError):
discard waitFor client.call("myProc", %[%"abc"])
test "Error RPC call":
expect(CatchableError): # The error type wont be translated
discard waitFor client.call("myError", %[%"abc", %[1, 2, 3, 4]])
srv.stop()
waitFor srv.closeWait()