From c6be913c6941077fe10a986fdc082d77154ebeed Mon Sep 17 00:00:00 2001 From: jangko Date: Wed, 23 Jun 2021 17:41:31 +0700 Subject: [PATCH] implement json rpc websocket server - both ws and wss mode fixes #76 --- json_rpc/rpcserver.nim | 4 +- json_rpc/servers/websocketserver.nim | 150 +++++++++++++++++++++++++++ 2 files changed, 152 insertions(+), 2 deletions(-) create mode 100644 json_rpc/servers/websocketserver.nim diff --git a/json_rpc/rpcserver.nim b/json_rpc/rpcserver.nim index 81c3c7e..1f9e27d 100644 --- a/json_rpc/rpcserver.nim +++ b/json_rpc/rpcserver.nim @@ -1,3 +1,3 @@ import server -import servers/[socketserver, httpserver] -export server, socketserver, httpserver +import servers/[socketserver, httpserver, websocketserver] +export server, socketserver, httpserver, websocketserver diff --git a/json_rpc/servers/websocketserver.nim b/json_rpc/servers/websocketserver.nim new file mode 100644 index 0000000..bad2300 --- /dev/null +++ b/json_rpc/servers/websocketserver.nim @@ -0,0 +1,150 @@ +import + chronicles, httputils, chronos, ws/ws, + ".."/[errors, server] + +export server + +logScope: + topics = "JSONRPC-WS-SERVER" + +type + RpcWebSocketServer* = ref object of RpcServer + server: StreamServer + wsserver: WSServer + +proc handleRequest(rpc: RpcWebSocketServer, request: HttpRequest) {.async.} = + trace "Handling request:", uri = request.uri.path + trace "Initiating web socket connection." + try: + let server = rpc.wsserver + let ws = await server.handleRequest(request) + if ws.readyState != Open: + error "Failed to open websocket connection" + return + + trace "Websocket handshake completed" + while ws.readyState != ReadyState.Closed: + let recvData = await ws.recv() + trace "Client message: ", size = recvData.len, binary = ws.binary + + if ws.readyState == ReadyState.Closed: + # if session already terminated by peer, + # no need to send response + break + + if recvData.len == 0: + await ws.close( + reason = "cannot process zero length message" + ) + break + + let future = rpc.route(cast[string](recvData)) + yield future + if future.failed: + debug "Internal error, while processing RPC call", + address = $request.uri + await ws.close( + reason = "Internal error, while processing RPC call" + ) + break + + var data = future.read() + trace "RPC result has been sent", address = $request.uri + + await ws.send(data) + + except WebSocketError as exc: + error "WebSocket error:", exception = exc.msg + +proc newRpcWebSocketServer*( + address: TransportAddress, + flags: set[ServerFlags] = {ServerFlags.TcpNoDelay, + ServerFlags.ReuseAddr}): RpcWebSocketServer = + + var server = new(RpcWebSocketServer) + proc processCallback(request: HttpRequest): Future[void] = + handleRequest(server, request) + + server.wsserver = WSServer.new() + server.server = HttpServer.create( + address, + processCallback, + flags + ) + + server + +proc newRpcWebSocketServer*( + host: string, + port: Port, + flags: set[ServerFlags] = {ServerFlags.TcpNoDelay, + ServerFlags.ReuseAddr}): RpcWebSocketServer = + + newRpcWebSocketServer(initTAddress(host, port), flags) + +proc newRpcWebSocketServer*( + address: TransportAddress, + tlsPrivateKey: TLSPrivateKey, + tlsCertificate: TLSCertificate, + flags: set[ServerFlags] = {ServerFlags.TcpNoDelay, + ServerFlags.ReuseAddr}, + tlsFlags: set[TLSFlags] = {}, + tlsMinVersion = TLSVersion.TLS12, + tlsMaxVersion = TLSVersion.TLS12): RpcWebSocketServer = + + var server = new(RpcWebSocketServer) + proc processCallback(request: HttpRequest): Future[void] = + handleRequest(server, request) + + server.wsserver = WSServer.new() + server.server = TlsHttpServer.create( + address, + tlsPrivateKey, + tlsCertificate, + processCallback, + flags, + tlsFlags, + tlsMinVersion, + tlsMaxVersion + ) + + server + +proc newRpcWebSocketServer*( + host: string, + port: Port, + tlsPrivateKey: TLSPrivateKey, + tlsCertificate: TLSCertificate, + flags: set[ServerFlags] = {ServerFlags.TcpNoDelay, + ServerFlags.ReuseAddr}, + tlsFlags: set[TLSFlags] = {}, + tlsMinVersion = TLSVersion.TLS12, + tlsMaxVersion = TLSVersion.TLS12): RpcWebSocketServer = + + newRpcWebSocketServer( + initTAddress(host, port), + tlsPrivateKey, + tlsCertificate, + flags, + tlsFlags, + tlsMinVersion, + tlsMaxVersion + ) + +proc start*(server: RpcWebSocketServer) = + ## Start the RPC server. + notice "WS RPC server started", address = server.server.local + server.server.start() + +proc stop*(server: RpcWebSocketServer) = + ## Stop the RPC server. + notice "WS RPC server stopped", address = server.server.local + server.server.stop() + +proc close*(server: RpcWebSocketServer) = + ## Cleanup resources of RPC server. + server.server.close() + +proc closeWait*(server: RpcWebSocketServer) {.async.} = + ## Cleanup resources of RPC server. + await server.server.closeWait()