Fix ws and socket client error handling and add test to #212 (#213)

This commit is contained in:
andri lim 2024-02-19 08:53:10 +07:00 committed by GitHub
parent 47cfc8916f
commit 514049a287
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 276 additions and 15 deletions

View File

@ -161,7 +161,7 @@ method call*(client: RpcHttpClient, name: string,
let msgRes = client.processMessage(resText)
if msgRes.isErr:
# Need to clean up in case the answer was invalid
debug "Failed to process POST Response for JSON-RPC", msg = msgRes.error
error "Failed to process POST Response for JSON-RPC", msg = msgRes.error
let exc = newException(JsonRpcError, msgRes.error)
newFut.fail(exc)
client.awaiting.del(id)

View File

@ -18,6 +18,9 @@ import
export client
logScope:
topics = "JSONRPC-SOCKET-CLIENT"
type
RpcSocketClient* = ref object of RpcClient
transport*: StreamTransport
@ -74,22 +77,47 @@ method callBatch*(client: RpcSocketClient,
return await client.batchFut
proc processData(client: RpcSocketClient) {.async.} =
proc processData(client: RpcSocketClient) {.async: (raises: []).} =
while true:
var localException: ref JsonRpcError
while true:
var value = await client.transport.readLine(defaultMaxRequestLength)
if value == "":
# transmission ends
try:
var value = await client.transport.readLine(defaultMaxRequestLength)
if value == "":
# transmission ends
await client.transport.closeWait()
break
let res = client.processMessage(value)
if res.isErr:
error "Error when processing RPC message", msg=res.error
localException = newException(JsonRpcError, res.error)
break
except TransportError as exc:
localException = newException(JsonRpcError, exc.msg)
await client.transport.closeWait()
break
except CancelledError as exc:
localException = newException(JsonRpcError, exc.msg)
await client.transport.closeWait()
break
let res = client.processMessage(value)
if res.isErr:
error "error when processing message", msg=res.error
raise newException(JsonRpcError, res.error)
if localException.isNil.not:
for _,fut in client.awaiting:
fut.fail(localException)
if client.batchFut.isNil.not and not client.batchFut.completed():
client.batchFut.fail(localException)
# async loop reconnection and waiting
client.transport = await connect(client.address)
try:
info "Reconnect to server", address=client.address
client.transport = await connect(client.address)
except TransportError as exc:
error "Error when reconnecting to server", msg=exc.msg
break
except CancelledError as exc:
error "Error when reconnecting to server", msg=exc.msg
break
proc connect*(client: RpcSocketClient, address: string, port: Port) {.async.} =
let addresses = resolveTAddress(address, port)

View File

@ -74,6 +74,13 @@ method callBatch*(client: RpcWebSocketClient,
proc processData(client: RpcWebSocketClient) {.async.} =
var error: ref CatchableError
template processError() =
for k, v in client.awaiting:
v.fail(error)
if client.batchFut.isNil.not and not client.batchFut.completed():
client.batchFut.fail(error)
client.awaiting.clear()
let ws = client.transport
try:
while ws.readyState != ReadyState.Closed:
@ -85,7 +92,9 @@ proc processData(client: RpcWebSocketClient) {.async.} =
let res = client.processMessage(string.fromBytes(value))
if res.isErr:
raise newException(JsonRpcError, res.error)
error "Error when processing RPC message", msg=res.error
error = newException(JsonRpcError, res.error)
processError()
except CatchableError as e:
error = e
@ -97,9 +106,7 @@ proc processData(client: RpcWebSocketClient) {.async.} =
if client.awaiting.len != 0:
if error.isNil:
error = newException(IOError, "Transport was closed while waiting for response")
for k, v in client.awaiting:
v.fail(error)
client.awaiting.clear()
processError()
if not client.onDisconnect.isNil:
client.onDisconnect()

View File

@ -8,7 +8,10 @@
# those terms.
import
std/importutils,
unittest2,
chronicles,
websock/websock,
../json_rpc/rpcclient,
../json_rpc/rpcserver
@ -48,7 +51,7 @@ proc setupClientHook(client: RpcClient): Shadow =
return err(exc.msg)
shadow
suite "test callsigs":
suite "test client features":
var server = newRpcHttpServer(["127.0.0.1:0"])
server.installHandlers()
var client = newRpcHttpClient()
@ -74,3 +77,226 @@ suite "test callsigs":
waitFor server.stop()
waitFor server.closeWait()
type
TestSocketServer = ref object of RpcSocketServer
getData: proc(): string {.gcsafe, raises: [].}
proc processClient(server: StreamServer, transport: StreamTransport) {.async: (raises: []), gcsafe.} =
## Process transport data to the RPC server
try:
var rpc = getUserData[TestSocketServer](server)
while true:
var
value = await transport.readLine(router.defaultMaxRequestLength)
if value == "":
await transport.closeWait()
break
let res = rpc.getData()
discard await transport.write(res & "\r\n")
except TransportError as ex:
error "Transport closed during processing client", msg=ex.msg
except CatchableError as ex:
error "Error occured during processing client", msg=ex.msg
proc addStreamServer(server: TestSocketServer, address: TransportAddress) =
privateAccess(RpcSocketServer)
try:
info "Starting JSON-RPC socket server", address = $address
var transportServer = createStreamServer(address, processClient, {ReuseAddr}, udata = server)
server.servers.add(transportServer)
except CatchableError as exc:
error "Failed to create server", address = $address, message = exc.msg
if len(server.servers) == 0:
# Server was not bound, critical error.
raise newException(RpcBindError, "Unable to create server!")
proc new(T: type TestSocketServer, getData: proc(): string {.gcsafe, raises: [].}): T =
T(
router: RpcRouter.init(),
getData: getData,
)
suite "test rpc socket client":
let server = TestSocketServer.new(proc(): string {.gcsafe, raises: [].} =
return """{"jsonrpc":"2.0","result":10}"""
)
let serverAddress = initTAddress("127.0.0.1:0")
server.addStreamServer(serverAddress)
var client = newRpcSocketClient()
server.start()
waitFor client.connect(server.localAddress()[0])
test "missing id in server response":
expect JsonRpcError:
let res = waitFor client.get_Banana(11)
discard res
server.stop()
waitFor server.closeWait()
type
TestHttpServer = ref object of RpcHttpServer
getData: proc(): string {.gcsafe, raises: [].}
proc processClientRpc(rpcServer: TestHttpServer): HttpProcessCallback2 =
return proc (req: RequestFence): Future[HttpResponseRef]
{.async: (raises: [CancelledError]).} =
if not req.isOk():
return defaultResponse()
let
request = req.get()
headers = HttpTable.init([("Content-Type",
"application/json; charset=utf-8")])
try:
let data = rpcServer.getData()
let res = await request.respond(Http200, data, headers)
trace "JSON-RPC result has been sent"
return res
except CancelledError as exc:
raise exc
except CatchableError as exc:
debug "Internal error while processing JSON-RPC call"
return defaultResponse(exc)
proc addHttpServer(
rpcServer: TestHttpServer,
address: TransportAddress,
socketFlags: set[ServerFlags] = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr},
serverUri = Uri(),
serverIdent = "",
maxConnections: int = -1,
bufferSize: int = 4096,
backlogSize: int = 100,
httpHeadersTimeout = 10.seconds,
maxHeadersSize: int = 8192,
maxRequestBodySize: int = 1_048_576) =
let server = HttpServerRef.new(
address,
processClientRpc(rpcServer),
{},
socketFlags,
serverUri, "nim-json-rpc", maxConnections, backlogSize,
bufferSize, httpHeadersTimeout, maxHeadersSize, maxRequestBodySize
).valueOr:
error "Failed to create server", address = $address,
message = error
raise newException(RpcBindError, "Unable to create server: " & $error)
info "Starting JSON-RPC HTTP server", url = "http://" & $address
privateAccess(RpcHttpServer)
rpcServer.httpServers.add server
proc new(T: type TestHttpServer, getData: proc(): string {.gcsafe, raises: [].}): T =
T(
router: RpcRouter.init(),
maxChunkSize: 8192,
getData: getData,
)
suite "test rpc http client":
let server = TestHttpServer.new(proc(): string {.gcsafe, raises: [].} =
return """{"jsonrpc":"2.0","result":10}"""
)
let serverAddress = initTAddress("127.0.0.1:0")
server.addHttpServer(serverAddress)
var client = newRpcHttpClient()
server.start()
waitFor client.connect("http://" & $server.localAddress()[0])
test "missing id in server response":
expect JsonRpcError:
let res = waitFor client.get_Banana(11)
discard res
waitFor server.stop()
waitFor server.closeWait()
type
TestWsServer = ref object of RpcWebSocketServer
getData: proc(): string {.gcsafe, raises: [].}
proc handleRequest(rpc: TestWsServer, request: websock.HttpRequest)
{.async: (raises: [CancelledError]).} =
try:
let server = rpc.wsserver
let ws = await server.handleRequest(request)
if ws.readyState != ReadyState.Open:
error "Failed to open websocket connection"
return
trace "Websocket handshake completed"
while ws.readyState != ReadyState.Closed:
let recvData = await ws.recvMsg()
trace "Client message: ", size = recvData.len, binary = ws.binary
if ws.readyState == ReadyState.Closed:
# if session already terminated by peer,
# no need to send response
break
if recvData.len == 0:
await ws.close(
reason = "cannot process zero length message"
)
break
let data = rpc.getData()
trace "RPC result has been sent", address = $request.uri
await ws.send(data)
except WebSocketError as exc:
error "WebSocket error:", exception = exc.msg
except CancelledError as exc:
raise exc
except CatchableError as exc:
error "Something error", msg=exc.msg
proc newWsServer(address: TransportAddress, getData: proc(): string {.gcsafe, raises: [].}): TestWsServer =
let flags = {ServerFlags.TcpNoDelay,ServerFlags.ReuseAddr}
var server = new(TestWsServer)
proc processCallback(request: websock.HttpRequest): Future[void] =
handleRequest(server, request)
privateAccess(RpcWebSocketServer)
server.getData = getData
server.wsserver = WSServer.new(rng = HmacDrbgContext.new())
server.server = websock.HttpServer.create(
address,
processCallback,
flags
)
server
suite "test ws http client":
let serverAddress = initTAddress("127.0.0.1:0")
let server = newWsServer(serverAddress, proc(): string {.gcsafe, raises: [].} =
return """{"jsonrpc":"2.0","result":10}"""
)
var client = newRpcWebSocketClient()
server.start()
waitFor client.connect("ws://" & $server.localAddress())
test "missing id in server response":
expect JsonRpcError:
let res = waitFor client.get_Banana(11)
discard res
server.stop()
waitFor server.closeWait()