Various error handling and processing fixes (#228)

* remove redundant gcsafe/raises
* rework async raises to chronos 4.0 where this was not yet done
* streamline logging between http/socket/ws
  * don't log error when raising exceptions (whoever handles should log)
  * debug-log requests in all variants of server and client
* unify ipv4/ipv6 address resolution, with preference for ipv6
* fix server start so that it consistently raises only when no addresses
could be bound
This commit is contained in:
Jacek Sieka 2024-10-22 21:58:46 +02:00 committed by GitHub
parent 0408795be9
commit 98a5efba4d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 476 additions and 552 deletions

View File

@ -57,4 +57,4 @@ task test, "run tests":
when not defined(windows):
# on windows, socker server build failed
buildOnly "-d:\"chronicles_sinks=textlines[dynamic],json[dynamic]\"", "tests/all"
buildOnly "-d:chronicles_log_level=TRACE -d:\"chronicles_sinks=textlines[dynamic],json[dynamic]\"", "tests/all"

View File

@ -7,8 +7,11 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [], gcsafe.}
import
std/[json, tables, macros],
chronicles,
chronos,
results,
./jsonmarshal,
@ -53,8 +56,6 @@ type
GetJsonRpcRequestHeaders* = proc(): seq[(string, string)] {.gcsafe, raises: [].}
{.push gcsafe, raises: [].}
# ------------------------------------------------------------------------------
# Public helpers
# ------------------------------------------------------------------------------
@ -128,22 +129,21 @@ proc getNextId*(client: RpcClient): RequestId =
method call*(client: RpcClient, name: string,
params: RequestParamsTx): Future[JsonString]
{.base, gcsafe, async.} =
doAssert(false, "`RpcClient.call` not implemented")
{.base, async.} =
raiseAssert("`RpcClient.call` not implemented")
method call*(client: RpcClient, name: string,
proc call*(client: RpcClient, name: string,
params: JsonNode): Future[JsonString]
{.base, gcsafe, async.} =
{.async: (raw: true).} =
client.call(name, params.paramsTx)
await client.call(name, params.paramsTx)
method close*(client: RpcClient): Future[void] {.base, gcsafe, async.} =
doAssert(false, "`RpcClient.close` not implemented")
method close*(client: RpcClient): Future[void] {.base, async.} =
raiseAssert("`RpcClient.close` not implemented")
method callBatch*(client: RpcClient,
calls: RequestBatchTx): Future[ResponseBatchRx]
{.base, gcsafe, async.} =
doAssert(false, "`RpcClient.callBatch` not implemented")
{.base, async.} =
raiseAssert("`RpcClient.callBatch` not implemented")
proc processMessage*(client: RpcClient, line: string): Result[void, string] =
if client.onProcessMessage.isNil.not:
@ -152,8 +152,6 @@ proc processMessage*(client: RpcClient, line: string): Result[void, string] =
if not fallBack:
return ok()
# Note: this doesn't use any transport code so doesn't need to be
# differentiated.
try:
let batch = JrpcSys.decode(line, ResponseBatchRx)
if batch.kind == rbkMany:
@ -166,7 +164,7 @@ proc processMessage*(client: RpcClient, line: string): Result[void, string] =
if response.jsonrpc.isNone:
return err("missing or invalid `jsonrpc`")
if response.id.isNone:
let id = response.id.valueOr:
if response.error.isSome:
let error = JrpcSys.encode(response.error.get)
return err(error)
@ -174,7 +172,6 @@ proc processMessage*(client: RpcClient, line: string): Result[void, string] =
return err("missing or invalid response id")
var requestFut: Future[JsonString]
let id = response.id.get
if not client.awaiting.pop(id, requestFut):
return err("Cannot find message id \"" & $id & "\"")
@ -189,6 +186,8 @@ proc processMessage*(client: RpcClient, line: string): Result[void, string] =
requestFut.fail(newException(JsonRpcError, msg))
return ok()
debug "Received JSON-RPC response",
len = string(response.result).len, id
requestFut.complete(response.result)
return ok()

View File

@ -7,6 +7,8 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [], gcsafe.}
import
std/[tables, uri],
stew/byteutils,
@ -33,12 +35,13 @@ type
maxBodySize: int
getHeaders: GetJsonRpcRequestHeaders
{.push gcsafe, raises: [].}
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc `$`(v: HttpAddress): string =
v.id
proc new(
T: type RpcHttpClient, maxBodySize = MaxMessageBodyBytes, secure = false,
getHeaders: GetJsonRpcRequestHeaders = nil, flags: HttpClientFlags = {}): T =
@ -136,14 +139,14 @@ proc newRpcHttpClient*(
method call*(client: RpcHttpClient, name: string,
params: RequestParamsTx): Future[JsonString]
{.async, gcsafe.} =
{.async.} =
let
id = client.getNextId()
reqBody = requestTxEncode(name, params, id)
debug "Sending message to RPC server",
address = client.httpAddress, msg_len = len(reqBody), name
debug "Sending JSON-RPC request",
address = client.httpAddress, len = len(reqBody), name, id
trace "Message", msg = reqBody
let resText = await client.callImpl(reqBody)
@ -158,7 +161,6 @@ method call*(client: RpcHttpClient, name: string,
let msgRes = client.processMessage(resText)
if msgRes.isErr:
# Need to clean up in case the answer was invalid
error "Failed to process POST Response for JSON-RPC", msg = msgRes.error
let exc = newException(JsonRpcError, msgRes.error)
newFut.fail(exc)
client.awaiting.del(id)
@ -177,10 +179,11 @@ method call*(client: RpcHttpClient, name: string,
method callBatch*(client: RpcHttpClient,
calls: RequestBatchTx): Future[ResponseBatchRx]
{.gcsafe, async.} =
let
reqBody = requestBatchEncode(calls)
resText = await client.callImpl(reqBody)
{.async.} =
let reqBody = requestBatchEncode(calls)
debug "Sending JSON-RPC batch",
address = $client.httpAddress, len = len(reqBody)
let resText = await client.callImpl(reqBody)
if client.batchFut.isNil or client.batchFut.finished():
client.batchFut = newFuture[ResponseBatchRx]()

View File

@ -7,6 +7,8 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [], gcsafe.}
import
std/tables,
chronicles,
@ -29,8 +31,6 @@ type
const defaultMaxRequestLength* = 1024 * 128
{.push gcsafe, raises: [].}
proc new*(T: type RpcSocketClient): T =
T()
@ -39,28 +39,32 @@ proc newRpcSocketClient*: RpcSocketClient =
RpcSocketClient.new()
method call*(client: RpcSocketClient, name: string,
params: RequestParamsTx): Future[JsonString] {.async, gcsafe.} =
params: RequestParamsTx): Future[JsonString] {.async.} =
## Remotely calls the specified RPC method.
let id = client.getNextId()
var jsonBytes = requestTxEncode(name, params, id) & "\r\n"
if client.transport.isNil:
raise newException(JsonRpcError,
"Transport is not initialised (missing a call to connect?)")
# completed by processMessage.
var newFut = newFuture[JsonString]()
let
id = client.getNextId()
reqBody = requestTxEncode(name, params, id) & "\r\n"
newFut = newFuture[JsonString]() # completed by processMessage
# add to awaiting responses
client.awaiting[id] = newFut
let res = await client.transport.write(jsonBytes)
debug "Sending JSON-RPC request",
address = $client.address, len = len(reqBody), name, id
let res = await client.transport.write(reqBody)
# TODO: Add actions when not full packet was send, e.g. disconnect peer.
doAssert(res == jsonBytes.len)
doAssert(res == reqBody.len)
return await newFut
method callBatch*(client: RpcSocketClient,
calls: RequestBatchTx): Future[ResponseBatchRx]
{.gcsafe, async.} =
{.async.} =
if client.transport.isNil:
raise newException(JsonRpcError,
"Transport is not initialised (missing a call to connect?)")
@ -68,12 +72,13 @@ method callBatch*(client: RpcSocketClient,
if client.batchFut.isNil or client.batchFut.finished():
client.batchFut = newFuture[ResponseBatchRx]()
let
jsonBytes = requestBatchEncode(calls) & "\r\n"
res = await client.transport.write(jsonBytes)
let reqBody = requestBatchEncode(calls) & "\r\n"
debug "Sending JSON-RPC batch",
address = $client.address, len = len(reqBody)
let res = await client.transport.write(reqBody)
# TODO: Add actions when not full packet was send, e.g. disconnect peer.
doAssert(res == jsonBytes.len)
doAssert(res == reqBody.len)
return await client.batchFut
@ -90,7 +95,6 @@ proc processData(client: RpcSocketClient) {.async: (raises: []).} =
let res = client.processMessage(value)
if res.isErr:
error "Error when processing RPC message", msg=res.error
localException = newException(JsonRpcError, res.error)
break
except TransportError as exc:
@ -116,7 +120,7 @@ proc processData(client: RpcSocketClient) {.async: (raises: []).} =
error "Error when reconnecting to server", msg=exc.msg
break
except CancelledError as exc:
error "Error when reconnecting to server", msg=exc.msg
debug "Server connection was cancelled", msg=exc.msg
break
proc connect*(client: RpcSocketClient, address: string, port: Port) {.async.} =

View File

@ -7,6 +7,8 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [], gcsafe.}
import
./websocketclientimpl,
../client

View File

@ -7,8 +7,10 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [], gcsafe.}
import
std/[uri, strutils],
std/uri,
pkg/websock/[websock, extensions/compression/deflate],
pkg/[chronos, chronos/apps/http/httptable, chronicles],
stew/byteutils,
@ -27,8 +29,6 @@ type
loop*: Future[void]
getHeaders*: GetJsonRpcRequestHeaders
{.push gcsafe, raises: [].}
proc new*(
T: type RpcWebSocketClient, getHeaders: GetJsonRpcRequestHeaders = nil): T =
T(getHeaders: getHeaders)
@ -39,26 +39,28 @@ proc newRpcWebSocketClient*(
RpcWebSocketClient.new(getHeaders)
method call*(client: RpcWebSocketClient, name: string,
params: RequestParamsTx): Future[JsonString] {.async, gcsafe.} =
params: RequestParamsTx): Future[JsonString] {.async.} =
## Remotely calls the specified RPC method.
if client.transport.isNil:
raise newException(JsonRpcError,
"Transport is not initialised (missing a call to connect?)")
let id = client.getNextId()
var value = requestTxEncode(name, params, id) & "\r\n"
# completed by processMessage.
var newFut = newFuture[JsonString]()
let
id = client.getNextId()
reqBody = requestTxEncode(name, params, id) & "\r\n"
newFut = newFuture[JsonString]() # completed by processMessage
# add to awaiting responses
client.awaiting[id] = newFut
await client.transport.send(value)
debug "Sending JSON-RPC request",
address = $client.uri, len = len(reqBody), name
await client.transport.send(reqBody)
return await newFut
method callBatch*(client: RpcWebSocketClient,
calls: RequestBatchTx): Future[ResponseBatchRx]
{.gcsafe, async.} =
{.async.} =
if client.transport.isNil:
raise newException(JsonRpcError,
"Transport is not initialised (missing a call to connect?)")
@ -66,8 +68,10 @@ method callBatch*(client: RpcWebSocketClient,
if client.batchFut.isNil or client.batchFut.finished():
client.batchFut = newFuture[ResponseBatchRx]()
let jsonBytes = requestBatchEncode(calls) & "\r\n"
await client.transport.send(jsonBytes)
let reqBody = requestBatchEncode(calls) & "\r\n"
debug "Sending JSON-RPC batch",
address = $client.uri, len = len(reqBody)
await client.transport.send(reqBody)
return await client.batchFut
@ -92,7 +96,6 @@ proc processData(client: RpcWebSocketClient) {.async.} =
let res = client.processMessage(string.fromBytes(value))
if res.isErr:
error "Error when processing RPC message", msg=res.error
error = newException(JsonRpcError, res.error)
processError()

View File

@ -7,7 +7,7 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [].}
{.push raises: [], gcsafe.}
import results, json_serialization

