httpclient: add unzip dan chunked tranfer of HTTP 1.1

also fix httpserver not to start sending chunked response if
the content is less than certain limit (4KB)

fixes #85
This commit is contained in:
jangko 2021-09-20 11:52:34 +07:00
parent aabdbe8185
commit 276684bd6a
No known key found for this signature in database
GPG Key ID: 31702AE10541E6B9
3 changed files with 157 additions and 290 deletions

View File

@ -9,9 +9,9 @@
import
std/[uri],
faststreams/inputs,
stew/results, chronicles,
chronos, httputils, chronos/apps/http/httpcommon,
faststreams/inputs, zlib/gzip,
stew/results, chronicles, stew/byteutils,
chronos, httputils, chronos/apps/http/[httpcommon, httptable, httpclient],
chronos/streams/tlsstream,
./common/[ast, names, response, types, errors],
./builtin/json_respstream, ./common_parser, ./lexer
@ -22,236 +22,168 @@ type
ctJson = "application/json"
VarPair = object
name: string
name : string
value: Node
ParseResult* = Result[void, ErrorDesc]
ClientResp* = object
code*: int # HTTP code
status* : int # HTTP code
reason* : string
response*: string
ClientResult* = Result[ClientResp, string]
GraphqlHttpClientRef* = ref GraphqlHttpClientObj
GraphqlHttpClientObj* = object of RootObj
opName : string
varTable : seq[VarPair]
names : NameCache
meth : HttpMethod
address : TransportAddress
uri : Uri
serverName : string
version : HttpVersion
path : string
uriResolved : bool
secure : bool
contentType : ContentType
bodyTimeout : Duration
headersTimeout: Duration
remoteAddress : TransportAddress
GraphqlSHttpClientRef* = ref object of GraphqlHttpClientRef
tlsFlags*: set[TLSFlags]
minVersion*: TLSVersion
maxVersion*: TLSVersion
GraphqlHttpClientResult* = Result[GraphqlHttpClientRef, string]
ConnectError = object of ValueError
when (NimMajor, NimMinor) <= (1, 2):
type
UriParseError = ValueError
address : HttpAddress
session : HttpSessionRef
const
HttpBodyTimeout = 12.seconds
HttpHeadersTimeout = 120.seconds
GraphQLPath* = "/graphql"
proc createSession(secure: bool,
maxRedirections = HttpMaxRedirections,
connectTimeout = HttpConnectTimeout,
headersTimeout = HttpHeadersTimeout,
connectionBufferSize = DefaultStreamBufferSize
): HttpSessionRef =
let flags = if secure:
{HttpClientFlag.NoVerifyHost,
HttpClientFlag.NoVerifyServerName}
else:
{}
HttpSessionRef.new(flags,
maxRedirections,
connectTimeout,
headersTimeout,
connectionBufferSize
)
proc init*(ctx: GraphqlHttpClientRef,
address: TransportAddress,
path: string,
serverName: string,
meth: HttpMethod,
version: HttpVersion,
contentType: ContentType,
bodyTimeout: Duration,
headersTimeout: Duration) =
path: string = GraphQLPath,
secure: bool = false,
meth: HttpMethod = MethodPost,
version: HttpVersion = HttpVersion11,
contentType: ContentType = ctJson,
maxRedirections = HttpMaxRedirections,
connectTimeout = HttpConnectTimeout,
headersTimeout = HttpHeadersTimeout,
connectionBufferSize = DefaultStreamBufferSize
) =
ctx.names = newNameCache()
ctx.varTable = @[]
ctx.meth = meth
ctx.contentType = contentType
ctx.bodyTimeout = bodyTimeout
ctx.headersTimeout = headersTimeout
ctx.version = version
ctx.uriResolved = true
ctx.path = path
ctx.serverName = serverName
ctx.address = address
ctx.contentType = contentType
ctx.secure = secure
ctx.session = createSession(
secure,
maxRedirections,
connectTimeout,
headersTimeout,
connectionBufferSize
)
ctx.address =
if secure:
getAddress(address, HttpClientScheme.Secure, path)
else:
getAddress(address, HttpClientScheme.NonSecure, path)
proc init*(ctx: GraphqlHttpClientRef,
uri: Uri | string,
meth: HttpMethod,
version: HttpVersion,
contentType: ContentType,
bodyTimeout: Duration,
headersTimeout: Duration) =
secure: bool = false,
meth: HttpMethod = MethodPost,
version: HttpVersion = HttpVersion11,
contentType: ContentType = ctJson,
maxRedirections = HttpMaxRedirections,
connectTimeout = HttpConnectTimeout,
headersTimeout = HttpHeadersTimeout,
connectionBufferSize = DefaultStreamBufferSize
): HttpResult[void] =
ctx.names = newNameCache()
ctx.varTable = @[]
ctx.meth = meth
ctx.contentType = contentType
ctx.bodyTimeout = bodyTimeout
ctx.headersTimeout = headersTimeout
ctx.version = version
ctx.uriResolved = false
when uri is string:
ctx.uri = parseUri(uri)
else:
ctx.uri = uri
proc init*(ctx: GraphqlSHttpClientRef,
uri: Uri | string,
tlsFlags: set[TLSFlags],
tlsMinVersion: TLSVersion,
tlsMaxVersion: TLSVersion,
meth: HttpMethod,
version: HttpVersion,
contentType: ContentType,
bodyTimeout: Duration,
headersTimeout: Duration) =
ctx.init(uri,
meth,
version,
contentType,
bodyTimeout,
headersTimeout)
ctx.tlsFlags = tlsFlags
ctx.minVersion = tlsMinVersion
ctx.maxVersion = tlsMaxVersion
ctx.secure = true
proc init*(ctx: GraphqlSHttpClientRef,
address: TransportAddress,
path: string,
serverName: string,
tlsFlags: set[TLSFlags],
tlsMinVersion: TLSVersion,
tlsMaxVersion: TLSVersion,
meth: HttpMethod,
version: HttpVersion,
contentType: ContentType,
bodyTimeout: Duration,
headersTimeout: Duration) =
ctx.init(address,
path,
serverName,
meth,
version,
contentType,
bodyTimeout,
headersTimeout)
ctx.tlsFlags = tlsFlags
ctx.minVersion = tlsMinVersion
ctx.maxVersion = tlsMaxVersion
ctx.secure = true
ctx.contentType = contentType
ctx.secure = secure
ctx.session = createSession(
secure,
maxRedirections,
connectTimeout,
headersTimeout,
connectionBufferSize
)
let res = getAddress(ctx.session, uri)
if res.isErr:
return err(res.error)
ctx.address = res.get()
proc new*(_: type GraphqlHttpClientRef,
address: TransportAddress,
path: string = "/graphql",
serverName: string = "",
path: string = GraphQLPath,
secure: bool = false,
meth: HttpMethod = MethodPost,
version: HttpVersion = HttpVersion10,
version: HttpVersion = HttpVersion11,
contentType: ContentType = ctJson,
bodyTimeout: Duration = HttpBodyTimeout,
headersTimeout: Duration = HttpHeadersTimeout): GraphqlHttpClientResult =
maxRedirections = HttpMaxRedirections,
connectTimeout = HttpConnectTimeout,
headersTimeout = HttpHeadersTimeout,
connectionBufferSize = DefaultStreamBufferSize
): HttpResult[GraphqlHttpClientRef] =
let client = GraphqlHttpClientRef()
client.init(address,
path,
serverName,
secure,
meth,
version,
contentType,
bodyTimeout,
headersTimeout)
maxRedirections,
connectTimeout,
headersTimeout,
connectionBufferSize
)
ok(client)
proc new*(_: type GraphqlHttpClientRef,
uri: Uri | string,
secure: bool = false,
meth: HttpMethod = MethodPost,
version: HttpVersion = HttpVersion10,
version: HttpVersion = HttpVersion11,
contentType: ContentType = ctJson,
bodyTimeout: Duration = HttpBodyTimeout,
headersTimeout: Duration = HttpHeadersTimeout): GraphqlHttpClientResult =
maxRedirections = HttpMaxRedirections,
connectTimeout = HttpConnectTimeout,
headersTimeout = HttpHeadersTimeout,
connectionBufferSize = DefaultStreamBufferSize
): HttpResult[GraphqlHttpClientRef] =
let client = GraphqlHttpClientRef()
try:
let res =
client.init(uri,
secure,
meth,
version,
contentType,
bodyTimeout,
headersTimeout)
ok(client)
except UriParseError as exc:
err(exc.msg)
proc new*(_: type GraphqlSHttpClientRef,
address: TransportAddress,
path: string = "/graphql",
serverName: string = "",
tlsFlags: set[TLSFlags] = {},
tlsMinVersion = TLSVersion.TLS11,
tlsMaxVersion = TLSVersion.TLS12,
meth: HttpMethod = MethodPost,
version: HttpVersion = HttpVersion10,
contentType: ContentType = ctJson,
bodyTimeout: Duration = HttpBodyTimeout,
headersTimeout: Duration = HttpHeadersTimeout): GraphqlHttpClientResult =
let client = GraphqlSHttpClientRef()
client.init(address,
path,
serverName,
tlsFlags,
tlsMinVersion,
tlsMaxVersion,
meth,
version,
contentType,
bodyTimeout,
headersTimeout)
maxRedirections,
connectTimeout,
headersTimeout,
connectionBufferSize
)
if res.isErr:
return err(res.error)
ok(client)
proc new*(_: type GraphqlSHttpClientRef,
uri: Uri | string,
tlsFlags: set[TLSFlags] = {},
tlsMinVersion = TLSVersion.TLS11,
tlsMaxVersion = TLSVersion.TLS12,
meth: HttpMethod = MethodPost,
version: HttpVersion = HttpVersion10,
contentType: ContentType = ctJson,
bodyTimeout: Duration = HttpBodyTimeout,
headersTimeout: Duration = HttpHeadersTimeout): GraphqlHttpClientResult =
let client = GraphqlSHttpClientRef()
try:
client.init(uri,
tlsFlags,
tlsMinVersion,
tlsMaxVersion,
meth,
version,
contentType,
bodyTimeout,
headersTimeout)
ok(client)
except UriParseError as exc:
err(exc.msg)
proc addVar*(ctx: GraphqlHttpClientRef, name: string, val: int) =
ctx.varTable.add VarPair(name: name, value: resp(val))
@ -309,64 +241,7 @@ proc clearAll*(ctx: GraphqlHttpClientRef) =
ctx.varTable.setLen(0)
ctx.opName = ""
proc connect(ctx: GraphqlHttpClientRef): Future[AsyncStream] {.async.} =
let transp = await connect(ctx.address)
ctx.remoteAddress = transp.remoteAddress()
let stream = AsyncStream(
reader: newAsyncStreamReader(transp),
writer: newAsyncStreamWriter(transp))
if ctx.secure:
let c = GraphqlSHttpClientRef(ctx)
let tlsStream = newTLSClientAsyncStream(
stream.reader,
stream.writer,
serverName = c.serverName,
minVersion = c.minVersion,
maxVersion = c.maxVersion,
flags = c.tlsFlags)
return AsyncStream(
reader: tlsStream.reader,
writer: tlsStream.writer)
else:
return stream
proc resolveUri(ctx: GraphqlHttpClientRef): Future[AsyncStream] {.async.} =
let host = $ctx.uri
ctx.serverName = host.split(":")[0]
ctx.path = ctx.uri.path
let addrs = resolveTAddress(host)
for a in addrs:
ctx.address = a
try:
let conn = await ctx.connect()
ctx.uriResolved = true
return conn
except TransportError as exc:
raise newException(ConnectError, exc.msg)
return AsyncStream()
proc closeTransp(transp: StreamTransport) {.async.} =
if not transp.closed():
await transp.closeWait()
proc closeStream(stream: AsyncStreamRW) {.async.} =
if not stream.closed():
await stream.closeWait()
proc closeWait(stream: AsyncStream) {.async.} =
await allFutures(
stream.reader.closeStream(),
stream.writer.closeStream(),
stream.reader.tsource.closeTransp())
proc sendRequest*(ctx: GraphqlHttpClientRef, query: string): Future[ClientResult] {.async.} =
# TODO: implement GET method
proc sendRequest*(ctx: GraphqlHttpClientRef, query: string): Future[HttpResult[ClientResp]] {.async.} =
# TODO: implement content-type: graphql
var r = JsonRespStream.new()
@ -381,58 +256,42 @@ proc sendRequest*(ctx: GraphqlHttpClientRef, query: string): Future[ClientResult
r.field(n.name)
r.serialize(n.value)
let body = r.getString()
var request = $ctx.meth & " " & ctx.path & " HTTP/1.0\r\n"
request.add("Host: " & ctx.serverName & "\r\n")
request.add("Content-Length: " & $len(body) & "\r\n")
request.add("Content-Type: " & $ctx.contentType & "\r\n")
request.add("\r\n")
request.add(body)
let body = r.getBytes()
let headers = [
("Content-Type", "application/json")#,
#("Accept-Encoding", "gzip"),
]
try:
let stream = if not ctx.uriResolved:
let stream = await ctx.resolveUri()
if not ctx.uriResolved:
return err("Unable to connect to host on any address!")
stream
else:
await ctx.connect()
let request =
HttpClientRequestRef.new(
ctx.session, ctx.address,
ctx.meth, ctx.version, {},
headers, body
)
await stream.writer.write(request)
let resp = await request.send()
let dataBuf = await resp.getBodyBytes()
var headersBuf = newSeq[byte](4096)
let rlenFut = stream.reader.readUntil(addr headersBuf[0], len(headersBuf),
HeadersMark)
let hres = await withTimeout(rlenFut, ctx.headersTimeout)
if not hres:
debug "Timeout expired while receiving headers",
address = ctx.remoteAddress
return err("headers timeout")
let rLen = rlenFut.read()
headersBuf.setLen(rLen)
let resp = parseResponse(headersBuf, true)
if not resp.success():
return err("parse header error")
let length = resp.contentLength()
let cresp =
if length > 0:
var dataBuf = newString(length)
let dataFut = stream.reader.readExactly(addr dataBuf[0], len(dataBuf))
let dres = await withTimeout(dataFut, ctx.bodyTimeout)
if not dres:
debug "Timeout expired while receiving response body",
address = ctx.remoteAddress
return err("response body timeout")
dataFut.read()
ok(ClientResp(code: resp.code, response: dataBuf))
if dataBuf.len > 0:
if ContentEncodingFlags.Gzip in resp.contentEncoding:
let gres = string.ungzip(dataBuf)
if gres.isErr:
return err(gres.error)
ok(ClientResp(status: resp.status, reason: resp.reason, response: gres.get()))
else:
let dataString = string.fromBytes(dataBuf)
ok(ClientResp(status: resp.status, reason: resp.reason, response: dataString))
else:
ok(ClientResp(code: resp.code, response: ""))
await stream.closeWait()
ok(ClientResp(status: resp.status, reason: resp.reason, response: ""))
await resp.closeWait()
await request.closeWait()
return cresp
except ConnectError as exc:
return err("Error connecting to address $1: $2" % [$ctx.address, exc.msg])
except TransportError as exc:
except HttpError as exc:
return err(exc.msg)
proc closeWait*(ctx: GraphqlHttpClientRef) {.async.} =
if ctx.session.isNil.not:
await ctx.session.closeWait()

View File

@ -74,6 +74,16 @@ proc sendResponse(res: string, status: HttpCode,
acceptEncoding: set[ContentEncodingFlags],
request: HttpRequestRef): Future[HttpResponseRef] {.gcsafe, async.} =
const chunkSize = 1024 * 4
if res.len <= chunkSize:
# don't split it into chunks if it's a small content
var header = HttpTable.init([("Content-Type", "application/json")])
if ContentEncodingFlags.Gzip in acceptEncoding:
header.add("Content-Encoding", "gzip")
return await request.respond(status, res, header)
# chunked transfer
let response = request.getResponse()
response.status = status
response.addHeader("Content-Type", "application/json")
@ -82,7 +92,6 @@ proc sendResponse(res: string, status: HttpCode,
await response.prepare()
const chunkSize = 1024 * 4
let maxLen = res.len
var len = res.len
while len > chunkSize:
@ -225,7 +234,7 @@ proc new*(t: typedesc[GraphqlHttpServerRef],
proc processCallback(rf: RequestFence): Future[HttpResponseRef] =
routingRequest(server, rf)
let sres = SecureHttpServerRef.new(address, processCallback,
let sres = SecureHttpServerRef.new(address, processCallback,
tlsPrivateKey, tlsCertificate, serverFlags,
socketFlags, serverUri, serverIdent, secureFlags,
maxConnections, bufferSize, backlogSize,
@ -236,7 +245,7 @@ proc new*(t: typedesc[GraphqlHttpServerRef],
ok(server)
else:
err("Could not create HTTP server instance: " & sres.error())
proc state*(rs: GraphqlHttpServerRef): GraphqlHttpServerState {.raises: [Defect].} =
## Returns current GraphQL server's state.
case rs.server.state

View File

@ -76,11 +76,10 @@ proc createServer(serverAddress: TransportAddress): GraphqlHttpServerRef =
res.get()
proc setupClient(address: TransportAddress): GraphqlHttpClientRef =
when defined(tls):
let flags = {TLSFlags.NoVerifyHost, TLSFlags.NoVerifyServerName}
GraphqlSHttpClientRef.new(address = address, tlsFlags = flags).get()
else:
GraphqlHttpClientRef.new(address).get()
const
secure = defined(tls)
GraphqlHttpClientRef.new(address, secure = secure).get()
proc runExecutor(client: GraphqlHttpClientRef, unit: Unit, testStatusIMPL: var TestStatus) =
client.operationName(unit.opName)
@ -91,7 +90,7 @@ proc runExecutor(client: GraphqlHttpClientRef, unit: Unit, testStatusIMPL: var T
return
let clientResp = res.get()
check (clientResp.code == 200) == (unit.error.len == 0)
check (clientResp.status == 200) == (unit.error.len == 0)
let resp = decodeResponse(clientResp.response)
if not resp.errors.isNil: