From bbbcb5549365e8a50102375c71860a67353ea11a Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Tue, 14 Sep 2021 20:32:58 +0300 Subject: [PATCH] Server-side events implementation. (#222) Fix keep-alive issue. Refactor some prepareXXX routines. --- chronos/apps/http/httpserver.nim | 178 ++++++++++++++++++++++--------- tests/testhttpserver.nim | 51 +++++++++ 2 files changed, 179 insertions(+), 50 deletions(-) diff --git a/chronos/apps/http/httpserver.nim b/chronos/apps/http/httpserver.nim index 65ff41b..d6774c5 100644 --- a/chronos/apps/http/httpserver.nim +++ b/chronos/apps/http/httpserver.nim @@ -45,7 +45,10 @@ type BoundBody, UnboundBody, MultipartForm, UrlencodedForm, ClientExpect HttpResponseFlags* {.pure.} = enum - KeepAlive, Chunked + KeepAlive, Stream + + HttpResponseStreamType* {.pure.} = enum + Plain, SSE, Chunked HttpResponseState* {.pure.} = enum Empty, Prepared, Sending, Finished, Failed, Cancelled, Dumb @@ -108,7 +111,8 @@ type flags: set[HttpResponseFlags] state*: HttpResponseState connection*: HttpConnectionRef - chunkedWriter: AsyncStreamWriter + streamType*: HttpResponseStreamType + writer: AsyncStreamWriter HttpResponseRef* = ref HttpResponse @@ -222,7 +226,10 @@ proc getResponse*(req: HttpRequestRef): HttpResponseRef {.raises: [Defect].} = version: req.version, headersTable: HttpTable.init(), connection: req.connection, - flags: if req.version == HttpVersion11: {KeepAlive} else: {} + flags: if req.version == HttpVersion11: + {HttpResponseFlags.KeepAlive} + else: + {} ) req.response = some(resp) resp @@ -625,14 +632,13 @@ proc closeWait*(conn: HttpConnectionRef) {.async.} = await allFutures(pending) conn.state = HttpState.Closed -proc closeWait(req: HttpRequestRef) {.async.} = +proc closeWait*(req: HttpRequestRef) {.async.} = if req.state == HttpState.Alive: if req.response.isSome(): req.state = HttpState.Closing let resp = req.response.get() - if (HttpResponseFlags.Chunked in resp.flags) and - not(isNil(resp.chunkedWriter)): - await resp.chunkedWriter.closeWait() + if (HttpResponseFlags.Stream in resp.flags) and not(isNil(resp.writer)): + await resp.writer.closeWait() req.state = HttpState.Closed proc createConnection(server: HttpServerRef, @@ -640,6 +646,16 @@ proc createConnection(server: HttpServerRef, async.} = return HttpConnectionRef.new(server, transp) +proc `keepalive=`*(resp: HttpResponseRef, value: bool) = + doAssert(resp.state == HttpResponseState.Empty) + if value: + resp.flags.incl(HttpResponseFlags.KeepAlive) + else: + resp.flags.excl(HttpResponseFlags.KeepAlive) + +proc keepalive*(resp: HttpResponseRef): bool {.raises: [Defect].} = + HttpResponseFlags.KeepAlive in resp.flags + proc processLoop(server: HttpServerRef, transp: StreamTransport) {.async.} = var conn: HttpConnectionRef @@ -767,7 +783,7 @@ proc processLoop(server: HttpServerRef, transp: StreamTransport) {.async.} = keepConn) else: # some data was already sent to the client. - discard + keepConn = resp.keepalive() except CancelledError: keepConn = false else: @@ -987,16 +1003,6 @@ proc post*(req: HttpRequestRef): Future[HttpTable] {.async.} = elif HttpRequestFlags.UnboundBody in req.requestFlags: raiseHttpCriticalError("Unsupported request body") -proc `keepalive=`*(resp: HttpResponseRef, value: bool) = - doAssert(resp.state == HttpResponseState.Empty) - if value: - resp.flags.incl(KeepAlive) - else: - resp.flags.excl(KeepAlive) - -proc keepalive*(resp: HttpResponseRef): bool {.raises: [Defect].} = - KeepAlive in resp.flags - proc setHeader*(resp: HttpResponseRef, key, value: string) {. raises: [Defect].} = ## Sets value of header ``key`` to ``value``. @@ -1029,6 +1035,17 @@ template checkPending(t: untyped) = if t.state != HttpResponseState.Empty: raiseHttpCriticalError("Response body was already sent") +func createHeaders(resp: HttpResponseRef): string = + var answer = $(resp.version) & " " & $(resp.status) & "\r\n" + for k, v in resp.headersTable.stringItems(): + if len(v) > 0: + answer.add(normalizeHeaderName(k)) + answer.add(": ") + answer.add(v) + answer.add("\r\n") + answer.add("\r\n") + answer + proc prepareLengthHeaders(resp: HttpResponseRef, length: int): string {. raises: [Defect].}= if not(resp.hasHeader(DateHeader)): @@ -1040,19 +1057,11 @@ proc prepareLengthHeaders(resp: HttpResponseRef, length: int): string {. if not(resp.hasHeader(ServerHeader)): resp.setHeader(ServerHeader, resp.connection.server.serverIdent) if not(resp.hasHeader(ConnectionHeader)): - if KeepAlive in resp.flags: + if HttpResponseFlags.KeepAlive in resp.flags: resp.setHeader(ConnectionHeader, "keep-alive") else: resp.setHeader(ConnectionHeader, "close") - var answer = $(resp.version) & " " & $(resp.status) & "\r\n" - for k, v in resp.headersTable.stringItems(): - if len(v) > 0: - answer.add(normalizeHeaderName(k)) - answer.add(": ") - answer.add(v) - answer.add("\r\n") - answer.add("\r\n") - answer + resp.createHeaders() proc prepareChunkedHeaders(resp: HttpResponseRef): string {. raises: [Defect].} = @@ -1065,19 +1074,35 @@ proc prepareChunkedHeaders(resp: HttpResponseRef): string {. if not(resp.hasHeader(ServerHeader)): resp.setHeader(ServerHeader, resp.connection.server.serverIdent) if not(resp.hasHeader(ConnectionHeader)): - if KeepAlive in resp.flags: + if HttpResponseFlags.KeepAlive in resp.flags: resp.setHeader(ConnectionHeader, "keep-alive") else: resp.setHeader(ConnectionHeader, "close") - var answer = $(resp.version) & " " & $(resp.status) & "\r\n" - for k, v in resp.headersTable.stringItems(): - if len(v) > 0: - answer.add(normalizeHeaderName(k)) - answer.add(": ") - answer.add(v) - answer.add("\r\n") - answer.add("\r\n") - answer + resp.createHeaders() + +proc prepareServerSideEventHeaders(resp: HttpResponseRef): string {. + raises: [Defect].} = + if not(resp.hasHeader(DateHeader)): + resp.setHeader(DateHeader, httpDate()) + if not(resp.hasHeader(ContentTypeHeader)): + resp.setHeader(ContentTypeHeader, "text/event-stream") + if not(resp.hasHeader(ServerHeader)): + resp.setHeader(ServerHeader, resp.connection.server.serverIdent) + if not(resp.hasHeader(ConnectionHeader)): + resp.flags.excl(HttpResponseFlags.KeepAlive) + resp.setHeader(ConnectionHeader, "close") + resp.createHeaders() + +proc preparePlainHeaders(resp: HttpResponseRef): string {. + raises: [Defect].} = + if not(resp.hasHeader(DateHeader)): + resp.setHeader(DateHeader, httpDate()) + if not(resp.hasHeader(ServerHeader)): + resp.setHeader(ServerHeader, resp.connection.server.serverIdent) + if not(resp.hasHeader(ConnectionHeader)): + resp.flags.excl(HttpResponseFlags.KeepAlive) + resp.setHeader(ConnectionHeader, "close") + resp.createHeaders() proc sendBody*(resp: HttpResponseRef, pbytes: pointer, nbytes: int) {.async.} = ## Send HTTP response at once by using bytes pointer ``pbytes`` and length @@ -1137,18 +1162,31 @@ proc sendError*(resp: HttpResponseRef, code: HttpCode, body = "") {.async.} = resp.state = HttpResponseState.Failed raiseHttpCriticalError("Unable to send response") -proc prepare*(resp: HttpResponseRef) {.async.} = +proc prepare*(resp: HttpResponseRef, + streamType = HttpResponseStreamType.Chunked) {.async.} = ## Prepare for HTTP stream response. ## ## Such responses will be sent chunk by chunk using ``chunked`` encoding. resp.checkPending() - let responseHeaders = resp.prepareChunkedHeaders() + let responseHeaders = + case streamType + of HttpResponseStreamType.Plain: + resp.preparePlainHeaders() + of HttpResponseStreamType.SSE: + resp.prepareServerSideEventHeaders() + of HttpResponseStreamType.Chunked: + resp.prepareChunkedHeaders() + resp.streamType = streamType resp.state = HttpResponseState.Prepared try: resp.state = HttpResponseState.Sending await resp.connection.writer.write(responseHeaders) - resp.chunkedWriter = newChunkedStreamWriter(resp.connection.writer) - resp.flags.incl(HttpResponseFlags.Chunked) + case streamType + of HttpResponseStreamType.Plain, HttpResponseStreamType.SSE: + resp.writer = newAsyncStreamWriter(resp.connection.writer) + of HttpResponseStreamType.Chunked: + resp.writer = newChunkedStreamWriter(resp.connection.writer) + resp.flags.incl(HttpResponseFlags.Stream) except CancelledError as exc: resp.state = HttpResponseState.Cancelled raise exc @@ -1156,17 +1194,33 @@ proc prepare*(resp: HttpResponseRef) {.async.} = resp.state = HttpResponseState.Failed raiseHttpCriticalError("Unable to send response") -proc sendChunk*(resp: HttpResponseRef, pbytes: pointer, nbytes: int) {.async.} = +proc prepareChunked*(resp: HttpResponseRef): Future[void] = + ## Prepare for HTTP chunked stream response. + ## + ## Such responses will be sent chunk by chunk using ``chunked`` encoding. + resp.prepare(HttpResponseStreamType.Chunked) + +proc preparePlain*(resp: HttpResponseRef): Future[void] = + ## Prepare for HTTP plain stream response. + ## + ## Such responses will be sent without any encoding. + resp.prepare(HttpResponseStreamType.Plain) + +proc prepareSSE*(resp: HttpResponseRef): Future[void] = + ## Prepare for HTTP server-side event stream response. + resp.prepare(HttpResponseStreamType.SSE) + +proc send*(resp: HttpResponseRef, pbytes: pointer, nbytes: int) {.async.} = ## Send single chunk of data pointed by ``pbytes`` and ``nbytes``. doAssert(not(isNil(pbytes)), "pbytes must not be nil") doAssert(nbytes >= 0, "nbytes should be bigger or equal to zero") - if HttpResponseFlags.Chunked notin resp.flags: + if HttpResponseFlags.Stream notin resp.flags: raiseHttpCriticalError("Response was not prepared") if resp.state notin {HttpResponseState.Prepared, HttpResponseState.Sending}: raiseHttpCriticalError("Response in incorrect state") try: resp.state = HttpResponseState.Sending - await resp.chunkedWriter.write(pbytes, nbytes) + await resp.writer.write(pbytes, nbytes) resp.state = HttpResponseState.Sending except CancelledError as exc: resp.state = HttpResponseState.Cancelled @@ -1175,15 +1229,15 @@ proc sendChunk*(resp: HttpResponseRef, pbytes: pointer, nbytes: int) {.async.} = resp.state = HttpResponseState.Failed raiseHttpCriticalError("Unable to send response") -proc sendChunk*(resp: HttpResponseRef, data: ByteChar) {.async.} = +proc send*(resp: HttpResponseRef, data: ByteChar) {.async.} = ## Send single chunk of data ``data``. - if HttpResponseFlags.Chunked notin resp.flags: + if HttpResponseFlags.Stream notin resp.flags: raiseHttpCriticalError("Response was not prepared") if resp.state notin {HttpResponseState.Prepared, HttpResponseState.Sending}: raiseHttpCriticalError("Response in incorrect state") try: resp.state = HttpResponseState.Sending - await resp.chunkedWriter.write(data) + await resp.writer.write(data) resp.state = HttpResponseState.Sending except CancelledError as exc: resp.state = HttpResponseState.Cancelled @@ -1192,15 +1246,39 @@ proc sendChunk*(resp: HttpResponseRef, data: ByteChar) {.async.} = resp.state = HttpResponseState.Failed raiseHttpCriticalError("Unable to send response") +proc sendChunk*(resp: HttpResponseRef, pbytes: pointer, + nbytes: int): Future[void] = + resp.send(pbytes, nbytes) + +proc sendChunk*(resp: HttpResponseRef, data: ByteChar): Future[void] = + resp.send(data) + +proc sendEvent*(resp: HttpResponseRef, eventName: string, + data: string): Future[void] = + ## Send server-side event with name ``eventName`` and payload ``data`` to + ## remote peer. + let data = + block: + var res = "" + if len(eventName) > 0: + res.add("event: ") + res.add(eventName) + res.add("\r\n") + res.add("data: ") + res.add(data) + res.add("\r\n\r\n") + res + resp.send(data) + proc finish*(resp: HttpResponseRef) {.async.} = ## Sending last chunk of data, so it will indicate end of HTTP response. - if HttpResponseFlags.Chunked notin resp.flags: + if HttpResponseFlags.Stream notin resp.flags: raiseHttpCriticalError("Response was not prepared") if resp.state notin {HttpResponseState.Prepared, HttpResponseState.Sending}: raiseHttpCriticalError("Response in incorrect state") try: resp.state = HttpResponseState.Sending - await resp.chunkedWriter.finish() + await resp.writer.finish() resp.state = HttpResponseState.Finished except CancelledError as exc: resp.state = HttpResponseState.Cancelled diff --git a/tests/testhttpserver.nim b/tests/testhttpserver.nim index c246783..33df9aa 100644 --- a/tests/testhttpserver.nim +++ b/tests/testhttpserver.nim @@ -907,6 +907,57 @@ suite "HTTP server testing suite": r1.get() == req[1][2] r2.get() == req[1][3] + test "SSE server-side events stream test": + proc testPostMultipart2(address: TransportAddress): Future[bool] {.async.} = + var serverRes = false + proc process(r: RequestFence): Future[HttpResponseRef] {. + async.} = + if r.isOk(): + let request = r.get() + let response = request.getResponse() + await response.prepareSSE() + await response.send("event: event1\r\ndata: data1\r\n\r\n") + await response.send("event: event2\r\ndata: data2\r\n\r\n") + await response.sendEvent("event3", "data3") + await response.sendEvent("event4", "data4") + await response.send("data: data5\r\n\r\n") + await response.sendEvent("", "data6") + await response.finish() + serverRes = true + return response + else: + serverRes = false + return dumbResponse() + + let socketFlags = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr} + let res = HttpServerRef.new(address, process, + socketFlags = socketFlags) + if res.isErr(): + return false + + let server = res.get() + server.start() + + let message = + "GET / HTTP/1.1\r\n" & + "Host: 127.0.0.1:30080\r\n" & + "Accept: text/event-stream\r\n" & + "\r\n" + + let data = await httpClient(address, message) + let expect = "event: event1\r\ndata: data1\r\n\r\n" & + "event: event2\r\ndata: data2\r\n\r\n" & + "event: event3\r\ndata: data3\r\n\r\n" & + "event: event4\r\ndata: data4\r\n\r\n" & + "data: data5\r\n\r\n" & + "data: data6\r\n\r\n" + await server.stop() + await server.closeWait() + return serverRes and (data.find(expect) >= 0) + + check waitFor(testPostMultipart2(initTAddress("127.0.0.1:30080"))) == true + + test "Leaks test": check: getTracker("async.stream.reader").isLeaked() == false