View File

@ -7,6 +7,8 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [], gcsafe.}
import
json_serialization
@ -19,6 +21,6 @@ createJsonFlavor JrpcConv,
omitOptionalFields = true, # Skip optional fields==none in Writer
allowUnknownFields = true,
skipNullFields = true # Skip optional fields==null in Reader
# JrpcConv is a namespace/flavor for encoding and decoding
# parameters and return value of a rpc method.

View File

@ -7,28 +7,23 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [], gcsafe.}
import
macros,
./shared_wrapper,
./jrpc_sys
{.push gcsafe, raises: [].}
proc createRpcProc(procName, parameters, callBody: NimNode): NimNode =
# parameters come as a tree
var paramList = newSeq[NimNode]()
for p in parameters: paramList.add(p)
let body = quote do:
{.gcsafe.}:
`callBody`
# build proc
result = newProc(procName, paramList, body)
result = newProc(procName, paramList, callBody)
# make proc async
result.addPragma ident"async"
result.addPragma ident"gcsafe"
# export this proc
result[0] = nnkPostfix.newTree(ident"*", newIdentNode($procName))

View File

@ -7,6 +7,8 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [], gcsafe.}
import
std/hashes,
results,
@ -154,8 +156,6 @@ RequestRx.useDefaultReaderIn JrpcSys
const
JsonRPC2Literal = JsonString("\"2.0\"")
{.push gcsafe, raises: [].}
func hash*(x: RequestId): hashes.Hash =
var h = 0.Hash
case x.kind:

View File

@ -7,6 +7,8 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [], gcsafe.}
import
std/[macros, typetraits],
stew/[byteutils, objects],
@ -27,8 +29,6 @@ type
numOptionals: int
minLength: int
{.push gcsafe, raises: [].}
# ------------------------------------------------------------------------------
# Optional resolvers
# ------------------------------------------------------------------------------
@ -311,7 +311,7 @@ func wrapServerHandler*(methName: string, params, procBody, procWrapper: NimNode
result = newStmtList()
result.add handler
result.add quote do:
proc `procWrapper`(`paramsIdent`: RequestParamsRx): Future[JsonString] {.async, gcsafe.} =
proc `procWrapper`(`paramsIdent`: RequestParamsRx): Future[JsonString] {.async.} =
# Avoid 'yield in expr not lowered' with an intermediate variable.
# See: https://github.com/nim-lang/Nim/issues/17849
`setup`

View File

@ -7,6 +7,8 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [], gcsafe.}
import
std/[json, macros],
./jrpc_sys,

View File

@ -0,0 +1,62 @@
import chronos, ../errors
from std/net import IPv6_any, IPv4_any
template processResolvedAddresses(what: string) =
if ips.len == 0:
# Addresses could not be resolved, critical error.
raise newException(RpcAddressUnresolvableError, "Unable to resolve " & what)
var dualStack = Opt.none(Port)
for ip in ips:
# Only yield the "any" address once because we try to use dual stack where
# available
if ip.toIpAddress() == IPv6_any():
dualStack = Opt.some(ip.port)
elif ip.toIpAddress() == IPv4_any() and dualStack == Opt.some(ip.port):
continue
yield ip
iterator resolveIP*(
addresses: openArray[string]
): TransportAddress {.raises: [JsonRpcError].} =
var ips: seq[TransportAddress]
# Resolve IPv6 first so that dual stack detection works as expected
for address in addresses:
try:
for resolved in resolveTAddress(address, AddressFamily.IPv6):
if resolved notin ips:
ips.add resolved
except TransportAddressError:
discard
for address in addresses:
try:
for resolved in resolveTAddress(address, AddressFamily.IPv4):
if resolved notin ips:
ips.add resolved
except TransportAddressError:
discard
processResolvedAddresses($addresses)
iterator resolveIP*(
address: string, port: Port
): TransportAddress {.raises: [JsonRpcError].} =
var ips: seq[TransportAddress]
# Resolve IPv6 first so that dual stack detection works as expected
try:
for resolved in resolveTAddress(address, port, AddressFamily.IPv6):
if resolved notin ips:
ips.add resolved
except TransportAddressError:
discard
try:
for resolved in resolveTAddress(address, port, AddressFamily.IPv4):
if resolved notin ips:
ips.add resolved
except TransportAddressError:
discard
processResolvedAddresses($address & ":" & $port)

