From 98a5efba4de26ac852d0715656f6b0c52a203a75 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Tue, 22 Oct 2024 21:58:46 +0200 Subject: [PATCH] Various error handling and processing fixes (#228) * remove redundant gcsafe/raises * rework async raises to chronos 4.0 where this was not yet done * streamline logging between http/socket/ws * don't log error when raising exceptions (whoever handles should log) * debug-log requests in all variants of server and client * unify ipv4/ipv6 address resolution, with preference for ipv6 * fix server start so that it consistently raises only when no addresses could be bound --- json_rpc.nimble | 2 +- json_rpc/client.nim | 31 ++-- json_rpc/clients/httpclient.nim | 23 +-- json_rpc/clients/socketclient.nim | 36 ++-- json_rpc/clients/websocketclient.nim | 2 + json_rpc/clients/websocketclientimpl.nim | 31 ++-- json_rpc/errors.nim | 2 +- json_rpc/jsonmarshal.nim | 4 +- json_rpc/private/client_handler_wrapper.nim | 11 +- json_rpc/private/jrpc_sys.nim | 4 +- json_rpc/private/server_handler_wrapper.nim | 6 +- json_rpc/private/shared_wrapper.nim | 2 + json_rpc/private/utils.nim | 62 +++++++ json_rpc/router.nim | 105 +++--------- json_rpc/rpcproxy.nim | 8 +- json_rpc/server.nim | 33 ++-- json_rpc/servers/httpserver.nim | 181 ++++++++------------ json_rpc/servers/socketserver.nim | 134 +++++---------- json_rpc/servers/websocketserver.nim | 130 ++++++++------ tests/testhook.nim | 5 +- tests/testhttp.nim | 119 ++++++------- tests/testhttps.nim | 91 +++++----- tests/testserverclient.nim | 6 +- 23 files changed, 476 insertions(+), 552 deletions(-) create mode 100644 json_rpc/private/utils.nim diff --git a/json_rpc.nimble b/json_rpc.nimble index 2f9ffc5..9b9e2e7 100644 --- a/json_rpc.nimble +++ b/json_rpc.nimble @@ -57,4 +57,4 @@ task test, "run tests": when not defined(windows): # on windows, socker server build failed - buildOnly "-d:\"chronicles_sinks=textlines[dynamic],json[dynamic]\"", "tests/all" + buildOnly "-d:chronicles_log_level=TRACE -d:\"chronicles_sinks=textlines[dynamic],json[dynamic]\"", "tests/all" diff --git a/json_rpc/client.nim b/json_rpc/client.nim index 114a899..f93487b 100644 --- a/json_rpc/client.nim +++ b/json_rpc/client.nim @@ -7,8 +7,11 @@ # This file may not be copied, modified, or distributed except according to # those terms. +{.push raises: [], gcsafe.} + import std/[json, tables, macros], + chronicles, chronos, results, ./jsonmarshal, @@ -53,8 +56,6 @@ type GetJsonRpcRequestHeaders* = proc(): seq[(string, string)] {.gcsafe, raises: [].} -{.push gcsafe, raises: [].} - # ------------------------------------------------------------------------------ # Public helpers # ------------------------------------------------------------------------------ @@ -128,22 +129,21 @@ proc getNextId*(client: RpcClient): RequestId = method call*(client: RpcClient, name: string, params: RequestParamsTx): Future[JsonString] - {.base, gcsafe, async.} = - doAssert(false, "`RpcClient.call` not implemented") + {.base, async.} = + raiseAssert("`RpcClient.call` not implemented") -method call*(client: RpcClient, name: string, +proc call*(client: RpcClient, name: string, params: JsonNode): Future[JsonString] - {.base, gcsafe, async.} = + {.async: (raw: true).} = + client.call(name, params.paramsTx) - await client.call(name, params.paramsTx) - -method close*(client: RpcClient): Future[void] {.base, gcsafe, async.} = - doAssert(false, "`RpcClient.close` not implemented") +method close*(client: RpcClient): Future[void] {.base, async.} = + raiseAssert("`RpcClient.close` not implemented") method callBatch*(client: RpcClient, calls: RequestBatchTx): Future[ResponseBatchRx] - {.base, gcsafe, async.} = - doAssert(false, "`RpcClient.callBatch` not implemented") + {.base, async.} = + raiseAssert("`RpcClient.callBatch` not implemented") proc processMessage*(client: RpcClient, line: string): Result[void, string] = if client.onProcessMessage.isNil.not: @@ -152,8 +152,6 @@ proc processMessage*(client: RpcClient, line: string): Result[void, string] = if not fallBack: return ok() - # Note: this doesn't use any transport code so doesn't need to be - # differentiated. try: let batch = JrpcSys.decode(line, ResponseBatchRx) if batch.kind == rbkMany: @@ -166,7 +164,7 @@ proc processMessage*(client: RpcClient, line: string): Result[void, string] = if response.jsonrpc.isNone: return err("missing or invalid `jsonrpc`") - if response.id.isNone: + let id = response.id.valueOr: if response.error.isSome: let error = JrpcSys.encode(response.error.get) return err(error) @@ -174,7 +172,6 @@ proc processMessage*(client: RpcClient, line: string): Result[void, string] = return err("missing or invalid response id") var requestFut: Future[JsonString] - let id = response.id.get if not client.awaiting.pop(id, requestFut): return err("Cannot find message id \"" & $id & "\"") @@ -189,6 +186,8 @@ proc processMessage*(client: RpcClient, line: string): Result[void, string] = requestFut.fail(newException(JsonRpcError, msg)) return ok() + debug "Received JSON-RPC response", + len = string(response.result).len, id requestFut.complete(response.result) return ok() diff --git a/json_rpc/clients/httpclient.nim b/json_rpc/clients/httpclient.nim index d15a5e5..52b5300 100644 --- a/json_rpc/clients/httpclient.nim +++ b/json_rpc/clients/httpclient.nim @@ -7,6 +7,8 @@ # This file may not be copied, modified, or distributed except according to # those terms. +{.push raises: [], gcsafe.} + import std/[tables, uri], stew/byteutils, @@ -33,12 +35,13 @@ type maxBodySize: int getHeaders: GetJsonRpcRequestHeaders -{.push gcsafe, raises: [].} - # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ +proc `$`(v: HttpAddress): string = + v.id + proc new( T: type RpcHttpClient, maxBodySize = MaxMessageBodyBytes, secure = false, getHeaders: GetJsonRpcRequestHeaders = nil, flags: HttpClientFlags = {}): T = @@ -136,14 +139,14 @@ proc newRpcHttpClient*( method call*(client: RpcHttpClient, name: string, params: RequestParamsTx): Future[JsonString] - {.async, gcsafe.} = + {.async.} = let id = client.getNextId() reqBody = requestTxEncode(name, params, id) - debug "Sending message to RPC server", - address = client.httpAddress, msg_len = len(reqBody), name + debug "Sending JSON-RPC request", + address = client.httpAddress, len = len(reqBody), name, id trace "Message", msg = reqBody let resText = await client.callImpl(reqBody) @@ -158,7 +161,6 @@ method call*(client: RpcHttpClient, name: string, let msgRes = client.processMessage(resText) if msgRes.isErr: # Need to clean up in case the answer was invalid - error "Failed to process POST Response for JSON-RPC", msg = msgRes.error let exc = newException(JsonRpcError, msgRes.error) newFut.fail(exc) client.awaiting.del(id) @@ -177,10 +179,11 @@ method call*(client: RpcHttpClient, name: string, method callBatch*(client: RpcHttpClient, calls: RequestBatchTx): Future[ResponseBatchRx] - {.gcsafe, async.} = - let - reqBody = requestBatchEncode(calls) - resText = await client.callImpl(reqBody) + {.async.} = + let reqBody = requestBatchEncode(calls) + debug "Sending JSON-RPC batch", + address = $client.httpAddress, len = len(reqBody) + let resText = await client.callImpl(reqBody) if client.batchFut.isNil or client.batchFut.finished(): client.batchFut = newFuture[ResponseBatchRx]() diff --git a/json_rpc/clients/socketclient.nim b/json_rpc/clients/socketclient.nim index 67f41ab..1f83c31 100644 --- a/json_rpc/clients/socketclient.nim +++ b/json_rpc/clients/socketclient.nim @@ -7,6 +7,8 @@ # This file may not be copied, modified, or distributed except according to # those terms. +{.push raises: [], gcsafe.} + import std/tables, chronicles, @@ -29,8 +31,6 @@ type const defaultMaxRequestLength* = 1024 * 128 -{.push gcsafe, raises: [].} - proc new*(T: type RpcSocketClient): T = T() @@ -39,28 +39,32 @@ proc newRpcSocketClient*: RpcSocketClient = RpcSocketClient.new() method call*(client: RpcSocketClient, name: string, - params: RequestParamsTx): Future[JsonString] {.async, gcsafe.} = + params: RequestParamsTx): Future[JsonString] {.async.} = ## Remotely calls the specified RPC method. - let id = client.getNextId() - var jsonBytes = requestTxEncode(name, params, id) & "\r\n" if client.transport.isNil: raise newException(JsonRpcError, "Transport is not initialised (missing a call to connect?)") - # completed by processMessage. - var newFut = newFuture[JsonString]() + let + id = client.getNextId() + reqBody = requestTxEncode(name, params, id) & "\r\n" + newFut = newFuture[JsonString]() # completed by processMessage + # add to awaiting responses client.awaiting[id] = newFut - let res = await client.transport.write(jsonBytes) + debug "Sending JSON-RPC request", + address = $client.address, len = len(reqBody), name, id + + let res = await client.transport.write(reqBody) # TODO: Add actions when not full packet was send, e.g. disconnect peer. - doAssert(res == jsonBytes.len) + doAssert(res == reqBody.len) return await newFut method callBatch*(client: RpcSocketClient, calls: RequestBatchTx): Future[ResponseBatchRx] - {.gcsafe, async.} = + {.async.} = if client.transport.isNil: raise newException(JsonRpcError, "Transport is not initialised (missing a call to connect?)") @@ -68,12 +72,13 @@ method callBatch*(client: RpcSocketClient, if client.batchFut.isNil or client.batchFut.finished(): client.batchFut = newFuture[ResponseBatchRx]() - let - jsonBytes = requestBatchEncode(calls) & "\r\n" - res = await client.transport.write(jsonBytes) + let reqBody = requestBatchEncode(calls) & "\r\n" + debug "Sending JSON-RPC batch", + address = $client.address, len = len(reqBody) + let res = await client.transport.write(reqBody) # TODO: Add actions when not full packet was send, e.g. disconnect peer. - doAssert(res == jsonBytes.len) + doAssert(res == reqBody.len) return await client.batchFut @@ -90,7 +95,6 @@ proc processData(client: RpcSocketClient) {.async: (raises: []).} = let res = client.processMessage(value) if res.isErr: - error "Error when processing RPC message", msg=res.error localException = newException(JsonRpcError, res.error) break except TransportError as exc: @@ -116,7 +120,7 @@ proc processData(client: RpcSocketClient) {.async: (raises: []).} = error "Error when reconnecting to server", msg=exc.msg break except CancelledError as exc: - error "Error when reconnecting to server", msg=exc.msg + debug "Server connection was cancelled", msg=exc.msg break proc connect*(client: RpcSocketClient, address: string, port: Port) {.async.} = diff --git a/json_rpc/clients/websocketclient.nim b/json_rpc/clients/websocketclient.nim index faf5815..20a4612 100644 --- a/json_rpc/clients/websocketclient.nim +++ b/json_rpc/clients/websocketclient.nim @@ -7,6 +7,8 @@ # This file may not be copied, modified, or distributed except according to # those terms. +{.push raises: [], gcsafe.} + import ./websocketclientimpl, ../client diff --git a/json_rpc/clients/websocketclientimpl.nim b/json_rpc/clients/websocketclientimpl.nim index 59fb07e..55bd0c5 100644 --- a/json_rpc/clients/websocketclientimpl.nim +++ b/json_rpc/clients/websocketclientimpl.nim @@ -7,8 +7,10 @@ # This file may not be copied, modified, or distributed except according to # those terms. +{.push raises: [], gcsafe.} + import - std/[uri, strutils], + std/uri, pkg/websock/[websock, extensions/compression/deflate], pkg/[chronos, chronos/apps/http/httptable, chronicles], stew/byteutils, @@ -27,8 +29,6 @@ type loop*: Future[void] getHeaders*: GetJsonRpcRequestHeaders -{.push gcsafe, raises: [].} - proc new*( T: type RpcWebSocketClient, getHeaders: GetJsonRpcRequestHeaders = nil): T = T(getHeaders: getHeaders) @@ -39,26 +39,28 @@ proc newRpcWebSocketClient*( RpcWebSocketClient.new(getHeaders) method call*(client: RpcWebSocketClient, name: string, - params: RequestParamsTx): Future[JsonString] {.async, gcsafe.} = + params: RequestParamsTx): Future[JsonString] {.async.} = ## Remotely calls the specified RPC method. if client.transport.isNil: raise newException(JsonRpcError, "Transport is not initialised (missing a call to connect?)") - let id = client.getNextId() - var value = requestTxEncode(name, params, id) & "\r\n" - - # completed by processMessage. - var newFut = newFuture[JsonString]() + let + id = client.getNextId() + reqBody = requestTxEncode(name, params, id) & "\r\n" + newFut = newFuture[JsonString]() # completed by processMessage # add to awaiting responses client.awaiting[id] = newFut - await client.transport.send(value) + debug "Sending JSON-RPC request", + address = $client.uri, len = len(reqBody), name + + await client.transport.send(reqBody) return await newFut method callBatch*(client: RpcWebSocketClient, calls: RequestBatchTx): Future[ResponseBatchRx] - {.gcsafe, async.} = + {.async.} = if client.transport.isNil: raise newException(JsonRpcError, "Transport is not initialised (missing a call to connect?)") @@ -66,8 +68,10 @@ method callBatch*(client: RpcWebSocketClient, if client.batchFut.isNil or client.batchFut.finished(): client.batchFut = newFuture[ResponseBatchRx]() - let jsonBytes = requestBatchEncode(calls) & "\r\n" - await client.transport.send(jsonBytes) + let reqBody = requestBatchEncode(calls) & "\r\n" + debug "Sending JSON-RPC batch", + address = $client.uri, len = len(reqBody) + await client.transport.send(reqBody) return await client.batchFut @@ -92,7 +96,6 @@ proc processData(client: RpcWebSocketClient) {.async.} = let res = client.processMessage(string.fromBytes(value)) if res.isErr: - error "Error when processing RPC message", msg=res.error error = newException(JsonRpcError, res.error) processError() diff --git a/json_rpc/errors.nim b/json_rpc/errors.nim index 7ec2a98..2b9a519 100644 --- a/json_rpc/errors.nim +++ b/json_rpc/errors.nim @@ -7,7 +7,7 @@ # This file may not be copied, modified, or distributed except according to # those terms. -{.push raises: [].} +{.push raises: [], gcsafe.} import results, json_serialization diff --git a/json_rpc/jsonmarshal.nim b/json_rpc/jsonmarshal.nim index 07b9aed..eac5014 100644 --- a/json_rpc/jsonmarshal.nim +++ b/json_rpc/jsonmarshal.nim @@ -7,6 +7,8 @@ # This file may not be copied, modified, or distributed except according to # those terms. +{.push raises: [], gcsafe.} + import json_serialization @@ -19,6 +21,6 @@ createJsonFlavor JrpcConv, omitOptionalFields = true, # Skip optional fields==none in Writer allowUnknownFields = true, skipNullFields = true # Skip optional fields==null in Reader - + # JrpcConv is a namespace/flavor for encoding and decoding # parameters and return value of a rpc method. diff --git a/json_rpc/private/client_handler_wrapper.nim b/json_rpc/private/client_handler_wrapper.nim index c65be73..218227d 100644 --- a/json_rpc/private/client_handler_wrapper.nim +++ b/json_rpc/private/client_handler_wrapper.nim @@ -7,28 +7,23 @@ # This file may not be copied, modified, or distributed except according to # those terms. +{.push raises: [], gcsafe.} + import macros, ./shared_wrapper, ./jrpc_sys -{.push gcsafe, raises: [].} - proc createRpcProc(procName, parameters, callBody: NimNode): NimNode = # parameters come as a tree var paramList = newSeq[NimNode]() for p in parameters: paramList.add(p) - let body = quote do: - {.gcsafe.}: - `callBody` - # build proc - result = newProc(procName, paramList, body) + result = newProc(procName, paramList, callBody) # make proc async result.addPragma ident"async" - result.addPragma ident"gcsafe" # export this proc result[0] = nnkPostfix.newTree(ident"*", newIdentNode($procName)) diff --git a/json_rpc/private/jrpc_sys.nim b/json_rpc/private/jrpc_sys.nim index 28a1e22..60a01cf 100644 --- a/json_rpc/private/jrpc_sys.nim +++ b/json_rpc/private/jrpc_sys.nim @@ -7,6 +7,8 @@ # This file may not be copied, modified, or distributed except according to # those terms. +{.push raises: [], gcsafe.} + import std/hashes, results, @@ -154,8 +156,6 @@ RequestRx.useDefaultReaderIn JrpcSys const JsonRPC2Literal = JsonString("\"2.0\"") -{.push gcsafe, raises: [].} - func hash*(x: RequestId): hashes.Hash = var h = 0.Hash case x.kind: diff --git a/json_rpc/private/server_handler_wrapper.nim b/json_rpc/private/server_handler_wrapper.nim index e7f2bb2..eb2db25 100644 --- a/json_rpc/private/server_handler_wrapper.nim +++ b/json_rpc/private/server_handler_wrapper.nim @@ -7,6 +7,8 @@ # This file may not be copied, modified, or distributed except according to # those terms. +{.push raises: [], gcsafe.} + import std/[macros, typetraits], stew/[byteutils, objects], @@ -27,8 +29,6 @@ type numOptionals: int minLength: int -{.push gcsafe, raises: [].} - # ------------------------------------------------------------------------------ # Optional resolvers # ------------------------------------------------------------------------------ @@ -311,7 +311,7 @@ func wrapServerHandler*(methName: string, params, procBody, procWrapper: NimNode result = newStmtList() result.add handler result.add quote do: - proc `procWrapper`(`paramsIdent`: RequestParamsRx): Future[JsonString] {.async, gcsafe.} = + proc `procWrapper`(`paramsIdent`: RequestParamsRx): Future[JsonString] {.async.} = # Avoid 'yield in expr not lowered' with an intermediate variable. # See: https://github.com/nim-lang/Nim/issues/17849 `setup` diff --git a/json_rpc/private/shared_wrapper.nim b/json_rpc/private/shared_wrapper.nim index be3339c..40ad306 100644 --- a/json_rpc/private/shared_wrapper.nim +++ b/json_rpc/private/shared_wrapper.nim @@ -7,6 +7,8 @@ # This file may not be copied, modified, or distributed except according to # those terms. +{.push raises: [], gcsafe.} + import std/[json, macros], ./jrpc_sys, diff --git a/json_rpc/private/utils.nim b/json_rpc/private/utils.nim new file mode 100644 index 0000000..d6b8f19 --- /dev/null +++ b/json_rpc/private/utils.nim @@ -0,0 +1,62 @@ +import chronos, ../errors + +from std/net import IPv6_any, IPv4_any + +template processResolvedAddresses(what: string) = + if ips.len == 0: + # Addresses could not be resolved, critical error. + raise newException(RpcAddressUnresolvableError, "Unable to resolve " & what) + + var dualStack = Opt.none(Port) + for ip in ips: + # Only yield the "any" address once because we try to use dual stack where + # available + if ip.toIpAddress() == IPv6_any(): + dualStack = Opt.some(ip.port) + elif ip.toIpAddress() == IPv4_any() and dualStack == Opt.some(ip.port): + continue + yield ip + +iterator resolveIP*( + addresses: openArray[string] +): TransportAddress {.raises: [JsonRpcError].} = + var ips: seq[TransportAddress] + # Resolve IPv6 first so that dual stack detection works as expected + for address in addresses: + try: + for resolved in resolveTAddress(address, AddressFamily.IPv6): + if resolved notin ips: + ips.add resolved + except TransportAddressError: + discard + + for address in addresses: + try: + for resolved in resolveTAddress(address, AddressFamily.IPv4): + if resolved notin ips: + ips.add resolved + except TransportAddressError: + discard + + processResolvedAddresses($addresses) + +iterator resolveIP*( + address: string, port: Port +): TransportAddress {.raises: [JsonRpcError].} = + var ips: seq[TransportAddress] + # Resolve IPv6 first so that dual stack detection works as expected + try: + for resolved in resolveTAddress(address, port, AddressFamily.IPv6): + if resolved notin ips: + ips.add resolved + except TransportAddressError: + discard + + try: + for resolved in resolveTAddress(address, port, AddressFamily.IPv4): + if resolved notin ips: + ips.add resolved + except TransportAddressError: + discard + + processResolvedAddresses($address & ":" & $port) diff --git a/json_rpc/router.nim b/json_rpc/router.nim index 81b1401..6b988f5 100644 --- a/json_rpc/router.nim +++ b/json_rpc/router.nim @@ -7,6 +7,8 @@ # This file may not be copied, modified, or distributed except according to # those terms. +{.push raises: [], gcsafe.} + import std/[macros, tables, json], chronicles, @@ -24,7 +26,7 @@ export type # Procedure signature accepted as an RPC call by server RpcProc* = proc(params: RequestParamsRx): Future[JsonString] - {.gcsafe, raises: [CatchableError].} + {.async.} RpcRouter* = object procs*: Table[string, RpcProc] @@ -40,8 +42,6 @@ const defaultMaxRequestLength* = 1024 * 128 -{.push gcsafe, raises: [].} - # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ @@ -127,12 +127,15 @@ proc hasMethod*(router: RpcRouter, methodName: string): bool = router.procs.hasKey(methodName) proc route*(router: RpcRouter, req: RequestRx): - Future[ResponseTx] {.gcsafe, async: (raises: []).} = + Future[ResponseTx] {.async: (raises: []).} = let rpcProc = router.validateRequest(req).valueOr: return wrapError(error, req.id) try: + debug "Processing JSON-RPC request", id = req.id, name = req.`method`.get() let res = await rpcProc(req.params) + debug "Returning JSON-RPC response", + id = req.id, name = req.`method`.get(), len = string(res).len return wrapReply(res, req.id) except ApplicationError as err: return wrapError(applicationError(err.code, err.msg, err.data), req.id) @@ -150,92 +153,34 @@ proc route*(router: RpcRouter, req: RequestRx): escapeJson(err.msg).JsonString). wrapError(req.id) -proc wrapErrorAsync*(code: int, msg: string): - Future[JsonString] {.gcsafe, async: (raises: []).} = - return wrapError(code, msg).JsonString - -proc route*(router: RpcRouter, data: string): - Future[string] {.gcsafe, async: (raises: []).} = +proc route*(router: RpcRouter, data: string|seq[byte]): + Future[string] {.async: (raises: []).} = ## Route to RPC from string data. Data is expected to be able to be ## converted to Json. ## Returns string of Json from RPC result/error node - when defined(nimHasWarnBareExcept): - {.push warning[BareExcept]:off.} - let request = try: JrpcSys.decode(data, RequestBatchRx) except CatchableError as err: return wrapError(JSON_PARSE_ERROR, err.msg) - except Exception as err: - # TODO https://github.com/status-im/nimbus-eth2/issues/2430 - return wrapError(JSON_PARSE_ERROR, err.msg) - - let reply = try: - if request.kind == rbkSingle: - let response = await router.route(request.single) - JrpcSys.encode(response) - elif request.many.len == 0: - wrapError(INVALID_REQUEST, "no request object in request array") - else: - var resFut: seq[Future[ResponseTx]] - for req in request.many: - resFut.add router.route(req) - await noCancel(allFutures(resFut)) - var response = ResponseBatchTx(kind: rbkMany) - for fut in resFut: - response.many.add fut.read() - JrpcSys.encode(response) - except CatchableError as err: - wrapError(JSON_ENCODE_ERROR, err.msg) - except Exception as err: - wrapError(JSON_ENCODE_ERROR, err.msg) - - when defined(nimHasWarnBareExcept): - {.pop warning[BareExcept]:on.} - - return reply - -proc tryRoute*(router: RpcRouter, req: RequestRx, - fut: var Future[JsonString]): Result[void, string] = - ## Route to RPC, returns false if the method or params cannot be found. - ## Expects RequestRx input and returns json output. - when defined(nimHasWarnBareExcept): - {.push warning[BareExcept]:off.} - {.push warning[UnreachableCode]:off.} try: - if req.jsonrpc.isNone: - return err("`jsonrpc` missing or invalid") - - if req.meth.isNone: - return err("`method` missing or invalid") - - let rpc = router.procs.getOrDefault(req.meth.get) - if rpc.isNil: - return err("rpc method not found: " & req.meth.get) - - fut = rpc(req.params) - return ok() - - except CatchableError as ex: - return err(ex.msg) - except Exception as ex: - return err(ex.msg) - - when defined(nimHasWarnBareExcept): - {.pop warning[BareExcept]:on.} - {.pop warning[UnreachableCode]:on.} - -proc tryRoute*(router: RpcRouter, data: JsonString, - fut: var Future[JsonString]): Result[void, string] = - ## Route to RPC, returns false if the method or params cannot be found. - ## Expects json input and returns json output. - try: - let req = JrpcSys.decode(data.string, RequestRx) - return router.tryRoute(req, fut) - except CatchableError as ex: - return err(ex.msg) + if request.kind == rbkSingle: + let response = await router.route(request.single) + JrpcSys.encode(response) + elif request.many.len == 0: + wrapError(INVALID_REQUEST, "no request object in request array") + else: + var resFut: seq[Future[ResponseTx]] + for req in request.many: + resFut.add router.route(req) + await noCancel(allFutures(resFut)) + var response = ResponseBatchTx(kind: rbkMany) + for fut in resFut: + response.many.add fut.read() + JrpcSys.encode(response) + except CatchableError as err: + wrapError(JSON_ENCODE_ERROR, err.msg) macro rpc*(server: RpcRouter, path: static[string], body: untyped): untyped = ## Define a remote procedure call. diff --git a/json_rpc/rpcproxy.nim b/json_rpc/rpcproxy.nim index 8d4c1b8..89e8918 100644 --- a/json_rpc/rpcproxy.nim +++ b/json_rpc/rpcproxy.nim @@ -7,6 +7,8 @@ # This file may not be copied, modified, or distributed except according to # those terms. +{.push raises: [], gcsafe.} + import pkg/websock/websock, ./servers/[httpserver], @@ -39,8 +41,6 @@ type compression*: bool flags*: set[TLSFlags] -{.push gcsafe, raises: [].} - # TODO Add validations that provided uri-s are correct https/wss uri and retrun # Result[string, ClientConfig] proc getHttpClientConfig*(uri: string): ClientConfig = @@ -54,7 +54,7 @@ proc getWebSocketClientConfig*( ClientConfig(kind: WebSocket, wsUri: uri, compression: compression, flags: flags) proc proxyCall(client: RpcClient, name: string): RpcProc = - return proc (params: RequestParamsRx): Future[JsonString] {.gcsafe, async.} = + return proc (params: RequestParamsRx): Future[JsonString] {.async.} = let res = await client.call(name, params.toTx) return res @@ -129,5 +129,5 @@ proc closeWait*(proxy: RpcProxy) {.async.} = func localAddress*(proxy: RpcProxy): seq[TransportAddress] = proxy.rpcHttpServer.localAddress() - + {.pop.} diff --git a/json_rpc/server.nim b/json_rpc/server.nim index ad6b3a9..2c08f1f 100644 --- a/json_rpc/server.nim +++ b/json_rpc/server.nim @@ -7,6 +7,8 @@ # This file may not be copied, modified, or distributed except according to # those terms. +{.push raises: [], gcsafe.} + import std/json, chronos, @@ -25,8 +27,6 @@ type RpcServer* = ref object of RootRef router*: RpcRouter -{.push gcsafe, raises: [].} - # ------------------------------------------------------------------------------ # Constructors # ------------------------------------------------------------------------------ @@ -46,28 +46,31 @@ template hasMethod*(server: RpcServer, methodName: string): bool = proc executeMethod*(server: RpcServer, methodName: string, - params: RequestParamsTx): Future[JsonString] - {.gcsafe, raises: [JsonRpcError].} = + params: RequestParamsTx): Future[JsonString] {.async: (raises: [CancelledError, JsonRpcError]).} = let req = requestTx(methodName, params, RequestId(kind: riNumber, num: 0)) - reqData = JrpcSys.encode(req).JsonString + reqData = JrpcSys.encode(req) + respData = await server.router.route(reqData) + resp = try: + JrpcSys.decode(respData, ResponseRx) + except CatchableError as exc: + raise (ref JsonRpcError)(msg: exc.msg) - server.router.tryRoute(reqData, result).isOkOr: - raise newException(JsonRpcError, error) + if resp.error.isSome: + raise (ref JsonRpcError)(msg: $resp.error.get) + resp.result proc executeMethod*(server: RpcServer, methodName: string, - args: JsonNode): Future[JsonString] - {.gcsafe, raises: [JsonRpcError].} = + args: JsonNode): Future[JsonString] {.async: (raises: [CancelledError, JsonRpcError], raw: true).} = let params = paramsTx(args) server.executeMethod(methodName, params) proc executeMethod*(server: RpcServer, methodName: string, - args: JsonString): Future[JsonString] - {.gcsafe, raises: [JsonRpcError].} = + args: JsonString): Future[JsonString] {.async: (raises: [CancelledError, JsonRpcError]).} = let params = try: let x = JrpcSys.decode(args.string, RequestParamsRx) @@ -75,16 +78,18 @@ proc executeMethod*(server: RpcServer, except SerializationError as exc: raise newException(JsonRpcError, exc.msg) - server.executeMethod(methodName, params) + await server.executeMethod(methodName, params) # Wrapper for message processing -proc route*(server: RpcServer, line: string): Future[string] {.gcsafe.} = +proc route*(server: RpcServer, line: string): Future[string] {.async: (raises: [], raw: true).} = + server.router.route(line) +proc route*(server: RpcServer, line: seq[byte]): Future[string] {.async: (raises: [], raw: true).} = server.router.route(line) # Server registration -proc register*(server: RpcServer, name: string, rpc: RpcProc) {.gcsafe, raises: [CatchableError].} = +proc register*(server: RpcServer, name: string, rpc: RpcProc) = ## Add a name/code pair to the RPC server. server.router.register(name, rpc) diff --git a/json_rpc/servers/httpserver.nim b/json_rpc/servers/httpserver.nim index 47b88b0..6bf9bc5 100644 --- a/json_rpc/servers/httpserver.nim +++ b/json_rpc/servers/httpserver.nim @@ -7,10 +7,13 @@ # This file may not be copied, modified, or distributed except according to # those terms. +{.push raises: [], gcsafe.} + import - stew/byteutils, + std/sequtils, chronicles, httputils, chronos, chronos/apps/http/[httpserver, shttpserver], + ../private/utils, ../errors, ../server @@ -24,15 +27,14 @@ const JsonRpcIdent = "nim-json-rpc" type - # HttpAuthHook: handle CORS, JWT auth, etc. in HTTP header # before actual request processed # return value: # - nil: auth success, continue execution # - HttpResponse: could not authenticate, stop execution # and return the response - HttpAuthHook* = proc(request: HttpRequestRef): Future[HttpResponseRef] - {.gcsafe, async: (raises: [CatchableError]).} + HttpAuthHook* = + proc(request: HttpRequestRef): Future[HttpResponseRef] {.async: (raises: [CatchableError]).} # This inheritance arrangement is useful for # e.g. combo HTTP server @@ -45,47 +47,48 @@ type proc serveHTTP*(rpcServer: RpcHttpHandler, request: HttpRequestRef): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} = - let - headers = HttpTable.init([("Content-Type", - "application/json; charset=utf-8")]) - chunkSize = rpcServer.maxChunkSize - try: + let req = await request.getBody() + debug "Received JSON-RPC request", + address = request.remote().valueOr(default(TransportAddress)), + len = req.len + let - body = await request.getBody() - data = await rpcServer.route(string.fromBytes(body)) + data = await rpcServer.route(req) + chunkSize = rpcServer.maxChunkSize + streamType = + if data.len <= chunkSize: + HttpResponseStreamType.Plain + else: + HttpResponseStreamType.Chunked + response = request.getResponse() - if data.len <= chunkSize: - let res = await request.respond(Http200, data, headers) - trace "JSON-RPC result has been sent" - return res + response.addHeader("Content-Type", "application/json") - let response = request.getResponse() - response.status = Http200 - response.addHeader("Content-Type", "application/json; charset=utf-8") - - await response.prepare() + await response.prepare(streamType) let maxLen = data.len var len = data.len while len > chunkSize: - await response.sendChunk(data[maxLen - len].unsafeAddr, chunkSize) + await response.send(data[maxLen - len].unsafeAddr, chunkSize) len -= chunkSize if len > 0: - await response.sendChunk(data[maxLen - len].unsafeAddr, len) + await response.send(data[maxLen - len].unsafeAddr, len) await response.finish() + response except CancelledError as exc: raise exc except CatchableError as exc: debug "Internal error while processing JSON-RPC call" - return defaultResponse(exc) + defaultResponse(exc) proc processClientRpc(rpcServer: RpcHttpServer): HttpProcessCallback2 = return proc (req: RequestFence): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} = if not req.isOk(): + debug "Got invalid request", err = req.error() return defaultResponse() let request = req.get() @@ -96,6 +99,8 @@ proc processClientRpc(rpcServer: RpcHttpServer): HttpProcessCallback2 = let res = await hook(request) if not res.isNil: return res + except CancelledError as exc: + raise exc except CatchableError as exc: error "Internal error while processing JSON-RPC hook", msg=exc.msg return defaultResponse(exc) @@ -113,7 +118,7 @@ proc addHttpServer*( backlogSize: int = 100, httpHeadersTimeout = 10.seconds, maxHeadersSize: int = 8192, - maxRequestBodySize: int = 1_048_576) = + maxRequestBodySize: int = 1_048_576) {.raises: [JsonRpcError].} = let server = HttpServerRef.new( address, processClientRpc(rpcServer), @@ -125,7 +130,6 @@ proc addHttpServer*( error "Failed to create server", address = $address, message = error raise newException(RpcBindError, "Unable to create server: " & $error) - info "Starting JSON-RPC HTTP server", url = "http://" & $address rpcServer.httpServers.add server @@ -143,7 +147,7 @@ proc addSecureHttpServer*( bufferSize: int = 4096, httpHeadersTimeout = 10.seconds, maxHeadersSize: int = 8192, - maxRequestBodySize: int = 1_048_576) = + maxRequestBodySize: int = 1_048_576) {.raises: [JsonRpcError].} = let server = SecureHttpServerRef.new( address, processClientRpc(rpcServer), @@ -158,106 +162,62 @@ proc addSecureHttpServer*( message = error raise newException(RpcBindError, "Unable to create server: " & $error) - info "Starting JSON-RPC HTTPS server", url = "https://" & $address - rpcServer.httpServers.add server proc addHttpServers*(server: RpcHttpServer, - addresses: openArray[TransportAddress]) = + addresses: openArray[TransportAddress]) {.raises: [JsonRpcError].} = + ## Start a server on at least one of the given addresses, or raise + if addresses.len == 0: + return + + var lastExc: ref JsonRpcError for item in addresses: - # TODO handle partial failures, ie when 1/N addresses fail - server.addHttpServer(item) + try: + server.addHttpServer(item) + except JsonRpcError as exc: + lastExc = exc + if server.httpServers.len == 0: + raise lastExc proc addSecureHttpServers*(server: RpcHttpServer, addresses: openArray[TransportAddress], tlsPrivateKey: TLSPrivateKey, - tlsCertificate: TLSCertificate) = + tlsCertificate: TLSCertificate) {.raises: [JsonRpcError].} = + ## Start a server on at least one of the given addresses, or raise + if addresses.len == 0: + return + + var lastExc: ref JsonRpcError for item in addresses: - # TODO handle partial failures, ie when 1/N addresses fail - server.addSecureHttpServer(item, tlsPrivateKey, tlsCertificate) + try: + server.addSecureHttpServer(item, tlsPrivateKey, tlsCertificate) + except JsonRpcError as exc: + lastExc = exc + if server.httpServers.len == 0: + raise lastExc -template processResolvedAddresses = - if tas4.len + tas6.len == 0: - # Addresses could not be resolved, critical error. - raise newException(RpcAddressUnresolvableError, "Unable to get address!") - - for r in tas4: - yield r - - if tas4.len == 0: # avoid ipv4 + ipv6 running together - for r in tas6: - yield r - -iterator resolvedAddresses(address: string): TransportAddress = - var - tas4: seq[TransportAddress] - tas6: seq[TransportAddress] - - # Attempt to resolve `address` for IPv4 address space. - try: - tas4 = resolveTAddress(address, AddressFamily.IPv4) - except CatchableError: - discard - - # Attempt to resolve `address` for IPv6 address space. - try: - tas6 = resolveTAddress(address, AddressFamily.IPv6) - except CatchableError: - discard - - processResolvedAddresses() - -iterator resolvedAddresses(address: string, port: Port): TransportAddress = - var - tas4: seq[TransportAddress] - tas6: seq[TransportAddress] - - # Attempt to resolve `address` for IPv4 address space. - try: - tas4 = resolveTAddress(address, port, AddressFamily.IPv4) - except CatchableError: - discard - - # Attempt to resolve `address` for IPv6 address space. - try: - tas6 = resolveTAddress(address, port, AddressFamily.IPv6) - except CatchableError: - discard - - processResolvedAddresses() - -proc addHttpServer*(server: RpcHttpServer, address: string) = +proc addHttpServer*(server: RpcHttpServer, address: string) {.raises: [JsonRpcError].} = ## Create new server and assign it to addresses ``addresses``. - for a in resolvedAddresses(address): - # TODO handle partial failures, ie when 1/N addresses fail - server.addHttpServer(a) + addHttpServers(server, toSeq(resolveIP([address]))) proc addSecureHttpServer*(server: RpcHttpServer, address: string, tlsPrivateKey: TLSPrivateKey, - tlsCertificate: TLSCertificate) = - for a in resolvedAddresses(address): - # TODO handle partial failures, ie when 1/N addresses fail - server.addSecureHttpServer(a, tlsPrivateKey, tlsCertificate) + tlsCertificate: TLSCertificate) {.raises: [JsonRpcError].} = + addSecureHttpServers(server, toSeq(resolveIP([address])), tlsPrivateKey, tlsCertificate) -proc addHttpServers*(server: RpcHttpServer, addresses: openArray[string]) = - for address in addresses: - # TODO handle partial failures, ie when 1/N addresses fail - server.addHttpServer(address) +proc addHttpServers*(server: RpcHttpServer, addresses: openArray[string]) {.raises: [JsonRpcError].} = + addHttpServers(server, toSeq(resolveIP(addresses))) -proc addHttpServer*(server: RpcHttpServer, address: string, port: Port) = - for a in resolvedAddresses(address, port): - # TODO handle partial failures, ie when 1/N addresses fail - server.addHttpServer(a) +proc addHttpServer*(server: RpcHttpServer, address: string, port: Port) {.raises: [JsonRpcError].} = + addHttpServers(server, toSeq(resolveIP(address, port))) proc addSecureHttpServer*(server: RpcHttpServer, address: string, port: Port, tlsPrivateKey: TLSPrivateKey, - tlsCertificate: TLSCertificate) = - for a in resolvedAddresses(address, port): - # TODO handle partial failures, ie when 1/N addresses fail - server.addSecureHttpServer(a, tlsPrivateKey, tlsCertificate) + tlsCertificate: TLSCertificate) {.raises: [JsonRpcError].} = + addSecureHttpServers(server, toSeq(resolveIP(address, port)), tlsPrivateKey, tlsCertificate) proc new*(T: type RpcHttpServer, authHooks: seq[HttpAuthHook] = @[]): T = T(router: RpcRouter.init(), httpServers: @[], authHooks: authHooks, maxChunkSize: 8192) @@ -271,22 +231,22 @@ proc newRpcHttpServer*(authHooks: seq[HttpAuthHook] = @[]): RpcHttpServer = proc newRpcHttpServer*(router: RpcRouter, authHooks: seq[HttpAuthHook] = @[]): RpcHttpServer = RpcHttpServer.new(router, authHooks) -proc newRpcHttpServer*(addresses: openArray[TransportAddress], authHooks: seq[HttpAuthHook] = @[]): RpcHttpServer = +proc newRpcHttpServer*(addresses: openArray[TransportAddress], authHooks: seq[HttpAuthHook] = @[]): RpcHttpServer {.raises: [JsonRpcError].} = ## Create new server and assign it to addresses ``addresses``. result = newRpcHttpServer(authHooks) result.addHttpServers(addresses) -proc newRpcHttpServer*(addresses: openArray[string], authHooks: seq[HttpAuthHook] = @[]): RpcHttpServer = +proc newRpcHttpServer*(addresses: openArray[string], authHooks: seq[HttpAuthHook] = @[]): RpcHttpServer {.raises: [JsonRpcError].} = ## Create new server and assign it to addresses ``addresses``. result = newRpcHttpServer(authHooks) result.addHttpServers(addresses) -proc newRpcHttpServer*(addresses: openArray[string], router: RpcRouter, authHooks: seq[HttpAuthHook] = @[]): RpcHttpServer = +proc newRpcHttpServer*(addresses: openArray[string], router: RpcRouter, authHooks: seq[HttpAuthHook] = @[]): RpcHttpServer {.raises: [JsonRpcError].} = ## Create new server and assign it to addresses ``addresses``. result = newRpcHttpServer(router, authHooks) result.addHttpServers(addresses) -proc newRpcHttpServer*(addresses: openArray[TransportAddress], router: RpcRouter, authHooks: seq[HttpAuthHook] = @[]): RpcHttpServer = +proc newRpcHttpServer*(addresses: openArray[TransportAddress], router: RpcRouter, authHooks: seq[HttpAuthHook] = @[]): RpcHttpServer {.raises: [JsonRpcError].} = ## Create new server and assign it to addresses ``addresses``. result = newRpcHttpServer(router, authHooks) result.addHttpServers(addresses) @@ -294,15 +254,14 @@ proc newRpcHttpServer*(addresses: openArray[TransportAddress], router: RpcRouter proc start*(server: RpcHttpServer) = ## Start the RPC server. for item in server.httpServers: - # TODO handle partial failures, ie when 1/N addresses fail - debug "HTTP RPC server started" # (todo: fix this), address = item + info "Starting JSON-RPC HTTP server", url = item.baseUri item.start() proc stop*(server: RpcHttpServer) {.async.} = ## Stop the RPC server. for item in server.httpServers: - debug "HTTP RPC server stopped" # (todo: fix this), address = item.local await item.stop() + info "Stopped JSON-RPC HTTP server", url = item.baseUri proc closeWait*(server: RpcHttpServer) {.async.} = ## Cleanup resources of RPC server. diff --git a/json_rpc/servers/socketserver.nim b/json_rpc/servers/socketserver.nim index ad12f19..34c6597 100644 --- a/json_rpc/servers/socketserver.nim +++ b/json_rpc/servers/socketserver.nim @@ -7,9 +7,13 @@ # This file may not be copied, modified, or distributed except according to # those terms. +{.push raises: [], gcsafe.} + import + std/sequtils, chronicles, json_serialization/std/net, + ../private/utils, ../errors, ../server @@ -20,20 +24,21 @@ type servers: seq[StreamServer] processClientHook: StreamCallback2 -proc processClient(server: StreamServer, transport: StreamTransport) {.async: (raises: []), gcsafe.} = +proc processClient(server: StreamServer, transport: StreamTransport) {.async: (raises: []).} = ## Process transport data to the RPC server try: var rpc = getUserData[RpcSocketServer](server) while true: - var - value = await transport.readLine(defaultMaxRequestLength) - if value == "": + let req = await transport.readLine(defaultMaxRequestLength) + if req == "": await transport.closeWait() break - debug "Processing message", address = transport.remoteAddress(), line = value + debug "Received JSON-RPC request", + address = transport.remoteAddress(), + len = req.len - let res = await rpc.route(value) + let res = await rpc.route(req) discard await transport.write(res & "\r\n") except TransportError as ex: error "Transport closed during processing client", msg=ex.msg @@ -42,98 +47,33 @@ proc processClient(server: StreamServer, transport: StreamTransport) {.async: (r # Utility functions for setting up servers using stream transport addresses -proc addStreamServer*(server: RpcSocketServer, address: TransportAddress) = +proc addStreamServer*(server: RpcSocketServer, address: TransportAddress) {.raises: [JsonRpcError].} = try: - info "Starting JSON-RPC socket server", address = $address var transportServer = createStreamServer(address, server.processClientHook, {ReuseAddr}, udata = server) server.servers.add(transportServer) except CatchableError as exc: error "Failed to create server", address = $address, message = exc.msg + raise newException(RpcBindError, "Unable to create stream server: " & exc.msg) - if len(server.servers) == 0: - # Server was not bound, critical error. - raise newException(RpcBindError, "Unable to create server!") - -proc addStreamServers*(server: RpcSocketServer, addresses: openArray[TransportAddress]) = +proc addStreamServers*(server: RpcSocketServer, addresses: openArray[TransportAddress]) {.raises: [JsonRpcError].} = + var lastExc: ref JsonRpcError for item in addresses: - server.addStreamServer(item) + try: + server.addStreamServer(item) + except JsonRpcError as exc: + lastExc = exc + if server.servers.len == 0: + raise lastExc -proc addStreamServer*(server: RpcSocketServer, address: string) = +proc addStreamServer*(server: RpcSocketServer, address: string) {.raises: [JsonRpcError].} = ## Create new server and assign it to addresses ``addresses``. - var - tas4: seq[TransportAddress] - tas6: seq[TransportAddress] - added = 0 + addStreamServers(server, toSeq(resolveIP([address]))) - # Attempt to resolve `address` for IPv4 address space. - try: - tas4 = resolveTAddress(address, AddressFamily.IPv4) - except CatchableError: - discard - except Defect: - discard +proc addStreamServers*(server: RpcSocketServer, addresses: openArray[string]) {.raises: [JsonRpcError].} = + addStreamServers(server, toSeq(resolveIP(addresses))) - # Attempt to resolve `address` for IPv6 address space. - try: - tas6 = resolveTAddress(address, AddressFamily.IPv6) - except CatchableError: - discard - except Defect: - discard - - for r in tas4: - server.addStreamServer(r) - added.inc - for r in tas6: - server.addStreamServer(r) - added.inc - - if added == 0: - # Addresses could not be resolved, critical error. - raise newException(RpcAddressUnresolvableError, "Unable to get address!") - -proc addStreamServers*(server: RpcSocketServer, addresses: openArray[string]) = - for address in addresses: - server.addStreamServer(address) - -proc addStreamServer*(server: RpcSocketServer, address: string, port: Port) = - var - tas4: seq[TransportAddress] - tas6: seq[TransportAddress] - added = 0 - - # Attempt to resolve `address` for IPv4 address space. - try: - tas4 = resolveTAddress(address, port, AddressFamily.IPv4) - except CatchableError: - discard - except Defect: - discard - - # Attempt to resolve `address` for IPv6 address space. - try: - tas6 = resolveTAddress(address, port, AddressFamily.IPv6) - except CatchableError: - discard - except Defect: - discard - - if len(tas4) == 0 and len(tas6) == 0: - # Address was not resolved, critical error. - raise newException(RpcAddressUnresolvableError, - "Address " & address & " could not be resolved!") - - for r in tas4: - server.addStreamServer(r) - added.inc - for r in tas6: - server.addStreamServer(r) - added.inc - - if len(server.servers) == 0: - # Server was not bound, critical error. - raise newException(RpcBindError, - "Could not setup server on " & address & ":" & $int(port)) +proc addStreamServer*(server: RpcSocketServer, address: string, port: Port) {.raises: [JsonRpcError].} = + addStreamServers(server, toSeq(resolveIP(address, port))) proc new(T: type RpcSocketServer): T = T(router: RpcRouter.init(), servers: @[], processClientHook: processClient) @@ -141,17 +81,17 @@ proc new(T: type RpcSocketServer): T = proc newRpcSocketServer*(): RpcSocketServer = RpcSocketServer.new() -proc newRpcSocketServer*(addresses: openArray[TransportAddress]): RpcSocketServer = +proc newRpcSocketServer*(addresses: openArray[TransportAddress]): RpcSocketServer {.raises: [JsonRpcError].} = ## Create new server and assign it to addresses ``addresses``. result = RpcSocketServer.new() result.addStreamServers(addresses) -proc newRpcSocketServer*(addresses: openArray[string]): RpcSocketServer = +proc newRpcSocketServer*(addresses: openArray[string]): RpcSocketServer {.raises: [JsonRpcError].} = ## Create new server and assign it to addresses ``addresses``. result = RpcSocketServer.new() result.addStreamServers(addresses) -proc newRpcSocketServer*(address: string, port: Port = Port(8545)): RpcSocketServer = +proc newRpcSocketServer*(address: string, port: Port = Port(8545)): RpcSocketServer {.raises: [JsonRpcError].} = # Create server on specified port result = RpcSocketServer.new() result.addStreamServer(address, port) @@ -161,15 +101,23 @@ proc newRpcSocketServer*(processClientHook: StreamCallback2): RpcSocketServer = result = RpcSocketServer.new() result.processClientHook = processClientHook -proc start*(server: RpcSocketServer) = +proc start*(server: RpcSocketServer) {.raises: [JsonRpcError].} = ## Start the RPC server. for item in server.servers: - item.start() + try: + info "Starting JSON-RPC socket server", address = item.localAddress + item.start() + except TransportOsError as exc: + # TODO stop already-started servers + raise (ref RpcBindError)(msg: exc.msg, parent: exc) proc stop*(server: RpcSocketServer) = ## Stop the RPC server. for item in server.servers: - item.stop() + try: + item.stop() + except TransportOsError as exc: + warn "Could not stop transport", err = exc.msg proc close*(server: RpcSocketServer) = ## Cleanup resources of RPC server. diff --git a/json_rpc/servers/websocketserver.nim b/json_rpc/servers/websocketserver.nim index 8bb9a20..f8cf1d8 100644 --- a/json_rpc/servers/websocketserver.nim +++ b/json_rpc/servers/websocketserver.nim @@ -7,13 +7,15 @@ # This file may not be copied, modified, or distributed except according to # those terms. +{.push raises: [], gcsafe.} + import chronicles, chronos, websock/[websock, types], websock/extensions/compression/deflate, stew/byteutils, json_serialization/std/net, - ".."/[server] + ".."/[errors, server] -export server, net +export errors, server, net logScope: topics = "JSONRPC-WS-SERVER" @@ -25,8 +27,8 @@ type # - true: auth success, continue execution # - false: could not authenticate, stop execution # and return the response - WsAuthHook* = proc(request: HttpRequest): Future[bool] - {.gcsafe, raises: [Defect, CatchableError].} + WsAuthHook* = + proc(request: HttpRequest): Future[bool] {.async: (raises: [CatchableError]).} # This inheritance arrangement is useful for # e.g. combo HTTP server @@ -48,22 +50,22 @@ proc serveHTTP*(rpc: RpcWebSocketHandler, request: HttpRequest) trace "Websocket handshake completed" while ws.readyState != ReadyState.Closed: - let recvData = await ws.recvMsg() - trace "Client message: ", size = recvData.len, binary = ws.binary + let req = await ws.recvMsg() + debug "Received JSON-RPC request", len = req.len if ws.readyState == ReadyState.Closed: # if session already terminated by peer, # no need to send response break - if recvData.len == 0: + if req.len == 0: await ws.close( reason = "cannot process zero length message" ) break let data = try: - await rpc.route(string.fromBytes(recvData)) + await rpc.route(req) except CatchableError as exc: debug "Internal error, while processing RPC call", address = $request.uri @@ -82,7 +84,7 @@ proc serveHTTP*(rpc: RpcWebSocketHandler, request: HttpRequest) raise exc except CatchableError as exc: - error "Something error", msg=exc.msg + debug "Something error", msg=exc.msg proc handleRequest(rpc: RpcWebSocketServer, request: HttpRequest) {.async: (raises: [CancelledError]).} = @@ -96,15 +98,17 @@ proc handleRequest(rpc: RpcWebSocketServer, request: HttpRequest) let res = await hook(request) if not res: return + except CancelledError as exc: + raise exc except CatchableError as exc: - error "Internal error while processing JSON-RPC hook", msg=exc.msg + debug "Internal error while processing JSON-RPC hook", msg=exc.msg try: await request.sendResponse(Http503, data = "", content = "Internal error, processing JSON-RPC hook: " & exc.msg) return except CatchableError as exc: - error "Something error", msg=exc.msg + debug "Something error", msg=exc.msg return await rpc.serveHTTP(request) @@ -124,18 +128,21 @@ proc newRpcWebSocketServer*( compression: bool = false, flags: set[ServerFlags] = {ServerFlags.TcpNoDelay,ServerFlags.ReuseAddr}, authHooks: seq[WsAuthHook] = @[], - rng = HmacDrbgContext.new()): RpcWebSocketServer = + rng = HmacDrbgContext.new()): RpcWebSocketServer {.raises: [JsonRpcError].} = var server = new(RpcWebSocketServer) proc processCallback(request: HttpRequest): Future[void] = handleRequest(server, request) server.initWebsocket(compression, authHooks, rng) - server.server = HttpServer.create( - address, - processCallback, - flags - ) + try: + server.server = HttpServer.create( + address, + processCallback, + flags + ) + except CatchableError as exc: + raise (ref RpcBindError)(msg: "Unable to create server: " & exc.msg, parent: exc) server @@ -145,15 +152,18 @@ proc newRpcWebSocketServer*( compression: bool = false, flags: set[ServerFlags] = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr}, authHooks: seq[WsAuthHook] = @[], - rng = HmacDrbgContext.new()): RpcWebSocketServer = + rng = HmacDrbgContext.new()): RpcWebSocketServer {.raises: [JsonRpcError].} = - newRpcWebSocketServer( - initTAddress(host, port), - compression, - flags, - authHooks, - rng - ) + try: + newRpcWebSocketServer( + initTAddress(host, port), + compression, + flags, + authHooks, + rng + ) + except TransportError as exc: + raise (ref RpcBindError)(msg: "Unable to create server: " & exc.msg, parent: exc) proc newRpcWebSocketServer*( address: TransportAddress, @@ -166,23 +176,26 @@ proc newRpcWebSocketServer*( tlsMinVersion = TLSVersion.TLS12, tlsMaxVersion = TLSVersion.TLS12, authHooks: seq[WsAuthHook] = @[], - rng = HmacDrbgContext.new()): RpcWebSocketServer = + rng = HmacDrbgContext.new()): RpcWebSocketServer {.raises: [JsonRpcError].} = var server = new(RpcWebSocketServer) proc processCallback(request: HttpRequest): Future[void] = handleRequest(server, request) server.initWebsocket(compression, authHooks, rng) - server.server = TlsHttpServer.create( - address, - tlsPrivateKey, - tlsCertificate, - processCallback, - flags, - tlsFlags, - tlsMinVersion, - tlsMaxVersion - ) + try: + server.server = TlsHttpServer.create( + address, + tlsPrivateKey, + tlsCertificate, + processCallback, + flags, + tlsFlags, + tlsMinVersion, + tlsMaxVersion + ) + except CatchableError as exc: + raise (ref RpcBindError)(msg: "Unable to create server: " & exc.msg, parent: exc) server @@ -198,30 +211,39 @@ proc newRpcWebSocketServer*( tlsMinVersion = TLSVersion.TLS12, tlsMaxVersion = TLSVersion.TLS12, authHooks: seq[WsAuthHook] = @[], - rng = HmacDrbgContext.new()): RpcWebSocketServer = + rng = HmacDrbgContext.new()): RpcWebSocketServer {.raises: [JsonRpcError].} = - newRpcWebSocketServer( - initTAddress(host, port), - tlsPrivateKey, - tlsCertificate, - compression, - flags, - tlsFlags, - tlsMinVersion, - tlsMaxVersion, - authHooks, - rng - ) + try: + newRpcWebSocketServer( + initTAddress(host, port), + tlsPrivateKey, + tlsCertificate, + compression, + flags, + tlsFlags, + tlsMinVersion, + tlsMaxVersion, + authHooks, + rng + ) + except TransportError as exc: + raise (ref RpcBindError)(msg: "Unable to create server: " & exc.msg, parent: exc) -proc start*(server: RpcWebSocketServer) = +proc start*(server: RpcWebSocketServer) {.raises: [JsonRpcError].} = ## Start the RPC server. - notice "WS RPC server started", address = server.server.local - server.server.start() + try: + info "Starting JSON-RPC WebSocket server", address = server.server.local + server.server.start() + except TransportOsError as exc: + raise (ref RpcBindError)(msg: "Unable to start server: " & exc.msg, parent: exc) proc stop*(server: RpcWebSocketServer) = ## Stop the RPC server. - notice "WS RPC server stopped", address = server.server.local - server.server.stop() + try: + server.server.stop() + notice "Stopped JSON-RPC WebSocket server", address = server.server.local + except TransportOsError as exc: + warn "Could not stop JSON-RPC WebSocket server", err = exc.msg proc close*(server: RpcWebSocketServer) = ## Cleanup resources of RPC server. diff --git a/tests/testhook.nim b/tests/testhook.nim index 1fbb020..5585234 100644 --- a/tests/testhook.nim +++ b/tests/testhook.nim @@ -25,8 +25,7 @@ proc authHeaders(): seq[(string, string)] = @[("Auth-Token", "Good Token")] suite "HTTP server hook test": - proc mockAuth(req: HttpRequestRef): Future[HttpResponseRef] {. - gcsafe, async: (raises: [CatchableError]).} = + proc mockAuth(req: HttpRequestRef): Future[HttpResponseRef] {.async: (raises: [CatchableError]).} = if req.headers.getString("Auth-Token") == "Good Token": return HttpResponseRef(nil) @@ -60,7 +59,7 @@ proc wsAuthHeaders(ctx: Hook, suite "Websocket server hook test": let hook = Hook(append: wsAuthHeaders) - proc mockAuth(req: websock.HttpRequest): Future[bool] {.async.} = + proc mockAuth(req: websock.HttpRequest): Future[bool] {.async: (raises: [CatchableError]).} = if not req.headers.contains("Auth-Token"): await req.sendResponse(code = Http403, data = "Missing Auth-Token") return false diff --git a/tests/testhttp.nim b/tests/testhttp.nim index 2bd0c82..eac1bab 100644 --- a/tests/testhttp.nim +++ b/tests/testhttp.nim @@ -8,80 +8,67 @@ # those terms. import - unittest2, + unittest2, chronos/unittest2/asynctests, ../json_rpc/[rpcserver, rpcclient, jsonmarshal] const TestsCount = 100 - -proc simpleTest(address: string): Future[bool] {.async.} = - var client = newRpcHttpClient() - await client.connect("http://" & address) - var r = await client.call("noParamsProc", %[]) - if r.string == "\"Hello world\"": - result = true - -proc continuousTest(address: string): Future[int] {.async.} = - var client = newRpcHttpClient() - result = 0 - for i in 0.. seq[byte]: + result = newSeq[byte](bigChunkSize) + for i in 0.. seq[byte]: - result = newSeq[byte](bigChunkSize) - for i in 0..