Server-side events implementation. (#222)

Fix keep-alive issue.
Refactor some prepareXXX routines.
This commit is contained in:
Eugene Kabanov 2021-09-14 20:32:58 +03:00 committed by GitHub
parent 5034f0a5a6
commit bbbcb55493
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 179 additions and 50 deletions

View File

@ -45,7 +45,10 @@ type
BoundBody, UnboundBody, MultipartForm, UrlencodedForm, ClientExpect
HttpResponseFlags* {.pure.} = enum
KeepAlive, Chunked
KeepAlive, Stream
HttpResponseStreamType* {.pure.} = enum
Plain, SSE, Chunked
HttpResponseState* {.pure.} = enum
Empty, Prepared, Sending, Finished, Failed, Cancelled, Dumb
@ -108,7 +111,8 @@ type
flags: set[HttpResponseFlags]
state*: HttpResponseState
connection*: HttpConnectionRef
chunkedWriter: AsyncStreamWriter
streamType*: HttpResponseStreamType
writer: AsyncStreamWriter
HttpResponseRef* = ref HttpResponse
@ -222,7 +226,10 @@ proc getResponse*(req: HttpRequestRef): HttpResponseRef {.raises: [Defect].} =
version: req.version,
headersTable: HttpTable.init(),
connection: req.connection,
flags: if req.version == HttpVersion11: {KeepAlive} else: {}
flags: if req.version == HttpVersion11:
{HttpResponseFlags.KeepAlive}
else:
{}
)
req.response = some(resp)
resp
@ -625,14 +632,13 @@ proc closeWait*(conn: HttpConnectionRef) {.async.} =
await allFutures(pending)
conn.state = HttpState.Closed
proc closeWait(req: HttpRequestRef) {.async.} =
proc closeWait*(req: HttpRequestRef) {.async.} =
if req.state == HttpState.Alive:
if req.response.isSome():
req.state = HttpState.Closing
let resp = req.response.get()
if (HttpResponseFlags.Chunked in resp.flags) and
not(isNil(resp.chunkedWriter)):
await resp.chunkedWriter.closeWait()
if (HttpResponseFlags.Stream in resp.flags) and not(isNil(resp.writer)):
await resp.writer.closeWait()
req.state = HttpState.Closed
proc createConnection(server: HttpServerRef,
@ -640,6 +646,16 @@ proc createConnection(server: HttpServerRef,
async.} =
return HttpConnectionRef.new(server, transp)
proc `keepalive=`*(resp: HttpResponseRef, value: bool) =
doAssert(resp.state == HttpResponseState.Empty)
if value:
resp.flags.incl(HttpResponseFlags.KeepAlive)
else:
resp.flags.excl(HttpResponseFlags.KeepAlive)
proc keepalive*(resp: HttpResponseRef): bool {.raises: [Defect].} =
HttpResponseFlags.KeepAlive in resp.flags
proc processLoop(server: HttpServerRef, transp: StreamTransport) {.async.} =
var
conn: HttpConnectionRef
@ -767,7 +783,7 @@ proc processLoop(server: HttpServerRef, transp: StreamTransport) {.async.} =
keepConn)
else:
# some data was already sent to the client.
discard
keepConn = resp.keepalive()
except CancelledError:
keepConn = false
else:
@ -987,16 +1003,6 @@ proc post*(req: HttpRequestRef): Future[HttpTable] {.async.} =
elif HttpRequestFlags.UnboundBody in req.requestFlags:
raiseHttpCriticalError("Unsupported request body")
proc `keepalive=`*(resp: HttpResponseRef, value: bool) =
doAssert(resp.state == HttpResponseState.Empty)
if value:
resp.flags.incl(KeepAlive)
else:
resp.flags.excl(KeepAlive)
proc keepalive*(resp: HttpResponseRef): bool {.raises: [Defect].} =
KeepAlive in resp.flags
proc setHeader*(resp: HttpResponseRef, key, value: string) {.
raises: [Defect].} =
## Sets value of header ``key`` to ``value``.
@ -1029,6 +1035,17 @@ template checkPending(t: untyped) =
if t.state != HttpResponseState.Empty:
raiseHttpCriticalError("Response body was already sent")
func createHeaders(resp: HttpResponseRef): string =
var answer = $(resp.version) & " " & $(resp.status) & "\r\n"
for k, v in resp.headersTable.stringItems():
if len(v) > 0:
answer.add(normalizeHeaderName(k))
answer.add(": ")
answer.add(v)
answer.add("\r\n")
answer.add("\r\n")
answer
proc prepareLengthHeaders(resp: HttpResponseRef, length: int): string {.
raises: [Defect].}=
if not(resp.hasHeader(DateHeader)):
@ -1040,19 +1057,11 @@ proc prepareLengthHeaders(resp: HttpResponseRef, length: int): string {.
if not(resp.hasHeader(ServerHeader)):
resp.setHeader(ServerHeader, resp.connection.server.serverIdent)
if not(resp.hasHeader(ConnectionHeader)):
if KeepAlive in resp.flags:
if HttpResponseFlags.KeepAlive in resp.flags:
resp.setHeader(ConnectionHeader, "keep-alive")
else:
resp.setHeader(ConnectionHeader, "close")
var answer = $(resp.version) & " " & $(resp.status) & "\r\n"
for k, v in resp.headersTable.stringItems():
if len(v) > 0:
answer.add(normalizeHeaderName(k))
answer.add(": ")
answer.add(v)
answer.add("\r\n")
answer.add("\r\n")
answer
resp.createHeaders()
proc prepareChunkedHeaders(resp: HttpResponseRef): string {.
raises: [Defect].} =
@ -1065,19 +1074,35 @@ proc prepareChunkedHeaders(resp: HttpResponseRef): string {.
if not(resp.hasHeader(ServerHeader)):
resp.setHeader(ServerHeader, resp.connection.server.serverIdent)
if not(resp.hasHeader(ConnectionHeader)):
if KeepAlive in resp.flags:
if HttpResponseFlags.KeepAlive in resp.flags:
resp.setHeader(ConnectionHeader, "keep-alive")
else:
resp.setHeader(ConnectionHeader, "close")
var answer = $(resp.version) & " " & $(resp.status) & "\r\n"
for k, v in resp.headersTable.stringItems():
if len(v) > 0:
answer.add(normalizeHeaderName(k))
answer.add(": ")
answer.add(v)
answer.add("\r\n")
answer.add("\r\n")
answer
resp.createHeaders()
proc prepareServerSideEventHeaders(resp: HttpResponseRef): string {.
raises: [Defect].} =
if not(resp.hasHeader(DateHeader)):
resp.setHeader(DateHeader, httpDate())
if not(resp.hasHeader(ContentTypeHeader)):
resp.setHeader(ContentTypeHeader, "text/event-stream")
if not(resp.hasHeader(ServerHeader)):
resp.setHeader(ServerHeader, resp.connection.server.serverIdent)
if not(resp.hasHeader(ConnectionHeader)):
resp.flags.excl(HttpResponseFlags.KeepAlive)
resp.setHeader(ConnectionHeader, "close")
resp.createHeaders()
proc preparePlainHeaders(resp: HttpResponseRef): string {.
raises: [Defect].} =
if not(resp.hasHeader(DateHeader)):
resp.setHeader(DateHeader, httpDate())
if not(resp.hasHeader(ServerHeader)):
resp.setHeader(ServerHeader, resp.connection.server.serverIdent)
if not(resp.hasHeader(ConnectionHeader)):
resp.flags.excl(HttpResponseFlags.KeepAlive)
resp.setHeader(ConnectionHeader, "close")
resp.createHeaders()
proc sendBody*(resp: HttpResponseRef, pbytes: pointer, nbytes: int) {.async.} =
## Send HTTP response at once by using bytes pointer ``pbytes`` and length
@ -1137,18 +1162,31 @@ proc sendError*(resp: HttpResponseRef, code: HttpCode, body = "") {.async.} =
resp.state = HttpResponseState.Failed
raiseHttpCriticalError("Unable to send response")
proc prepare*(resp: HttpResponseRef) {.async.} =
proc prepare*(resp: HttpResponseRef,
streamType = HttpResponseStreamType.Chunked) {.async.} =
## Prepare for HTTP stream response.
##
## Such responses will be sent chunk by chunk using ``chunked`` encoding.
resp.checkPending()
let responseHeaders = resp.prepareChunkedHeaders()
let responseHeaders =
case streamType
of HttpResponseStreamType.Plain:
resp.preparePlainHeaders()
of HttpResponseStreamType.SSE:
resp.prepareServerSideEventHeaders()
of HttpResponseStreamType.Chunked:
resp.prepareChunkedHeaders()
resp.streamType = streamType
resp.state = HttpResponseState.Prepared
try:
resp.state = HttpResponseState.Sending
await resp.connection.writer.write(responseHeaders)
resp.chunkedWriter = newChunkedStreamWriter(resp.connection.writer)
resp.flags.incl(HttpResponseFlags.Chunked)
case streamType
of HttpResponseStreamType.Plain, HttpResponseStreamType.SSE:
resp.writer = newAsyncStreamWriter(resp.connection.writer)
of HttpResponseStreamType.Chunked:
resp.writer = newChunkedStreamWriter(resp.connection.writer)
resp.flags.incl(HttpResponseFlags.Stream)
except CancelledError as exc:
resp.state = HttpResponseState.Cancelled
raise exc
@ -1156,17 +1194,33 @@ proc prepare*(resp: HttpResponseRef) {.async.} =
resp.state = HttpResponseState.Failed
raiseHttpCriticalError("Unable to send response")
proc sendChunk*(resp: HttpResponseRef, pbytes: pointer, nbytes: int) {.async.} =
proc prepareChunked*(resp: HttpResponseRef): Future[void] =
## Prepare for HTTP chunked stream response.
##
## Such responses will be sent chunk by chunk using ``chunked`` encoding.
resp.prepare(HttpResponseStreamType.Chunked)
proc preparePlain*(resp: HttpResponseRef): Future[void] =
## Prepare for HTTP plain stream response.
##
## Such responses will be sent without any encoding.
resp.prepare(HttpResponseStreamType.Plain)
proc prepareSSE*(resp: HttpResponseRef): Future[void] =
## Prepare for HTTP server-side event stream response.
resp.prepare(HttpResponseStreamType.SSE)
proc send*(resp: HttpResponseRef, pbytes: pointer, nbytes: int) {.async.} =
## Send single chunk of data pointed by ``pbytes`` and ``nbytes``.
doAssert(not(isNil(pbytes)), "pbytes must not be nil")
doAssert(nbytes >= 0, "nbytes should be bigger or equal to zero")
if HttpResponseFlags.Chunked notin resp.flags:
if HttpResponseFlags.Stream notin resp.flags:
raiseHttpCriticalError("Response was not prepared")
if resp.state notin {HttpResponseState.Prepared, HttpResponseState.Sending}:
raiseHttpCriticalError("Response in incorrect state")
try:
resp.state = HttpResponseState.Sending
await resp.chunkedWriter.write(pbytes, nbytes)
await resp.writer.write(pbytes, nbytes)
resp.state = HttpResponseState.Sending
except CancelledError as exc:
resp.state = HttpResponseState.Cancelled
@ -1175,15 +1229,15 @@ proc sendChunk*(resp: HttpResponseRef, pbytes: pointer, nbytes: int) {.async.} =
resp.state = HttpResponseState.Failed
raiseHttpCriticalError("Unable to send response")
proc sendChunk*(resp: HttpResponseRef, data: ByteChar) {.async.} =
proc send*(resp: HttpResponseRef, data: ByteChar) {.async.} =
## Send single chunk of data ``data``.
if HttpResponseFlags.Chunked notin resp.flags:
if HttpResponseFlags.Stream notin resp.flags:
raiseHttpCriticalError("Response was not prepared")
if resp.state notin {HttpResponseState.Prepared, HttpResponseState.Sending}:
raiseHttpCriticalError("Response in incorrect state")
try:
resp.state = HttpResponseState.Sending
await resp.chunkedWriter.write(data)
await resp.writer.write(data)
resp.state = HttpResponseState.Sending
except CancelledError as exc:
resp.state = HttpResponseState.Cancelled
@ -1192,15 +1246,39 @@ proc sendChunk*(resp: HttpResponseRef, data: ByteChar) {.async.} =
resp.state = HttpResponseState.Failed
raiseHttpCriticalError("Unable to send response")
proc sendChunk*(resp: HttpResponseRef, pbytes: pointer,
nbytes: int): Future[void] =
resp.send(pbytes, nbytes)
proc sendChunk*(resp: HttpResponseRef, data: ByteChar): Future[void] =
resp.send(data)
proc sendEvent*(resp: HttpResponseRef, eventName: string,
data: string): Future[void] =
## Send server-side event with name ``eventName`` and payload ``data`` to
## remote peer.
let data =
block:
var res = ""
if len(eventName) > 0:
res.add("event: ")
res.add(eventName)
res.add("\r\n")
res.add("data: ")
res.add(data)
res.add("\r\n\r\n")
res
resp.send(data)
proc finish*(resp: HttpResponseRef) {.async.} =
## Sending last chunk of data, so it will indicate end of HTTP response.
if HttpResponseFlags.Chunked notin resp.flags:
if HttpResponseFlags.Stream notin resp.flags:
raiseHttpCriticalError("Response was not prepared")
if resp.state notin {HttpResponseState.Prepared, HttpResponseState.Sending}:
raiseHttpCriticalError("Response in incorrect state")
try:
resp.state = HttpResponseState.Sending
await resp.chunkedWriter.finish()
await resp.writer.finish()
resp.state = HttpResponseState.Finished
except CancelledError as exc:
resp.state = HttpResponseState.Cancelled

View File

@ -907,6 +907,57 @@ suite "HTTP server testing suite":
r1.get() == req[1][2]
r2.get() == req[1][3]
test "SSE server-side events stream test":
proc testPostMultipart2(address: TransportAddress): Future[bool] {.async.} =
var serverRes = false
proc process(r: RequestFence): Future[HttpResponseRef] {.
async.} =
if r.isOk():
let request = r.get()
let response = request.getResponse()
await response.prepareSSE()
await response.send("event: event1\r\ndata: data1\r\n\r\n")
await response.send("event: event2\r\ndata: data2\r\n\r\n")
await response.sendEvent("event3", "data3")
await response.sendEvent("event4", "data4")
await response.send("data: data5\r\n\r\n")
await response.sendEvent("", "data6")
await response.finish()
serverRes = true
return response
else:
serverRes = false
return dumbResponse()
let socketFlags = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr}
let res = HttpServerRef.new(address, process,
socketFlags = socketFlags)
if res.isErr():
return false
let server = res.get()
server.start()
let message =
"GET / HTTP/1.1\r\n" &
"Host: 127.0.0.1:30080\r\n" &
"Accept: text/event-stream\r\n" &
"\r\n"
let data = await httpClient(address, message)
let expect = "event: event1\r\ndata: data1\r\n\r\n" &
"event: event2\r\ndata: data2\r\n\r\n" &
"event: event3\r\ndata: data3\r\n\r\n" &
"event: event4\r\ndata: data4\r\n\r\n" &
"data: data5\r\n\r\n" &
"data: data6\r\n\r\n"
await server.stop()
await server.closeWait()
return serverRes and (data.find(expect) >= 0)
check waitFor(testPostMultipart2(initTAddress("127.0.0.1:30080"))) == true
test "Leaks test":
check:
getTracker("async.stream.reader").isLeaked() == false