Both http server and client now can handle chunked transfer

This commit is contained in:
jangko 2024-01-12 17:05:55 +07:00
parent 97d19b9583
commit 66208055bc
No known key found for this signature in database
GPG Key ID: 31702AE10541E6B9
2 changed files with 50 additions and 11 deletions

View File

@ -37,6 +37,7 @@ type
RpcHttpServer* = ref object of RpcServer RpcHttpServer* = ref object of RpcServer
httpServers: seq[HttpServerRef] httpServers: seq[HttpServerRef]
authHooks: seq[HttpAuthHook] authHooks: seq[HttpAuthHook]
maxChunkSize: int
proc processClientRpc(rpcServer: RpcHttpServer): HttpProcessCallback2 = proc processClientRpc(rpcServer: RpcHttpServer): HttpProcessCallback2 =
return proc (req: RequestFence): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} = return proc (req: RequestFence): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} =
@ -64,15 +65,31 @@ proc processClientRpc(rpcServer: RpcHttpServer): HttpProcessCallback2 =
let let
headers = HttpTable.init([("Content-Type", headers = HttpTable.init([("Content-Type",
"application/json; charset=utf-8")]) "application/json; charset=utf-8")])
chunkSize = rpcServer.maxChunkSize
try: try:
let let
body = await request.getBody() body = await request.getBody()
data = await rpcServer.route(string.fromBytes(body)) data = await rpcServer.route(string.fromBytes(body))
res = await request.respond(Http200, data, headers)
if data.len <= chunkSize:
let res = await request.respond(Http200, data, headers)
trace "JSON-RPC result has been sent" trace "JSON-RPC result has been sent"
return res return res
let response = request.getResponse()
response.status = Http200
response.addHeader("Content-Type", "application/json; charset=utf-8")
await response.prepare()
let maxLen = data.len
var len = data.len
while len > chunkSize:
await response.sendChunk(data[maxLen - len].unsafeAddr, chunkSize)
len -= chunkSize
if len > 0:
await response.sendChunk(data[maxLen - len].unsafeAddr, len)
await response.finish()
except CancelledError as exc: except CancelledError as exc:
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
@ -243,10 +260,10 @@ proc addSecureHttpServer*(server: RpcHttpServer,
server.addSecureHttpServer(a, tlsPrivateKey, tlsCertificate) server.addSecureHttpServer(a, tlsPrivateKey, tlsCertificate)
proc new*(T: type RpcHttpServer, authHooks: seq[HttpAuthHook] = @[]): T = proc new*(T: type RpcHttpServer, authHooks: seq[HttpAuthHook] = @[]): T =
T(router: RpcRouter.init(), httpServers: @[], authHooks: authHooks) T(router: RpcRouter.init(), httpServers: @[], authHooks: authHooks, maxChunkSize: 8192)
proc new*(T: type RpcHttpServer, router: RpcRouter, authHooks: seq[HttpAuthHook] = @[]): T = proc new*(T: type RpcHttpServer, router: RpcRouter, authHooks: seq[HttpAuthHook] = @[]): T =
T(router: router, httpServers: @[], authHooks: authHooks) T(router: router, httpServers: @[], authHooks: authHooks, maxChunkSize: 8192)
proc newRpcHttpServer*(authHooks: seq[HttpAuthHook] = @[]): RpcHttpServer = proc newRpcHttpServer*(authHooks: seq[HttpAuthHook] = @[]): RpcHttpServer =
RpcHttpServer.new(authHooks) RpcHttpServer.new(authHooks)
@ -295,3 +312,6 @@ proc closeWait*(server: RpcHttpServer) {.async.} =
proc localAddress*(server: RpcHttpServer): seq[TransportAddress] = proc localAddress*(server: RpcHttpServer): seq[TransportAddress] =
for item in server.httpServers: for item in server.httpServers:
result.add item.instance.localAddress() result.add item.instance.localAddress()
proc setMaxChunkSize*(server: RpcHttpServer, maxChunkSize: int) =
server.maxChunkSize = maxChunkSize

View File

@ -7,8 +7,9 @@
# This file may not be copied, modified, or distributed except according to # This file may not be copied, modified, or distributed except according to
# those terms. # those terms.
import unittest2 import
import ../json_rpc/[rpcserver, rpcclient] unittest2,
../json_rpc/[rpcserver, rpcclient, jsonmarshal]
const TestsCount = 100 const TestsCount = 100
@ -46,6 +47,15 @@ proc invalidTest(address: string): Future[bool] {.async.} =
if invalidA and invalidB: if invalidA and invalidB:
result = true result = true
const bigChunkSize = 4 * 8192
proc chunkedTest(address: string): Future[bool] {.async.} =
var client = newRpcHttpClient()
await client.connect("http://" & address)
let r = await client.call("bigchunkMethod", %[])
let data = JrpcConv.decode(r.string, seq[byte])
return data.len == bigChunkSize
var httpsrv = newRpcHttpServer(["127.0.0.1:0"]) var httpsrv = newRpcHttpServer(["127.0.0.1:0"])
# Create RPC on server # Create RPC on server
@ -54,15 +64,24 @@ httpsrv.rpc("myProc") do(input: string, data: array[0..3, int]):
httpsrv.rpc("noParamsProc") do(): httpsrv.rpc("noParamsProc") do():
result = %("Hello world") result = %("Hello world")
httpsrv.rpc("bigchunkMethod") do() -> seq[byte]:
result = newSeq[byte](bigChunkSize)
for i in 0..<result.len:
result[i] = byte(i mod 255)
httpsrv.setMaxChunkSize(8192)
httpsrv.start() httpsrv.start()
let serverAddress = $httpsrv.localAddress()[0]
suite "JSON-RPC test suite": suite "JSON-RPC test suite":
test "Simple RPC call": test "Simple RPC call":
check waitFor(simpleTest($httpsrv.localAddress()[0])) == true check waitFor(simpleTest(serverAddress)) == true
test "Continuous RPC calls (" & $TestsCount & " messages)": test "Continuous RPC calls (" & $TestsCount & " messages)":
check waitFor(continuousTest($httpsrv.localAddress()[0])) == TestsCount check waitFor(continuousTest(serverAddress)) == TestsCount
test "Invalid RPC calls": test "Invalid RPC calls":
check waitFor(invalidTest($httpsrv.localAddress()[0])) == true check waitFor(invalidTest(serverAddress)) == true
test "Http client can handle chunked transfer encoding":
check waitFor(chunkedTest(serverAddress)) == true
waitFor httpsrv.stop() waitFor httpsrv.stop()
waitFor httpsrv.closeWait() waitFor httpsrv.closeWait()