View File

@ -7,6 +7,8 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [], gcsafe.}
import
std/[macros, tables, json],
chronicles,
@ -24,7 +26,7 @@ export
type
# Procedure signature accepted as an RPC call by server
RpcProc* = proc(params: RequestParamsRx): Future[JsonString]
{.gcsafe, raises: [CatchableError].}
{.async.}
RpcRouter* = object
procs*: Table[string, RpcProc]
@ -40,8 +42,6 @@ const
defaultMaxRequestLength* = 1024 * 128
{.push gcsafe, raises: [].}
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
@ -127,12 +127,15 @@ proc hasMethod*(router: RpcRouter, methodName: string): bool =
router.procs.hasKey(methodName)
proc route*(router: RpcRouter, req: RequestRx):
Future[ResponseTx] {.gcsafe, async: (raises: []).} =
Future[ResponseTx] {.async: (raises: []).} =
let rpcProc = router.validateRequest(req).valueOr:
return wrapError(error, req.id)
try:
debug "Processing JSON-RPC request", id = req.id, name = req.`method`.get()
let res = await rpcProc(req.params)
debug "Returning JSON-RPC response",
id = req.id, name = req.`method`.get(), len = string(res).len
return wrapReply(res, req.id)
except ApplicationError as err:
return wrapError(applicationError(err.code, err.msg, err.data), req.id)
@ -150,92 +153,34 @@ proc route*(router: RpcRouter, req: RequestRx):
escapeJson(err.msg).JsonString).
wrapError(req.id)
proc wrapErrorAsync*(code: int, msg: string):
Future[JsonString] {.gcsafe, async: (raises: []).} =
return wrapError(code, msg).JsonString
proc route*(router: RpcRouter, data: string):
Future[string] {.gcsafe, async: (raises: []).} =
proc route*(router: RpcRouter, data: string|seq[byte]):
Future[string] {.async: (raises: []).} =
## Route to RPC from string data. Data is expected to be able to be
## converted to Json.
## Returns string of Json from RPC result/error node
when defined(nimHasWarnBareExcept):
{.push warning[BareExcept]:off.}
let request =
try:
JrpcSys.decode(data, RequestBatchRx)
except CatchableError as err:
return wrapError(JSON_PARSE_ERROR, err.msg)
except Exception as err:
# TODO https://github.com/status-im/nimbus-eth2/issues/2430
return wrapError(JSON_PARSE_ERROR, err.msg)
let reply = try:
if request.kind == rbkSingle:
let response = await router.route(request.single)
JrpcSys.encode(response)
elif request.many.len == 0:
wrapError(INVALID_REQUEST, "no request object in request array")
else:
var resFut: seq[Future[ResponseTx]]
for req in request.many:
resFut.add router.route(req)
await noCancel(allFutures(resFut))
var response = ResponseBatchTx(kind: rbkMany)
for fut in resFut:
response.many.add fut.read()
JrpcSys.encode(response)
except CatchableError as err:
wrapError(JSON_ENCODE_ERROR, err.msg)
except Exception as err:
wrapError(JSON_ENCODE_ERROR, err.msg)
when defined(nimHasWarnBareExcept):
{.pop warning[BareExcept]:on.}
return reply
proc tryRoute*(router: RpcRouter, req: RequestRx,
fut: var Future[JsonString]): Result[void, string] =
## Route to RPC, returns false if the method or params cannot be found.
## Expects RequestRx input and returns json output.
when defined(nimHasWarnBareExcept):
{.push warning[BareExcept]:off.}
{.push warning[UnreachableCode]:off.}
try:
if req.jsonrpc.isNone:
return err("`jsonrpc` missing or invalid")
if req.meth.isNone:
return err("`method` missing or invalid")
let rpc = router.procs.getOrDefault(req.meth.get)
if rpc.isNil:
return err("rpc method not found: " & req.meth.get)
fut = rpc(req.params)
return ok()
except CatchableError as ex:
return err(ex.msg)
except Exception as ex:
return err(ex.msg)
when defined(nimHasWarnBareExcept):
{.pop warning[BareExcept]:on.}
{.pop warning[UnreachableCode]:on.}
proc tryRoute*(router: RpcRouter, data: JsonString,
fut: var Future[JsonString]): Result[void, string] =
## Route to RPC, returns false if the method or params cannot be found.
## Expects json input and returns json output.
try:
let req = JrpcSys.decode(data.string, RequestRx)
return router.tryRoute(req, fut)
except CatchableError as ex:
return err(ex.msg)
if request.kind == rbkSingle:
let response = await router.route(request.single)
JrpcSys.encode(response)
elif request.many.len == 0:
wrapError(INVALID_REQUEST, "no request object in request array")
else:
var resFut: seq[Future[ResponseTx]]
for req in request.many:
resFut.add router.route(req)
await noCancel(allFutures(resFut))
var response = ResponseBatchTx(kind: rbkMany)
for fut in resFut:
response.many.add fut.read()
JrpcSys.encode(response)
except CatchableError as err:
wrapError(JSON_ENCODE_ERROR, err.msg)
macro rpc*(server: RpcRouter, path: static[string], body: untyped): untyped =
## Define a remote procedure call.

View File

