* fixes

* fix https://github.com/status-im/nimbus-eth2/issues/1650
* only one of `result` and `error` allowed in response
* fix invalid `string` instances being created from byte sequences
* fix large int64 parsing on 32-bit
* fix exception inheritance
* fix some dangling results
* some cleanups

* annotate exception issues, fix cancellation

* more error handling cleanup

* add rudimentary error tests

* cleanups

* simplify init
* use nextId -> lastId to avoid =1 init
* remove obsolete tests
This commit is contained in:
Jacek Sieka 2021-02-15 13:45:51 +01:00 committed by GitHub
parent 831471f6d4
commit 4eb39203eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 338 additions and 507 deletions

View File

@ -1,29 +1,26 @@
import
tables, json, macros,
std/[tables, json, macros],
chronos,
jsonmarshal, errors
./jsonmarshal
from strutils import toLowerAscii, replace
export
chronos
chronos, json, tables
type
ClientId* = int64
RpcClient* = ref object of RootRef
awaiting*: Table[ClientId, Future[Response]]
nextId: ClientId
lastId: ClientId
methodHandlers: Table[string, proc(j: JsonNode) {.gcsafe.}]
onDisconnect*: proc() {.gcsafe.}
Response* = tuple[error: bool, result: JsonNode]
proc initRpcClient*[T: RpcClient](client: var T) =
client.nextId = 1
Response* = JsonNode
proc getNextId*(client: RpcClient): ClientId =
result = client.nextId
client.nextId.inc
client.lastId += 1
client.lastId
proc rpcCallNode*(path: string, params: JsonNode, id: ClientId): JsonNode =
%{"jsonrpc": %"2.0", "method": %path, "params": params, "id": %id}
@ -34,62 +31,38 @@ method call*(client: RpcClient, name: string,
method close*(client: RpcClient) {.base, gcsafe, async.} = discard
template asyncRaise[T](fut: Future[T], errType: typedesc, msg: string) =
fut.fail(newException(errType, msg))
template `or`(a: JsonNode, b: typed): JsonNode =
if a == nil: b else: a
macro checkGet(node: JsonNode, fieldName: string,
jKind: static[JsonNodeKind]): untyped =
let n = genSym(ident = "n") #`node`{`fieldName`}
result = quote:
let `n` = `node`{`fieldname`}
if `n`.isNil or `n`.kind == JNull:
raise newException(ValueError,
"Message is missing required field \"" & `fieldName` & "\"")
if `n`.kind != `jKind`.JsonNodeKind:
raise newException(ValueError,
"Expected " & $(`jKind`.JsonNodeKind) & ", got " & $`n`.kind)
case jKind
of JBool: result.add(quote do: `n`.getBool)
of JInt: result.add(quote do: `n`.getInt)
of JString: result.add(quote do: `n`.getStr)
of JFloat: result.add(quote do: `n`.getFloat)
of JObject: result.add(quote do: `n`.getObject)
else: discard
proc processMessage*(self: RpcClient, line: string) {.gcsafe.} =
proc processMessage*(self: RpcClient, line: string) =
# Note: this doesn't use any transport code so doesn't need to be
# differentiated.
let node = parseJson(line)
if "id" in node:
let id = checkGet(node, "id", JInt)
let id = node{"id"} or newJNull()
var requestFut: Future[Response]
if not self.awaiting.pop(id, requestFut):
raise newException(ValueError,
"Cannot find message id \"" & $node["id"].getInt & "\"")
if not self.awaiting.pop(id.getInt(-1), requestFut):
raise newException(ValueError, "Cannot find message id \"" & $id & "\"")
let version = checkGet(node, "jsonrpc", JString)
let version = node{"jsonrpc"}.getStr()
if version != "2.0":
self.awaiting[id].asyncRaise(ValueError,
"Unsupported version of JSON, expected 2.0, received \"" & version & "\"")
let errorNode = node{"error"}
if errorNode.isNil or errorNode.kind == JNull:
var res = node{"result"}
if not res.isNil:
requestFut.complete((false, res))
else:
requestFut.fail(newException(InvalidResponse, "Missing `result` field"))
requestFut.fail(newException(ValueError,
"Unsupported version of JSON, expected 2.0, received \"" & version & "\""))
else:
requestFut.fail(newException(ValueError, $errorNode))
let result = node{"result"}
if result.isNil:
let error = node{"error"} or newJNull()
requestFut.fail(newException(ValueError, $error))
else:
requestFut.complete(result)
elif "method" in node:
# This could be subscription notification
let name = node["method"].getStr()
let handler = self.methodHandlers.getOrDefault(name)
if not handler.isNil:
handler(node{"params"})
handler(node{"params"} or newJArray())
else:
raise newException(ValueError, "Invalid jsonrpc message: " & $node)
@ -165,23 +138,20 @@ proc createRpcFromSig*(clientType, rpcDecl: NimNode): NimNode =
clientIdent = newIdentNode("client")
# proc return variable
procRes = ident"result"
# actual return value, `rpcResult`.result
jsonRpcResult = nnkDotExpr.newTree(rpcResult, newIdentNode("result"))
# perform rpc call
callBody.add(quote do:
# `rpcResult` is of type `Response`
let `rpcResult` = await `clientIdent`.call(`pathStr`, `jsonParamIdent`)
if `rpcResult`.error: raise newException(ValueError, $`rpcResult`.result)
)
if customReturnType:
# marshal json to native Nim type
callBody.add(jsonToNim(procRes, returnType, jsonRpcResult, "result"))
callBody.add(jsonToNim(procRes, returnType, rpcResult, "result"))
else:
# native json expected so no work
callBody.add quote do:
`procRes` = `rpcResult`.result
`procRes` = `rpcResult`
when defined(nimDumpRpcs):
echo pathStr, ":\n", result.repr

View File

@ -1,6 +1,8 @@
import json, strutils, tables, uri
import chronicles, httputils, chronos, json_serialization/std/net
import ../client
import
std/[json, strutils, tables, uri],
stew/byteutils,
chronicles, httputils, chronos, json_serialization/std/net,
../client
logScope:
topics = "JSONRPC-HTTP-CLIENT"
@ -13,10 +15,11 @@ type
loop: Future[void]
addresses: seq[TransportAddress]
options: HttpClientOptions
maxBodySize: int
const
MaxHttpHeadersSize = 8192 # maximum size of HTTP headers in octets
MaxHttpRequestSize = 128 * 1024 # maximum size of HTTP body in octets
MaxHttpRequestSize = 128 * 1024 * 1024 # maximum size of HTTP body in octets
HttpHeadersTimeout = 120.seconds # timeout for receiving headers (120 sec)
HttpBodyTimeout = 12.seconds # timeout for receiving body (12 sec)
HeadersMark = @[byte(0x0D), byte(0x0A), byte(0x0D), byte(0x0A)]
@ -34,12 +37,11 @@ proc sendRequest(transp: StreamTransport,
if len(data) > 0:
request.add(data)
try:
let res = await transp.write(cast[seq[char]](request))
if res != len(request):
result = false
result = true
except:
result = false
let res = await transp.write(request.toBytes())
return res == len(request):
except CancelledError as exc: raise exc
except CatchableError:
return false
proc validateResponse*(transp: StreamTransport,
header: HttpResponseHeader): bool =
@ -48,8 +50,7 @@ proc validateResponse*(transp: StreamTransport,
httpcode = header.code,
httpreason = header.reason(),
address = transp.remoteAddress()
result = false
return
return false
var ctype = header["Content-Type"]
# might be "application/json; charset=utf-8"
@ -57,8 +58,7 @@ proc validateResponse*(transp: StreamTransport,
# Content-Type header is not "application/json"
debug "Content type must be application/json",
address = transp.remoteAddress()
result = false
return
return false
let length = header.contentLength()
if length <= 0:
@ -71,15 +71,13 @@ proc validateResponse*(transp: StreamTransport,
else:
debug "Content body size could not be calculated",
address = transp.remoteAddress()
result = false
return
return false
result = true
return true
proc recvData(transp: StreamTransport): Future[string] {.async.} =
proc recvData(transp: StreamTransport, maxBodySize: int): Future[string] {.async.} =
var buffer = newSeq[byte](MaxHttpHeadersSize)
var header: HttpResponseHeader
var error = false
try:
let hlenfut = transp.readUntil(addr buffer[0], MaxHttpHeadersSize,
HeadersMark)
@ -88,35 +86,38 @@ proc recvData(transp: StreamTransport): Future[string] {.async.} =
# Timeout
debug "Timeout expired while receiving headers",
address = transp.remoteAddress()
error = true
else:
let hlen = hlenfut.read()
buffer.setLen(hlen)
header = buffer.parseResponse()
if header.failed():
# Header could not be parsed
debug "Malformed header received",
address = transp.remoteAddress()
error = true
return ""
let hlen = hlenfut.read()
buffer.setLen(hlen)
header = buffer.parseResponse()
if header.failed():
# Header could not be parsed
debug "Malformed header received",
address = transp.remoteAddress()
return ""
except TransportLimitError:
# size of headers exceeds `MaxHttpHeadersSize`
debug "Maximum size of headers limit reached",
address = transp.remoteAddress()
error = true
return ""
except TransportIncompleteError:
# remote peer disconnected
debug "Remote peer disconnected", address = transp.remoteAddress()
error = true
return ""
except TransportOsError as exc:
debug "Problems with networking", address = transp.remoteAddress(),
error = exc.msg
error = true
return ""
if error or not transp.validateResponse(header):
result = ""
return
if not transp.validateResponse(header):
return ""
let length = header.contentLength()
if length > maxBodySize:
debug "Request body too large", length, maxBodySize
return ""
try:
if length > 0:
# `Content-Length` is present in response header.
@ -127,43 +128,40 @@ proc recvData(transp: StreamTransport): Future[string] {.async.} =
# Timeout
debug "Timeout expired while receiving request body",
address = transp.remoteAddress()
error = true
else:
blenfut.read()
return ""
blenfut.read() # exceptions
else:
# `Content-Length` is not present in response header, so we are reading
# everything until connection will be closed.
var blenfut = transp.read()
var blenfut = transp.read(maxBodySize)
let ores = await withTimeout(blenfut, HttpBodyTimeout)
if not ores:
# Timeout
debug "Timeout expired while receiving request body",
address = transp.remoteAddress()
error = true
else:
buffer = blenfut.read()
return ""
buffer = blenfut.read()
except TransportIncompleteError:
# remote peer disconnected
debug "Remote peer disconnected", address = transp.remoteAddress()
error = true
return ""
except TransportOsError as exc:
debug "Problems with networking", address = transp.remoteAddress(),
error = exc.msg
error = true
return ""
if error:
result = ""
else:
result = cast[string](buffer)
return string.fromBytes(buffer)
proc init(opts: var HttpClientOptions) =
opts.httpMethod = MethodPost
proc new(T: type RpcHttpClient, maxBodySize = MaxHttpRequestSize): T =
T(
maxBodySize: maxBodySize,
options: HttpClientOptions(httpMethod: MethodPost),
)
proc newRpcHttpClient*(): RpcHttpClient =
## Creates a new HTTP client instance.
new result
result.initRpcClient()
result.options.init()
proc newRpcHttpClient*(maxBodySize = MaxHttpRequestSize): RpcHttpClient =
RpcHttpClient.new(maxBodySize)
proc httpMethod*(client: RpcHttpClient): HttpMethod =
client.options.httpMethod
@ -176,30 +174,42 @@ method call*(client: RpcHttpClient, name: string,
## Remotely calls the specified RPC method.
let id = client.getNextId()
let transp = await connect(client.addresses[0])
var reqBody = $rpcCallNode(name, params, id)
let res = await transp.sendRequest(reqBody, client.httpMethod)
let
transp = await connect(client.addresses[0])
reqBody = $rpcCallNode(name, params, id)
res = await transp.sendRequest(reqBody, client.httpMethod)
if not res:
debug "Failed to send message to RPC server",
address = transp.remoteAddress(), msg_len = len(reqBody)
await transp.closeWait()
raise newException(ValueError, "Transport error")
else:
debug "Message sent to RPC server", address = transp.remoteAddress(),
msg_len = len(reqBody)
trace "Message", msg = reqBody
var value = await transp.recvData()
debug "Message sent to RPC server", address = transp.remoteAddress(),
msg_len = len(reqBody)
trace "Message", msg = reqBody
let value = await transp.recvData(client.maxBodySize)
await transp.closeWait()
if value.len == 0:
raise newException(ValueError, "Empty response from server")
# completed by processMessage.
# completed by processMessage - the flow is quite weird here to accomodate
# socket and ws clients, but could use a more thorough refactoring
var newFut = newFuture[Response]()
# add to awaiting responses
client.awaiting[id] = newFut
client.processMessage(value)
result = await newFut
try:
# Might raise for all kinds of reasons
client.processMessage(value)
finally:
# Need to clean up in case the answer was invalid
client.awaiting.del(id)
# processMessage should have completed this future - if it didn't, `read` will
# raise, which is reasonable
return newFut.read()
proc connect*(client: RpcHttpClient, address: string, port: Port) {.async.} =
client.addresses = resolveTAddress(address, port)

View File

@ -1,4 +1,6 @@
import ../client, chronos, tables, json
import
std/[json, tables],
../client, chronos
type
RpcSocketClient* = ref object of RpcClient
@ -8,16 +10,18 @@ type
const defaultMaxRequestLength* = 1024 * 128
proc new*(T: type RpcSocketClient): T =
T()
proc newRpcSocketClient*: RpcSocketClient =
## Creates a new client instance.
new result
result.initRpcClient()
RpcSocketClient.new()
method call*(self: RpcSocketClient, name: string,
params: JsonNode): Future[Response] {.async.} =
## Remotely calls the specified RPC method.
let id = self.getNextId()
var value = $rpcCallNode(name, params, id) & "\c\l"
var value = $rpcCallNode(name, params, id) & "\r\n"
if self.transport.isNil:
raise newException(ValueError,
"Transport is not initialised (missing a call to connect?)")
@ -31,7 +35,7 @@ method call*(self: RpcSocketClient, name: string,
# TODO: Add actions when not full packet was send, e.g. disconnect peer.
doAssert(res == len(value))
result = await newFut
return await newFut
proc processData(client: RpcSocketClient) {.async.} =
while true:
@ -42,7 +46,9 @@ proc processData(client: RpcSocketClient) {.async.} =
await client.transport.closeWait()
break
# TODO handle exceptions
client.processMessage(value)
# async loop reconnection and waiting
client.transport = await connect(client.address)

View File

@ -1,4 +1,6 @@
import ../client, chronos, tables, json, strtabs
import
std/[json, strtabs, tables],
../client, chronos
const newsUseChronos = true
include news
@ -9,16 +11,18 @@ type
uri*: string
loop*: Future[void]
proc new*(T: type RpcWebSocketClient): T =
T()
proc newRpcWebSocketClient*: RpcWebSocketClient =
## Creates a new client instance.
new result
result.initRpcClient()
RpcWebSocketClient.new()
method call*(self: RpcWebSocketClient, name: string,
params: JsonNode): Future[Response] {.async.} =
## Remotely calls the specified RPC method.
let id = self.getNextId()
var value = $rpcCallNode(name, params, id) & "\c\l"
var value = $rpcCallNode(name, params, id) & "\r\n"
if self.transport.isNil:
raise newException(ValueError,
"Transport is not initialised (missing a call to connect?)")
@ -30,10 +34,10 @@ method call*(self: RpcWebSocketClient, name: string,
self.awaiting[id] = newFut
await self.transport.send(value)
result = await newFut
return await newFut
proc processData(client: RpcWebSocketClient) {.async.} =
var error: ref Exception
var error: ref CatchableError
try:
while true:
var value = await client.transport.receiveString()

View File

@ -8,3 +8,5 @@ type
InvalidResponse* = object of JsonRpcError
## raised when the server response violates the JSON-RPC protocol
RpcBindError* = object of JsonRpcError
RpcAddressUnresolvableError* = object of JsonRpcError

View File

@ -1,28 +1,30 @@
import
macros, json, options, typetraits,
std/[macros, json, options, typetraits],
stew/byteutils
export json
proc expect*(actual, expected: JsonNodeKind, argName: string) =
if actual != expected: raise newException(ValueError, "Parameter [" & argName & "] expected " & $expected & " but got " & $actual)
proc `%`*(n: byte{not lit}): JsonNode =
result = newJInt(int(n))
newJInt(int(n))
proc `%`*(n: uint64{not lit}): JsonNode =
result = newJInt(int(n))
newJInt(int(n))
proc `%`*(n: ref SomeInteger): JsonNode =
if n.isNil:
result = newJNull()
newJNull()
else:
result = newJInt(n[])
newJInt(n[])
when (NimMajor, NimMinor, NimPatch) < (0, 19, 9):
proc `%`*[T](option: Option[T]): JsonNode =
if option.isSome:
result = `%`(option.get)
`%`(option.get)
else:
result = newJNull()
newJNull()
# Compiler requires forward decl when processing out of module
proc fromJson*(n: JsonNode, argName: string, result: var bool)
@ -42,7 +44,7 @@ proc fromJson*[T](n: JsonNode, argName: string, result: var Option[T])
# This can't be forward declared: https://github.com/nim-lang/Nim/issues/7868
proc fromJson*[T: enum](n: JsonNode, argName: string, result: var T) =
n.kind.expect(JInt, argName)
result = n.getInt().T
result = n.getBiggestInt().T
# This can't be forward declared: https://github.com/nim-lang/Nim/issues/7868
proc fromJson*[T: object|tuple](n: JsonNode, argName: string, result: var T) =
@ -83,11 +85,11 @@ proc fromJson*[T: ref object](n: JsonNode, argName: string, result: var T) =
proc fromJson*(n: JsonNode, argName: string, result: var int64) =
n.kind.expect(JInt, argName)
result = n.getInt()
result = n.getBiggestInt().int64
proc fromJson*(n: JsonNode, argName: string, result: var uint64) =
n.kind.expect(JInt, argName)
let asInt = n.getInt()
let asInt = n.getBiggestInt()
# signed -> unsigned conversions are unchecked
# https://github.com/nim-lang/RFCs/issues/175
if asInt < 0:
@ -103,6 +105,10 @@ proc fromJson*(n: JsonNode, argName: string, result: var uint32) =
if asInt < 0:
raise newException(
ValueError, "JSON-RPC input is an unexpected negative value")
if asInt > BiggestInt(uint32.high()):
raise newException(
ValueError, "JSON-RPC input is too large for uint32")
result = uint32(asInt)
proc fromJson*(n: JsonNode, argName: string, result: var ref int64) =
@ -118,7 +124,8 @@ proc fromJson*(n: JsonNode, argName: string, result: var ref int) =
proc fromJson*(n: JsonNode, argName: string, result: var byte) =
n.kind.expect(JInt, argName)
let v = n.getInt()
if v > 255 or v < 0: raise newException(ValueError, "Parameter \"" & argName & "\" value out of range for byte: " & $v)
if v > 255 or v < 0:
raise newException(ValueError, "Parameter \"" & argName & "\" value out of range for byte: " & $v)
result = byte(v)
proc fromJson*(n: JsonNode, argName: string, result: var float) =
@ -142,7 +149,8 @@ proc fromJson*[T](n: JsonNode, argName: string, result: var seq[T]) =
proc fromJson*[N, T](n: JsonNode, argName: string, result: var array[N, T]) =
n.kind.expect(JArray, argName)
if n.len > result.len: raise newException(ValueError, "Parameter \"" & argName & "\" item count is too big for array")
if n.len > result.len:
raise newException(ValueError, "Parameter \"" & argName & "\" item count is too big for array")
for i in 0 ..< n.len:
fromJson(n[i], argName, result[i])
@ -175,12 +183,9 @@ iterator paramsRevIter(params: NimNode): tuple[name, ntype: NimNode] =
yield (arg[j], argType)
proc isOptionalArg(typeNode: NimNode): bool =
if typeNode.kind != nnkBracketExpr:
result = false
return
result = typeNode[0].kind == nnkIdent and
typeNode[0].strVal == "Option"
typeNode.kind == nnkBracketExpr and
typeNode[0].kind == nnkIdent and
typeNode[0].strVal == "Option"
proc expectOptionalArrayLen(node, parameters, jsonIdent: NimNode, maxLength: int): int =
var minLength = maxLength
@ -199,13 +204,12 @@ proc expectOptionalArrayLen(node, parameters, jsonIdent: NimNode, maxLength: int
raise newException(ValueError, `expectedStr` & $`jsonIdent`.len)
)
result = minLength
minLength
proc containsOptionalArg(params: NimNode): bool =
for n, t in paramsIter(params):
if t.isOptionalArg:
result = true
break
return true
proc jsonToNim*(assignIdent, paramType, jsonIdent: NimNode, paramNameStr: string, optional = false): NimNode =
# verify input and load a Nim type from json data

View File

@ -1,37 +1,23 @@
import
json, tables, strutils, macros, options,
std/[json, macros, options, strutils, tables],
chronicles, chronos, json_serialization/writer,
jsonmarshal
./jsonmarshal
export
chronos, json, jsonmarshal
type
RpcJsonError* = enum
rjeInvalidJson, rjeVersionError, rjeNoMethod, rjeNoId, rjeNoParams, rjeNoJObject
RpcJsonErrorContainer* = tuple[err: RpcJsonError, msg: string]
StringOfJson* = JsonString
# Procedure signature accepted as an RPC call by server
RpcProc* = proc(input: JsonNode): Future[StringOfJson] {.gcsafe.}
RpcProcError* = ref object of Exception
code*: int
data*: JsonNode
RpcBindError* = object of Exception
RpcAddressUnresolvableError* = object of Exception
RpcRouter* = object
procs*: TableRef[string, RpcProc]
procs*: Table[string, RpcProc]
const
methodField = "method"
paramsField = "params"
jsonRpcField = "jsonrpc"
idField = "id"
messageTerminator = "\c\l"
JSON_PARSE_ERROR* = -32700
INVALID_REQUEST* = -32600
@ -41,18 +27,11 @@ const
SERVER_ERROR* = -32000
defaultMaxRequestLength* = 1024 * 128
jsonErrorMessages*: array[RpcJsonError, (int, string)] =
[
(JSON_PARSE_ERROR, "Invalid JSON"),
(INVALID_REQUEST, "JSON 2.0 required"),
(INVALID_REQUEST, "No method requested"),
(INVALID_REQUEST, "No id specified"),
(INVALID_PARAMS, "No parameters specified"),
(INVALID_PARAMS, "Invalid request object")
]
proc newRpcRouter*: RpcRouter =
result.procs = newTable[string, RpcProc]()
proc init*(T: type RpcRouter): T = discard
proc newRpcRouter*: RpcRouter {.deprecated.} =
RpcRouter.init()
proc register*(router: var RpcRouter, path: string, call: RpcProc) =
router.procs.add(path, call)
@ -63,111 +42,56 @@ proc hasMethod*(router: RpcRouter, methodName: string): bool = router.procs.hasK
func isEmpty(node: JsonNode): bool = node.isNil or node.kind == JNull
# Json state checking
template jsonValid*(jsonString: string, node: var JsonNode): (bool, string) =
var
valid = true
msg = ""
try:
node = parseJson(line)
# Handle cases where params is omitted
if not node.hasKey(paramsField):
node.add(paramsField, newJArray())
except CatchableError as exc:
valid = false
msg = exc.msg
debug "Cannot process json", json = jsonString, msg = msg
(valid, msg)
proc checkJsonState*(line: string,
node: var JsonNode): Option[RpcJsonErrorContainer] =
## Tries parsing line into node, if successful checks required fields
## Returns: error state or none
let res = jsonValid(line, node)
if not res[0]:
return some((rjeInvalidJson, res[1]))
if node.kind != JObject:
return some((rjeNoJObject, ""))
if not node.hasKey(idField):
return some((rjeNoId, ""))
let jVer = node{jsonRpcField}
if jVer != nil and jVer.kind != JNull and jVer != %"2.0":
return some((rjeVersionError, ""))
if not node.hasKey(methodField) or node[methodField].kind != JString:
return some((rjeNoMethod, ""))
if not node.hasKey(paramsField):
return some((rjeNoParams, ""))
return none(RpcJsonErrorContainer)
# Json reply wrappers
proc wrapReply*(id: JsonNode, value, error: StringOfJson): StringOfJson =
return StringOfJson(
"""{"jsonrpc":"2.0","id":$1,"result":$2,"error":$3}""" % [
$id, string(value), string(error)
])
# https://www.jsonrpc.org/specification#response_object
proc wrapReply*(id: JsonNode, value: StringOfJson): StringOfJson =
# Success response carries version, id and result fields only
StringOfJson(
"""{"jsonrpc":"2.0","id":$1,"result":$2}""" % [$id, string(value)] & "\r\n")
proc wrapError*(code: int, msg: string, id: JsonNode,
data: JsonNode = newJNull()): StringOfJson {.gcsafe.} =
# Create standardised error json
result = StringOfJson(
"""{"code":$1,"id":$2,"message":$3,"data":$4}""" % [
$code, $id, escapeJson(msg), $data
])
debug "Error generated", error = result, id = id
proc wrapError*(code: int, msg: string, id: JsonNode = newJNull(),
data: JsonNode = newJNull()): StringOfJson =
# Error reply that carries version, id and error object only
StringOfJson(
"""{"jsonrpc":"2.0","id":$1,"error":{"code":$2,"message":$3,"data":$4}}""" % [
$id, $code, escapeJson(msg), $data
] & "\r\n")
proc route*(router: RpcRouter, node: JsonNode): Future[StringOfJson] {.async, gcsafe.} =
## Assumes correct setup of node
let
methodName = node[methodField].str
id = node[idField]
rpcProc = router.procs.getOrDefault(methodName)
if node{"jsonrpc"}.getStr() != "2.0":
return wrapError(INVALID_REQUEST, "'jsonrpc' missing or invalid")
if rpcProc.isNil:
let
methodNotFound = %(methodName & " is not a registered RPC method.")
error = wrapError(METHOD_NOT_FOUND, "Method not found", id, methodNotFound)
result = wrapReply(id, StringOfJson("null"), error)
else:
try:
let jParams = node[paramsField]
let res = await rpcProc(jParams)
result = wrapReply(id, res, StringOfJson("null"))
except CatchableError as err:
debug "Error occurred within RPC", methodName, errorMessage = err.msg
let error = wrapError(SERVER_ERROR, methodName & " raised an exception",
id, newJString(err.msg))
result = wrapReply(id, StringOfJson("null"), error)
let id = node{"id"}
if id == nil:
return wrapError(INVALID_REQUEST, "'id' missing or invalid")
let methodName = node{"method"}.getStr()
if methodName.len == 0:
return wrapError(INVALID_REQUEST, "'method' missing or invalid")
let rpcProc = router.procs.getOrDefault(methodName)
if rpcProc == nil:
return wrapError(METHOD_NOT_FOUND, "'" & methodName & "' is not a registered RPC method", id)
let params = node.getOrDefault("params")
try:
let res = await rpcProc(if params == nil: newJArray() else: params)
return wrapReply(id, res)
except CatchableError as err:
debug "Error occurred within RPC", methodName = methodName, err = err.msg
return wrapError(
SERVER_ERROR, methodName & " raised an exception", id, newJString(err.msg))
proc route*(router: RpcRouter, data: string): Future[string] {.async, gcsafe.} =
## Route to RPC from string data. Data is expected to be able to be converted to Json.
## Returns string of Json from RPC result/error node
var
node: JsonNode
# parse json node and/or flag missing fields and errors
jsonErrorState = checkJsonState(data, node)
let node =
try: parseJson(data)
except CatchableError as err:
return string(wrapError(JSON_PARSE_ERROR, err.msg))
if jsonErrorState.isSome:
let errState = jsonErrorState.get
var id =
if errState.err == rjeInvalidJson or
errState.err == rjeNoId or
errState.err == rjeNoJObject:
newJNull()
else:
node["id"]
let
# const error code and message
errKind = jsonErrorMessages[errState.err]
# pass on the actual error message
fullMsg = errKind[1] & " " & errState[1]
res = wrapError(code = errKind[0], msg = fullMsg, id = id)
# return error state as json
result = string(res) & messageTerminator
else:
let res = await router.route(node)
result = string(res) & messageTerminator
return string(await router.route(node))
proc tryRoute*(router: RpcRouter, data: JsonNode, fut: var Future[StringOfJson]): bool =
## Route to RPC, returns false if the method or params cannot be found.
@ -218,9 +142,6 @@ macro rpc*(server: RpcRouter, path: string, body: untyped): untyped =
procName = newIdentNode(procNameStr)
# when parameters present: proc that contains our rpc body
doMain = newIdentNode(procNameStr & "DoMain")
# async result
res = newIdentNode("result")
errJson = newIdentNode("errJson")
var
setup = jsonToNim(parameters, paramsIdent)
procBody = if body.kind == nnkStmtList: body else: body.body

View File

@ -1,6 +1,8 @@
import json, macros
import chronos, router, chronicles
import jsonmarshal
import
std/[json, macros],
chronos, chronicles,
./router,
./jsonmarshal
export chronos, json, jsonmarshal, router, chronicles
@ -8,9 +10,10 @@ type
RpcServer* = ref object of RootRef
router*: RpcRouter
proc newRpcServer*(): RpcServer =
new result
result.router = newRpcRouter()
proc new(T: type RpcServer): T =
T(router: RpcRouter.init())
proc newRpcServer*(): RpcServer {.deprecated.} = RpcServer.new()
template rpc*(server: RpcServer, path: string, body: untyped): untyped =
server.router.rpc(path, body)
@ -20,8 +23,8 @@ template hasMethod*(server: RpcServer, methodName: string): bool =
# Wrapper for message processing
proc route*(server: RpcServer, line: string): Future[string] {.async, gcsafe.} =
result = await server.router.route(line)
proc route*(server: RpcServer, line: string): Future[string] {.gcsafe.} =
server.router.route(line)
# Server registration
@ -32,5 +35,3 @@ proc register*(server: RpcServer, name: string, rpc: RpcProc) =
proc unRegisterAll*(server: RpcServer) =
# Remove all remote procedure calls from this server.
server.router.clear

View File

@ -1,6 +1,7 @@
import std/[json, strutils]
import chronicles, httputils, chronos
import ../server
import
std/[json, strutils],
chronicles, httputils, chronos,
../server, ../errors
logScope:
topics = "JSONRPC-HTTP-SERVER"
@ -34,42 +35,38 @@ proc sendAnswer(transp: StreamTransport, version: HttpVersion, code: HttpCode,
answer.add(data)
try:
let res = await transp.write(answer)
if res != len(answer):
result = false
result = true
except:
result = false
return res == len(answer)
except CancelledError as exc: raise exc
except CatchableError:
return false
proc validateRequest(transp: StreamTransport,
header: HttpRequestHeader): Future[ReqStatus] {.async.} =
if header.meth in {MethodPut, MethodDelete}:
# Request method is either PUT or DELETE.
debug "PUT/DELETE methods are not allowed", address = transp.remoteAddress()
if await transp.sendAnswer(header.version, Http405):
result = Error
return if await transp.sendAnswer(header.version, Http405):
Error
else:
result = ErrorFailure
return
ErrorFailure
let length = header.contentLength()
if length <= 0:
# request length could not be calculated.
debug "Content-Length is missing or 0", address = transp.remoteAddress()
if await transp.sendAnswer(header.version, Http411):
result = Error
return if await transp.sendAnswer(header.version, Http411):
Error
else:
result = ErrorFailure
return
ErrorFailure
if length > MaxHttpRequestSize:
# request length is more then `MaxHttpRequestSize`.
debug "Maximum size of request body reached",
address = transp.remoteAddress()
if await transp.sendAnswer(header.version, Http413):
result = Error
return if await transp.sendAnswer(header.version, Http413):
Error
else:
result = ErrorFailure
return
ErrorFailure
var ctype = header["Content-Type"]
# might be "application/json; charset=utf-8"
@ -77,13 +74,12 @@ proc validateRequest(transp: StreamTransport,
# Content-Type header is not "application/json"
debug "Content type must be application/json",
address = transp.remoteAddress()
if await transp.sendAnswer(header.version, Http415):
result = Error
return if await transp.sendAnswer(header.version, Http415):
Error
else:
result = ErrorFailure
return
ErrorFailure
result = Success
return Success
proc processClient(server: StreamServer,
transp: StreamTransport) {.async, gcsafe.} =
@ -103,7 +99,7 @@ proc processClient(server: StreamServer,
# Timeout
debug "Timeout expired while receiving headers",
address = transp.remoteAddress()
let res = await transp.sendAnswer(HttpVersion11, Http408)
discard await transp.sendAnswer(HttpVersion11, Http408)
await transp.closeWait()
break
else:
@ -114,14 +110,14 @@ proc processClient(server: StreamServer,
# Header could not be parsed
debug "Malformed header received",
address = transp.remoteAddress()
let res = await transp.sendAnswer(HttpVersion11, Http400)
discard await transp.sendAnswer(HttpVersion11, Http400)
await transp.closeWait()
break
except TransportLimitError:
# size of headers exceeds `MaxHttpHeadersSize`
debug "Maximum size of headers limit reached",
address = transp.remoteAddress()
let res = await transp.sendAnswer(HttpVersion11, Http413)
discard await transp.sendAnswer(HttpVersion11, Http413)
await transp.closeWait()
break
except TransportIncompleteError:
@ -158,7 +154,7 @@ proc processClient(server: StreamServer,
# Timeout
debug "Timeout expired while receiving request body",
address = transp.remoteAddress()
let res = await transp.sendAnswer(header.version, Http413)
discard await transp.sendAnswer(header.version, Http413)
await transp.closeWait()
break
else:
@ -225,7 +221,7 @@ proc addStreamServer*(server: RpcHttpServer, address: TransportAddress) =
raise newException(RpcBindError, "Unable to create server!")
proc addStreamServers*(server: RpcHttpServer,
addresses: openarray[TransportAddress]) =
addresses: openArray[TransportAddress]) =
for item in addresses:
server.addStreamServer(item)
@ -239,13 +235,13 @@ proc addStreamServer*(server: RpcHttpServer, address: string) =
# Attempt to resolve `address` for IPv4 address space.
try:
tas4 = resolveTAddress(address, AddressFamily.IPv4)
except:
except CatchableError:
discard
# Attempt to resolve `address` for IPv6 address space.
try:
tas6 = resolveTAddress(address, AddressFamily.IPv6)
except:
except CatchableError:
discard
for r in tas4:
@ -259,7 +255,7 @@ proc addStreamServer*(server: RpcHttpServer, address: string) =
# Addresses could not be resolved, critical error.
raise newException(RpcAddressUnresolvableError, "Unable to get address!")
proc addStreamServers*(server: RpcHttpServer, addresses: openarray[string]) =
proc addStreamServers*(server: RpcHttpServer, addresses: openArray[string]) =
for address in addresses:
server.addStreamServer(address)
@ -272,13 +268,13 @@ proc addStreamServer*(server: RpcHttpServer, address: string, port: Port) =
# Attempt to resolve `address` for IPv4 address space.
try:
tas4 = resolveTAddress(address, port, AddressFamily.IPv4)
except:
except CatchableError:
discard
# Attempt to resolve `address` for IPv6 address space.
try:
tas6 = resolveTAddress(address, port, AddressFamily.IPv6)
except:
except CatchableError:
discard
if len(tas4) == 0 and len(tas6) == 0:
@ -298,15 +294,18 @@ proc addStreamServer*(server: RpcHttpServer, address: string, port: Port) =
raise newException(RpcBindError,
"Could not setup server on " & address & ":" & $int(port))
proc newRpcHttpServer*(): RpcHttpServer =
RpcHttpServer(router: newRpcRouter(), servers: @[])
proc new*(T: type RpcHttpServer): T =
T(router: RpcRouter.init(), servers: @[])
proc newRpcHttpServer*(addresses: openarray[TransportAddress]): RpcHttpServer =
proc newRpcHttpServer*(): RpcHttpServer =
RpcHttpServer.new()
proc newRpcHttpServer*(addresses: openArray[TransportAddress]): RpcHttpServer =
## Create new server and assign it to addresses ``addresses``.
result = newRpcHttpServer()
result.addStreamServers(addresses)
proc newRpcHttpServer*(addresses: openarray[string]): RpcHttpServer =
proc newRpcHttpServer*(addresses: openArray[string]): RpcHttpServer =
## Create new server and assign it to addresses ``addresses``.
result = newRpcHttpServer()
result.addStreamServers(addresses)

View File

@ -2,7 +2,7 @@ import
std/json,
chronicles,
json_serialization/std/net,
../server
../server, ../errors
export server
@ -45,7 +45,7 @@ proc addStreamServer*(server: RpcSocketServer, address: TransportAddress) =
# Server was not bound, critical error.
raise newException(RpcBindError, "Unable to create server!")
proc addStreamServers*(server: RpcSocketServer, addresses: openarray[TransportAddress]) =
proc addStreamServers*(server: RpcSocketServer, addresses: openArray[TransportAddress]) =
for item in addresses:
server.addStreamServer(item)
@ -79,7 +79,7 @@ proc addStreamServer*(server: RpcSocketServer, address: string) =
# Addresses could not be resolved, critical error.
raise newException(RpcAddressUnresolvableError, "Unable to get address!")
proc addStreamServers*(server: RpcSocketServer, addresses: openarray[string]) =
proc addStreamServers*(server: RpcSocketServer, addresses: openArray[string]) =
for address in addresses:
server.addStreamServer(address)
@ -118,22 +118,25 @@ proc addStreamServer*(server: RpcSocketServer, address: string, port: Port) =
raise newException(RpcBindError,
"Could not setup server on " & address & ":" & $int(port))
proc newRpcSocketServer*: RpcSocketServer =
RpcSocketServer(router: newRpcRouter(), servers: @[])
proc new(T: type RpcSocketServer): T =
T(router: RpcRouter.init(), servers: @[])
proc newRpcSocketServer*(addresses: openarray[TransportAddress]): RpcSocketServer =
proc newRpcSocketServer*(): RpcSocketServer =
RpcSocketServer.new()
proc newRpcSocketServer*(addresses: openArray[TransportAddress]): RpcSocketServer =
## Create new server and assign it to addresses ``addresses``.
result = newRpcSocketServer()
result = RpcSocketServer.new()
result.addStreamServers(addresses)
proc newRpcSocketServer*(addresses: openarray[string]): RpcSocketServer =
proc newRpcSocketServer*(addresses: openArray[string]): RpcSocketServer =
## Create new server and assign it to addresses ``addresses``.
result = newRpcSocketServer()
result = RpcSocketServer.new()
result.addStreamServers(addresses)
proc newRpcSocketServer*(address: string, port: Port = Port(8545)): RpcSocketServer =
# Create server on specified port
result = newRpcSocketServer()
result = RpcSocketServer.new()
result.addStreamServer(address, port)
proc start*(server: RpcSocketServer) =

View File

@ -1,4 +1,4 @@
{. warning[UnusedImport]:off .}
import
testrpcmacro, testserverclient, testethcalls, testhttp #, testerrors
testrpcmacro, testserverclient, testethcalls, testhttp

View File

@ -1,20 +0,0 @@
include ../ json_rpc / client
proc nextId*(self: RpcClient): int64 = self.nextId
proc rawCall*(self: RpcClient, name: string,
msg: string): Future[Response] {.async.} =
# For debug purposes only
let id = $self.nextId
self.nextId.inc
var s = msg & "\c\l"
let res = await self.transport.write(s)
doAssert res == len(s)
# completed by processMessage.
var newFut = newFuture[Response]()
# add to awaiting responses
self.awaiting[id] = newFut
result = await newFut

View File

@ -1,5 +1,5 @@
import
strutils, json,
json,
nimcrypto, stint,
ethtypes, ethhexstrings, stintjson, ../json_rpc/rpcserver
@ -208,7 +208,7 @@ proc addEthRpcs*(server: RpcServer) =
## Generates and returns an estimate of how much gas is necessary to allow the transaction to complete.
## The transaction will not be added to the blockchain. Note that the estimate may be significantly more than
## the amount of gas actually used by the transaction, for a variety of reasons including EVM mechanics and node performance.
##
##
## call: the transaction call object.
## quantityTag: integer block number, or the string "latest", "earliest" or "pending", see the default block parameter.
## Returns the amount of gas used.
@ -260,7 +260,7 @@ proc addEthRpcs*(server: RpcServer) =
discard
server.rpc("eth_getUncleByBlockHashAndIndex") do(data: UInt256, quantity: int64) -> BlockObject:
## Returns information about a uncle of a block by hash and uncle index position.
## Returns information about a uncle of a block by hash and uncle index position.
##
## data: hash a block.
## quantity: the uncle's index position.
@ -310,7 +310,7 @@ proc addEthRpcs*(server: RpcServer) =
## [A] "A in first position (and anything after)"
## [null, B] "anything in first position AND B in second position (and anything after)"
## [A, B] "A in first position AND B in second position (and anything after)"
## [[A, B], [A, B]] "(A OR B) in first position AND (A OR B) in second position (and anything after)"
## [[A, B], [A, B]] "(A OR B) in first position AND (A OR B) in second position (and anything after)"
##
## filterOptions: settings for this filter.
## Returns integer filter id.
@ -331,7 +331,7 @@ proc addEthRpcs*(server: RpcServer) =
discard
server.rpc("eth_uninstallFilter") do(filterId: int) -> bool:
## Uninstalls a filter with given id. Should always be called when watch is no longer needed.
## Uninstalls a filter with given id. Should always be called when watch is no longer needed.
## Additonally Filters timeout when they aren't requested with eth_getFilterChanges for a period of time.
##
## filterId: The filter id.

View File

@ -1,80 +0,0 @@
#[
This module uses debug versions of the rpc components that
allow unchecked and unformatted calls.
]#
import unittest, debugclient, ../json_rpc/rpcserver
import strformat, chronicles
var server = newRpcSocketServer("localhost", Port(8545))
var client = newRpcSocketClient()
server.start()
waitFor client.connect("localhost", Port(8545))
server.rpc("rpc") do(a: int, b: int):
result = %(&"a: {a}, b: {b}")
server.rpc("makeError"):
if true:
raise newException(ValueError, "Test")
proc testMissingRpc: Future[Response] {.async.} =
var fut = client.call("phantomRpc", %[])
result = await fut
proc testInvalidJsonVer: Future[Response] {.async.} =
let json =
$ %{"jsonrpc": %"3.99", "method": %"rpc", "params": %[],
"id": % $client.nextId} & "\c\l"
var fut = client.rawCall("rpc", json)
result = await fut
proc testMalformed: Future[Response] {.async.} =
let malformedJson = "{field: 2, \"field: 3}"
var fut = client.rawCall("rpc", malformedJson)
await fut or sleepAsync(1000)
if fut.finished: result = fut.read()
else: result = (true, %"Timeout")
proc testRaise: Future[Response] {.async.} =
var fut = client.call("makeError", %[])
result = await fut
suite "RPC Errors":
# Note: We don't expect a exceptions for most of the tests,
# because the server should respond with the error in json
test "Missing RPC":
#expect ValueError:
try:
let res = waitFor testMissingRpc()
check res.error == true and
res.result["message"] == %"Method not found" and
res.result["data"] == %"phantomRpc is not a registered method."
except CatchableError as exc:
echo "Error ", exc.msg
#[test "Incorrect json version":
#expect ValueError:
try:
let res = waitFor testInvalidJsonVer()
check res.error == true and res.result["message"] == %"JSON 2.0 required"
except CatchableError as exc:
echo "Error ", exc.msg
]#
test "Raising exceptions":
#expect ValueError:
try:
let res = waitFor testRaise()
except CatchableError as exc:
echo "Error ", exc.msg
test "Malformed json":
# TODO: We time out here because the server won't be able to
# find an id to return to us, so we cannot complete the future.
try:
let res = waitFor testMalformed()
check res.error == true and res.result == %"Timeout"
except CatchableError as exc:
echo "Error ", exc.msg

View File

@ -20,7 +20,7 @@ createRpcSigs(RpcSocketClient, sourceDir & DirSep & "ethcallsigs.nim")
## Create custom RPC with StUint input parameter
server.rpc("rpc.uint256param") do(i: UInt256):
let r = i + 1.stUint(256)
result = %r
return %r
## Create custom RPC with StUInt return parameter
server.rpc("rpc.testreturnuint256") do() -> UInt256:
@ -30,24 +30,24 @@ server.rpc("rpc.testreturnuint256") do() -> UInt256:
proc testLocalCalls: Future[seq[StringOfJson]] =
## Call RPCs created with `rpc` locally.
## This simply demonstrates async calls of the procs generated by the `rpc` macro.
var
let
uint256Param = rpcUInt256Param(%[%"0x1234567890"])
returnUint256 = rpcTestReturnUInt256(%[])
result = all(uint256Param, returnUint256)
return all(uint256Param, returnUint256)
proc testRemoteUInt256: Future[seq[Response]] =
## Call function remotely on server, testing `stint` types
var
let
uint256Param = client.call("rpc.uint256param", %[%"0x1234567890"])
returnUint256 = client.call("rpc.testreturnuint256", %[])
result = all(uint256Param, returnUint256)
return all(uint256Param, returnUint256)
proc testSigCalls: Future[seq[string]] =
## Remote call using proc generated from signatures in `ethcallsigs.nim`
var
let
version = client.web3_clientVersion()
sha3 = client.web3_sha3("0x68656c6c6f20776f726c64")
result = all(version, sha3)
return all(version, sha3)
server.start()
waitFor client.connect("localhost", Port(8545))
@ -63,9 +63,9 @@ suite "Local calls":
suite "Remote calls":
let remoteResults = testRemoteUInt256().waitFor
test "UInt256 param":
check remoteResults[0].result == %"0x1234567891"
check remoteResults[0] == %"0x1234567891"
test "Return UInt256":
check remoteResults[1].result == %"0x1234567890abcdef"
check remoteResults[1] == %"0x1234567890abcdef"
suite "Generated from signatures":
let sigResults = testSigCalls().waitFor

View File

@ -11,7 +11,7 @@ const
Requests = [
"GET / HTTP/1.1\r\n" &
"Host: www.google.com\r\n" &
"Host: status.im\r\n" &
"Content-Length: 71\r\n" &
"Content-Type: text/html\r\n" &
"Connection: close\r\n" &
@ -19,50 +19,57 @@ const
"{\"jsonrpc\":\"2.0\",\"method\":\"myProc\",\"params\":[\"abc\", [1, 2, 3]],\"id\":67}",
"BADHEADER\r\n\r\n",
"GET / HTTP/1.1\r\n" &
"Host: www.google.com\r\n" &
"Host: status.im\r\n" &
"Content-Type: application/json\r\n" &
"Connection: close\r\n" &
"\r\n",
"PUT / HTTP/1.1\r\n" &
"Host: www.google.com\r\n" &
"Host: status.im\r\n" &
"Content-Length: 71\r\n" &
"Content-Type: text/html\r\n" &
"Connection: close\r\n" &
"\r\n" &
"{\"jsonrpc\":\"2.0\",\"method\":\"myProc\",\"params\":[\"abc\", [1, 2, 3]],\"id\":67}",
"DELETE / HTTP/1.1\r\n" &
"Host: www.google.com\r\n" &
"Host: status.im\r\n" &
"Content-Length: 71\r\n" &
"Content-Type: text/html\r\n" &
"Connection: close\r\n" &
"\r\n" &
"{\"jsonrpc\":\"2.0\",\"method\":\"myProc\",\"params\":[\"abc\", [1, 2, 3]],\"id\":67}",
"GET / HTTP/0.9\r\n" &
"Host: www.google.com\r\n" &
"Host: status.im\r\n" &
"Content-Length: 71\r\n" &
"Content-Type: application/json\r\n" &
"\r\n" &
"{\"jsonrpc\":\"2.0\",\"method\":\"myProc\",\"params\":[\"abc\", [1, 2, 3]],\"id\":67}",
"GET / HTTP/1.0\r\n" &
"Host: www.google.com\r\n" &
"Host: status.im\r\n" &
"Content-Length: 71\r\n" &
"Content-Type: application/json\r\n" &
"\r\n" &
"{\"jsonrpc\":\"2.0\",\"method\":\"myProc\",\"params\":[\"abc\", [1, 2, 3]],\"id\":67}",
"GET / HTTP/1.1\r\n" &
"Host: www.google.com\r\n" &
"Host: status.im\r\n" &
"Content-Length: 71\r\n" &
"Content-Type: application/json\r\n" &
"Connection: close\r\n" &
"\r\n" &
"{\"jsonrpc\":\"2.0\",\"method\":\"myProc\",\"params\":[\"abc\", [1, 2, 3]],\"id\":67}",
"GET / HTTP/1.1\r\n" &
"Host: www.google.com\r\n" &
"Host: status.im\r\n" &
"Content-Length: 49\r\n" &
"Content-Type: application/json\r\n" &
"Connection: close\r\n" &
"\r\n" &
"{\"jsonrpc\":\"2.0\",\"method\":\"noParamsProc\",\"id\":67}",
"GET / HTTP/1.1\r\n" &
"Host: status.im\r\n" &
"Content-Length: 137438953472\r\n" &
"Content-Type: application/json\r\n" &
"Connection: close\r\n" &
"\r\n" &
"{128 gb Content-Length}",
]
proc continuousTest(address: string, port: Port): Future[int] {.async.} =
@ -71,7 +78,7 @@ proc continuousTest(address: string, port: Port): Future[int] {.async.} =
for i in 0..<TestsCount:
await client.connect(address, port)
var r = await client.call("myProc", %[%"abc", %[1, 2, 3, i]])
if r.result.getStr == "Hello abc data: [1, 2, 3, " & $i & "]":
if r.getStr == "Hello abc data: [1, 2, 3, " & $i & "]":
result += 1
await client.close()
@ -81,6 +88,8 @@ proc customMessage(address: TransportAddress,
var buffer = newSeq[byte](BufferSize)
var header: HttpResponseHeader
var transp = await connect(address)
defer: transp.close()
let wres = await transp.write(data)
doAssert(wres == len(data))
let rres = await transp.readUntil(addr buffer[0], BufferSize, HeadersMark)
@ -88,11 +97,7 @@ proc customMessage(address: TransportAddress,
buffer.setLen(rres)
header = parseResponse(buffer)
doAssert(header.success())
if header.code == expect:
result = true
else:
result = false
transp.close()
return header.code == expect
proc headerTest(address: string, port: Port): Future[bool] {.async.} =
var a = resolveTAddress(address, port)
@ -105,7 +110,7 @@ proc headerTest(address: string, port: Port): Future[bool] {.async.} =
header.add("Content-Type: application/json\r\n")
header.add("Connection: close\r\n\r\n")
header.add("{\"jsonrpc\":\"2.0\",\"method\":\"myProc\",\"params\":[\"abc\", [1, 2, 3]],\"id\":67}")
result = await customMessage(a[0], header, 413)
return await customMessage(a[0], header, 413)
proc bodyTest(address: string, port: Port): Future[bool] {.async.} =
var body = repeat('B', BigBodySize)
@ -115,7 +120,7 @@ proc bodyTest(address: string, port: Port): Future[bool] {.async.} =
header.add("Content-Type: application/json\r\n")
header.add("Connection: close\r\n\r\n")
header.add(body)
result = await customMessage(a[0], header, 413)
return await customMessage(a[0], header, 413)
proc disconTest(address: string, port: Port,
number: int, expect: int): Future[bool] {.async.} =
@ -123,7 +128,9 @@ proc disconTest(address: string, port: Port,
var buffer = newSeq[byte](BufferSize)
var header: HttpResponseHeader
var transp = await connect(a[0])
var data = Requests[number]
defer: transp.close()
let data = Requests[number]
let wres = await transp.write(data)
doAssert(wres == len(data))
let rres = await transp.readUntil(addr buffer[0], BufferSize, HeadersMark)
@ -131,21 +138,15 @@ proc disconTest(address: string, port: Port,
buffer.setLen(rres)
header = parseResponse(buffer)
doAssert(header.success())
if header.code == expect:
result = true
else:
result = false
if header.code != expect:
return false
var length = header.contentLength()
let length = header.contentLength()
doAssert(length > 0)
buffer.setLen(length)
await transp.readExactly(addr buffer[0], len(buffer))
var left = await transp.read()
if len(left) == 0 and transp.atEof():
result = true
else:
result = false
transp.close()
let left = await transp.read()
return len(left) == 0 and transp.atEof()
proc simpleTest(address: string, port: Port,
number: int, expect: int): Future[bool] {.async.} =
@ -187,6 +188,8 @@ suite "HTTP Server/HTTP Client RPC test suite":
check waitFor(disconTest("localhost", Port(8545), 7, 200)) == true
test "Omitted params test":
check waitFor(simpleTest("localhost", Port(8545), 8, 200)) == true
test "Big Content-Length":
check waitFor(simpleTest("localhost", Port(8545), 9, 413)) == true
httpsrv.stop()
waitFor httpsrv.closeWait()

View File

@ -38,47 +38,46 @@ var s = newRpcSocketServer(["localhost:8545"])
# RPC definitions
s.rpc("rpc.simplepath"):
result = %1
return %1
s.rpc("rpc.differentparams") do(a: int, b: string):
result = %[%a, %b]
return %[%a, %b]
s.rpc("rpc.arrayparam") do(arr: array[0..5, byte], b: string):
var res = %arr
res.add %b
result = %res
return %res
s.rpc("rpc.seqparam") do(a: string, s: seq[int]):
var res = newJArray()
res.add %a
for item in s:
res.add %int(item)
result = res
return res
s.rpc("rpc.objparam") do(a: string, obj: MyObject):
result = %obj
return %obj
s.rpc("rpc.returntypesimple") do(i: int) -> int:
result = i
return i
s.rpc("rpc.returntypecomplex") do(i: int) -> Test2:
result.x = [1, i, 3]
result.y = "test"
return Test2(x: [1, i, 3], y: "test")
s.rpc("rpc.testreturns") do() -> int:
return 1234
s.rpc("rpc.multivarsofonetype") do(a, b: string) -> string:
result = a & " " & b
return a & " " & b
s.rpc("rpc.optional") do(obj: MyOptional) -> MyOptional:
result = obj
return obj
s.rpc("rpc.optionalArg") do(val: int, obj: Option[MyOptional]) -> MyOptional:
if obj.isSome():
result = obj.get()
return if obj.isSome():
obj.get()
else:
result = MyOptional(maybeInt: some(val))
MyOptional(maybeInt: some(val))
type
OptionalFields = object
@ -98,12 +97,14 @@ s.rpc("rpc.mixedOptionalArg") do(a: int, b: Option[int], c: string,
result.e = e
s.rpc("rpc.optionalArgNotBuiltin") do(obj: Option[MyOptionalNotBuiltin]) -> string:
result = "Empty1"
if obj.isSome:
return if obj.isSome:
let val = obj.get.val
result = "Empty2"
if val.isSome:
result = obj.get.val.get.y
obj.get.val.get.y
else:
"Empty2"
else:
"Empty1"
type
MaybeOptions = object
@ -198,8 +199,7 @@ suite "Server types":
discard waitfor rpcArrayParam(%[%"test", %"hello"])
expect ValueError:
# wrong param type
let res = waitFor rpcDifferentParams(%[%"abc", %1])
# TODO: When errors are proper return values, check error for param name
discard waitFor rpcDifferentParams(%[%"abc", %1])
test "Multiple variables of one type":
let r = waitfor rpcMultiVarsOfOneType(%[%"hello", %"world"])
@ -256,4 +256,3 @@ suite "Server types":
s.stop()
waitFor s.closeWait()

View File

@ -1,23 +1,32 @@
import
unittest, json, chronicles,
../json_rpc/[rpcclient, rpcserver], ./helpers
../json_rpc/[rpcclient, rpcserver]
var srv = newRpcSocketServer(["localhost:8545"])
var client = newRpcSocketClient()
# Create RPC on server
srv.rpc("myProc") do(input: string, data: array[0..3, int]):
result = %("Hello " & input & " data: " & $data)
return %("Hello " & input & " data: " & $data)
srv.rpc("myError") do(input: string, data: array[0..3, int]):
raise (ref ValueError)(msg: "someMessage")
srv.start()
waitFor client.connect("localhost", Port(8545))
# TODO: When an error occurs during a test, stop the server
suite "Server/Client RPC":
test "Custom RPC":
var r = waitFor client.call("myProc", %[%"abc", %[1, 2, 3, 4]])
check r.result.getStr == "Hello abc data: [1, 2, 3, 4]"
test "Successful RPC call":
let r = waitFor client.call("myProc", %[%"abc", %[1, 2, 3, 4]])
check r.getStr == "Hello abc data: [1, 2, 3, 4]"
test "Missing params":
expect(CatchableError):
discard waitFor client.call("myProc", %[%"abc"])
test "Error RPC call":
expect(CatchableError): # The error type wont be translated
discard waitFor client.call("myError", %[%"abc", %[1, 2, 3, 4]])
srv.stop()
waitFor srv.closeWait()