mirror of
https://github.com/logos-storage/nim-json-rpc.git
synced 2026-02-20 05:33:10 +00:00
Restore the support for using the news package
This commit is contained in:
parent
7ff4559cc0
commit
35e6d7e6ff
@ -13,7 +13,8 @@ requires "nim >= 1.2.0",
|
||||
"chronos",
|
||||
"httputils",
|
||||
"chronicles#ba2817f1",
|
||||
"https://github.com/status-im/nim-websock",
|
||||
"https://github.com/status-im/news#status",
|
||||
"websock",
|
||||
"json_serialization"
|
||||
|
||||
proc getLang(): string =
|
||||
@ -29,6 +30,11 @@ proc buildBinary(name: string, srcDir = "./", params = "", cmdParams = "", lang
|
||||
|
||||
task test, "run tests":
|
||||
buildBinary "all", "tests/",
|
||||
params = "-r -f --hints:off --debuginfo --path:'.' --threads:on -d:chronicles_log_level=ERROR",
|
||||
params = "-r -f --hints:off --debuginfo --path:'.' --threads:on -d:chronicles_log_level=ERROR -d:json_rpc_websocket_package=websock",
|
||||
cmdParams = "",
|
||||
lang = getLang()
|
||||
|
||||
buildBinary "all", "tests/",
|
||||
params = "-r -f --hints:off --debuginfo --path:'.' --threads:on -d:chronicles_log_level=ERROR -d:json_rpc_websocket_package=news",
|
||||
cmdParams = "",
|
||||
lang = getLang()
|
||||
|
||||
7
json_rpc/clients/config.nim
Normal file
7
json_rpc/clients/config.nim
Normal file
@ -0,0 +1,7 @@
|
||||
const
|
||||
json_rpc_websocket_package {.strdefine.} = "websock"
|
||||
useNews* = json_rpc_websocket_package == "news"
|
||||
|
||||
when json_rpc_websocket_package notin ["websock", "news"]:
|
||||
{.fatal: "json_rpc_websocket_package should be set to either 'websock' or 'news'".}
|
||||
|
||||
@ -1,20 +1,33 @@
|
||||
import
|
||||
std/[strtabs, tables, uri, strutils],
|
||||
pkg/[chronos, websock/websock, chronicles],
|
||||
websock/extensions/compression/deflate,
|
||||
std/[strtabs, tables],
|
||||
pkg/[chronos, chronicles],
|
||||
stew/byteutils,
|
||||
../client
|
||||
../client, ./config
|
||||
|
||||
export client
|
||||
|
||||
logScope:
|
||||
topics = "JSONRPC-WS-CLIENT"
|
||||
|
||||
type
|
||||
RpcWebSocketClient* = ref object of RpcClient
|
||||
transport*: WSSession
|
||||
uri*: Uri
|
||||
loop*: Future[void]
|
||||
when useNews:
|
||||
const newsUseChronos = true
|
||||
include pkg/news
|
||||
|
||||
type
|
||||
RpcWebSocketClient* = ref object of RpcClient
|
||||
transport*: WebSocket
|
||||
uri*: string
|
||||
loop*: Future[void]
|
||||
|
||||
else:
|
||||
import std/[uri, strutils]
|
||||
import pkg/websock/[websock, extensions/compression/deflate]
|
||||
|
||||
type
|
||||
RpcWebSocketClient* = ref object of RpcClient
|
||||
transport*: WSSession
|
||||
uri*: Uri
|
||||
loop*: Future[void]
|
||||
|
||||
proc new*(T: type RpcWebSocketClient): T =
|
||||
T()
|
||||
@ -43,20 +56,36 @@ method call*(self: RpcWebSocketClient, name: string,
|
||||
|
||||
proc processData(client: RpcWebSocketClient) {.async.} =
|
||||
var error: ref CatchableError
|
||||
let ws = client.transport
|
||||
try:
|
||||
while ws.readystate != ReadyState.Closed:
|
||||
var value = await ws.recvMsg()
|
||||
|
||||
if value.len == 0:
|
||||
# transmission ends
|
||||
break
|
||||
when useNews:
|
||||
try:
|
||||
while true:
|
||||
var value = await client.transport.receiveString()
|
||||
if value == "":
|
||||
# transmission ends
|
||||
break
|
||||
|
||||
client.processMessage(string.fromBytes(value))
|
||||
except CatchableError as e:
|
||||
error = e
|
||||
client.processMessage(value)
|
||||
except CatchableError as e:
|
||||
error = e
|
||||
|
||||
client.transport.close()
|
||||
else:
|
||||
let ws = client.transport
|
||||
try:
|
||||
while ws.readystate != ReadyState.Closed:
|
||||
var value = await ws.recvMsg()
|
||||
|
||||
if value.len == 0:
|
||||
# transmission ends
|
||||
break
|
||||
|
||||
client.processMessage(string.fromBytes(value))
|
||||
except CatchableError as e:
|
||||
error = e
|
||||
|
||||
await client.transport.close()
|
||||
|
||||
await client.transport.close()
|
||||
client.transport = nil
|
||||
|
||||
if client.awaiting.len != 0:
|
||||
@ -68,28 +97,46 @@ proc processData(client: RpcWebSocketClient) {.async.} =
|
||||
if not client.onDisconnect.isNil:
|
||||
client.onDisconnect()
|
||||
|
||||
proc connect*(client: RpcWebSocketClient, uri: string,
|
||||
compression: bool = false,
|
||||
flags: set[TLSFlags] = {
|
||||
NoVerifyHost, NoVerifyServerName}) {.async.} =
|
||||
when useNews:
|
||||
proc connect*(
|
||||
client: RpcWebSocketClient,
|
||||
uri: string,
|
||||
headers: StringTableRef = nil,
|
||||
compression = false) {.async.} =
|
||||
if compression:
|
||||
warn "compression is not supported with the news back-end"
|
||||
var headers = headers
|
||||
if headers.isNil:
|
||||
headers = newStringTable({"Origin": "http://localhost"})
|
||||
elif "Origin" notin headers:
|
||||
# TODO: This is a hack, because the table might be case sensitive. Ideally strtabs module has
|
||||
# to be extended with case insensitive accessors.
|
||||
headers["Origin"] = "http://localhost"
|
||||
client.transport = await newWebSocket(uri, headers)
|
||||
client.uri = uri
|
||||
else:
|
||||
proc connect*(
|
||||
client: RpcWebSocketClient, uri: string,
|
||||
compression = false,
|
||||
flags: set[TLSFlags] = {NoVerifyHost, NoVerifyServerName}) {.async.} =
|
||||
var ext: seq[ExtFactory] = if compression: @[deflateFactory()]
|
||||
else: @[]
|
||||
let uri = parseUri(uri)
|
||||
let ws = await WebSocket.connect(
|
||||
uri=uri,
|
||||
factories=ext,
|
||||
flags=flags
|
||||
)
|
||||
client.transport = ws
|
||||
client.uri = uri
|
||||
|
||||
var ext: seq[ExtFactory] = if compression:
|
||||
@[deflateFactory()]
|
||||
else:
|
||||
@[]
|
||||
let uri = parseUri(uri)
|
||||
let ws = await WebSocket.connect(
|
||||
uri=uri,
|
||||
factories=ext,
|
||||
flags=flags
|
||||
)
|
||||
client.transport = ws
|
||||
client.uri = uri
|
||||
|
||||
client.loop = processData(client)
|
||||
client.loop = processData(client)
|
||||
|
||||
method close*(client: RpcWebSocketClient) {.async.} =
|
||||
await client.loop.cancelAndWait()
|
||||
if not client.transport.isNil:
|
||||
await client.transport.close()
|
||||
when useNews:
|
||||
client.transport.close()
|
||||
else:
|
||||
await client.transport.close()
|
||||
client.transport = nil
|
||||
|
||||
@ -42,7 +42,7 @@ proc getWebSocketClientConfig*(
|
||||
flags: set[TLSFlags] = {
|
||||
NoVerifyHost, NoVerifyServerName}): ClientConfig =
|
||||
ClientConfig(kind: WebSocket, wsUri: uri, compression: compression, flags: flags)
|
||||
|
||||
|
||||
proc proxyCall(client: RpcClient, name: string): RpcProc =
|
||||
return proc (params: JsonNode): Future[StringOfJson] {.async.} =
|
||||
let res = await client.call(name, params)
|
||||
@ -63,14 +63,14 @@ proc new*(T: type RpcProxy, server: RpcHttpServer, cfg: ClientConfig): T =
|
||||
of WebSocket:
|
||||
let client = newRpcWebSocketClient()
|
||||
return T(
|
||||
rpcHttpServer: server,
|
||||
kind: WebSocket,
|
||||
wsUri: cfg.wsUri,
|
||||
webSocketClient: client,
|
||||
compression: cfg.compression,
|
||||
rpcHttpServer: server,
|
||||
kind: WebSocket,
|
||||
wsUri: cfg.wsUri,
|
||||
webSocketClient: client,
|
||||
compression: cfg.compression,
|
||||
flags: cfg.flags
|
||||
)
|
||||
|
||||
|
||||
proc new*(T: type RpcProxy, listenAddresses: openArray[TransportAddress], cfg: ClientConfig): T {.raises: [Defect, CatchableError].} =
|
||||
RpcProxy.new(newRpcHttpServer(listenAddresses, RpcRouter.init()), cfg)
|
||||
|
||||
|
||||
@ -1,4 +1,16 @@
|
||||
{. warning[UnusedImport]:off .}
|
||||
|
||||
import
|
||||
testrpcmacro, testserverclient, testethcalls, testhttp, testproxy
|
||||
../json_rpc/clients/config
|
||||
|
||||
import
|
||||
testrpcmacro, testethcalls, testhttp
|
||||
|
||||
when not useNews:
|
||||
# TODO The websock server doesn't interop properly
|
||||
# with the news client at the moment
|
||||
import testserverclient
|
||||
|
||||
when not useNews:
|
||||
# The proxy implementation is based on websock
|
||||
import testproxy
|
||||
|
||||
@ -1,6 +1,9 @@
|
||||
import
|
||||
unittest, json, chronicles,
|
||||
../json_rpc/[rpcclient, rpcserver]
|
||||
../json_rpc/[rpcclient, rpcserver, clients/config]
|
||||
|
||||
const
|
||||
compressionSupported = useNews
|
||||
|
||||
# Create RPC on server
|
||||
proc setupServer*(srv: RpcServer) =
|
||||
@ -57,12 +60,14 @@ suite "Websocket Server/Client RPC":
|
||||
waitFor srv.closeWait()
|
||||
|
||||
suite "Websocket Server/Client RPC with Compression":
|
||||
var srv = newRpcWebSocketServer("127.0.0.1", Port(8545), compression = true)
|
||||
var srv = newRpcWebSocketServer("127.0.0.1", Port(8545),
|
||||
compression = compressionSupported)
|
||||
var client = newRpcWebSocketClient()
|
||||
|
||||
srv.setupServer()
|
||||
srv.start()
|
||||
waitFor client.connect("ws://127.0.0.1:8545/", compression = true)
|
||||
waitFor client.connect("ws://127.0.0.1:8545/",
|
||||
compression = compressionSupported)
|
||||
|
||||
test "Successful RPC call":
|
||||
let r = waitFor client.call("myProc", %[%"abc", %[1, 2, 3, 4]])
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user