From 9db5407e814cdd3d7d523a89d1709386544527cb Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Sat, 29 May 2021 19:15:03 +0300 Subject: [PATCH] Switch to Chronos HTTP client (adds support for HTTPS) --- json_rpc/clients/httpclient.nim | 215 ++++++-------------------------- 1 file changed, 40 insertions(+), 175 deletions(-) diff --git a/json_rpc/clients/httpclient.nim b/json_rpc/clients/httpclient.nim index 6419c68..b305dfb 100644 --- a/json_rpc/clients/httpclient.nim +++ b/json_rpc/clients/httpclient.nim @@ -1,10 +1,12 @@ import std/[strutils, tables, uri], - stew/byteutils, + stew/[byteutils, results], + chronos/apps/http/httpclient as chronosHttpClient, chronicles, httputils, json_serialization/std/net, - ../client + ".."/[client, errors] -export client +export + client logScope: topics = "JSONRPC-HTTP-CLIENT" @@ -14,188 +16,45 @@ type httpMethod: HttpMethod RpcHttpClient* = ref object of RpcClient - loop: Future[void] - addresses: seq[TransportAddress] - options: HttpClientOptions + httpSession: HttpSessionRef + httpAddress: HttpResult[HttpAddress] maxBodySize: int const - MaxHttpHeadersSize = 8192 # maximum size of HTTP headers in octets MaxHttpRequestSize = 128 * 1024 * 1024 # maximum size of HTTP body in octets - HttpHeadersTimeout = 120.seconds # timeout for receiving headers (120 sec) - HttpBodyTimeout = 12.seconds # timeout for receiving body (12 sec) - HeadersMark = @[byte(0x0D), byte(0x0A), byte(0x0D), byte(0x0A)] - -proc sendRequest(transp: StreamTransport, - data: string, httpMethod: HttpMethod): Future[bool] {.async.} = - var request = $httpMethod & " / " - request.add($HttpVersion10) - request.add("\r\n") - request.add("Date: " & httpDate() & "\r\n") - request.add("Host: " & $transp.remoteAddress & "\r\n") - request.add("Content-Type: application/json\r\n") - request.add("Content-Length: " & $len(data) & "\r\n") - request.add("\r\n") - if len(data) > 0: - request.add(data) - try: - let res = await transp.write(request.toBytes()) - return res == len(request): - except CancelledError as exc: raise exc - except CatchableError: - return false - -proc validateResponse*(transp: StreamTransport, - header: HttpResponseHeader): bool = - if header.code != 200: - debug "Server returns error", - httpcode = header.code, - httpreason = header.reason(), - address = transp.remoteAddress() - return false - - var ctype = header["Content-Type"] - # might be "application/json; charset=utf-8" - if "application/json" notin ctype.toLowerAscii(): - # Content-Type header is not "application/json" - debug "Content type must be application/json", - address = transp.remoteAddress() - return false - - let length = header.contentLength() - if length <= 0: - if header.version == HttpVersion11: - if header["Connection"].toLowerAscii() != "close": - # Response body length could not be calculated. - if header["Transfer-Encoding"].toLowerAscii() == "chunked": - debug "Chunked encoding is not supported", - address = transp.remoteAddress() - else: - debug "Content body size could not be calculated", - address = transp.remoteAddress() - return false - - return true - -proc recvData(transp: StreamTransport, maxBodySize: int): Future[string] {.async.} = - var buffer = newSeq[byte](MaxHttpHeadersSize) - var header: HttpResponseHeader - try: - let hlenfut = transp.readUntil(addr buffer[0], MaxHttpHeadersSize, - HeadersMark) - let ores = await withTimeout(hlenfut, HttpHeadersTimeout) - if not ores: - # Timeout - debug "Timeout expired while receiving headers", - address = transp.remoteAddress() - return "" - - let hlen = hlenfut.read() - buffer.setLen(hlen) - header = buffer.parseResponse() - if header.failed(): - # Header could not be parsed - debug "Malformed header received", - address = transp.remoteAddress() - return "" - except TransportLimitError: - # size of headers exceeds `MaxHttpHeadersSize` - debug "Maximum size of headers limit reached", - address = transp.remoteAddress() - return "" - except TransportIncompleteError: - # remote peer disconnected - debug "Remote peer disconnected", address = transp.remoteAddress() - return "" - except TransportOsError as exc: - debug "Problems with networking", address = transp.remoteAddress(), - error = exc.msg - return "" - - if not transp.validateResponse(header): - return "" - - let length = header.contentLength() - if length > maxBodySize: - debug "Request body too large", length, maxBodySize - return "" - - try: - if length > 0: - # `Content-Length` is present in response header. - buffer.setLen(length) - let blenfut = transp.readExactly(addr buffer[0], length) - let ores = await withTimeout(blenfut, HttpBodyTimeout) - if not ores: - # Timeout - debug "Timeout expired while receiving request body", - address = transp.remoteAddress() - return "" - - blenfut.read() # exceptions - else: - # `Content-Length` is not present in response header, so we are reading - # everything until connection will be closed. - var blenfut = transp.read(maxBodySize) - let ores = await withTimeout(blenfut, HttpBodyTimeout) - if not ores: - # Timeout - debug "Timeout expired while receiving request body", - address = transp.remoteAddress() - return "" - - buffer = blenfut.read() - except TransportIncompleteError: - # remote peer disconnected - debug "Remote peer disconnected", address = transp.remoteAddress() - return "" - except TransportOsError as exc: - debug "Problems with networking", address = transp.remoteAddress(), - error = exc.msg - return "" - - return string.fromBytes(buffer) proc new(T: type RpcHttpClient, maxBodySize = MaxHttpRequestSize): T = T( maxBodySize: maxBodySize, - options: HttpClientOptions(httpMethod: MethodPost), + httpSession: HttpSessionRef.new(), ) proc newRpcHttpClient*(maxBodySize = MaxHttpRequestSize): RpcHttpClient = RpcHttpClient.new(maxBodySize) -proc httpMethod*(client: RpcHttpClient): HttpMethod = - client.options.httpMethod - -proc httpMethod*(client: RpcHttpClient, m: HttpMethod) = - client.options.httpMethod = m - method call*(client: RpcHttpClient, name: string, - params: JsonNode): Future[Response] {. - async, gcsafe, raises: [Defect, CatchableError].} = - ## Remotely calls the specified RPC method. - let id = client.getNextId() + params: JsonNode): Future[Response] + {.async, gcsafe, raises: [Defect, CatchableError].} = + doAssert client.httpSession != nil + if client.httpAddress.isErr: + raise newException(RpcAddressUnresolvableError, client.httpAddress.error) let - transp = await connect(client.addresses[0]) + id = client.getNextId() reqBody = $rpcCallNode(name, params, id) - res = await transp.sendRequest(reqBody, client.httpMethod) + req = HttpClientRequestRef.post(client.httpSession, + client.httpAddress.get, + body = reqBody.toOpenArrayByte(0, reqBody.len - 1)) + res = await req.send() - if not res: - debug "Failed to send message to RPC server", - address = transp.remoteAddress(), msg_len = len(reqBody) - await transp.closeWait() - raise newException(ValueError, "Transport error") - - debug "Message sent to RPC server", address = transp.remoteAddress(), - msg_len = len(reqBody) + debug "Message sent to RPC server", + address = client.httpAddress, msg_len = len(reqBody) trace "Message", msg = reqBody + echo "req body ", reqBody - let value = await transp.recvData(client.maxBodySize) - await transp.closeWait() - if value.len == 0: - raise newException(ValueError, "Empty response from server") + let resText = string.fromBytes(await res.getBodyBytes(client.maxBodySize)) + trace "Response", text = resText + echo "response ", resText # completed by processMessage - the flow is quite weird here to accomodate # socket and ws clients, but could use a more thorough refactoring @@ -205,22 +64,28 @@ method call*(client: RpcHttpClient, name: string, try: # Might raise for all kinds of reasons - client.processMessage(value) + client.processMessage(resText) finally: # Need to clean up in case the answer was invalid client.awaiting.del(id) # processMessage should have completed this future - if it didn't, `read` will # raise, which is reasonable - return newFut.read() + if newFut.finished: + return newFut.read() + else: + # TODO: Provide more clarity regarding the failure here + raise newException(InvalidResponse, "Invalid response") + +proc connect*(client: RpcHttpClient, url: string) + {.async, raises: [Defect].} = + client.httpAddress = client.httpSession.getAddress(url) + if client.httpAddress.isErr: + raise newException(RpcAddressUnresolvableError, client.httpAddress.error) proc connect*(client: RpcHttpClient, address: string, port: Port) {.async.} = - client.addresses = resolveTAddress(address, port) + let addresses = resolveTAddress(address, port) + if addresses.len == 0: + raise newException(RpcAddressUnresolvableError, "Failed to resolve address: " & address) + ok client.httpAddress, getAddress(addresses[0]) -proc connect*(client: RpcHttpClient, url: string) {.async.} = - # TODO: The url (path, etc) should make it into the request - let pu = parseUri(url) - var port = Port(80) - if pu.port.len != 0: - port = parseInt(pu.port).Port - client.addresses = resolveTAddress(pu.hostname, port)