mirror of
https://github.com/logos-storage/nim-json-rpc.git
synced 2026-01-04 06:33:10 +00:00
Implement RPC batch call both in servers and clients
This commit is contained in:
parent
85d6a67fbc
commit
0b8cec3aa8
@ -24,15 +24,30 @@ export
|
|||||||
tables,
|
tables,
|
||||||
jsonmarshal,
|
jsonmarshal,
|
||||||
RequestParamsTx,
|
RequestParamsTx,
|
||||||
|
RequestBatchTx,
|
||||||
|
ResponseBatchRx,
|
||||||
results
|
results
|
||||||
|
|
||||||
type
|
type
|
||||||
|
RpcBatchItem* = object
|
||||||
|
meth*: string
|
||||||
|
params*: RequestParamsTx
|
||||||
|
|
||||||
|
RpcBatchCallRef* = ref object of RootRef
|
||||||
|
client*: RpcClient
|
||||||
|
batch*: seq[RpcBatchItem]
|
||||||
|
|
||||||
|
RpcBatchResponse* = object
|
||||||
|
error*: Opt[string]
|
||||||
|
result*: JsonString
|
||||||
|
|
||||||
RpcClient* = ref object of RootRef
|
RpcClient* = ref object of RootRef
|
||||||
awaiting*: Table[RequestId, Future[JsonString]]
|
awaiting*: Table[RequestId, Future[JsonString]]
|
||||||
lastId: int
|
lastId: int
|
||||||
onDisconnect*: proc() {.gcsafe, raises: [].}
|
onDisconnect*: proc() {.gcsafe, raises: [].}
|
||||||
onProcessMessage*: proc(client: RpcClient, line: string):
|
onProcessMessage*: proc(client: RpcClient, line: string):
|
||||||
Result[bool, string] {.gcsafe, raises: [].}
|
Result[bool, string] {.gcsafe, raises: [].}
|
||||||
|
batchFut*: Future[ResponseBatchRx]
|
||||||
|
|
||||||
GetJsonRpcRequestHeaders* = proc(): seq[(string, string)] {.gcsafe, raises: [].}
|
GetJsonRpcRequestHeaders* = proc(): seq[(string, string)] {.gcsafe, raises: [].}
|
||||||
|
|
||||||
@ -42,10 +57,65 @@ type
|
|||||||
# Public helpers
|
# Public helpers
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
func validateResponse(resIndex: int, res: ResponseRx): Result[void, string] =
|
||||||
|
if res.jsonrpc.isNone:
|
||||||
|
return err("missing or invalid `jsonrpc` in response " & $resIndex)
|
||||||
|
|
||||||
|
if res.id.isNone:
|
||||||
|
if res.error.isSome:
|
||||||
|
let error = JrpcSys.encode(res.error.get)
|
||||||
|
return err(error)
|
||||||
|
else:
|
||||||
|
return err("missing or invalid response id in response " & $resIndex)
|
||||||
|
|
||||||
|
if res.error.isSome:
|
||||||
|
let error = JrpcSys.encode(res.error.get)
|
||||||
|
return err(error)
|
||||||
|
|
||||||
|
# Up to this point, the result should contains something
|
||||||
|
if res.result.string.len == 0:
|
||||||
|
return err("missing or invalid response result in response " & $resIndex)
|
||||||
|
|
||||||
|
ok()
|
||||||
|
|
||||||
|
proc processResponse(resIndex: int,
|
||||||
|
map: var Table[RequestId, int],
|
||||||
|
responses: var seq[RpcBatchResponse],
|
||||||
|
response: ResponseRx): Result[void, string] =
|
||||||
|
let r = validateResponse(resIndex, response)
|
||||||
|
if r.isErr:
|
||||||
|
if response.id.isSome:
|
||||||
|
let id = response.id.get
|
||||||
|
var index: int
|
||||||
|
if not map.pop(id, index):
|
||||||
|
return err("cannot find message id: " & $id & " in response " & $resIndex)
|
||||||
|
responses[index] = RpcBatchResponse(
|
||||||
|
error: Opt.some(r.error)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
return err(r.error)
|
||||||
|
else:
|
||||||
|
let id = response.id.get
|
||||||
|
var index: int
|
||||||
|
if not map.pop(id, index):
|
||||||
|
return err("cannot find message id: " & $id & " in response " & $resIndex)
|
||||||
|
responses[index] = RpcBatchResponse(
|
||||||
|
result: response.result
|
||||||
|
)
|
||||||
|
|
||||||
|
ok()
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------------------
|
||||||
|
# Public helpers
|
||||||
|
# ------------------------------------------------------------------------------
|
||||||
|
|
||||||
func requestTxEncode*(name: string, params: RequestParamsTx, id: RequestId): string =
|
func requestTxEncode*(name: string, params: RequestParamsTx, id: RequestId): string =
|
||||||
let req = requestTx(name, params, id)
|
let req = requestTx(name, params, id)
|
||||||
JrpcSys.encode(req)
|
JrpcSys.encode(req)
|
||||||
|
|
||||||
|
func requestBatchEncode*(calls: RequestBatchTx): string =
|
||||||
|
JrpcSys.encode(calls)
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
# Public functions
|
# Public functions
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
@ -68,6 +138,11 @@ method call*(client: RpcClient, name: string,
|
|||||||
method close*(client: RpcClient): Future[void] {.base, gcsafe, async.} =
|
method close*(client: RpcClient): Future[void] {.base, gcsafe, async.} =
|
||||||
doAssert(false, "`RpcClient.close` not implemented")
|
doAssert(false, "`RpcClient.close` not implemented")
|
||||||
|
|
||||||
|
method callBatch*(client: RpcClient,
|
||||||
|
calls: RequestBatchTx): Future[ResponseBatchRx]
|
||||||
|
{.base, gcsafe, async.} =
|
||||||
|
doAssert(false, "`RpcClient.callBatch` not implemented")
|
||||||
|
|
||||||
proc processMessage*(client: RpcClient, line: string): Result[void, string] =
|
proc processMessage*(client: RpcClient, line: string): Result[void, string] =
|
||||||
if client.onProcessMessage.isNil.not:
|
if client.onProcessMessage.isNil.not:
|
||||||
let fallBack = client.onProcessMessage(client, line).valueOr:
|
let fallBack = client.onProcessMessage(client, line).valueOr:
|
||||||
@ -78,8 +153,14 @@ proc processMessage*(client: RpcClient, line: string): Result[void, string] =
|
|||||||
# Note: this doesn't use any transport code so doesn't need to be
|
# Note: this doesn't use any transport code so doesn't need to be
|
||||||
# differentiated.
|
# differentiated.
|
||||||
try:
|
try:
|
||||||
let response = JrpcSys.decode(line, ResponseRx)
|
let batch = JrpcSys.decode(line, ResponseBatchRx)
|
||||||
|
if batch.kind == rbkMany:
|
||||||
|
if client.batchFut.isNil or client.batchFut.finished():
|
||||||
|
client.batchFut = newFuture[ResponseBatchRx]()
|
||||||
|
client.batchFut.complete(batch)
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
let response = batch.single
|
||||||
if response.jsonrpc.isNone:
|
if response.jsonrpc.isNone:
|
||||||
return err("missing or invalid `jsonrpc`")
|
return err("missing or invalid `jsonrpc`")
|
||||||
|
|
||||||
@ -114,6 +195,41 @@ proc processMessage*(client: RpcClient, line: string): Result[void, string] =
|
|||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
return err(exc.msg)
|
return err(exc.msg)
|
||||||
|
|
||||||
|
proc prepareBatch*(client: RpcClient): RpcBatchCallRef =
|
||||||
|
RpcBatchCallRef(client: client)
|
||||||
|
|
||||||
|
proc send*(batch: RpcBatchCallRef):
|
||||||
|
Future[Result[seq[RpcBatchResponse], string]] {.
|
||||||
|
async: (raises: []).} =
|
||||||
|
var
|
||||||
|
calls = RequestBatchTx(
|
||||||
|
kind: rbkMany,
|
||||||
|
many: newSeqOfCap[RequestTx](batch.batch.len),
|
||||||
|
)
|
||||||
|
responses = newSeq[RpcBatchResponse](batch.batch.len)
|
||||||
|
map = initTable[RequestId, int]()
|
||||||
|
|
||||||
|
for item in batch.batch:
|
||||||
|
let id = batch.client.getNextId()
|
||||||
|
map[id] = calls.many.len
|
||||||
|
calls.many.add requestTx(item.meth, item.params, id)
|
||||||
|
|
||||||
|
try:
|
||||||
|
let res = await batch.client.callBatch(calls)
|
||||||
|
if res.kind == rbkSingle:
|
||||||
|
let r = processResponse(0, map, responses, res.single)
|
||||||
|
if r.isErr:
|
||||||
|
return err(r.error)
|
||||||
|
else:
|
||||||
|
for i, z in res.many:
|
||||||
|
let r = processResponse(i, map, responses, z)
|
||||||
|
if r.isErr:
|
||||||
|
return err(r.error)
|
||||||
|
except CatchableError as exc:
|
||||||
|
return err(exc.msg)
|
||||||
|
|
||||||
|
return ok(responses)
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
# Signature processing
|
# Signature processing
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
# json-rpc
|
# json-rpc
|
||||||
# Copyright (c) 2019-2023 Status Research & Development GmbH
|
# Copyright (c) 2019-2024 Status Research & Development GmbH
|
||||||
# Licensed under either of
|
# Licensed under either of
|
||||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||||
@ -38,6 +38,10 @@ const
|
|||||||
|
|
||||||
{.push gcsafe, raises: [].}
|
{.push gcsafe, raises: [].}
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------------------
|
||||||
|
# Private helpers
|
||||||
|
# ------------------------------------------------------------------------------
|
||||||
|
|
||||||
proc new(
|
proc new(
|
||||||
T: type RpcHttpClient, maxBodySize = MaxHttpRequestSize, secure = false,
|
T: type RpcHttpClient, maxBodySize = MaxHttpRequestSize, secure = false,
|
||||||
getHeaders: GetJsonRpcRequestHeaders = nil, flags: HttpClientFlags = {}): T =
|
getHeaders: GetJsonRpcRequestHeaders = nil, flags: HttpClientFlags = {}): T =
|
||||||
@ -53,15 +57,24 @@ proc new(
|
|||||||
getHeaders: getHeaders
|
getHeaders: getHeaders
|
||||||
)
|
)
|
||||||
|
|
||||||
proc newRpcHttpClient*(
|
template closeRefs(req, res: untyped) =
|
||||||
maxBodySize = MaxHttpRequestSize, secure = false,
|
# We can't trust try/finally in async/await in all nim versions, so we
|
||||||
getHeaders: GetJsonRpcRequestHeaders = nil,
|
# do it manually instead
|
||||||
flags: HttpClientFlags = {}): RpcHttpClient =
|
if req != nil:
|
||||||
RpcHttpClient.new(maxBodySize, secure, getHeaders, flags)
|
try:
|
||||||
|
await req.closeWait()
|
||||||
|
except CatchableError as exc: # shouldn't happen
|
||||||
|
debug "Error closing JSON-RPC HTTP resuest/response", err = exc.msg
|
||||||
|
discard exc
|
||||||
|
|
||||||
method call*(client: RpcHttpClient, name: string,
|
if res != nil:
|
||||||
params: RequestParamsTx): Future[JsonString]
|
try:
|
||||||
{.async, gcsafe.} =
|
await res.closeWait()
|
||||||
|
except CatchableError as exc: # shouldn't happen
|
||||||
|
debug "Error closing JSON-RPC HTTP resuest/response", err = exc.msg
|
||||||
|
discard exc
|
||||||
|
|
||||||
|
proc callImpl(client: RpcHttpClient, reqBody: string): Future[string] {.async.} =
|
||||||
doAssert client.httpSession != nil
|
doAssert client.httpSession != nil
|
||||||
if client.httpAddress.isErr:
|
if client.httpAddress.isErr:
|
||||||
raise newException(RpcAddressUnresolvableError, client.httpAddress.error)
|
raise newException(RpcAddressUnresolvableError, client.httpAddress.error)
|
||||||
@ -73,33 +86,9 @@ method call*(client: RpcHttpClient, name: string,
|
|||||||
@[]
|
@[]
|
||||||
headers.add(("Content-Type", "application/json"))
|
headers.add(("Content-Type", "application/json"))
|
||||||
|
|
||||||
let
|
|
||||||
id = client.getNextId()
|
|
||||||
reqBody = requestTxEncode(name, params, id)
|
|
||||||
|
|
||||||
var req: HttpClientRequestRef
|
var req: HttpClientRequestRef
|
||||||
var res: HttpClientResponseRef
|
var res: HttpClientResponseRef
|
||||||
|
|
||||||
template used(x: typed) =
|
|
||||||
# silence unused warning
|
|
||||||
discard
|
|
||||||
|
|
||||||
template closeRefs() =
|
|
||||||
# We can't trust try/finally in async/await in all nim versions, so we
|
|
||||||
# do it manually instead
|
|
||||||
if req != nil:
|
|
||||||
try:
|
|
||||||
await req.closeWait()
|
|
||||||
except CatchableError as exc: # shouldn't happen
|
|
||||||
used(exc)
|
|
||||||
debug "Error closing JSON-RPC HTTP resuest/response", err = exc.msg
|
|
||||||
if res != nil:
|
|
||||||
try:
|
|
||||||
await res.closeWait()
|
|
||||||
except CatchableError as exc: # shouldn't happen
|
|
||||||
used(exc)
|
|
||||||
debug "Error closing JSON-RPC HTTP resuest/response", err = exc.msg
|
|
||||||
|
|
||||||
debug "Sending message to RPC server",
|
debug "Sending message to RPC server",
|
||||||
address = client.httpAddress, msg_len = len(reqBody), name
|
address = client.httpAddress, msg_len = len(reqBody), name
|
||||||
trace "Message", msg = reqBody
|
trace "Message", msg = reqBody
|
||||||
@ -113,17 +102,17 @@ method call*(client: RpcHttpClient, name: string,
|
|||||||
await req.send()
|
await req.send()
|
||||||
except CancelledError as e:
|
except CancelledError as e:
|
||||||
debug "Cancelled POST Request with JSON-RPC", e = e.msg
|
debug "Cancelled POST Request with JSON-RPC", e = e.msg
|
||||||
closeRefs()
|
closeRefs(req, res)
|
||||||
raise e
|
raise e
|
||||||
except CatchableError as e:
|
except CatchableError as e:
|
||||||
debug "Failed to send POST Request with JSON-RPC", e = e.msg
|
debug "Failed to send POST Request with JSON-RPC", e = e.msg
|
||||||
closeRefs()
|
closeRefs(req, res)
|
||||||
raise (ref RpcPostError)(msg: "Failed to send POST Request with JSON-RPC: " & e.msg, parent: e)
|
raise (ref RpcPostError)(msg: "Failed to send POST Request with JSON-RPC: " & e.msg, parent: e)
|
||||||
|
|
||||||
if res.status < 200 or res.status >= 300: # res.status is not 2xx (success)
|
if res.status < 200 or res.status >= 300: # res.status is not 2xx (success)
|
||||||
debug "Unsuccessful POST Request with JSON-RPC",
|
debug "Unsuccessful POST Request with JSON-RPC",
|
||||||
status = res.status, reason = res.reason
|
status = res.status, reason = res.reason
|
||||||
closeRefs()
|
closeRefs(req, res)
|
||||||
raise (ref ErrorResponse)(status: res.status, msg: res.reason)
|
raise (ref ErrorResponse)(status: res.status, msg: res.reason)
|
||||||
|
|
||||||
let resBytes =
|
let resBytes =
|
||||||
@ -131,15 +120,34 @@ method call*(client: RpcHttpClient, name: string,
|
|||||||
await res.getBodyBytes(client.maxBodySize)
|
await res.getBodyBytes(client.maxBodySize)
|
||||||
except CancelledError as e:
|
except CancelledError as e:
|
||||||
debug "Cancelled POST Response for JSON-RPC", e = e.msg
|
debug "Cancelled POST Response for JSON-RPC", e = e.msg
|
||||||
closeRefs()
|
closeRefs(req, res)
|
||||||
raise e
|
raise e
|
||||||
except CatchableError as e:
|
except CatchableError as e:
|
||||||
debug "Failed to read POST Response for JSON-RPC", e = e.msg
|
debug "Failed to read POST Response for JSON-RPC", e = e.msg
|
||||||
closeRefs()
|
closeRefs(req, res)
|
||||||
raise (ref FailedHttpResponse)(msg: "Failed to read POST Response for JSON-RPC: " & e.msg, parent: e)
|
raise (ref FailedHttpResponse)(msg: "Failed to read POST Response for JSON-RPC: " & e.msg, parent: e)
|
||||||
|
|
||||||
let resText = string.fromBytes(resBytes)
|
result = string.fromBytes(resBytes)
|
||||||
trace "Response", text = resText
|
trace "Response", text = result
|
||||||
|
closeRefs(req, res)
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------------------
|
||||||
|
# Public functions
|
||||||
|
# ------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
proc newRpcHttpClient*(
|
||||||
|
maxBodySize = MaxHttpRequestSize, secure = false,
|
||||||
|
getHeaders: GetJsonRpcRequestHeaders = nil,
|
||||||
|
flags: HttpClientFlags = {}): RpcHttpClient =
|
||||||
|
RpcHttpClient.new(maxBodySize, secure, getHeaders, flags)
|
||||||
|
|
||||||
|
method call*(client: RpcHttpClient, name: string,
|
||||||
|
params: RequestParamsTx): Future[JsonString]
|
||||||
|
{.async, gcsafe.} =
|
||||||
|
let
|
||||||
|
id = client.getNextId()
|
||||||
|
reqBody = requestTxEncode(name, params, id)
|
||||||
|
resText = await client.callImpl(reqBody)
|
||||||
|
|
||||||
# completed by processMessage - the flow is quite weird here to accomodate
|
# completed by processMessage - the flow is quite weird here to accomodate
|
||||||
# socket and ws clients, but could use a more thorough refactoring
|
# socket and ws clients, but could use a more thorough refactoring
|
||||||
@ -155,13 +163,10 @@ method call*(client: RpcHttpClient, name: string,
|
|||||||
let exc = newException(JsonRpcError, msgRes.error)
|
let exc = newException(JsonRpcError, msgRes.error)
|
||||||
newFut.fail(exc)
|
newFut.fail(exc)
|
||||||
client.awaiting.del(id)
|
client.awaiting.del(id)
|
||||||
closeRefs()
|
|
||||||
raise exc
|
raise exc
|
||||||
|
|
||||||
client.awaiting.del(id)
|
client.awaiting.del(id)
|
||||||
|
|
||||||
closeRefs()
|
|
||||||
|
|
||||||
# processMessage should have completed this future - if it didn't, `read` will
|
# processMessage should have completed this future - if it didn't, `read` will
|
||||||
# raise, which is reasonable
|
# raise, which is reasonable
|
||||||
if newFut.finished:
|
if newFut.finished:
|
||||||
@ -171,6 +176,34 @@ method call*(client: RpcHttpClient, name: string,
|
|||||||
debug "Invalid POST Response for JSON-RPC"
|
debug "Invalid POST Response for JSON-RPC"
|
||||||
raise newException(InvalidResponse, "Invalid response")
|
raise newException(InvalidResponse, "Invalid response")
|
||||||
|
|
||||||
|
method callBatch*(client: RpcHttpClient,
|
||||||
|
calls: RequestBatchTx): Future[ResponseBatchRx]
|
||||||
|
{.gcsafe, async.} =
|
||||||
|
let
|
||||||
|
reqBody = requestBatchEncode(calls)
|
||||||
|
resText = await client.callImpl(reqBody)
|
||||||
|
|
||||||
|
if client.batchFut.isNil or client.batchFut.finished():
|
||||||
|
client.batchFut = newFuture[ResponseBatchRx]()
|
||||||
|
|
||||||
|
# Might error for all kinds of reasons
|
||||||
|
let msgRes = client.processMessage(resText)
|
||||||
|
if msgRes.isErr:
|
||||||
|
# Need to clean up in case the answer was invalid
|
||||||
|
debug "Failed to process POST Response for JSON-RPC", msg = msgRes.error
|
||||||
|
let exc = newException(JsonRpcError, msgRes.error)
|
||||||
|
client.batchFut.fail(exc)
|
||||||
|
raise exc
|
||||||
|
|
||||||
|
# processMessage should have completed this future - if it didn't, `read` will
|
||||||
|
# raise, which is reasonable
|
||||||
|
if client.batchFut.finished:
|
||||||
|
return client.batchFut.read()
|
||||||
|
else:
|
||||||
|
# TODO: Provide more clarity regarding the failure here
|
||||||
|
debug "Invalid POST Response for JSON-RPC"
|
||||||
|
raise newException(InvalidResponse, "Invalid response")
|
||||||
|
|
||||||
proc connect*(client: RpcHttpClient, url: string) {.async.} =
|
proc connect*(client: RpcHttpClient, url: string) {.async.} =
|
||||||
client.httpAddress = client.httpSession.getAddress(url)
|
client.httpAddress = client.httpSession.getAddress(url)
|
||||||
if client.httpAddress.isErr:
|
if client.httpAddress.isErr:
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
# json-rpc
|
# json-rpc
|
||||||
# Copyright (c) 2019-2023 Status Research & Development GmbH
|
# Copyright (c) 2019-2024 Status Research & Development GmbH
|
||||||
# Licensed under either of
|
# Licensed under either of
|
||||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||||
@ -35,26 +35,45 @@ proc newRpcSocketClient*: RpcSocketClient =
|
|||||||
## Creates a new client instance.
|
## Creates a new client instance.
|
||||||
RpcSocketClient.new()
|
RpcSocketClient.new()
|
||||||
|
|
||||||
method call*(self: RpcSocketClient, name: string,
|
method call*(client: RpcSocketClient, name: string,
|
||||||
params: RequestParamsTx): Future[JsonString] {.async, gcsafe.} =
|
params: RequestParamsTx): Future[JsonString] {.async, gcsafe.} =
|
||||||
## Remotely calls the specified RPC method.
|
## Remotely calls the specified RPC method.
|
||||||
let id = self.getNextId()
|
let id = client.getNextId()
|
||||||
var value = requestTxEncode(name, params, id) & "\r\n"
|
var jsonBytes = requestTxEncode(name, params, id) & "\r\n"
|
||||||
if self.transport.isNil:
|
if client.transport.isNil:
|
||||||
raise newException(JsonRpcError,
|
raise newException(JsonRpcError,
|
||||||
"Transport is not initialised (missing a call to connect?)")
|
"Transport is not initialised (missing a call to connect?)")
|
||||||
|
|
||||||
# completed by processMessage.
|
# completed by processMessage.
|
||||||
var newFut = newFuture[JsonString]()
|
var newFut = newFuture[JsonString]()
|
||||||
# add to awaiting responses
|
# add to awaiting responses
|
||||||
self.awaiting[id] = newFut
|
client.awaiting[id] = newFut
|
||||||
|
|
||||||
let res = await self.transport.write(value)
|
let res = await client.transport.write(jsonBytes)
|
||||||
# TODO: Add actions when not full packet was send, e.g. disconnect peer.
|
# TODO: Add actions when not full packet was send, e.g. disconnect peer.
|
||||||
doAssert(res == len(value))
|
doAssert(res == jsonBytes.len)
|
||||||
|
|
||||||
return await newFut
|
return await newFut
|
||||||
|
|
||||||
|
method callBatch*(client: RpcSocketClient,
|
||||||
|
calls: RequestBatchTx): Future[ResponseBatchRx]
|
||||||
|
{.gcsafe, async.} =
|
||||||
|
if client.transport.isNil:
|
||||||
|
raise newException(JsonRpcError,
|
||||||
|
"Transport is not initialised (missing a call to connect?)")
|
||||||
|
|
||||||
|
if client.batchFut.isNil or client.batchFut.finished():
|
||||||
|
client.batchFut = newFuture[ResponseBatchRx]()
|
||||||
|
|
||||||
|
let
|
||||||
|
jsonBytes = requestBatchEncode(calls) & "\r\n"
|
||||||
|
res = await client.transport.write(jsonBytes)
|
||||||
|
|
||||||
|
# TODO: Add actions when not full packet was send, e.g. disconnect peer.
|
||||||
|
doAssert(res == jsonBytes.len)
|
||||||
|
|
||||||
|
return await client.batchFut
|
||||||
|
|
||||||
proc processData(client: RpcSocketClient) {.async.} =
|
proc processData(client: RpcSocketClient) {.async.} =
|
||||||
while true:
|
while true:
|
||||||
while true:
|
while true:
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
# json-rpc
|
# json-rpc
|
||||||
# Copyright (c) 2019-2023 Status Research & Development GmbH
|
# Copyright (c) 2019-2024 Status Research & Development GmbH
|
||||||
# Licensed under either of
|
# Licensed under either of
|
||||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||||
@ -38,23 +38,39 @@ proc newRpcWebSocketClient*(
|
|||||||
## Creates a new client instance.
|
## Creates a new client instance.
|
||||||
RpcWebSocketClient.new(getHeaders)
|
RpcWebSocketClient.new(getHeaders)
|
||||||
|
|
||||||
method call*(self: RpcWebSocketClient, name: string,
|
method call*(client: RpcWebSocketClient, name: string,
|
||||||
params: RequestParamsTx): Future[JsonString] {.async, gcsafe.} =
|
params: RequestParamsTx): Future[JsonString] {.async, gcsafe.} =
|
||||||
## Remotely calls the specified RPC method.
|
## Remotely calls the specified RPC method.
|
||||||
let id = self.getNextId()
|
if client.transport.isNil:
|
||||||
var value = requestTxEncode(name, params, id) & "\r\n"
|
|
||||||
if self.transport.isNil:
|
|
||||||
raise newException(JsonRpcError,
|
raise newException(JsonRpcError,
|
||||||
"Transport is not initialised (missing a call to connect?)")
|
"Transport is not initialised (missing a call to connect?)")
|
||||||
|
|
||||||
|
let id = client.getNextId()
|
||||||
|
var value = requestTxEncode(name, params, id) & "\r\n"
|
||||||
|
|
||||||
# completed by processMessage.
|
# completed by processMessage.
|
||||||
var newFut = newFuture[JsonString]()
|
var newFut = newFuture[JsonString]()
|
||||||
# add to awaiting responses
|
# add to awaiting responses
|
||||||
self.awaiting[id] = newFut
|
client.awaiting[id] = newFut
|
||||||
|
|
||||||
await self.transport.send(value)
|
await client.transport.send(value)
|
||||||
return await newFut
|
return await newFut
|
||||||
|
|
||||||
|
method callBatch*(client: RpcWebSocketClient,
|
||||||
|
calls: RequestBatchTx): Future[ResponseBatchRx]
|
||||||
|
{.gcsafe, async.} =
|
||||||
|
if client.transport.isNil:
|
||||||
|
raise newException(JsonRpcError,
|
||||||
|
"Transport is not initialised (missing a call to connect?)")
|
||||||
|
|
||||||
|
if client.batchFut.isNil or client.batchFut.finished():
|
||||||
|
client.batchFut = newFuture[ResponseBatchRx]()
|
||||||
|
|
||||||
|
let jsonBytes = requestBatchEncode(calls) & "\r\n"
|
||||||
|
await client.transport.send(jsonBytes)
|
||||||
|
|
||||||
|
return await client.batchFut
|
||||||
|
|
||||||
proc processData(client: RpcWebSocketClient) {.async.} =
|
proc processData(client: RpcWebSocketClient) {.async.} =
|
||||||
var error: ref CatchableError
|
var error: ref CatchableError
|
||||||
|
|
||||||
|
|||||||
@ -32,6 +32,17 @@ proc createRpcProc(procName, parameters, callBody: NimNode): NimNode =
|
|||||||
# export this proc
|
# export this proc
|
||||||
result[0] = nnkPostfix.newTree(ident"*", newIdentNode($procName))
|
result[0] = nnkPostfix.newTree(ident"*", newIdentNode($procName))
|
||||||
|
|
||||||
|
proc createBatchCallProc(procName, parameters, callBody: NimNode): NimNode =
|
||||||
|
# parameters come as a tree
|
||||||
|
var paramList = newSeq[NimNode]()
|
||||||
|
for p in parameters: paramList.add(p)
|
||||||
|
|
||||||
|
# build proc
|
||||||
|
result = newProc(procName, paramList, callBody)
|
||||||
|
|
||||||
|
# export this proc
|
||||||
|
result[0] = nnkPostfix.newTree(ident"*", newIdentNode($procName))
|
||||||
|
|
||||||
proc setupConversion(reqParams, params: NimNode): NimNode =
|
proc setupConversion(reqParams, params: NimNode): NimNode =
|
||||||
# populate json params
|
# populate json params
|
||||||
# even rpcs with no parameters have an empty json array node sent
|
# even rpcs with no parameters have an empty json array node sent
|
||||||
@ -47,7 +58,7 @@ proc setupConversion(reqParams, params: NimNode): NimNode =
|
|||||||
|
|
||||||
proc createRpcFromSig*(clientType, rpcDecl: NimNode, alias = NimNode(nil)): NimNode =
|
proc createRpcFromSig*(clientType, rpcDecl: NimNode, alias = NimNode(nil)): NimNode =
|
||||||
## This procedure will generate something like this:
|
## This procedure will generate something like this:
|
||||||
## - Currently it always send posisitional parameters to the server
|
## - Currently it always send positional parameters to the server
|
||||||
##
|
##
|
||||||
## proc rpcApi(client: RpcClient; paramA: TypeA; paramB: TypeB): Future[RetType] =
|
## proc rpcApi(client: RpcClient; paramA: TypeA; paramB: TypeB): Future[RetType] =
|
||||||
## {.gcsafe.}:
|
## {.gcsafe.}:
|
||||||
@ -66,11 +77,11 @@ proc createRpcFromSig*(clientType, rpcDecl: NimNode, alias = NimNode(nil)): NimN
|
|||||||
procName = if alias.isNil: rpcDecl.name else: alias
|
procName = if alias.isNil: rpcDecl.name else: alias
|
||||||
pathStr = $rpcDecl.name
|
pathStr = $rpcDecl.name
|
||||||
returnType = params[0]
|
returnType = params[0]
|
||||||
reqParams = genSym(nskVar, "reqParams")
|
reqParams = ident "reqParams"
|
||||||
setup = setupConversion(reqParams, params)
|
setup = setupConversion(reqParams, params)
|
||||||
clientIdent = ident"client"
|
clientIdent = ident"client"
|
||||||
# temporary variable to hold `Response` from rpc call
|
# temporary variable to hold `Response` from rpc call
|
||||||
rpcResult = genSym(nskLet, "res")
|
rpcResult = ident "res"
|
||||||
# proc return variable
|
# proc return variable
|
||||||
procRes = ident"result"
|
procRes = ident"result"
|
||||||
doDecode = quote do:
|
doDecode = quote do:
|
||||||
@ -79,6 +90,9 @@ proc createRpcFromSig*(clientType, rpcDecl: NimNode, alias = NimNode(nil)): NimN
|
|||||||
if returnType.noWrap: quote do:
|
if returnType.noWrap: quote do:
|
||||||
`procRes` = `rpcResult`
|
`procRes` = `rpcResult`
|
||||||
else: doDecode
|
else: doDecode
|
||||||
|
|
||||||
|
batchParams = params.copy
|
||||||
|
batchIdent = ident "batch"
|
||||||
|
|
||||||
# insert rpc client as first parameter
|
# insert rpc client as first parameter
|
||||||
params.insert(1, nnkIdentDefs.newTree(
|
params.insert(1, nnkIdentDefs.newTree(
|
||||||
@ -99,8 +113,29 @@ proc createRpcFromSig*(clientType, rpcDecl: NimNode, alias = NimNode(nil)): NimN
|
|||||||
let `rpcResult` = await `clientIdent`.call(`pathStr`, `reqParams`)
|
let `rpcResult` = await `clientIdent`.call(`pathStr`, `reqParams`)
|
||||||
`maybeWrap`
|
`maybeWrap`
|
||||||
|
|
||||||
|
|
||||||
|
# insert RpcBatchCallRef as first parameter
|
||||||
|
batchParams.insert(1, nnkIdentDefs.newTree(
|
||||||
|
batchIdent,
|
||||||
|
ident "RpcBatchCallRef",
|
||||||
|
newEmptyNode()
|
||||||
|
))
|
||||||
|
|
||||||
|
# remove return type
|
||||||
|
batchParams[0] = newEmptyNode()
|
||||||
|
|
||||||
|
let batchCallBody = quote do:
|
||||||
|
`setup`
|
||||||
|
`batchIdent`.batch.add RpcBatchItem(
|
||||||
|
meth: `pathStr`,
|
||||||
|
params: `reqParams`
|
||||||
|
)
|
||||||
|
|
||||||
# create rpc proc
|
# create rpc proc
|
||||||
result = createRpcProc(procName, params, callBody)
|
result = newStmtList()
|
||||||
|
result.add createRpcProc(procName, params, callBody)
|
||||||
|
result.add createBatchCallProc(procName, batchParams, batchCallBody)
|
||||||
|
|
||||||
when defined(nimDumpRpcs):
|
when defined(nimDumpRpcs):
|
||||||
echo pathStr, ":\n", result.repr
|
echo pathStr, ":\n", result.repr
|
||||||
|
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
# json-rpc
|
# json-rpc
|
||||||
# Copyright (c) 2019-2023 Status Research & Development GmbH
|
# Copyright (c) 2019-2024 Status Research & Development GmbH
|
||||||
# Licensed under either of
|
# Licensed under either of
|
||||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
# json-rpc
|
# json-rpc
|
||||||
# Copyright (c) 2019-2023 Status Research & Development GmbH
|
# Copyright (c) 2019-2024 Status Research & Development GmbH
|
||||||
# Licensed under either of
|
# Licensed under either of
|
||||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||||
@ -155,21 +155,32 @@ proc route*(router: RpcRouter, data: string):
|
|||||||
|
|
||||||
let request =
|
let request =
|
||||||
try:
|
try:
|
||||||
JrpcSys.decode(data, RequestRx)
|
JrpcSys.decode(data, RequestBatchRx)
|
||||||
except CatchableError as err:
|
except CatchableError as err:
|
||||||
return wrapError(JSON_PARSE_ERROR, err.msg)
|
return wrapError(JSON_PARSE_ERROR, err.msg)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
# TODO https://github.com/status-im/nimbus-eth2/issues/2430
|
# TODO https://github.com/status-im/nimbus-eth2/issues/2430
|
||||||
return wrapError(JSON_PARSE_ERROR, err.msg)
|
return wrapError(JSON_PARSE_ERROR, err.msg)
|
||||||
|
|
||||||
let reply =
|
let reply = try:
|
||||||
try:
|
if request.kind == rbkSingle:
|
||||||
let response = await router.route(request)
|
let response = await router.route(request.single)
|
||||||
JrpcSys.encode(response)
|
JrpcSys.encode(response)
|
||||||
|
elif request.many.len == 0:
|
||||||
|
wrapError(INVALID_REQUEST, "no request object in request array")
|
||||||
|
else:
|
||||||
|
var resFut: seq[Future[ResponseTx]]
|
||||||
|
for req in request.many:
|
||||||
|
resFut.add router.route(req)
|
||||||
|
await noCancel(allFutures(resFut))
|
||||||
|
var response = ResponseBatchTx(kind: rbkMany)
|
||||||
|
for fut in resFut:
|
||||||
|
response.many.add fut.read()
|
||||||
|
JrpcSys.encode(response)
|
||||||
except CatchableError as err:
|
except CatchableError as err:
|
||||||
return wrapError(JSON_ENCODE_ERROR, err.msg)
|
wrapError(JSON_ENCODE_ERROR, err.msg)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
return wrapError(JSON_ENCODE_ERROR, err.msg)
|
wrapError(JSON_ENCODE_ERROR, err.msg)
|
||||||
|
|
||||||
when defined(nimHasWarnBareExcept):
|
when defined(nimHasWarnBareExcept):
|
||||||
{.pop warning[BareExcept]:on.}
|
{.pop warning[BareExcept]:on.}
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
# json-rpc
|
# json-rpc
|
||||||
# Copyright (c) 2019-2023 Status Research & Development GmbH
|
# Copyright (c) 2019-2024 Status Research & Development GmbH
|
||||||
# Licensed under either of
|
# Licensed under either of
|
||||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||||
@ -20,4 +20,5 @@ import
|
|||||||
test_jrpc_sys,
|
test_jrpc_sys,
|
||||||
test_router_rpc,
|
test_router_rpc,
|
||||||
test_callsigs,
|
test_callsigs,
|
||||||
test_client_hook
|
test_client_hook,
|
||||||
|
test_batch_call
|
||||||
|
|||||||
145
tests/test_batch_call.nim
Normal file
145
tests/test_batch_call.nim
Normal file
@ -0,0 +1,145 @@
|
|||||||
|
# json-rpc
|
||||||
|
# Copyright (c) 2024 Status Research & Development GmbH
|
||||||
|
# Licensed under either of
|
||||||
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||||
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||||
|
# at your option.
|
||||||
|
# This file may not be copied, modified, or distributed except according to
|
||||||
|
# those terms.
|
||||||
|
|
||||||
|
import
|
||||||
|
unittest2,
|
||||||
|
../json_rpc/rpcclient,
|
||||||
|
../json_rpc/rpcserver
|
||||||
|
|
||||||
|
createRpcSigsFromNim(RpcClient):
|
||||||
|
proc get_banana(id: int): bool
|
||||||
|
proc get_apple(id: string): string
|
||||||
|
proc get_except(): string
|
||||||
|
|
||||||
|
proc setupServer(server: RpcServer) =
|
||||||
|
server.rpc("get_banana") do(id: int) -> bool:
|
||||||
|
return id == 13
|
||||||
|
|
||||||
|
server.rpc("get_apple") do(id: string) -> string:
|
||||||
|
return "apple: " & id
|
||||||
|
|
||||||
|
server.rpc("get_except") do() -> string:
|
||||||
|
raise newException(ValueError, "get_except error")
|
||||||
|
|
||||||
|
suite "Socket batch call":
|
||||||
|
var srv = newRpcSocketServer(["127.0.0.1:0"])
|
||||||
|
var client = newRpcSocketClient()
|
||||||
|
|
||||||
|
srv.setupServer()
|
||||||
|
srv.start()
|
||||||
|
waitFor client.connect(srv.localAddress()[0])
|
||||||
|
|
||||||
|
test "batch call basic":
|
||||||
|
let batch = client.prepareBatch()
|
||||||
|
|
||||||
|
batch.get_banana(11)
|
||||||
|
batch.get_apple("green")
|
||||||
|
batch.get_except()
|
||||||
|
|
||||||
|
let res = waitFor batch.send()
|
||||||
|
check res.isOk
|
||||||
|
if res.isErr:
|
||||||
|
debugEcho res.error
|
||||||
|
break
|
||||||
|
|
||||||
|
let r = res.get
|
||||||
|
check r[0].error.isNone
|
||||||
|
check r[0].result.string == "false"
|
||||||
|
|
||||||
|
check r[1].error.isNone
|
||||||
|
check r[1].result.string == "\"apple: green\""
|
||||||
|
|
||||||
|
check r[2].error.isSome
|
||||||
|
check r[2].error.get == """{"code":-32000,"message":"get_except raised an exception","data":"get_except error"}"""
|
||||||
|
check r[2].result.string.len == 0
|
||||||
|
|
||||||
|
test "rpc call after batch call":
|
||||||
|
let res = waitFor client.get_banana(13)
|
||||||
|
check res == true
|
||||||
|
|
||||||
|
srv.stop()
|
||||||
|
waitFor srv.closeWait()
|
||||||
|
|
||||||
|
suite "HTTP batch call":
|
||||||
|
var srv = newRpcHttpServer(["127.0.0.1:0"])
|
||||||
|
var client = newRpcHttpClient()
|
||||||
|
|
||||||
|
srv.setupServer()
|
||||||
|
srv.start()
|
||||||
|
waitFor client.connect("http://" & $srv.localAddress()[0])
|
||||||
|
|
||||||
|
test "batch call basic":
|
||||||
|
let batch = client.prepareBatch()
|
||||||
|
|
||||||
|
batch.get_banana(11)
|
||||||
|
batch.get_apple("green")
|
||||||
|
batch.get_except()
|
||||||
|
|
||||||
|
let res = waitFor batch.send()
|
||||||
|
check res.isOk
|
||||||
|
if res.isErr:
|
||||||
|
debugEcho res.error
|
||||||
|
break
|
||||||
|
|
||||||
|
let r = res.get
|
||||||
|
check r[0].error.isNone
|
||||||
|
check r[0].result.string == "false"
|
||||||
|
|
||||||
|
check r[1].error.isNone
|
||||||
|
check r[1].result.string == "\"apple: green\""
|
||||||
|
|
||||||
|
check r[2].error.isSome
|
||||||
|
check r[2].error.get == """{"code":-32000,"message":"get_except raised an exception","data":"get_except error"}"""
|
||||||
|
check r[2].result.string.len == 0
|
||||||
|
|
||||||
|
test "rpc call after batch call":
|
||||||
|
let res = waitFor client.get_banana(13)
|
||||||
|
check res == true
|
||||||
|
|
||||||
|
waitFor srv.stop()
|
||||||
|
waitFor srv.closeWait()
|
||||||
|
|
||||||
|
suite "Websocket batch call":
|
||||||
|
var srv = newRpcWebSocketServer("127.0.0.1", Port(0))
|
||||||
|
var client = newRpcWebSocketClient()
|
||||||
|
|
||||||
|
srv.setupServer()
|
||||||
|
srv.start()
|
||||||
|
waitFor client.connect("ws://" & $srv.localAddress())
|
||||||
|
|
||||||
|
test "batch call basic":
|
||||||
|
let batch = client.prepareBatch()
|
||||||
|
|
||||||
|
batch.get_banana(11)
|
||||||
|
batch.get_apple("green")
|
||||||
|
batch.get_except()
|
||||||
|
|
||||||
|
let res = waitFor batch.send()
|
||||||
|
check res.isOk
|
||||||
|
if res.isErr:
|
||||||
|
debugEcho res.error
|
||||||
|
break
|
||||||
|
|
||||||
|
let r = res.get
|
||||||
|
check r[0].error.isNone
|
||||||
|
check r[0].result.string == "false"
|
||||||
|
|
||||||
|
check r[1].error.isNone
|
||||||
|
check r[1].result.string == "\"apple: green\""
|
||||||
|
|
||||||
|
check r[2].error.isSome
|
||||||
|
check r[2].error.get == """{"code":-32000,"message":"get_except raised an exception","data":"get_except error"}"""
|
||||||
|
check r[2].result.string.len == 0
|
||||||
|
|
||||||
|
test "rpc call after batch call":
|
||||||
|
let res = waitFor client.get_banana(13)
|
||||||
|
check res == true
|
||||||
|
|
||||||
|
srv.stop()
|
||||||
|
waitFor srv.closeWait()
|
||||||
Loading…
x
Reference in New Issue
Block a user