2019-07-31 09:04:53 +00:00
|
|
|
import json, strutils, tables, uri
|
2019-06-20 12:46:39 +00:00
|
|
|
import chronicles, httputils, chronos, json_serialization/std/net
|
2018-07-14 07:51:54 +00:00
|
|
|
import ../client
|
|
|
|
|
|
|
|
logScope:
|
2019-01-16 10:59:40 +00:00
|
|
|
topics = "JSONRPC-HTTP-CLIENT"
|
2018-07-14 07:51:54 +00:00
|
|
|
|
|
|
|
type
|
2018-11-12 04:43:51 +00:00
|
|
|
HttpClientOptions* = object
|
|
|
|
httpMethod: HttpMethod
|
|
|
|
|
2018-07-14 07:51:54 +00:00
|
|
|
RpcHttpClient* = ref object of RpcClient
|
2019-05-14 15:42:51 +00:00
|
|
|
loop: Future[void]
|
2018-07-14 07:51:54 +00:00
|
|
|
addresses: seq[TransportAddress]
|
2018-11-12 04:43:51 +00:00
|
|
|
options: HttpClientOptions
|
2018-07-14 07:51:54 +00:00
|
|
|
|
|
|
|
const
|
2019-03-25 06:58:35 +00:00
|
|
|
MaxHttpHeadersSize = 8192 # maximum size of HTTP headers in octets
|
|
|
|
MaxHttpRequestSize = 128 * 1024 # maximum size of HTTP body in octets
|
|
|
|
HttpHeadersTimeout = 120.seconds # timeout for receiving headers (120 sec)
|
|
|
|
HttpBodyTimeout = 12.seconds # timeout for receiving body (12 sec)
|
2018-07-14 07:51:54 +00:00
|
|
|
HeadersMark = @[byte(0x0D), byte(0x0A), byte(0x0D), byte(0x0A)]
|
|
|
|
|
|
|
|
proc sendRequest(transp: StreamTransport,
|
2018-11-13 03:22:43 +00:00
|
|
|
data: string, httpMethod: HttpMethod): Future[bool] {.async.} =
|
|
|
|
var request = $httpMethod & " / "
|
2019-01-10 02:40:28 +00:00
|
|
|
request.add($HttpVersion10)
|
2018-07-14 07:51:54 +00:00
|
|
|
request.add("\r\n")
|
|
|
|
request.add("Date: " & httpDate() & "\r\n")
|
2018-11-07 15:43:52 +00:00
|
|
|
request.add("Host: " & $transp.remoteAddress & "\r\n")
|
2018-07-14 07:51:54 +00:00
|
|
|
request.add("Content-Type: application/json\r\n")
|
|
|
|
request.add("Content-Length: " & $len(data) & "\r\n")
|
|
|
|
request.add("\r\n")
|
|
|
|
if len(data) > 0:
|
|
|
|
request.add(data)
|
|
|
|
try:
|
|
|
|
let res = await transp.write(cast[seq[char]](request))
|
|
|
|
if res != len(request):
|
|
|
|
result = false
|
|
|
|
result = true
|
|
|
|
except:
|
|
|
|
result = false
|
|
|
|
|
|
|
|
proc validateResponse*(transp: StreamTransport,
|
|
|
|
header: HttpResponseHeader): bool =
|
|
|
|
if header.code != 200:
|
|
|
|
debug "Server returns error",
|
|
|
|
httpcode = header.code,
|
|
|
|
httpreason = header.reason(),
|
|
|
|
address = transp.remoteAddress()
|
|
|
|
result = false
|
|
|
|
return
|
|
|
|
|
|
|
|
var ctype = header["Content-Type"]
|
2019-01-25 19:11:01 +00:00
|
|
|
# might be "application/json; charset=utf-8"
|
|
|
|
if "application/json" notin ctype.toLowerAscii():
|
2018-07-14 07:51:54 +00:00
|
|
|
# Content-Type header is not "application/json"
|
|
|
|
debug "Content type must be application/json",
|
|
|
|
address = transp.remoteAddress()
|
|
|
|
result = false
|
|
|
|
return
|
|
|
|
|
|
|
|
let length = header.contentLength()
|
|
|
|
if length <= 0:
|
2018-12-10 11:56:10 +00:00
|
|
|
if header.version == HttpVersion11:
|
|
|
|
if header["Connection"].toLowerAscii() != "close":
|
|
|
|
# Response body length could not be calculated.
|
|
|
|
if header["Transfer-Encoding"].toLowerAscii() == "chunked":
|
|
|
|
debug "Chunked encoding is not supported",
|
|
|
|
address = transp.remoteAddress()
|
|
|
|
else:
|
|
|
|
debug "Content body size could not be calculated",
|
|
|
|
address = transp.remoteAddress()
|
2019-06-12 13:44:19 +00:00
|
|
|
result = false
|
|
|
|
return
|
2018-07-14 07:51:54 +00:00
|
|
|
|
|
|
|
result = true
|
|
|
|
|
|
|
|
proc recvData(transp: StreamTransport): Future[string] {.async.} =
|
|
|
|
var buffer = newSeq[byte](MaxHttpHeadersSize)
|
|
|
|
var header: HttpResponseHeader
|
|
|
|
var error = false
|
|
|
|
try:
|
|
|
|
let hlenfut = transp.readUntil(addr buffer[0], MaxHttpHeadersSize,
|
|
|
|
HeadersMark)
|
|
|
|
let ores = await withTimeout(hlenfut, HttpHeadersTimeout)
|
|
|
|
if not ores:
|
|
|
|
# Timeout
|
|
|
|
debug "Timeout expired while receiving headers",
|
|
|
|
address = transp.remoteAddress()
|
|
|
|
error = true
|
|
|
|
else:
|
|
|
|
let hlen = hlenfut.read()
|
|
|
|
buffer.setLen(hlen)
|
|
|
|
header = buffer.parseResponse()
|
|
|
|
if header.failed():
|
|
|
|
# Header could not be parsed
|
|
|
|
debug "Malformed header received",
|
|
|
|
address = transp.remoteAddress()
|
|
|
|
error = true
|
|
|
|
except TransportLimitError:
|
|
|
|
# size of headers exceeds `MaxHttpHeadersSize`
|
|
|
|
debug "Maximum size of headers limit reached",
|
|
|
|
address = transp.remoteAddress()
|
|
|
|
error = true
|
|
|
|
except TransportIncompleteError:
|
|
|
|
# remote peer disconnected
|
|
|
|
debug "Remote peer disconnected", address = transp.remoteAddress()
|
|
|
|
error = true
|
2019-12-02 11:14:00 +00:00
|
|
|
except TransportOsError as exc:
|
2018-07-14 07:51:54 +00:00
|
|
|
debug "Problems with networking", address = transp.remoteAddress(),
|
2019-12-02 11:14:00 +00:00
|
|
|
error = exc.msg
|
2018-07-14 07:51:54 +00:00
|
|
|
error = true
|
|
|
|
|
|
|
|
if error or not transp.validateResponse(header):
|
|
|
|
result = ""
|
|
|
|
return
|
|
|
|
|
|
|
|
let length = header.contentLength()
|
|
|
|
try:
|
2018-12-10 11:56:10 +00:00
|
|
|
if length > 0:
|
|
|
|
# `Content-Length` is present in response header.
|
|
|
|
buffer.setLen(length)
|
|
|
|
let blenfut = transp.readExactly(addr buffer[0], length)
|
|
|
|
let ores = await withTimeout(blenfut, HttpBodyTimeout)
|
|
|
|
if not ores:
|
|
|
|
# Timeout
|
|
|
|
debug "Timeout expired while receiving request body",
|
|
|
|
address = transp.remoteAddress()
|
|
|
|
error = true
|
|
|
|
else:
|
|
|
|
blenfut.read()
|
2018-07-14 07:51:54 +00:00
|
|
|
else:
|
2018-12-10 11:56:10 +00:00
|
|
|
# `Content-Length` is not present in response header, so we are reading
|
|
|
|
# everything until connection will be closed.
|
|
|
|
var blenfut = transp.read()
|
|
|
|
let ores = await withTimeout(blenfut, HttpBodyTimeout)
|
|
|
|
if not ores:
|
|
|
|
# Timeout
|
|
|
|
debug "Timeout expired while receiving request body",
|
|
|
|
address = transp.remoteAddress()
|
|
|
|
error = true
|
|
|
|
else:
|
|
|
|
buffer = blenfut.read()
|
2018-07-14 07:51:54 +00:00
|
|
|
except TransportIncompleteError:
|
|
|
|
# remote peer disconnected
|
|
|
|
debug "Remote peer disconnected", address = transp.remoteAddress()
|
|
|
|
error = true
|
2019-12-02 11:14:00 +00:00
|
|
|
except TransportOsError as exc:
|
2018-07-14 07:51:54 +00:00
|
|
|
debug "Problems with networking", address = transp.remoteAddress(),
|
2019-12-02 11:14:00 +00:00
|
|
|
error = exc.msg
|
2018-07-14 07:51:54 +00:00
|
|
|
error = true
|
|
|
|
|
|
|
|
if error:
|
|
|
|
result = ""
|
|
|
|
else:
|
|
|
|
result = cast[string](buffer)
|
|
|
|
|
2018-11-12 04:43:51 +00:00
|
|
|
proc init(opts: var HttpClientOptions) =
|
2019-06-12 13:44:19 +00:00
|
|
|
opts.httpMethod = MethodPost
|
2018-11-12 04:43:51 +00:00
|
|
|
|
2018-07-14 07:51:54 +00:00
|
|
|
proc newRpcHttpClient*(): RpcHttpClient =
|
|
|
|
## Creates a new HTTP client instance.
|
|
|
|
new result
|
|
|
|
result.initRpcClient()
|
2018-11-12 04:43:51 +00:00
|
|
|
result.options.init()
|
|
|
|
|
|
|
|
proc httpMethod*(client: RpcHttpClient): HttpMethod =
|
|
|
|
client.options.httpMethod
|
|
|
|
|
|
|
|
proc httpMethod*(client: RpcHttpClient, m: HttpMethod) =
|
|
|
|
client.options.httpMethod = m
|
2018-07-14 07:51:54 +00:00
|
|
|
|
2019-06-12 13:44:19 +00:00
|
|
|
method call*(client: RpcHttpClient, name: string,
|
2019-07-19 09:12:25 +00:00
|
|
|
params: JsonNode): Future[Response] {.async, gcsafe.} =
|
2018-07-14 07:51:54 +00:00
|
|
|
## Remotely calls the specified RPC method.
|
|
|
|
let id = client.getNextId()
|
|
|
|
|
2019-06-12 13:44:19 +00:00
|
|
|
let transp = await connect(client.addresses[0])
|
|
|
|
var reqBody = $rpcCallNode(name, params, id)
|
|
|
|
echo "Sending (", client.httpMethod, "): ", reqBody
|
|
|
|
let res = await transp.sendRequest(reqBody, client.httpMethod)
|
2018-07-14 07:51:54 +00:00
|
|
|
if not res:
|
|
|
|
debug "Failed to send message to RPC server",
|
2019-06-12 13:44:19 +00:00
|
|
|
address = transp.remoteAddress(), msg_len = len(reqBody)
|
2019-06-17 16:56:19 +00:00
|
|
|
await transp.closeWait()
|
2018-07-14 07:51:54 +00:00
|
|
|
raise newException(ValueError, "Transport error")
|
|
|
|
else:
|
2019-06-12 13:44:19 +00:00
|
|
|
debug "Message sent to RPC server", address = transp.remoteAddress(),
|
|
|
|
msg_len = len(reqBody)
|
|
|
|
trace "Message", msg = reqBody
|
|
|
|
|
|
|
|
var value = await transp.recvData()
|
2019-06-17 16:56:19 +00:00
|
|
|
await transp.closeWait()
|
2019-06-12 13:44:19 +00:00
|
|
|
if value.len == 0:
|
|
|
|
raise newException(ValueError, "Empty response from server")
|
2018-07-14 07:51:54 +00:00
|
|
|
|
|
|
|
# completed by processMessage.
|
|
|
|
var newFut = newFuture[Response]()
|
|
|
|
# add to awaiting responses
|
|
|
|
client.awaiting[id] = newFut
|
2019-06-12 13:44:19 +00:00
|
|
|
client.processMessage(value)
|
2018-07-14 07:51:54 +00:00
|
|
|
result = await newFut
|
|
|
|
|
|
|
|
proc connect*(client: RpcHttpClient, address: string, port: Port) {.async.} =
|
|
|
|
client.addresses = resolveTAddress(address, port)
|
2019-07-31 09:04:53 +00:00
|
|
|
|
|
|
|
proc connect*(client: RpcHttpClient, url: string) {.async.} =
|
|
|
|
# TODO: The url (path, etc) should make it into the request
|
|
|
|
let pu = parseUri(url)
|
|
|
|
var port = Port(80)
|
|
|
|
if pu.port.len != 0:
|
|
|
|
port = parseInt(pu.port).Port
|
|
|
|
client.addresses = resolveTAddress(pu.hostname, port)
|