Add HTTP client helper to read `text/event-stream` streaming response. (#375)

This commit is contained in:
Eugene Kabanov 2023-05-19 17:25:22 +03:00 committed by GitHub
parent 38cc233700
commit f748387462
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 290 additions and 2 deletions

View File

@ -6,8 +6,8 @@
# Licensed under either of # Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2) # Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT) # MIT license (LICENSE-MIT)
import std/[uri, tables, strutils, sequtils] import std/[uri, tables, sequtils]
import stew/[results, base10, base64], httputils import stew/[results, base10, base64, byteutils], httputils
import ../../asyncloop, ../../asyncsync import ../../asyncloop, ../../asyncsync
import ../../streams/[asyncstream, tlsstream, chunkstream, boundstream] import ../../streams/[asyncstream, tlsstream, chunkstream, boundstream]
import httptable, httpcommon, httpagent, httpbodyrw, multipart import httptable, httpcommon, httpagent, httpbodyrw, multipart
@ -194,6 +194,10 @@ type
opened*: int64 opened*: int64
closed*: int64 closed*: int64
ServerSentEvent* = object
name*: string
data*: string
# HttpClientRequestRef valid states are: # HttpClientRequestRef valid states are:
# Ready -> Open -> (Finished, Error) -> (Closing, Closed) # Ready -> Open -> (Finished, Error) -> (Closing, Closed)
# #
@ -1511,3 +1515,100 @@ proc fetch*(session: HttpSessionRef, url: Uri): Future[HttpResponseTuple] {.
if not(isNil(request)): await closeWait(request) if not(isNil(request)): await closeWait(request)
if not(isNil(redirect)): await closeWait(redirect) if not(isNil(redirect)): await closeWait(redirect)
raise exc raise exc
proc getServerSentEvents*(
response: HttpClientResponseRef,
maxEventSize: int = -1
): Future[seq[ServerSentEvent]] {.async.} =
## Read number of server-sent events (SSE) from HTTP response ``response``.
##
## ``maxEventSize`` - maximum size of events chunk in one message, use
## `-1` or `0` to set size to unlimited.
##
## Server-sent events parsing is done according to:
## https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream
##
## Note: Server-sent event comments are ignored and silently skipped.
const
CR = byte(0x0D)
LF = byte(0x0A)
COLON = byte(':')
SPACE = byte(' ')
let reader = response.getBodyReader()
var
error: ref HttpReadError = nil
res: seq[ServerSentEvent]
buffer: seq[byte]
proc consumeBuffer() =
if len(buffer) == 0: return
let pos = buffer.find(COLON)
if pos == 0:
# comment line
discard
elif pos > 0:
# field_name: field_value
let
name = string.fromBytes(buffer.toOpenArray(0, pos - 1))
value =
if (pos + 1) < len(buffer):
let spos = if buffer[pos + 1] == SPACE: pos + 2 else: pos + 1
string.fromBytes(buffer.toOpenArray(spos, len(buffer) - 1))
else:
""
res.add(ServerSentEvent(name: name, data: value))
else:
# field_name only
let name = string.fromBytes(buffer.toOpenArray(0, len(buffer) - 1))
res.add(ServerSentEvent(name: name, data: ""))
# Reset internal buffer to zero length.
buffer.setLen(0)
proc discardBuffer() =
if len(buffer) == 0: return
# Reset internal buffer to 1 byte length to keep comment sign.
buffer.setLen(1)
proc predicate(data: openArray[byte]): tuple[consumed: int, done: bool] =
var i = 0
while i < len(data):
if data[i] in {CR, LF}:
# CR or LF encountered
inc(i)
if (data[i - 1] == CR) and ((i < len(data)) and data[i] == LF):
# We trying to check for CRLF
inc(i)
if len(buffer) == 0:
if len(res) == 0:
res.add(ServerSentEvent(name: "", data: ""))
return (i, true)
consumeBuffer()
else:
buffer.add(data[i])
if (maxEventSize >= 0) and (len(buffer) > maxEventSize):
if buffer[0] != COLON:
# We only check limits for events and ignore comments size.
error = newException(HttpReadLimitError,
"Size of event exceeded maximum size")
return (0, true)
discardBuffer()
inc(i)
if len(data) == 0:
# Stream is at EOF
consumeBuffer()
return (0, true)
(i, false)
await reader.readMessage(predicate)
if not isNil(error):
raise error
else:
return res

View File

@ -54,6 +54,7 @@ type
HttpRedirectError* = object of HttpError HttpRedirectError* = object of HttpError
HttpAddressError* = object of HttpError HttpAddressError* = object of HttpError
HttpUseClosedError* = object of HttpError HttpUseClosedError* = object of HttpError
HttpReadLimitError* = object of HttpReadError
KeyValueTuple* = tuple KeyValueTuple* = tuple
key: string key: string

View File

@ -1001,6 +1001,188 @@ suite "HTTP client testing suite":
return true return true
proc testServerSentEvents(address: TransportAddress,
secure: bool): Future[bool] {.async.} =
const
SingleGoodTests = [
("/test/single/1", "a:b\r\nc: d\re:f\n:comment\r\ng:\n h: j \n\n",
@[("a", "b"), ("c", "d"), ("e", "f"), ("g", ""), (" h", "j ")]),
("/test/single/2", ":comment\r:\nfield1\r\nfield2:\n\n",
@[("field1", ""), ("field2", "")]),
("/test/single/3", ":c1\r:c2\nfield1:value1", @[("field1", "value1")]),
("/test/single/4", ":c1\r:c2\nfield1:", @[("field1", "")]),
("/test/single/5", ":c1\r:c2\nfield1", @[("field1", "")]),
("/test/single/6", "a", @[("a", "")]),
("/test/single/7", "b:", @[("b", "")]),
("/test/single/8", "c:d", @[("c", "d")]),
("/test/single/9", ":", @[]),
("/test/single/10", "", @[]),
("/test/single/11", ":c1\n", @[]),
("/test/single/12", ":c1\n:c2\n", @[]),
("/test/single/13", ":c1\n:c2\n:c3\n", @[]),
("/test/single/14", ":c1\n:c2\n:c3\n:c4", @[]),
("/test/single/15", "\r\r", @[("", "")]),
("/test/single/15", "\n\n", @[("", "")]),
("/test/single/17", "\r\n\r\n", @[("", "")]),
("/test/single/18", "\r\n", @[("", "")]),
("/test/single/19", "\r", @[("", "")]),
("/test/single/20", "\n", @[("", "")])
]
MultipleGoodTests = [
("/test/multiple/1", "a:b\nc:d\n\ne:f\rg:h\r\ri:j\r\nk:l\r\n\r\n", 3,
@[@[("a", "b"), ("c", "d")], @[("e", "f"), ("g", "h")],
@[("i", "j"), ("k", "l")]]),
("/test/multiple/2", "a:b\nc:d\n\ne:f\rg:h\r\ri:j\r\nk:l\r\n\r\n\r\n",
4, @[@[("a", "b"), ("c", "d")], @[("e", "f"), ("g", "h")],
@[("i", "j"), ("k", "l")], @[("", "")]]),
]
OverflowTests = [
("/test/overflow/1", ":verylongcomment", 1, false),
("/test/overflow/2", ":verylongcomment\n:anotherone", 1, false),
("/test/overflow/3", "aa\n", 1, true),
("/test/overflow/4", "a:b\n", 2, true)
]
proc `==`(a: ServerSentEvent, b: tuple[name: string, value: string]): bool =
a.name == b.name and a.data == b.value
proc `==`(a: seq[ServerSentEvent],
b: seq[tuple[name: string, value: string]]): bool =
if len(a) != len(b):
return false
for index, value in a.pairs():
if value != b[index]:
return false
true
proc `==`(a: seq[seq[ServerSentEvent]],
b: seq[seq[tuple[name: string, value: string]]]): bool =
if len(a) != len(b):
return false
for index, value in a.pairs():
if value != b[index]:
return false
true
proc process(r: RequestFence): Future[HttpResponseRef] {.async.} =
if r.isOk():
let request = r.get()
if request.uri.path.startsWith("/test/single/"):
let index =
block:
var res = -1
for index, value in SingleGoodTests.pairs():
if value[0] == request.uri.path:
res = index
break
res
if index < 0:
return await request.respond(Http404, "Page not found")
var response = request.getResponse()
response.status = Http200
await response.sendBody(SingleGoodTests[index][1])
return response
elif request.uri.path.startsWith("/test/multiple/"):
let index =
block:
var res = -1
for index, value in MultipleGoodTests.pairs():
if value[0] == request.uri.path:
res = index
break
res
if index < 0:
return await request.respond(Http404, "Page not found")
var response = request.getResponse()
response.status = Http200
await response.sendBody(MultipleGoodTests[index][1])
return response
elif request.uri.path.startsWith("/test/overflow/"):
let index =
block:
var res = -1
for index, value in OverflowTests.pairs():
if value[0] == request.uri.path:
res = index
break
res
if index < 0:
return await request.respond(Http404, "Page not found")
var response = request.getResponse()
response.status = Http200
await response.sendBody(OverflowTests[index][1])
return response
else:
return await request.respond(Http404, "Page not found")
else:
return dumbResponse()
var server = createServer(address, process, secure)
server.start()
var session = createSession(secure)
try:
for item in SingleGoodTests:
let ha =
if secure:
getAddress(address, HttpClientScheme.Secure, item[0])
else:
getAddress(address, HttpClientScheme.NonSecure, item[0])
let
req = HttpClientRequestRef.new(session, ha, HttpMethod.MethodGet)
response = await req.send()
events = await response.getServerSentEvents()
check events == item[2]
await response.closeWait()
await req.closeWait()
for item in MultipleGoodTests:
let ha =
if secure:
getAddress(address, HttpClientScheme.Secure, item[0])
else:
getAddress(address, HttpClientScheme.NonSecure, item[0])
var req = HttpClientRequestRef.new(session, ha, HttpMethod.MethodGet)
var response = await send(req)
let events =
block:
var res: seq[seq[ServerSentEvent]]
for i in 0 ..< item[2]:
let ires = await response.getServerSentEvents()
res.add(ires)
res
check events == item[3]
await closeWait(response)
await closeWait(req)
for item in OverflowTests:
let ha =
if secure:
getAddress(address, HttpClientScheme.Secure, item[0])
else:
getAddress(address, HttpClientScheme.NonSecure, item[0])
var req = HttpClientRequestRef.new(session, ha, HttpMethod.MethodGet)
var response = await send(req)
let error =
try:
let events {.used.} = await response.getServerSentEvents(item[2])
false
except HttpReadLimitError:
true
except CatchableError:
false
check error == item[3]
await closeWait(response)
await closeWait(req)
finally:
await closeWait(session)
await server.stop()
await server.closeWait()
return true
test "HTTP all request methods test": test "HTTP all request methods test":
let address = initTAddress("127.0.0.1:30080") let address = initTAddress("127.0.0.1:30080")
check waitFor(testMethods(address, false)) == 18 check waitFor(testMethods(address, false)) == 18
@ -1080,6 +1262,10 @@ suite "HTTP client testing suite":
let address = initTAddress("127.0.0.1:30080") let address = initTAddress("127.0.0.1:30080")
check waitFor(testNoPipeline(address)) == true check waitFor(testNoPipeline(address)) == true
test "HTTP client server-sent events test":
let address = initTAddress("127.0.0.1:30080")
check waitFor(testServerSentEvents(address, false)) == true
test "Leaks test": test "Leaks test":
proc getTrackerLeaks(tracker: string): bool = proc getTrackerLeaks(tracker: string): bool =
let tracker = getTracker(tracker) let tracker = getTracker(tracker)