implement json rpc websocket server

- both ws and wss mode

fixes #76
This commit is contained in:
jangko 2021-06-23 17:41:31 +07:00
parent a79f10f10f
commit c6be913c69
No known key found for this signature in database
GPG Key ID: 31702AE10541E6B9
2 changed files with 152 additions and 2 deletions

View File

@ -1,3 +1,3 @@
import server
import servers/[socketserver, httpserver]
export server, socketserver, httpserver
import servers/[socketserver, httpserver, websocketserver]
export server, socketserver, httpserver, websocketserver

View File

@ -0,0 +1,150 @@
import
chronicles, httputils, chronos, ws/ws,
".."/[errors, server]
export server
logScope:
topics = "JSONRPC-WS-SERVER"
type
RpcWebSocketServer* = ref object of RpcServer
server: StreamServer
wsserver: WSServer
proc handleRequest(rpc: RpcWebSocketServer, request: HttpRequest) {.async.} =
trace "Handling request:", uri = request.uri.path
trace "Initiating web socket connection."
try:
let server = rpc.wsserver
let ws = await server.handleRequest(request)
if ws.readyState != Open:
error "Failed to open websocket connection"
return
trace "Websocket handshake completed"
while ws.readyState != ReadyState.Closed:
let recvData = await ws.recv()
trace "Client message: ", size = recvData.len, binary = ws.binary
if ws.readyState == ReadyState.Closed:
# if session already terminated by peer,
# no need to send response
break
if recvData.len == 0:
await ws.close(
reason = "cannot process zero length message"
)
break
let future = rpc.route(cast[string](recvData))
yield future
if future.failed:
debug "Internal error, while processing RPC call",
address = $request.uri
await ws.close(
reason = "Internal error, while processing RPC call"
)
break
var data = future.read()
trace "RPC result has been sent", address = $request.uri
await ws.send(data)
except WebSocketError as exc:
error "WebSocket error:", exception = exc.msg
proc newRpcWebSocketServer*(
address: TransportAddress,
flags: set[ServerFlags] = {ServerFlags.TcpNoDelay,
ServerFlags.ReuseAddr}): RpcWebSocketServer =
var server = new(RpcWebSocketServer)
proc processCallback(request: HttpRequest): Future[void] =
handleRequest(server, request)
server.wsserver = WSServer.new()
server.server = HttpServer.create(
address,
processCallback,
flags
)
server
proc newRpcWebSocketServer*(
host: string,
port: Port,
flags: set[ServerFlags] = {ServerFlags.TcpNoDelay,
ServerFlags.ReuseAddr}): RpcWebSocketServer =
newRpcWebSocketServer(initTAddress(host, port), flags)
proc newRpcWebSocketServer*(
address: TransportAddress,
tlsPrivateKey: TLSPrivateKey,
tlsCertificate: TLSCertificate,
flags: set[ServerFlags] = {ServerFlags.TcpNoDelay,
ServerFlags.ReuseAddr},
tlsFlags: set[TLSFlags] = {},
tlsMinVersion = TLSVersion.TLS12,
tlsMaxVersion = TLSVersion.TLS12): RpcWebSocketServer =
var server = new(RpcWebSocketServer)
proc processCallback(request: HttpRequest): Future[void] =
handleRequest(server, request)
server.wsserver = WSServer.new()
server.server = TlsHttpServer.create(
address,
tlsPrivateKey,
tlsCertificate,
processCallback,
flags,
tlsFlags,
tlsMinVersion,
tlsMaxVersion
)
server
proc newRpcWebSocketServer*(
host: string,
port: Port,
tlsPrivateKey: TLSPrivateKey,
tlsCertificate: TLSCertificate,
flags: set[ServerFlags] = {ServerFlags.TcpNoDelay,
ServerFlags.ReuseAddr},
tlsFlags: set[TLSFlags] = {},
tlsMinVersion = TLSVersion.TLS12,
tlsMaxVersion = TLSVersion.TLS12): RpcWebSocketServer =
newRpcWebSocketServer(
initTAddress(host, port),
tlsPrivateKey,
tlsCertificate,
flags,
tlsFlags,
tlsMinVersion,
tlsMaxVersion
)
proc start*(server: RpcWebSocketServer) =
## Start the RPC server.
notice "WS RPC server started", address = server.server.local
server.server.start()
proc stop*(server: RpcWebSocketServer) =
## Stop the RPC server.
notice "WS RPC server stopped", address = server.server.local
server.server.stop()
proc close*(server: RpcWebSocketServer) =
## Cleanup resources of RPC server.
server.server.close()
proc closeWait*(server: RpcWebSocketServer) {.async.} =
## Cleanup resources of RPC server.
await server.server.closeWait()