@ -7,6 +7,8 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [], gcsafe.}
import
pkg/websock/websock,
./servers/[httpserver],
@ -39,8 +41,6 @@ type
compression*: bool
flags*: set[TLSFlags]
{.push gcsafe, raises: [].}
# TODO Add validations that provided uri-s are correct https/wss uri and retrun
# Result[string, ClientConfig]
proc getHttpClientConfig*(uri: string): ClientConfig =
@ -54,7 +54,7 @@ proc getWebSocketClientConfig*(
ClientConfig(kind: WebSocket, wsUri: uri, compression: compression, flags: flags)
proc proxyCall(client: RpcClient, name: string): RpcProc =
return proc (params: RequestParamsRx): Future[JsonString] {.gcsafe, async.} =
return proc (params: RequestParamsRx): Future[JsonString] {.async.} =
let res = await client.call(name, params.toTx)
return res
@ -129,5 +129,5 @@ proc closeWait*(proxy: RpcProxy) {.async.} =
func localAddress*(proxy: RpcProxy): seq[TransportAddress] =
proxy.rpcHttpServer.localAddress()
{.pop.}

View File

@ -7,6 +7,8 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [], gcsafe.}
import
std/json,
chronos,
@ -25,8 +27,6 @@ type
RpcServer* = ref object of RootRef
router*: RpcRouter
{.push gcsafe, raises: [].}
# ------------------------------------------------------------------------------
# Constructors
# ------------------------------------------------------------------------------
@ -46,28 +46,31 @@ template hasMethod*(server: RpcServer, methodName: string): bool =
proc executeMethod*(server: RpcServer,
methodName: string,
params: RequestParamsTx): Future[JsonString]
{.gcsafe, raises: [JsonRpcError].} =
params: RequestParamsTx): Future[JsonString] {.async: (raises: [CancelledError, JsonRpcError]).} =
let
req = requestTx(methodName, params, RequestId(kind: riNumber, num: 0))
reqData = JrpcSys.encode(req).JsonString
reqData = JrpcSys.encode(req)
respData = await server.router.route(reqData)
resp = try:
JrpcSys.decode(respData, ResponseRx)
except CatchableError as exc:
raise (ref JsonRpcError)(msg: exc.msg)
server.router.tryRoute(reqData, result).isOkOr:
raise newException(JsonRpcError, error)
if resp.error.isSome:
raise (ref JsonRpcError)(msg: $resp.error.get)
resp.result
proc executeMethod*(server: RpcServer,
methodName: string,
args: JsonNode): Future[JsonString]
{.gcsafe, raises: [JsonRpcError].} =
args: JsonNode): Future[JsonString] {.async: (raises: [CancelledError, JsonRpcError], raw: true).} =
let params = paramsTx(args)
server.executeMethod(methodName, params)
proc executeMethod*(server: RpcServer,
methodName: string,
args: JsonString): Future[JsonString]
{.gcsafe, raises: [JsonRpcError].} =
args: JsonString): Future[JsonString] {.async: (raises: [CancelledError, JsonRpcError]).} =
let params = try:
let x = JrpcSys.decode(args.string, RequestParamsRx)
@ -75,16 +78,18 @@ proc executeMethod*(server: RpcServer,
except SerializationError as exc:
raise newException(JsonRpcError, exc.msg)
server.executeMethod(methodName, params)
await server.executeMethod(methodName, params)
# Wrapper for message processing
proc route*(server: RpcServer, line: string): Future[string] {.gcsafe.} =
proc route*(server: RpcServer, line: string): Future[string] {.async: (raises: [], raw: true).} =
server.router.route(line)
proc route*(server: RpcServer, line: seq[byte]): Future[string] {.async: (raises: [], raw: true).} =
server.router.route(line)
# Server registration
proc register*(server: RpcServer, name: string, rpc: RpcProc) {.gcsafe, raises: [CatchableError].} =
proc register*(server: RpcServer, name: string, rpc: RpcProc) =
## Add a name/code pair to the RPC server.
server.router.register(name, rpc)

View File

@ -7,10 +7,13 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [], gcsafe.}
import
stew/byteutils,
std/sequtils,
chronicles, httputils, chronos,
chronos/apps/http/[httpserver, shttpserver],
../private/utils,
../errors,
../server
@ -24,15 +27,14 @@ const
JsonRpcIdent = "nim-json-rpc"
type
# HttpAuthHook: handle CORS, JWT auth, etc. in HTTP header
# before actual request processed
# return value:
# - nil: auth success, continue execution
# - HttpResponse: could not authenticate, stop execution
# and return the response
HttpAuthHook* = proc(request: HttpRequestRef): Future[HttpResponseRef]
{.gcsafe, async: (raises: [CatchableError]).}
HttpAuthHook* =
proc(request: HttpRequestRef): Future[HttpResponseRef] {.async: (raises: [CatchableError]).}
# This inheritance arrangement is useful for
# e.g. combo HTTP server
@ -45,47 +47,48 @@ type
proc serveHTTP*(rpcServer: RpcHttpHandler, request: HttpRequestRef):
Future[HttpResponseRef] {.async: (raises: [CancelledError]).} =
let
headers = HttpTable.init([("Content-Type",
"application/json; charset=utf-8")])
chunkSize = rpcServer.maxChunkSize
try:
let req = await request.getBody()
debug "Received JSON-RPC request",
address = request.remote().valueOr(default(TransportAddress)),
len = req.len
let
body = await request.getBody()
data = await rpcServer.route(string.fromBytes(body))
data = await rpcServer.route(req)
chunkSize = rpcServer.maxChunkSize
streamType =
if data.len <= chunkSize:
HttpResponseStreamType.Plain
else:
HttpResponseStreamType.Chunked
response = request.getResponse()
if data.len <= chunkSize:
let res = await request.respond(Http200, data, headers)
trace "JSON-RPC result has been sent"
return res
response.addHeader("Content-Type", "application/json")
let response = request.getResponse()
response.status = Http200
response.addHeader("Content-Type", "application/json; charset=utf-8")
await response.prepare()
await response.prepare(streamType)
let maxLen = data.len
var len = data.len
while len > chunkSize:
await response.sendChunk(data[maxLen - len].unsafeAddr, chunkSize)
await response.send(data[maxLen - len].unsafeAddr, chunkSize)
len -= chunkSize
if len > 0:
await response.sendChunk(data[maxLen - len].unsafeAddr, len)
await response.send(data[maxLen - len].unsafeAddr, len)
await response.finish()
response
except CancelledError as exc:
raise exc
except CatchableError as exc:
debug "Internal error while processing JSON-RPC call"
return defaultResponse(exc)
defaultResponse(exc)
proc processClientRpc(rpcServer: RpcHttpServer): HttpProcessCallback2 =
return proc (req: RequestFence): Future[HttpResponseRef]
{.async: (raises: [CancelledError]).} =
if not req.isOk():
debug "Got invalid request", err = req.error()
return defaultResponse()
let request = req.get()
@ -96,6 +99,8 @@ proc processClientRpc(rpcServer: RpcHttpServer): HttpProcessCallback2 =
let res = await hook(request)
if not res.isNil:
return res
except CancelledError as exc:
raise exc
except CatchableError as exc:
error "Internal error while processing JSON-RPC hook", msg=exc.msg
return defaultResponse(exc)
@ -113,7 +118,7 @@ proc addHttpServer*(
backlogSize: int = 100,
httpHeadersTimeout = 10.seconds,
maxHeadersSize: int = 8192,
maxRequestBodySize: int = 1_048_576) =
maxRequestBodySize: int = 1_048_576) {.raises: [JsonRpcError].} =
let server = HttpServerRef.new(
address,
processClientRpc(rpcServer),
@ -125,7 +130,6 @@ proc addHttpServer*(
error "Failed to create server", address = $address,
message = error
raise newException(RpcBindError, "Unable to create server: " & $error)
info "Starting JSON-RPC HTTP server", url = "http://" & $address
rpcServer.httpServers.add server
@ -143,7 +147,7 @@ proc addSecureHttpServer*(
bufferSize: int = 4096,
httpHeadersTimeout = 10.seconds,
maxHeadersSize: int = 8192,
maxRequestBodySize: int = 1_048_576) =
maxRequestBodySize: int = 1_048_576) {.raises: [JsonRpcError].} =
let server = SecureHttpServerRef.new(
address,
processClientRpc(rpcServer),
@ -158,106 +162,62 @@ proc addSecureHttpServer*(
message = error
raise newException(RpcBindError, "Unable to create server: " & $error)
info "Starting JSON-RPC HTTPS server", url = "https://" & $address
rpcServer.httpServers.add server
proc addHttpServers*(server: RpcHttpServer,
addresses: openArray[TransportAddress]) =
addresses: openArray[TransportAddress]) {.raises: [JsonRpcError].} =
## Start a server on at least one of the given addresses, or raise
if addresses.len == 0:
return
var lastExc: ref JsonRpcError
for item in addresses:
# TODO handle partial failures, ie when 1/N addresses fail
server.addHttpServer(item)
try:
server.addHttpServer(item)
except JsonRpcError as exc:
lastExc = exc
if server.httpServers.len == 0:
raise lastExc
proc addSecureHttpServers*(server: RpcHttpServer,
addresses: openArray[TransportAddress],
tlsPrivateKey: TLSPrivateKey,
tlsCertificate: TLSCertificate) =
tlsCertificate: TLSCertificate) {.raises: [JsonRpcError].} =
## Start a server on at least one of the given addresses, or raise
if addresses.len == 0:
return
var lastExc: ref JsonRpcError
for item in addresses:
# TODO handle partial failures, ie when 1/N addresses fail
server.addSecureHttpServer(item, tlsPrivateKey, tlsCertificate)
try:
server.addSecureHttpServer(item, tlsPrivateKey, tlsCertificate)
except JsonRpcError as exc:
lastExc = exc
if server.httpServers.len == 0:
raise lastExc
template processResolvedAddresses =
if tas4.len + tas6.len == 0:
# Addresses could not be resolved, critical error.
raise newException(RpcAddressUnresolvableError, "Unable to get address!")
for r in tas4:
yield r
if tas4.len == 0: # avoid ipv4 + ipv6 running together
for r in tas6:
yield r
iterator resolvedAddresses(address: string): TransportAddress =
var
tas4: seq[TransportAddress]
tas6: seq[TransportAddress]
# Attempt to resolve `address` for IPv4 address space.
try:
tas4 = resolveTAddress(address, AddressFamily.IPv4)
except CatchableError:
discard
# Attempt to resolve `address` for IPv6 address space.
try:
tas6 = resolveTAddress(address, AddressFamily.IPv6)
except CatchableError:
discard
processResolvedAddresses()
iterator resolvedAddresses(address: string, port: Port): TransportAddress =
var
tas4: seq[TransportAddress]
tas6: seq[TransportAddress]
# Attempt to resolve `address` for IPv4 address space.
try:
tas4 = resolveTAddress(address, port, AddressFamily.IPv4)
except CatchableError:
discard
# Attempt to resolve `address` for IPv6 address space.
try:
tas6 = resolveTAddress(address, port, AddressFamily.IPv6)
except CatchableError:
discard
processResolvedAddresses()
proc addHttpServer*(server: RpcHttpServer, address: string) =
proc addHttpServer*(server: RpcHttpServer, address: string) {.raises: [JsonRpcError].} =
## Create new server and assign it to addresses ``addresses``.
for a in resolvedAddresses(address):
# TODO handle partial failures, ie when 1/N addresses fail
server.addHttpServer(a)
addHttpServers(server, toSeq(resolveIP([address])))
proc addSecureHttpServer*(server: RpcHttpServer,
address: string,
tlsPrivateKey: TLSPrivateKey,
tlsCertificate: TLSCertificate) =
for a in resolvedAddresses(address):
# TODO handle partial failures, ie when 1/N addresses fail
server.addSecureHttpServer(a, tlsPrivateKey, tlsCertificate)
tlsCertificate: TLSCertificate) {.raises: [JsonRpcError].} =
addSecureHttpServers(server, toSeq(resolveIP([address])), tlsPrivateKey, tlsCertificate)
proc addHttpServers*(server: RpcHttpServer, addresses: openArray[string]) =
for address in addresses:
# TODO handle partial failures, ie when 1/N addresses fail
server.addHttpServer(address)
proc addHttpServers*(server: RpcHttpServer, addresses: openArray[string]) {.raises: [JsonRpcError].} =
addHttpServers(server, toSeq(resolveIP(addresses)))
proc addHttpServer*(server: RpcHttpServer, address: string, port: Port) =
for a in resolvedAddresses(address, port):
# TODO handle partial failures, ie when 1/N addresses fail
server.addHttpServer(a)
proc addHttpServer*(server: RpcHttpServer, address: string, port: Port) {.raises: [JsonRpcError].} =
addHttpServers(server, toSeq(resolveIP(address, port)))
proc addSecureHttpServer*(server: RpcHttpServer,
address: string,
port: Port,
tlsPrivateKey: TLSPrivateKey,
tlsCertificate: TLSCertificate) =
for a in resolvedAddresses(address, port):
# TODO handle partial failures, ie when 1/N addresses fail
server.addSecureHttpServer(a, tlsPrivateKey, tlsCertificate)
tlsCertificate: TLSCertificate) {.raises: [JsonRpcError].} =
addSecureHttpServers(server, toSeq(resolveIP(address, port)), tlsPrivateKey, tlsCertificate)
proc new*(T: type RpcHttpServer, authHooks: seq[HttpAuthHook] = @[]): T =
T(router: RpcRouter.init(), httpServers: @[], authHooks: authHooks, maxChunkSize: 8192)
@ -271,22 +231,22 @@ proc newRpcHttpServer*(authHooks: seq[HttpAuthHook] = @[]): RpcHttpServer =
proc newRpcHttpServer*(router: RpcRouter, authHooks: seq[HttpAuthHook] = @[]): RpcHttpServer =
RpcHttpServer.new(router, authHooks)
proc newRpcHttpServer*(addresses: openArray[TransportAddress], authHooks: seq[HttpAuthHook] = @[]): RpcHttpServer =
proc newRpcHttpServer*(addresses: openArray[TransportAddress], authHooks: seq[HttpAuthHook] = @[]): RpcHttpServer {.raises: [JsonRpcError].} =
## Create new server and assign it to addresses ``addresses``.
result = newRpcHttpServer(authHooks)
result.addHttpServers(addresses)
proc newRpcHttpServer*(addresses: openArray[string], authHooks: seq[HttpAuthHook] = @[]): RpcHttpServer =
proc newRpcHttpServer*(addresses: openArray[string], authHooks: seq[HttpAuthHook] = @[]): RpcHttpServer {.raises: [JsonRpcError].} =
## Create new server and assign it to addresses ``addresses``.
result = newRpcHttpServer(authHooks)
result.addHttpServers(addresses)
proc newRpcHttpServer*(addresses: openArray[string], router: RpcRouter, authHooks: seq[HttpAuthHook] = @[]): RpcHttpServer =
proc newRpcHttpServer*(addresses: openArray[string], router: RpcRouter, authHooks: seq[HttpAuthHook] = @[]): RpcHttpServer {.raises: [JsonRpcError].} =
## Create new server and assign it to addresses ``addresses``.
result = newRpcHttpServer(router, authHooks)
result.addHttpServers(addresses)
proc newRpcHttpServer*(addresses: openArray[TransportAddress], router: RpcRouter, authHooks: seq[HttpAuthHook] = @[]): RpcHttpServer =
proc newRpcHttpServer*(addresses: openArray[TransportAddress], router: RpcRouter, authHooks: seq[HttpAuthHook] = @[]): RpcHttpServer {.raises: [JsonRpcError].} =
## Create new server and assign it to addresses ``addresses``.
result = newRpcHttpServer(router, authHooks)
result.addHttpServers(addresses)
@ -294,15 +254,14 @@ proc newRpcHttpServer*(addresses: openArray[TransportAddress], router: RpcRouter
proc start*(server: RpcHttpServer) =
## Start the RPC server.
for item in server.httpServers:
# TODO handle partial failures, ie when 1/N addresses fail
debug "HTTP RPC server started" # (todo: fix this), address = item
info "Starting JSON-RPC HTTP server", url = item.baseUri
item.start()
proc stop*(server: RpcHttpServer) {.async.} =
## Stop the RPC server.
for item in server.httpServers:
debug "HTTP RPC server stopped" # (todo: fix this), address = item.local
await item.stop()
info "Stopped JSON-RPC HTTP server", url = item.baseUri
proc closeWait*(server: RpcHttpServer) {.async.} =
## Cleanup resources of RPC server.

View File

@ -7,9 +7,13 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [], gcsafe.}
import
std/sequtils,
chronicles,
json_serialization/std/net,
../private/utils,
../errors,
../server
@ -20,20 +24,21 @@ type
servers: seq[StreamServer]
processClientHook: StreamCallback2
proc processClient(server: StreamServer, transport: StreamTransport) {.async: (raises: []), gcsafe.} =
proc processClient(server: StreamServer, transport: StreamTransport) {.async: (raises: []).} =
## Process transport data to the RPC server
try:
var rpc = getUserData[RpcSocketServer](server)
while true:
var
value = await transport.readLine(defaultMaxRequestLength)
if value == "":
let req = await transport.readLine(defaultMaxRequestLength)
if req == "":
await transport.closeWait()
break
debug "Processing message", address = transport.remoteAddress(), line = value
debug "Received JSON-RPC request",
address = transport.remoteAddress(),
len = req.len
let res = await rpc.route(value)
let res = await rpc.route(req)
discard await transport.write(res & "\r\n")
except TransportError as ex:
error "Transport closed during processing client", msg=ex.msg
@ -42,98 +47,33 @@ proc processClient(server: StreamServer, transport: StreamTransport) {.async: (r
# Utility functions for setting up servers using stream transport addresses
proc addStreamServer*(server: RpcSocketServer, address: TransportAddress) =
proc addStreamServer*(server: RpcSocketServer, address: TransportAddress) {.raises: [JsonRpcError].} =
try:
info "Starting JSON-RPC socket server", address = $address
var transportServer = createStreamServer(address, server.processClientHook, {ReuseAddr}, udata = server)
server.servers.add(transportServer)
except CatchableError as exc:
error "Failed to create server", address = $address, message = exc.msg
raise newException(RpcBindError, "Unable to create stream server: " & exc.msg)
if len(server.servers) == 0:
# Server was not bound, critical error.
raise newException(RpcBindError, "Unable to create server!")
proc addStreamServers*(server: RpcSocketServer, addresses: openArray[TransportAddress]) =
proc addStreamServers*(server: RpcSocketServer, addresses: openArray[TransportAddress]) {.raises: [JsonRpcError].} =
var lastExc: ref JsonRpcError
for item in addresses:
server.addStreamServer(item)
try:
server.addStreamServer(item)
except JsonRpcError as exc:
lastExc = exc
if server.servers.len == 0:
raise lastExc
proc addStreamServer*(server: RpcSocketServer, address: string) =
proc addStreamServer*(server: RpcSocketServer, address: string) {.raises: [JsonRpcError].} =
## Create new server and assign it to addresses ``addresses``.
var
tas4: seq[TransportAddress]
tas6: seq[TransportAddress]
added = 0
addStreamServers(server, toSeq(resolveIP([address])))
# Attempt to resolve `address` for IPv4 address space.
try:
tas4 = resolveTAddress(address, AddressFamily.IPv4)
except CatchableError:
discard
except Defect:
discard
proc addStreamServers*(server: RpcSocketServer, addresses: openArray[string]) {.raises: [JsonRpcError].} =
addStreamServers(server, toSeq(resolveIP(addresses)))
# Attempt to resolve `address` for IPv6 address space.
try:
tas6 = resolveTAddress(address, AddressFamily.IPv6)
except CatchableError:
discard
except Defect:
discard
for r in tas4:
server.addStreamServer(r)
added.inc
for r in tas6:
server.addStreamServer(r)
added.inc
if added == 0:
# Addresses could not be resolved, critical error.
raise newException(RpcAddressUnresolvableError, "Unable to get address!")
proc addStreamServers*(server: RpcSocketServer, addresses: openArray[string]) =
for address in addresses:
server.addStreamServer(address)
proc addStreamServer*(server: RpcSocketServer, address: string, port: Port) =
var
tas4: seq[TransportAddress]
tas6: seq[TransportAddress]
added = 0
# Attempt to resolve `address` for IPv4 address space.
try:
tas4 = resolveTAddress(address, port, AddressFamily.IPv4)
except CatchableError:
discard
except Defect:
discard
# Attempt to resolve `address` for IPv6 address space.
try:
tas6 = resolveTAddress(address, port, AddressFamily.IPv6)
except CatchableError:
discard
except Defect:
discard
if len(tas4) == 0 and len(tas6) == 0:
# Address was not resolved, critical error.
raise newException(RpcAddressUnresolvableError,
"Address " & address & " could not be resolved!")
for r in tas4:
server.addStreamServer(r)
added.inc
for r in tas6:
server.addStreamServer(r)
added.inc
if len(server.servers) == 0:
# Server was not bound, critical error.
raise newException(RpcBindError,
"Could not setup server on " & address & ":" & $int(port))
proc addStreamServer*(server: RpcSocketServer, address: string, port: Port) {.raises: [JsonRpcError].} =
addStreamServers(server, toSeq(resolveIP(address, port)))
proc new(T: type RpcSocketServer): T =
T(router: RpcRouter.init(), servers: @[], processClientHook: processClient)
@ -141,17 +81,17 @@ proc new(T: type RpcSocketServer): T =
proc newRpcSocketServer*(): RpcSocketServer =
RpcSocketServer.new()
proc newRpcSocketServer*(addresses: openArray[TransportAddress]): RpcSocketServer =
proc newRpcSocketServer*(addresses: openArray[TransportAddress]): RpcSocketServer {.raises: [JsonRpcError].} =
## Create new server and assign it to addresses ``addresses``.
result = RpcSocketServer.new()
result.addStreamServers(addresses)
proc newRpcSocketServer*(addresses: openArray[string]): RpcSocketServer =
proc newRpcSocketServer*(addresses: openArray[string]): RpcSocketServer {.raises: [JsonRpcError].} =
## Create new server and assign it to addresses ``addresses``.
result = RpcSocketServer.new()
result.addStreamServers(addresses)
proc newRpcSocketServer*(address: string, port: Port = Port(8545)): RpcSocketServer =
proc newRpcSocketServer*(address: string, port: Port = Port(8545)): RpcSocketServer {.raises: [JsonRpcError].} =
# Create server on specified port
result = RpcSocketServer.new()
result.addStreamServer(address, port)
@ -161,15 +101,23 @@ proc newRpcSocketServer*(processClientHook: StreamCallback2): RpcSocketServer =
result = RpcSocketServer.new()
result.processClientHook = processClientHook
proc start*(server: RpcSocketServer) =
proc start*(server: RpcSocketServer) {.raises: [JsonRpcError].} =
## Start the RPC server.
for item in server.servers:
item.start()
try:
info "Starting JSON-RPC socket server", address = item.localAddress
item.start()
except TransportOsError as exc:
# TODO stop already-started servers
raise (ref RpcBindError)(msg: exc.msg, parent: exc)
proc stop*(server: RpcSocketServer) =
## Stop the RPC server.
for item in server.servers:
item.stop()
try:
item.stop()
except TransportOsError as exc:
warn "Could not stop transport", err = exc.msg
proc close*(server: RpcSocketServer) =
## Cleanup resources of RPC server.

View File

@ -7,13 +7,15 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [], gcsafe.}
import
chronicles, chronos, websock/[websock, types],
websock/extensions/compression/deflate,
stew/byteutils, json_serialization/std/net,
".."/[server]
".."/[errors, server]
export server, net
export errors, server, net
logScope:
topics = "JSONRPC-WS-SERVER"
@ -25,8 +27,8 @@ type
# - true: auth success, continue execution
# - false: could not authenticate, stop execution
# and return the response
WsAuthHook* = proc(request: HttpRequest): Future[bool]
{.gcsafe, raises: [Defect, CatchableError].}
WsAuthHook* =
proc(request: HttpRequest): Future[bool] {.async: (raises: [CatchableError]).}
# This inheritance arrangement is useful for
# e.g. combo HTTP server
@ -48,22 +50,22 @@ proc serveHTTP*(rpc: RpcWebSocketHandler, request: HttpRequest)
trace "Websocket handshake completed"
while ws.readyState != ReadyState.Closed:
let recvData = await ws.recvMsg()
trace "Client message: ", size = recvData.len, binary = ws.binary
let req = await ws.recvMsg()
debug "Received JSON-RPC request", len = req.len
if ws.readyState == ReadyState.Closed:
# if session already terminated by peer,
# no need to send response
break
if recvData.len == 0:
if req.len == 0:
await ws.close(
reason = "cannot process zero length message"
)
break
let data = try:
await rpc.route(string.fromBytes(recvData))
await rpc.route(req)
except CatchableError as exc:
debug "Internal error, while processing RPC call",
address = $request.uri
@ -82,7 +84,7 @@ proc serveHTTP*(rpc: RpcWebSocketHandler, request: HttpRequest)
raise exc
except CatchableError as exc:
error "Something error", msg=exc.msg
debug "Something error", msg=exc.msg
proc handleRequest(rpc: RpcWebSocketServer, request: HttpRequest)
{.async: (raises: [CancelledError]).} =
@ -96,15 +98,17 @@ proc handleRequest(rpc: RpcWebSocketServer, request: HttpRequest)
let res = await hook(request)
if not res:
return
except CancelledError as exc:
raise exc
except CatchableError as exc:
error "Internal error while processing JSON-RPC hook", msg=exc.msg
debug "Internal error while processing JSON-RPC hook", msg=exc.msg
try:
await request.sendResponse(Http503,
data = "",
content = "Internal error, processing JSON-RPC hook: " & exc.msg)
return
except CatchableError as exc:
error "Something error", msg=exc.msg
debug "Something error", msg=exc.msg
return
await rpc.serveHTTP(request)
@ -124,18 +128,21 @@ proc newRpcWebSocketServer*(
compression: bool = false,
flags: set[ServerFlags] = {ServerFlags.TcpNoDelay,ServerFlags.ReuseAddr},
authHooks: seq[WsAuthHook] = @[],
rng = HmacDrbgContext.new()): RpcWebSocketServer =
rng = HmacDrbgContext.new()): RpcWebSocketServer {.raises: [JsonRpcError].} =
var server = new(RpcWebSocketServer)
proc processCallback(request: HttpRequest): Future[void] =
handleRequest(server, request)
server.initWebsocket(compression, authHooks, rng)
server.server = HttpServer.create(
address,
processCallback,
flags
)
try:
server.server = HttpServer.create(
address,
processCallback,
flags
)
except CatchableError as exc:
raise (ref RpcBindError)(msg: "Unable to create server: " & exc.msg, parent: exc)
server
@ -145,15 +152,18 @@ proc newRpcWebSocketServer*(
compression: bool = false,
flags: set[ServerFlags] = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr},
authHooks: seq[WsAuthHook] = @[],
rng = HmacDrbgContext.new()): RpcWebSocketServer =
rng = HmacDrbgContext.new()): RpcWebSocketServer {.raises: [JsonRpcError].} =
newRpcWebSocketServer(
initTAddress(host, port),
compression,
flags,
authHooks,
rng
)
try:
newRpcWebSocketServer(
initTAddress(host, port),
compression,
flags,
authHooks,
rng
)
except TransportError as exc:
raise (ref RpcBindError)(msg: "Unable to create server: " & exc.msg, parent: exc)
proc newRpcWebSocketServer*(
address: TransportAddress,
@ -166,23 +176,26 @@ proc newRpcWebSocketServer*(
tlsMinVersion = TLSVersion.TLS12,
tlsMaxVersion = TLSVersion.TLS12,
authHooks: seq[WsAuthHook] = @[],
rng = HmacDrbgContext.new()): RpcWebSocketServer =
rng = HmacDrbgContext.new()): RpcWebSocketServer {.raises: [JsonRpcError].} =
var server = new(RpcWebSocketServer)
proc processCallback(request: HttpRequest): Future[void] =
handleRequest(server, request)
server.initWebsocket(compression, authHooks, rng)
server.server = TlsHttpServer.create(
address,
tlsPrivateKey,
tlsCertificate,
processCallback,
flags,
tlsFlags,
tlsMinVersion,
tlsMaxVersion
)
try:
server.server = TlsHttpServer.create(
address,
tlsPrivateKey,
tlsCertificate,
processCallback,
flags,
tlsFlags,
tlsMinVersion,
tlsMaxVersion
)
except CatchableError as exc:
raise (ref RpcBindError)(msg: "Unable to create server: " & exc.msg, parent: exc)
server
@ -198,30 +211,39 @@ proc newRpcWebSocketServer*(
tlsMinVersion = TLSVersion.TLS12,
tlsMaxVersion = TLSVersion.TLS12,
authHooks: seq[WsAuthHook] = @[],
rng = HmacDrbgContext.new()): RpcWebSocketServer =
rng = HmacDrbgContext.new()): RpcWebSocketServer {.raises: [JsonRpcError].} =
newRpcWebSocketServer(
initTAddress(host, port),
tlsPrivateKey,
tlsCertificate,
compression,
flags,
tlsFlags,
tlsMinVersion,
tlsMaxVersion,
authHooks,
rng
)
try:
newRpcWebSocketServer(
initTAddress(host, port),
tlsPrivateKey,
tlsCertificate,
compression,
flags,
tlsFlags,
tlsMinVersion,
tlsMaxVersion,
authHooks,
rng
)
except TransportError as exc:
raise (ref RpcBindError)(msg: "Unable to create server: " & exc.msg, parent: exc)
proc start*(server: RpcWebSocketServer) =
proc start*(server: RpcWebSocketServer) {.raises: [JsonRpcError].} =
## Start the RPC server.
notice "WS RPC server started", address = server.server.local
server.server.start()
try:
info "Starting JSON-RPC WebSocket server", address = server.server.local
server.server.start()
except TransportOsError as exc:
raise (ref RpcBindError)(msg: "Unable to start server: " & exc.msg, parent: exc)
proc stop*(server: RpcWebSocketServer) =
## Stop the RPC server.
notice "WS RPC server stopped", address = server.server.local
server.server.stop()
try:
server.server.stop()
notice "Stopped JSON-RPC WebSocket server", address = server.server.local
except TransportOsError as exc:
warn "Could not stop JSON-RPC WebSocket server", err = exc.msg
proc close*(server: RpcWebSocketServer) =
## Cleanup resources of RPC server.

