mirror of
https://github.com/logos-storage/nim-json-rpc.git
synced 2026-01-04 06:33:10 +00:00
add support for websocket in proxy (#112)
This commit is contained in:
parent
318949a401
commit
eda5e8554f
@ -1,54 +1,106 @@
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
import
|
||||
pkg/websock/websock,
|
||||
./servers/[httpserver],
|
||||
./clients/[httpclient]
|
||||
./clients/[httpclient, websocketclient]
|
||||
|
||||
type RpcHttpProxy* = ref object of RootRef
|
||||
rpcHttpClient*: RpcHttpClient
|
||||
rpcHttpServer*: RpcHttpServer
|
||||
type
|
||||
ClientKind* = enum
|
||||
Http,
|
||||
WebSocket
|
||||
|
||||
proc proxyCall(client: RpcHttpClient, name: string): RpcProc =
|
||||
ClientConfig = object
|
||||
case kind: ClientKind
|
||||
of Http:
|
||||
httpUri: string
|
||||
of WebSocket:
|
||||
wsUri: string
|
||||
compression: bool
|
||||
flags: set[TLSFlags]
|
||||
|
||||
RpcProxy* = ref object of RootRef
|
||||
rpcHttpServer*: RpcHttpServer
|
||||
case kind*: ClientKind
|
||||
of Http:
|
||||
httpUri*: string
|
||||
httpClient*: RpcHttpClient
|
||||
of WebSocket:
|
||||
wsUri*: string
|
||||
webSocketClient*: RpcWebSocketClient
|
||||
compression*: bool
|
||||
flags*: set[TLSFlags]
|
||||
|
||||
# TODO Add validations that provided uri-s are correct https/wss uri and retrun
|
||||
# Result[string, ClientConfig]
|
||||
proc getHttpClientConfig*(uri: string): ClientConfig =
|
||||
ClientConfig(kind: Http, httpUri: uri)
|
||||
|
||||
proc getWebSocketClientConfig*(
|
||||
uri: string,
|
||||
compression: bool = false,
|
||||
flags: set[TLSFlags] = {
|
||||
NoVerifyHost, NoVerifyServerName}): ClientConfig =
|
||||
ClientConfig(kind: WebSocket, wsUri: uri, compression: compression, flags: flags)
|
||||
|
||||
proc proxyCall(client: RpcClient, name: string): RpcProc =
|
||||
return proc (params: JsonNode): Future[StringOfJson] {.async.} =
|
||||
let res = await client.call(name, params)
|
||||
return StringOfJson($res)
|
||||
|
||||
proc new*(T: type RpcHttpProxy, server: RpcHttpServer): T =
|
||||
let client = newRpcHttpClient()
|
||||
T(rpcHttpClient: client, rpcHttpServer: server)
|
||||
proc getClient(proxy: RpcProxy): RpcClient =
|
||||
case proxy.kind
|
||||
of Http:
|
||||
proxy.httpClient
|
||||
of WebSocket:
|
||||
proxy.webSocketClient
|
||||
|
||||
proc new*(T: type RpcProxy, server: RpcHttpServer, cfg: ClientConfig): T =
|
||||
case cfg.kind
|
||||
of Http:
|
||||
let client = newRpcHttpClient()
|
||||
return T(rpcHttpServer: server, kind: Http, httpUri: cfg.httpUri, httpClient: client)
|
||||
of WebSocket:
|
||||
let client = newRpcWebSocketClient()
|
||||
return T(
|
||||
rpcHttpServer: server,
|
||||
kind: WebSocket,
|
||||
wsUri: cfg.wsUri,
|
||||
webSocketClient: client,
|
||||
compression: cfg.compression,
|
||||
flags: cfg.flags
|
||||
)
|
||||
|
||||
proc new*(T: type RpcHttpProxy, listenAddresses: openArray[TransportAddress]): T {.raises: [Defect, CatchableError].} =
|
||||
RpcHttpProxy.new(newRpcHttpServer(listenAddresses, RpcRouter.init()))
|
||||
proc new*(T: type RpcProxy, listenAddresses: openArray[TransportAddress], cfg: ClientConfig): T {.raises: [Defect, CatchableError].} =
|
||||
RpcProxy.new(newRpcHttpServer(listenAddresses, RpcRouter.init()), cfg)
|
||||
|
||||
proc new*(T: type RpcHttpProxy, listenAddresses: openArray[string]): T {.raises: [Defect, CatchableError].} =
|
||||
RpcHttpProxy.new(newRpcHttpServer(listenAddresses, RpcRouter.init()))
|
||||
proc new*(T: type RpcProxy, listenAddresses: openArray[string], cfg: ClientConfig): T {.raises: [Defect, CatchableError].} =
|
||||
RpcProxy.new(newRpcHttpServer(listenAddresses, RpcRouter.init()), cfg)
|
||||
|
||||
proc newRpcHttpProxy*(listenAddresses: openArray[string]): RpcHttpProxy {.raises: [Defect, CatchableError].} =
|
||||
RpcHttpProxy.new(listenAddresses)
|
||||
proc connectToProxy(proxy: RpcProxy): Future[void] =
|
||||
case proxy.kind
|
||||
of Http:
|
||||
return proxy.httpClient.connect(proxy.httpUri)
|
||||
of WebSocket:
|
||||
return proxy.webSocketClient.connect(proxy.wsUri, proxy.compression, proxy.flags)
|
||||
|
||||
proc newRpcHttpProxy*(listenAddresses: openArray[TransportAddress]): RpcHttpProxy {.raises: [Defect, CatchableError].} =
|
||||
RpcHttpProxy.new(listenAddresses)
|
||||
|
||||
proc start*(proxy:RpcHttpProxy, proxyServerUrl: string) {.async.} =
|
||||
proc start*(proxy: RpcProxy) {.async.} =
|
||||
proxy.rpcHttpServer.start()
|
||||
await proxy.rpcHttpClient.connect(proxyServerUrl)
|
||||
await proxy.connectToProxy()
|
||||
|
||||
proc start*(proxy:RpcHttpProxy, proxyServerAddress: string, proxyServerPort: Port) {.async.} =
|
||||
proxy.rpcHttpServer.start()
|
||||
await proxy.rpcHttpClient.connect(proxyServerAddress, proxyServerPort)
|
||||
|
||||
template rpc*(server: RpcHttpProxy, path: string, body: untyped): untyped =
|
||||
template rpc*(server: RpcProxy, path: string, body: untyped): untyped =
|
||||
server.rpcHttpServer.rpc(path, body)
|
||||
|
||||
proc registerProxyMethod*(proxy: var RpcHttpProxy, methodName: string) =
|
||||
proc registerProxyMethod*(proxy: var RpcProxy, methodName: string) =
|
||||
try:
|
||||
proxy.rpcHttpServer.register(methodName, proxyCall(proxy.rpcHttpClient, methodName))
|
||||
proxy.rpcHttpServer.register(methodName, proxyCall(proxy.getClient(), methodName))
|
||||
except CatchableError as err:
|
||||
# Adding proc type to table gives invalid exception tracking, see Nim bug: https://github.com/nim-lang/Nim/issues/18376
|
||||
raiseAssert err.msg
|
||||
|
||||
proc stop*(rpcHttpProxy: RpcHttpProxy) {.raises: [Defect, CatchableError].} =
|
||||
rpcHttpProxy.rpcHttpServer.stop()
|
||||
proc stop*(proxy: RpcProxy) {.async.} =
|
||||
await proxy.getClient().close()
|
||||
proxy.rpcHttpServer.stop()
|
||||
|
||||
proc closeWait*(rpcHttpProxy: RpcHttpProxy) {.async.} =
|
||||
await rpcHttpProxy.rpcHttpServer.closeWait()
|
||||
proc closeWait*(proxy: RpcProxy) {.async.} =
|
||||
await proxy.rpcHttpServer.closeWait()
|
||||
|
||||
@ -2,31 +2,31 @@ import
|
||||
unittest, json, chronicles,
|
||||
../json_rpc/[rpcclient, rpcserver, rpcproxy]
|
||||
|
||||
let srvAddress = "localhost:8545"
|
||||
var srv = newRpcHttpServer([srvAddress])
|
||||
|
||||
let srvAddress = initTAddress("127.0.0.1", Port(8545))
|
||||
let proxySrvAddress = "localhost:8546"
|
||||
var proxy = newRpcHttpProxy([proxySrvAddress])
|
||||
let proxySrvAddressForClient = "http://"&proxySrvAddress
|
||||
|
||||
template registerMethods(srv: RpcServer, proxy: RpcProxy) =
|
||||
srv.rpc("myProc") do(input: string, data: array[0..3, int]):
|
||||
return %("Hello " & input & " data: " & $data)
|
||||
# Create RPC on proxy server
|
||||
proxy.registerProxyMethod("myProc")
|
||||
|
||||
var client = newRpcHttpClient()
|
||||
let duplicatedProcedureName = "duplicated"
|
||||
# Create standard handler on server
|
||||
proxy.rpc("myProc1") do(input: string, data: array[0..3, int]):
|
||||
return %("Hello " & input & " data: " & $data)
|
||||
|
||||
# Create RPC on server
|
||||
srv.rpc("myProc") do(input: string, data: array[0..3, int]):
|
||||
return %("Hello " & input & " data: " & $data)
|
||||
suite "Proxy RPC through http":
|
||||
var srv = newRpcHttpServer([srvAddress])
|
||||
var proxy = RpcProxy.new([proxySrvAddress], getHttpClientConfig("http://127.0.0.1:8545"))
|
||||
var client = newRpcHttpClient()
|
||||
|
||||
# Create RPC on proxy server
|
||||
proxy.registerProxyMethod("myProc")
|
||||
registerMethods(srv, proxy)
|
||||
|
||||
# Create standard handler on server
|
||||
proxy.rpc("myProc1") do(input: string, data: array[0..3, int]):
|
||||
return %("Hello " & input & " data: " & $data)
|
||||
srv.start()
|
||||
waitFor proxy.start()
|
||||
waitFor client.connect(proxySrvAddressForClient)
|
||||
|
||||
srv.start()
|
||||
waitFor proxy.start("localhost", Port(8545))
|
||||
waitFor client.connect("localhost", Port(8546))
|
||||
|
||||
suite "Proxy RPC":
|
||||
test "Successful RPC call thorugh proxy":
|
||||
let r = waitFor client.call("myProc", %[%"abc", %[1, 2, 3, 4]])
|
||||
check r.getStr == "Hello abc data: [1, 2, 3, 4]"
|
||||
@ -40,8 +40,36 @@ suite "Proxy RPC":
|
||||
expect(CatchableError):
|
||||
discard waitFor client.call("missingMethod", %[%"abc"])
|
||||
|
||||
srv.stop()
|
||||
waitFor srv.closeWait()
|
||||
waitFor proxy.stop()
|
||||
waitFor proxy.closeWait()
|
||||
|
||||
srv.stop()
|
||||
waitFor srv.closeWait()
|
||||
proxy.stop()
|
||||
waitFor proxy.closeWait()
|
||||
suite "Proxy RPC through websockets":
|
||||
var srv = newRpcWebSocketServer(srvAddress)
|
||||
var proxy = RpcProxy.new([proxySrvAddress], getWebSocketClientConfig("ws://127.0.0.1:8545"))
|
||||
var client = newRpcHttpClient()
|
||||
|
||||
registerMethods(srv, proxy)
|
||||
|
||||
srv.start()
|
||||
waitFor proxy.start()
|
||||
waitFor client.connect(proxySrvAddressForClient)
|
||||
|
||||
test "Successful RPC call thorugh proxy":
|
||||
let r = waitFor client.call("myProc", %[%"abc", %[1, 2, 3, 4]])
|
||||
check r.getStr == "Hello abc data: [1, 2, 3, 4]"
|
||||
test "Successful RPC call no proxy":
|
||||
let r = waitFor client.call("myProc1", %[%"abc", %[1, 2, 3, 4]])
|
||||
check r.getStr == "Hello abc data: [1, 2, 3, 4]"
|
||||
test "Missing params":
|
||||
expect(CatchableError):
|
||||
discard waitFor client.call("myProc", %[%"abc"])
|
||||
test "Method missing on server and proxy server":
|
||||
expect(CatchableError):
|
||||
discard waitFor client.call("missingMethod", %[%"abc"])
|
||||
|
||||
srv.stop()
|
||||
waitFor srv.closeWait()
|
||||
waitFor proxy.stop()
|
||||
waitFor proxy.closeWait()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user