From f748387462b2472a941e916285f04fc14fd1180a Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Fri, 19 May 2023 17:25:22 +0300 Subject: [PATCH] Add HTTP client helper to read `text/event-stream` streaming response. (#375) --- chronos/apps/http/httpclient.nim | 105 ++++++++++++++++- chronos/apps/http/httpcommon.nim | 1 + tests/testhttpclient.nim | 186 +++++++++++++++++++++++++++++++ 3 files changed, 290 insertions(+), 2 deletions(-) diff --git a/chronos/apps/http/httpclient.nim b/chronos/apps/http/httpclient.nim index 30a05e2..1462c53 100644 --- a/chronos/apps/http/httpclient.nim +++ b/chronos/apps/http/httpclient.nim @@ -6,8 +6,8 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import std/[uri, tables, strutils, sequtils] -import stew/[results, base10, base64], httputils +import std/[uri, tables, sequtils] +import stew/[results, base10, base64, byteutils], httputils import ../../asyncloop, ../../asyncsync import ../../streams/[asyncstream, tlsstream, chunkstream, boundstream] import httptable, httpcommon, httpagent, httpbodyrw, multipart @@ -194,6 +194,10 @@ type opened*: int64 closed*: int64 + ServerSentEvent* = object + name*: string + data*: string + # HttpClientRequestRef valid states are: # Ready -> Open -> (Finished, Error) -> (Closing, Closed) # @@ -1511,3 +1515,100 @@ proc fetch*(session: HttpSessionRef, url: Uri): Future[HttpResponseTuple] {. if not(isNil(request)): await closeWait(request) if not(isNil(redirect)): await closeWait(redirect) raise exc + +proc getServerSentEvents*( + response: HttpClientResponseRef, + maxEventSize: int = -1 + ): Future[seq[ServerSentEvent]] {.async.} = + ## Read number of server-sent events (SSE) from HTTP response ``response``. + ## + ## ``maxEventSize`` - maximum size of events chunk in one message, use + ## `-1` or `0` to set size to unlimited. + ## + ## Server-sent events parsing is done according to: + ## https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream + ## + ## Note: Server-sent event comments are ignored and silently skipped. + const + CR = byte(0x0D) + LF = byte(0x0A) + COLON = byte(':') + SPACE = byte(' ') + + let reader = response.getBodyReader() + + var + error: ref HttpReadError = nil + res: seq[ServerSentEvent] + buffer: seq[byte] + + proc consumeBuffer() = + if len(buffer) == 0: return + + let pos = buffer.find(COLON) + if pos == 0: + # comment line + discard + elif pos > 0: + # field_name: field_value + let + name = string.fromBytes(buffer.toOpenArray(0, pos - 1)) + value = + if (pos + 1) < len(buffer): + let spos = if buffer[pos + 1] == SPACE: pos + 2 else: pos + 1 + string.fromBytes(buffer.toOpenArray(spos, len(buffer) - 1)) + else: + "" + res.add(ServerSentEvent(name: name, data: value)) + else: + # field_name only + let name = string.fromBytes(buffer.toOpenArray(0, len(buffer) - 1)) + res.add(ServerSentEvent(name: name, data: "")) + + # Reset internal buffer to zero length. + buffer.setLen(0) + + proc discardBuffer() = + if len(buffer) == 0: return + # Reset internal buffer to 1 byte length to keep comment sign. + buffer.setLen(1) + + proc predicate(data: openArray[byte]): tuple[consumed: int, done: bool] = + var i = 0 + while i < len(data): + if data[i] in {CR, LF}: + # CR or LF encountered + inc(i) + if (data[i - 1] == CR) and ((i < len(data)) and data[i] == LF): + # We trying to check for CRLF + inc(i) + + if len(buffer) == 0: + if len(res) == 0: + res.add(ServerSentEvent(name: "", data: "")) + return (i, true) + consumeBuffer() + else: + buffer.add(data[i]) + if (maxEventSize >= 0) and (len(buffer) > maxEventSize): + if buffer[0] != COLON: + # We only check limits for events and ignore comments size. + error = newException(HttpReadLimitError, + "Size of event exceeded maximum size") + return (0, true) + discardBuffer() + + inc(i) + + if len(data) == 0: + # Stream is at EOF + consumeBuffer() + return (0, true) + + (i, false) + + await reader.readMessage(predicate) + if not isNil(error): + raise error + else: + return res diff --git a/chronos/apps/http/httpcommon.nim b/chronos/apps/http/httpcommon.nim index 860a7a7..515920e 100644 --- a/chronos/apps/http/httpcommon.nim +++ b/chronos/apps/http/httpcommon.nim @@ -54,6 +54,7 @@ type HttpRedirectError* = object of HttpError HttpAddressError* = object of HttpError HttpUseClosedError* = object of HttpError + HttpReadLimitError* = object of HttpReadError KeyValueTuple* = tuple key: string diff --git a/tests/testhttpclient.nim b/tests/testhttpclient.nim index ccd8eef..07128d9 100644 --- a/tests/testhttpclient.nim +++ b/tests/testhttpclient.nim @@ -1001,6 +1001,188 @@ suite "HTTP client testing suite": return true + proc testServerSentEvents(address: TransportAddress, + secure: bool): Future[bool] {.async.} = + const + SingleGoodTests = [ + ("/test/single/1", "a:b\r\nc: d\re:f\n:comment\r\ng:\n h: j \n\n", + @[("a", "b"), ("c", "d"), ("e", "f"), ("g", ""), (" h", "j ")]), + ("/test/single/2", ":comment\r:\nfield1\r\nfield2:\n\n", + @[("field1", ""), ("field2", "")]), + ("/test/single/3", ":c1\r:c2\nfield1:value1", @[("field1", "value1")]), + ("/test/single/4", ":c1\r:c2\nfield1:", @[("field1", "")]), + ("/test/single/5", ":c1\r:c2\nfield1", @[("field1", "")]), + ("/test/single/6", "a", @[("a", "")]), + ("/test/single/7", "b:", @[("b", "")]), + ("/test/single/8", "c:d", @[("c", "d")]), + ("/test/single/9", ":", @[]), + ("/test/single/10", "", @[]), + ("/test/single/11", ":c1\n", @[]), + ("/test/single/12", ":c1\n:c2\n", @[]), + ("/test/single/13", ":c1\n:c2\n:c3\n", @[]), + ("/test/single/14", ":c1\n:c2\n:c3\n:c4", @[]), + ("/test/single/15", "\r\r", @[("", "")]), + ("/test/single/15", "\n\n", @[("", "")]), + ("/test/single/17", "\r\n\r\n", @[("", "")]), + ("/test/single/18", "\r\n", @[("", "")]), + ("/test/single/19", "\r", @[("", "")]), + ("/test/single/20", "\n", @[("", "")]) + ] + MultipleGoodTests = [ + ("/test/multiple/1", "a:b\nc:d\n\ne:f\rg:h\r\ri:j\r\nk:l\r\n\r\n", 3, + @[@[("a", "b"), ("c", "d")], @[("e", "f"), ("g", "h")], + @[("i", "j"), ("k", "l")]]), + ("/test/multiple/2", "a:b\nc:d\n\ne:f\rg:h\r\ri:j\r\nk:l\r\n\r\n\r\n", + 4, @[@[("a", "b"), ("c", "d")], @[("e", "f"), ("g", "h")], + @[("i", "j"), ("k", "l")], @[("", "")]]), + ] + OverflowTests = [ + ("/test/overflow/1", ":verylongcomment", 1, false), + ("/test/overflow/2", ":verylongcomment\n:anotherone", 1, false), + ("/test/overflow/3", "aa\n", 1, true), + ("/test/overflow/4", "a:b\n", 2, true) + ] + + proc `==`(a: ServerSentEvent, b: tuple[name: string, value: string]): bool = + a.name == b.name and a.data == b.value + + proc `==`(a: seq[ServerSentEvent], + b: seq[tuple[name: string, value: string]]): bool = + if len(a) != len(b): + return false + for index, value in a.pairs(): + if value != b[index]: + return false + true + + proc `==`(a: seq[seq[ServerSentEvent]], + b: seq[seq[tuple[name: string, value: string]]]): bool = + if len(a) != len(b): + return false + for index, value in a.pairs(): + if value != b[index]: + return false + true + + proc process(r: RequestFence): Future[HttpResponseRef] {.async.} = + if r.isOk(): + let request = r.get() + if request.uri.path.startsWith("/test/single/"): + let index = + block: + var res = -1 + for index, value in SingleGoodTests.pairs(): + if value[0] == request.uri.path: + res = index + break + res + if index < 0: + return await request.respond(Http404, "Page not found") + var response = request.getResponse() + response.status = Http200 + await response.sendBody(SingleGoodTests[index][1]) + return response + elif request.uri.path.startsWith("/test/multiple/"): + let index = + block: + var res = -1 + for index, value in MultipleGoodTests.pairs(): + if value[0] == request.uri.path: + res = index + break + res + if index < 0: + return await request.respond(Http404, "Page not found") + var response = request.getResponse() + response.status = Http200 + await response.sendBody(MultipleGoodTests[index][1]) + return response + elif request.uri.path.startsWith("/test/overflow/"): + let index = + block: + var res = -1 + for index, value in OverflowTests.pairs(): + if value[0] == request.uri.path: + res = index + break + res + if index < 0: + return await request.respond(Http404, "Page not found") + var response = request.getResponse() + response.status = Http200 + await response.sendBody(OverflowTests[index][1]) + return response + else: + return await request.respond(Http404, "Page not found") + else: + return dumbResponse() + + var server = createServer(address, process, secure) + server.start() + + var session = createSession(secure) + + try: + for item in SingleGoodTests: + let ha = + if secure: + getAddress(address, HttpClientScheme.Secure, item[0]) + else: + getAddress(address, HttpClientScheme.NonSecure, item[0]) + let + req = HttpClientRequestRef.new(session, ha, HttpMethod.MethodGet) + response = await req.send() + events = await response.getServerSentEvents() + check events == item[2] + await response.closeWait() + await req.closeWait() + + for item in MultipleGoodTests: + let ha = + if secure: + getAddress(address, HttpClientScheme.Secure, item[0]) + else: + getAddress(address, HttpClientScheme.NonSecure, item[0]) + var req = HttpClientRequestRef.new(session, ha, HttpMethod.MethodGet) + var response = await send(req) + let events = + block: + var res: seq[seq[ServerSentEvent]] + for i in 0 ..< item[2]: + let ires = await response.getServerSentEvents() + res.add(ires) + res + check events == item[3] + await closeWait(response) + await closeWait(req) + + for item in OverflowTests: + let ha = + if secure: + getAddress(address, HttpClientScheme.Secure, item[0]) + else: + getAddress(address, HttpClientScheme.NonSecure, item[0]) + var req = HttpClientRequestRef.new(session, ha, HttpMethod.MethodGet) + var response = await send(req) + let error = + try: + let events {.used.} = await response.getServerSentEvents(item[2]) + false + except HttpReadLimitError: + true + except CatchableError: + false + check error == item[3] + await closeWait(response) + await closeWait(req) + + finally: + await closeWait(session) + await server.stop() + await server.closeWait() + + return true + test "HTTP all request methods test": let address = initTAddress("127.0.0.1:30080") check waitFor(testMethods(address, false)) == 18 @@ -1079,6 +1261,10 @@ suite "HTTP client testing suite": test "HTTP client no-pipeline test": let address = initTAddress("127.0.0.1:30080") check waitFor(testNoPipeline(address)) == true + + test "HTTP client server-sent events test": + let address = initTAddress("127.0.0.1:30080") + check waitFor(testServerSentEvents(address, false)) == true test "Leaks test": proc getTrackerLeaks(tracker: string): bool =