View File

@ -25,8 +25,7 @@ proc authHeaders(): seq[(string, string)] =
@[("Auth-Token", "Good Token")]
suite "HTTP server hook test":
proc mockAuth(req: HttpRequestRef): Future[HttpResponseRef] {.
gcsafe, async: (raises: [CatchableError]).} =
proc mockAuth(req: HttpRequestRef): Future[HttpResponseRef] {.async: (raises: [CatchableError]).} =
if req.headers.getString("Auth-Token") == "Good Token":
return HttpResponseRef(nil)
@ -60,7 +59,7 @@ proc wsAuthHeaders(ctx: Hook,
suite "Websocket server hook test":
let hook = Hook(append: wsAuthHeaders)
proc mockAuth(req: websock.HttpRequest): Future[bool] {.async.} =
proc mockAuth(req: websock.HttpRequest): Future[bool] {.async: (raises: [CatchableError]).} =
if not req.headers.contains("Auth-Token"):
await req.sendResponse(code = Http403, data = "Missing Auth-Token")
return false

View File

@ -8,80 +8,67 @@
# those terms.
import
unittest2,
unittest2, chronos/unittest2/asynctests,
../json_rpc/[rpcserver, rpcclient, jsonmarshal]
const TestsCount = 100
proc simpleTest(address: string): Future[bool] {.async.} =
var client = newRpcHttpClient()
await client.connect("http://" & address)
var r = await client.call("noParamsProc", %[])
if r.string == "\"Hello world\"":
result = true
proc continuousTest(address: string): Future[int] {.async.} =
var client = newRpcHttpClient()
result = 0
for i in 0..<TestsCount:
await client.connect("http://" & address)
var r = await client.call("myProc", %[%"abc", %[1, 2, 3, i]])
if r.string == "\"Hello abc data: [1, 2, 3, " & $i & "]\"":
result += 1
await client.close()
proc invalidTest(address: string): Future[bool] {.async.} =
var client = newRpcHttpClient()
await client.connect("http://" & address)
var invalidA, invalidB: bool
try:
var r = await client.call("invalidProcA", %[])
discard r
except JsonRpcError:
invalidA = true
try:
var r = await client.call("invalidProcB", %[1, 2, 3])
discard r
except JsonRpcError:
invalidB = true
if invalidA and invalidB:
result = true
const bigChunkSize = 4 * 8192
proc chunkedTest(address: string): Future[bool] {.async.} =
var client = newRpcHttpClient()
await client.connect("http://" & address)
let r = await client.call("bigchunkMethod", %[])
let data = JrpcConv.decode(r.string, seq[byte])
return data.len == bigChunkSize
suite "JSON-RPC/http":
setup:
var httpsrv = newRpcHttpServer(["127.0.0.1:0"])
# Create RPC on server
httpsrv.rpc("myProc") do(input: string, data: array[0..3, int]):
result = %("Hello " & input & " data: " & $data)
httpsrv.rpc("noParamsProc") do():
result = %("Hello world")
var httpsrv = newRpcHttpServer(["127.0.0.1:0"])
httpsrv.rpc("bigchunkMethod") do() -> seq[byte]:
result = newSeq[byte](bigChunkSize)
for i in 0..<result.len:
result[i] = byte(i mod 255)
# Create RPC on server
httpsrv.rpc("myProc") do(input: string, data: array[0..3, int]):
result = %("Hello " & input & " data: " & $data)
httpsrv.rpc("noParamsProc") do():
result = %("Hello world")
httpsrv.setMaxChunkSize(8192)
httpsrv.start()
let serverAddress = $httpsrv.localAddress()[0]
httpsrv.rpc("bigchunkMethod") do() -> seq[byte]:
result = newSeq[byte](bigChunkSize)
for i in 0..<result.len:
result[i] = byte(i mod 255)
teardown:
waitFor httpsrv.stop()
waitFor httpsrv.closeWait()
httpsrv.setMaxChunkSize(8192)
httpsrv.start()
let serverAddress = $httpsrv.localAddress()[0]
asyncTest "Simple RPC call":
var client = newRpcHttpClient()
await client.connect("http://" & serverAddress)
suite "JSON-RPC test suite":
test "Simple RPC call":
check waitFor(simpleTest(serverAddress)) == true
test "Continuous RPC calls (" & $TestsCount & " messages)":
check waitFor(continuousTest(serverAddress)) == TestsCount
test "Invalid RPC calls":
check waitFor(invalidTest(serverAddress)) == true
test "Http client can handle chunked transfer encoding":
check waitFor(chunkedTest(serverAddress)) == true
var r = await client.call("noParamsProc", %[])
check r.string == "\"Hello world\""
await client.close()
waitFor httpsrv.stop()
waitFor httpsrv.closeWait()
asyncTest "Continuous RPC calls (" & $TestsCount & " messages)":
var client = newRpcHttpClient()
for i in 0..<TestsCount:
await client.connect("http://" & serverAddress)
var r = await client.call("myProc", %[%"abc", %[1, 2, 3, i]])
check: r.string == "\"Hello abc data: [1, 2, 3, " & $i & "]\""
await client.close()
asyncTest "Invalid RPC calls":
var client = newRpcHttpClient()
await client.connect("http://" & serverAddress)
expect JsonRpcError:
discard await client.call("invalidProcA", %[])
expect JsonRpcError:
discard await client.call("invalidProcB", %[1, 2, 3])
await client.close()
asyncTest "Http client can handle chunked transfer encoding":
var client = newRpcHttpClient()
await client.connect("http://" & serverAddress)
let r = await client.call("bigchunkMethod", %[])
let data = JrpcConv.decode(r.string, seq[byte])
check:
data.len == bigChunkSize
await client.close()

View File

@ -7,7 +7,7 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
import unittest2
import unittest2, chronos/unittest2/asynctests
import ../json_rpc/[rpcserver, rpcclient]
import chronos/[streams/tlsstream, apps/http/httpcommon]
@ -73,61 +73,48 @@ N8r5CwGcIX/XPC3lKazzbZ8baA==
-----END CERTIFICATE-----
"""
proc simpleTest(address: string): Future[bool] {.async.} =
var client = newRpcHttpClient(secure=true)
await client.connect("https://" & address)
var r = await client.call("noParamsProc", %[])
if r.string == "\"Hello world\"":
result = true
proc continuousTest(address: string): Future[int] {.async.} =
var client = newRpcHttpClient(secure=true)
result = 0
for i in 0..<TestsCount:
await client.connect("https://" & address)
var r = await client.call("myProc", %[%"abc", %[1, 2, 3, i]])
if r.string == "\"Hello abc data: [1, 2, 3, " & $i & "]\"":
result += 1
await client.close()
proc invalidTest(address: string): Future[bool] {.async.} =
var client = newRpcHttpClient(secure=true)
await client.connect("https://" & address)
var invalidA, invalidB: bool
try:
var r = await client.call("invalidProcA", %[])
discard r
except JsonRpcError:
invalidA = true
try:
var r = await client.call("invalidProcB", %[1, 2, 3])
discard r
except JsonRpcError:
invalidB = true
if invalidA and invalidB:
result = true
let secureKey = TLSPrivateKey.init(HttpsSelfSignedRsaKey)
let secureCert = TLSCertificate.init(HttpsSelfSignedRsaCert)
var secureHttpSrv = RpcHttpServer.new()
secureHttpSrv.addSecureHttpServer("127.0.0.1:0", secureKey, secureCert)
suite "JSON-RPC/https":
setup:
var secureHttpSrv = RpcHttpServer.new()
# Create RPC on server
secureHttpSrv.rpc("myProc") do(input: string, data: array[0..3, int]):
result = %("Hello " & input & " data: " & $data)
secureHttpSrv.rpc("noParamsProc") do():
result = %("Hello world")
secureHttpSrv.addSecureHttpServer("127.0.0.1:0", secureKey, secureCert)
secureHttpSrv.start()
# Create RPC on server
secureHttpSrv.rpc("myProc") do(input: string, data: array[0..3, int]):
result = %("Hello " & input & " data: " & $data)
secureHttpSrv.rpc("noParamsProc") do():
result = %("Hello world")
suite "JSON-RPC test suite":
test "Simple RPC call":
check waitFor(simpleTest($secureHttpSrv.localAddress()[0])) == true
test "Continuous RPC calls (" & $TestsCount & " messages)":
check waitFor(continuousTest($secureHttpSrv.localAddress()[0])) == TestsCount
test "Invalid RPC calls":
check waitFor(invalidTest($secureHttpSrv.localAddress()[0])) == true
secureHttpSrv.start()
let serverAddress = $secureHttpSrv.localAddress()[0]
teardown:
waitFor secureHttpSrv.stop()
waitFor secureHttpSrv.closeWait()
asyncTest "Simple RPC call":
var client = newRpcHttpClient(secure=true)
await client.connect("https://" & serverAddress)
var r = await client.call("noParamsProc", %[])
check r.string == "\"Hello world\""
asyncTest "Continuous RPC calls (" & $TestsCount & " messages)":
var client = newRpcHttpClient(secure=true)
for i in 0..<TestsCount:
await client.connect("https://" & serverAddress)
var r = await client.call("myProc", %[%"abc", %[1, 2, 3, i]])
check r.string == "\"Hello abc data: [1, 2, 3, " & $i & "]\""
await client.close()
asyncTest "Invalid RPC calls":
var client = newRpcHttpClient(secure=true)
await client.connect("https://" & serverAddress)
expect JsonRpcError:
discard await client.call("invalidProcA", %[])
expect JsonRpcError:
discard await client.call("invalidProcB", %[1, 2, 3])
waitFor secureHttpSrv.stop()
waitFor secureHttpSrv.closeWait()

View File

@ -117,10 +117,10 @@ suite "Websocket Server/Client RPC with Compression":
suite "Custom processClient":
test "Should be able to use custom processClient":
var wasCalled: bool = false
proc processClientHook(server: StreamServer, transport: StreamTransport) {.async: (raises: []), gcsafe.} =
proc processClientHook(server: StreamServer, transport: StreamTransport) {.async: (raises: []).} =
wasCalled = true
var srv = newRpcSocketServer(processClientHook)
srv.addStreamServer("localhost", Port(8888))
var client = newRpcSocketClient()