diff --git a/chronos/apps/http/httpserver.nim b/chronos/apps/http/httpserver.nim index c4b9841..ffb5511 100644 --- a/chronos/apps/http/httpserver.nim +++ b/chronos/apps/http/httpserver.nim @@ -11,7 +11,7 @@ import stew/results, httputils import ../../asyncloop, ../../asyncsync import ../../streams/[asyncstream, boundstream, chunkstream] import httptable, httpcommon, multipart -export httpcommon, multipart +export httptable, httpcommon, multipart when defined(useChroniclesLogging): echo "Importing chronicles" @@ -21,12 +21,12 @@ type HttpServerFlags* {.pure.} = enum Secure, NoExpectHandler - HttpStatus* = enum - DropConnection, KeepConnection - - HTTPServerError* {.pure.} = enum + HttpServerError* {.pure.} = enum TimeoutError, CatchableError, RecoverableError, CriticalError + HttpServerState* {.pure.} = enum + ServerRunning, ServerStopped, ServerClosed + HttpProcessError* = object error*: HTTPServerError exc*: ref CatchableError @@ -39,12 +39,15 @@ type ClientExpect HttpResponseFlags* {.pure.} = enum - Prepared, DataSent, DataSending, KeepAlive + KeepAlive, Chunked + + HttpResponseState* {.pure.} = enum + Empty, Prepared, Sending, Finished, Failed, Cancelled, Dumb HttpProcessCallback* = - proc(req: RequestFence[HttpRequest]): Future[HttpStatus] {.gcsafe.} + proc(req: RequestFence[HttpRequestRef]): Future[HttpResponseRef] {.gcsafe.} - HttpServer* = ref object of RootRef + HttpServer* = object of RootObj instance*: StreamServer # semaphore*: AsyncSemaphore maxConnections*: int @@ -59,10 +62,9 @@ type maxRequestBodySize: int processCallback: HttpProcessCallback - HttpServerState* = enum - ServerRunning, ServerStopped, ServerClosed + HttpServerRef* = ref HttpServer - HttpRequest* = ref object of RootRef + HttpRequest* = object of RootObj headersTable: HttpTable queryTable: HttpTable postTable: Option[HttpTable] @@ -76,40 +78,38 @@ type transferEncoding*: set[TransferEncodingFlags] requestFlags*: set[HttpRequestFlags] contentLength: int - connection*: HttpConnection - mainReader*: AsyncStreamReader - mainWriter*: AsyncStreamWriter - response*: Option[HttpResponse] + connection*: HttpConnectionRef + response*: Option[HttpResponseRef] - HttpResponse* = ref object of RootRef + HttpRequestRef* = ref HttpRequest + + HttpResponse* = object of RootObj status*: HttpCode version*: HttpVersion headersTable: HttpTable - body*: seq[byte] - responseFlags*: set[HttpResponseFlags] - connection*: HttpConnection - mainWriter: AsyncStreamWriter + body: seq[byte] + flags: set[HttpResponseFlags] + state: HttpResponseState + connection*: HttpConnectionRef + chunkedWriter: AsyncStreamWriter - HttpConnection* = ref object of RootRef - server*: HttpServer + HttpResponseRef* = ref HttpResponse + + HttpConnection* = object of RootObj + server*: HttpServerRef transp: StreamTransport + mainReader*: AsyncStreamReader + mainWriter*: AsyncStreamWriter buffer: seq[byte] + HttpConnectionRef* = ref HttpConnection + proc init(htype: typedesc[HttpProcessError], error: HTTPServerError, exc: ref CatchableError, remote: TransportAddress): HttpProcessError = HttpProcessError(error: error, exc: exc, remote: remote) -proc new*(htype: typedesc[HttpResponse], req: HttpRequest): HttpResponse = - HttpResponse( - status: Http200, - version: req.version, - headersTable: HttpTable.init(), - connection: req.connection, - mainWriter: req.mainWriter - ) - -proc new*(htype: typedesc[HttpServer], +proc new*(htype: typedesc[HttpServerRef], address: TransportAddress, processCallback: HttpProcessCallback, serverFlags: set[HttpServerFlags] = {}, @@ -121,9 +121,9 @@ proc new*(htype: typedesc[HttpServer], httpHeadersTimeout = 10.seconds, httpBodyTimeout = 30.seconds, maxHeadersSize: int = 8192, - maxRequestBodySize: int = 1_048_576): HttpResult[HttpServer] = + maxRequestBodySize: int = 1_048_576): HttpResult[HttpServerRef] = - var res = HttpServer( + var res = HttpServerRef( maxConnections: maxConnections, headersTimeout: httpHeadersTimeout, bodyTimeout: httpBodyTimeout, @@ -156,18 +156,37 @@ proc new*(htype: typedesc[HttpServer], except CatchableError as exc: return err(exc.msg) +proc getResponse*(req: HttpRequestRef): HttpResponseRef = + if req.response.isNone(): + var resp = HttpResponseRef( + status: Http200, + state: HttpResponseState.Empty, + version: req.version, + headersTable: HttpTable.init(), + connection: req.connection, + flags: if req.version == HttpVersion11: {KeepAlive} else: {} + ) + req.response = some(resp) + resp + else: + req.response.get() + +proc dumbResponse*(): HttpResponseRef = + ## Create an empty response to return when request processor got no request. + HttpResponseRef(state: HttpResponseState.Dumb, version: HttpVersion11) + proc getId(transp: StreamTransport): string {.inline.} = ## Returns string unique transport's identifier as string. $transp.remoteAddress() & "_" & $transp.localAddress() -proc hasBody*(request: HttpRequest): bool = +proc hasBody*(request: HttpRequestRef): bool = ## Returns ``true`` if request has body. request.requestFlags * {HttpRequestFlags.BoundBody, HttpRequestFlags.UnboundBody} != {} -proc prepareRequest(conn: HttpConnection, - req: HttpRequestHeader): HttpResultCode[HttpRequest] = - var request = HttpRequest(connection: conn) +proc prepareRequest(conn: HttpConnectionRef, + req: HttpRequestHeader): HttpResultCode[HttpRequestRef] = + var request = HttpRequestRef(connection: conn) if req.version notin {HttpVersion10, HttpVersion11}: return err(Http505) @@ -274,28 +293,24 @@ proc prepareRequest(conn: HttpConnection, if strip(expectHeader).toLowerAscii() == "100-continue": request.requestFlags.incl(HttpRequestFlags.ClientExpect) - request.mainReader = newAsyncStreamReader(conn.transp) - request.mainWriter = newAsyncStreamWriter(conn.transp) ok(request) -proc clear*(request: HttpRequest) {.async.} = - await allFutures( - request.mainReader.closeWait(), - request.mainWriter.closeWait(), - ) - -proc getBodyStream*(request: HttpRequest): HttpResult[AsyncStreamReader] = +proc getBodyStream*(request: HttpRequestRef): HttpResult[AsyncStreamReader] = ## Returns stream's reader instance which can be used to read request's body. ## ## Please be sure to handle ``Expect`` header properly. + ## + ## Streams which was obtained using this procedure must be closed to avoid + ## leaks. if HttpRequestFlags.BoundBody in request.requestFlags: - ok(newBoundedStreamReader(request.mainReader, request.contentLength)) + ok(newBoundedStreamReader(request.connection.mainReader, + request.contentLength)) elif HttpRequestFlags.UnboundBody in request.requestFlags: - ok(newChunkedStreamReader(request.mainReader)) + ok(newChunkedStreamReader(request.connection.mainReader)) else: err("Request do not have body available") -proc handleExpect*(request: HttpRequest) {.async.} = +proc handleExpect*(request: HttpRequestRef) {.async.} = ## Handle expectation for ``Expect`` header. ## https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Expect if HttpServerFlags.NoExpectHandler notin request.connection.server.flags: @@ -303,13 +318,13 @@ proc handleExpect*(request: HttpRequest) {.async.} = if request.version == HttpVersion11: try: let message = $request.version & " " & $Http100 & "\r\n\r\n" - await request.mainWriter.write(message) + await request.connection.mainWriter.write(message) except CancelledError as exc: raise exc except AsyncStreamWriteError, AsyncStreamIncompleteError: raise newHttpCriticalError("Unable to send `100-continue` response") -proc getBody*(request: HttpRequest): Future[seq[byte]] {.async.} = +proc getBody*(request: HttpRequestRef): Future[seq[byte]] {.async.} = ## Obtain request's body as sequence of bytes. let res = request.getBodyStream() if res.isErr(): @@ -320,8 +335,10 @@ proc getBody*(request: HttpRequest): Future[seq[byte]] {.async.} = return await read(res.get()) except AsyncStreamError: raise newHttpCriticalError("Unable to read request's body") + finally: + await closeWait(res.get()) -proc consumeBody*(request: HttpRequest): Future[void] {.async.} = +proc consumeBody*(request: HttpRequestRef): Future[void] {.async.} = ## Consume/discard request's body. let res = request.getBodyStream() if res.isErr(): @@ -331,11 +348,12 @@ proc consumeBody*(request: HttpRequest): Future[void] {.async.} = try: await request.handleExpect() discard await reader.consume() - return except AsyncStreamError: raise newHttpCriticalError("Unable to consume request's body") + finally: + await closeWait(res.get()) -proc sendErrorResponse(conn: HttpConnection, version: HttpVersion, +proc sendErrorResponse(conn: HttpConnectionRef, version: HttpVersion, code: HttpCode, keepAlive = true, datatype = "text/text", databody = ""): Future[bool] {.async.} = @@ -352,22 +370,16 @@ proc sendErrorResponse(conn: HttpConnection, version: HttpVersion, if len(databody) > 0: answer.add(databody) try: - let res {.used.} = await conn.transp.write(answer) + await conn.mainWriter.write(answer) return true except CancelledError: return false - except TransportOsError: + except AsyncStreamWriteError: return false - except CatchableError: + except AsyncStreamIncompleteError: return false -proc sendErrorResponse*(request: HttpRequest, code: HttpCode, keepAlive = true, - datatype = "text/text", - databody = ""): Future[bool] = - sendErrorResponse(request.connection, request.version, code, keepAlive, - datatype, databody) - -proc getRequest*(conn: HttpConnection): Future[HttpRequest] {.async.} = +proc getRequest*(conn: HttpConnectionRef): Future[HttpRequestRef] {.async.} = try: conn.buffer.setLen(conn.server.maxHeadersSize) let res = await conn.transp.readUntil(addr conn.buffer[0], len(conn.buffer), @@ -392,39 +404,56 @@ proc getRequest*(conn: HttpConnection): Future[HttpRequest] {.async.} = discard await conn.sendErrorResponse(HttpVersion11, Http413, false) raise newHttpCriticalError("Maximum size of request headers reached") -proc processLoop(server: HttpServer, transp: StreamTransport) {.async.} = - var conn = HttpConnection( - transp: transp, buffer: newSeq[byte](server.maxHeadersSize), - server: server +proc new(ht: typedesc[HttpConnectionRef], server: HttpServerRef, + transp: StreamTransport): HttpConnectionRef = + HttpConnectionRef( + transp: transp, + server: server, + buffer: newSeq[byte](server.maxHeadersSize), + mainReader: newAsyncStreamReader(transp), + mainWriter: newAsyncStreamWriter(transp) ) +proc close(conn: HttpConnectionRef): Future[void] = + allFutures(conn.mainReader.closeWait(), conn.mainWriter.closeWait(), + conn.transp.closeWait()) + +proc close(req: HttpRequestRef) {.async.} = + if req.response.isSome(): + let resp = req.response.get() + if (HttpResponseFlags.Chunked in resp.flags) and + not(isNil(resp.chunkedWriter)): + await resp.chunkedWriter.closeWait() + +proc processLoop(server: HttpServerRef, transp: StreamTransport) {.async.} = + var conn = HttpConnectionRef.new(server, transp) var breakLoop = false while true: var - arg: RequestFence[HttpRequest] - resp = HttpStatus.DropConnection + arg: RequestFence[HttpRequestRef] + resp: HttpResponseRef try: let request = await conn.getRequest().wait(server.headersTimeout) - arg = RequestFence[HttpRequest].ok(request) + arg = RequestFence[HttpRequestRef].ok(request) except CancelledError: breakLoop = true except AsyncTimeoutError as exc: let error = HttpProcessError.init(HTTPServerError.TimeoutError, exc, transp.remoteAddress()) - arg = RequestFence[HttpRequest].err(error) + arg = RequestFence[HttpRequestRef].err(error) except HttpRecoverableError as exc: let error = HttpProcessError.init(HTTPServerError.RecoverableError, exc, transp.remoteAddress()) - arg = RequestFence[HttpRequest].err(error) + arg = RequestFence[HttpRequestRef].err(error) except HttpCriticalError as exc: let error = HttpProcessError.init(HTTPServerError.CriticalError, exc, transp.remoteAddress()) - arg = RequestFence[HttpRequest].err(error) + arg = RequestFence[HttpRequestRef].err(error) except CatchableError as exc: let error = HttpProcessError.init(HTTPServerError.CatchableError, exc, transp.remoteAddress()) - arg = RequestFence[HttpRequest].err(error) + arg = RequestFence[HttpRequestRef].err(error) if breakLoop: break @@ -437,61 +466,51 @@ proc processLoop(server: HttpServer, transp: StreamTransport) {.async.} = except CancelledError: breakLoop = true except CatchableError as exc: - resp = DropConnection - echo "Exception received from processor callback ", exc.name lastError = exc if breakLoop: break - let keepConn = - case resp - of DropConnection: - false - of KeepConnection: - false - if arg.isErr(): case arg.error().error of HTTPServerError.TimeoutError: - discard await conn.sendErrorResponse(HttpVersion11, Http408, keepConn) + discard await conn.sendErrorResponse(HttpVersion11, Http408, false) of HTTPServerError.RecoverableError: - discard await conn.sendErrorResponse(HttpVersion11, Http400, keepConn) + discard await conn.sendErrorResponse(HttpVersion11, Http400, false) of HTTPServerError.CriticalError: - discard await conn.sendErrorResponse(HttpVersion11, Http400, keepConn) + discard await conn.sendErrorResponse(HttpVersion11, Http400, false) of HTTPServerError.CatchableError: - discard await conn.sendErrorResponse(HttpVersion11, Http400, keepConn) - if not(keepConn): - break + discard await conn.sendErrorResponse(HttpVersion11, Http400, false) + break else: + let request = arg.get() + let keepConn = if request.version == HttpVersion11: true else: false if isNil(lastError): - echo "lastError = nil" - echo "mainWriter.bytesCount = ", arg.get().mainWriter.bytesCount - - if arg.get().mainWriter.bytesCount == 0'u64: - echo "Sending 404 keepConn = ", keepConn - # Processor callback finished without an error, but response was not - # sent to client, so we going to send HTTP404 error. + case resp.state + of HttpResponseState.Empty: + # Response was ignored discard await conn.sendErrorResponse(HttpVersion11, Http404, keepConn) - echo "bytesCount = ", arg.get().mainWriter.bytesCount + of HttpResponseState.Prepared: + # Response was prepared but not sent. + discard await conn.sendErrorResponse(HttpVersion11, Http409, keepConn) + else: + # some data was already sent to the client. + discard else: - if arg.get().mainWriter.bytesCount == 0'u64: - # Processor callback finished with an error, but response was not - # sent to client, so we going to send HTTP503 error. - discard await conn.sendErrorResponse(HttpVersion11, Http503, true) + discard await conn.sendErrorResponse(HttpVersion11, Http503, true) - ## Perform cleanup of request instance - await arg.get().clear() + # Closing and releasing all the request resources. + await request.close() if not(keepConn): break - await transp.closeWait() + await conn.close() server.connections.del(transp.getId()) # if server.maxConnections > 0: # server.semaphore.release() -proc acceptClientLoop(server: HttpServer) {.async.} = +proc acceptClientLoop(server: HttpServerRef) {.async.} = var breakLoop = false while true: try: @@ -518,7 +537,7 @@ proc acceptClientLoop(server: HttpServer) {.async.} = if breakLoop: break -proc state*(server: HttpServer): HttpServerState = +proc state*(server: HttpServerRef): HttpServerState = ## Returns current HTTP server's state. if server.lifetime.finished(): ServerClosed @@ -531,22 +550,22 @@ proc state*(server: HttpServer): HttpServerState = else: ServerRunning -proc start*(server: HttpServer) = +proc start*(server: HttpServerRef) = ## Starts HTTP server. if server.state == ServerStopped: server.acceptLoop = acceptClientLoop(server) -proc stop*(server: HttpServer) {.async.} = +proc stop*(server: HttpServerRef) {.async.} = ## Stop HTTP server from accepting new connections. if server.state == ServerRunning: await server.acceptLoop.cancelAndWait() -proc drop*(server: HttpServer) {.async.} = +proc drop*(server: HttpServerRef) {.async.} = ## Drop all pending HTTP connections. if server.state in {ServerStopped, ServerRunning}: discard -proc close*(server: HttpServer) {.async.} = +proc close*(server: HttpServerRef) {.async.} = ## Stop HTTP server and drop all the pending connections. if server.state != ServerClosed: await server.stop() @@ -554,7 +573,7 @@ proc close*(server: HttpServer) {.async.} = await server.instance.closeWait() server.lifetime.complete() -proc join*(server: HttpServer): Future[void] = +proc join*(server: HttpServerRef): Future[void] = ## Wait until HTTP server will not be closed. var retFuture = newFuture[void]("http.server.join") @@ -574,7 +593,25 @@ proc join*(server: HttpServer): Future[void] = retFuture -proc post*(req: HttpRequest): Future[HttpTable] {.async.} = +proc getMultipartReader*(req: HttpRequestRef): HttpResult[MultiPartReaderRef] = + ## Create new MultiPartReader interface for specific request. + if req.meth in PostMethods: + if MultipartForm in req.requestFlags: + let ctype = ? getContentType(req.headersTable.getList("content-type")) + if ctype != "multipart/form-data": + err("Content type is not supported") + else: + let boundary = ? getMultipartBoundary( + req.headersTable.getList("content-type") + ) + var stream = ? req.getBodyStream() + ok(MultiPartReaderRef.new(stream, boundary)) + else: + err("Request's data is not multipart encoded") + else: + err("Request's method do not supports multipart") + +proc post*(req: HttpRequestRef): Future[HttpTable] {.async.} = ## Return POST parameters if req.postTable.isSome(): return req.postTable.get() @@ -595,31 +632,42 @@ proc post*(req: HttpRequest): Future[HttpTable] {.async.} = req.postTable = some(table) return table elif MultipartForm in req.requestFlags: - let cres = getContentType(req.headersTable.getList("content-type")) - if cres.isErr(): - raise newHttpCriticalError(cres.error) - let bres = getMultipartBoundary(req.headersTable.getList("content-type")) - if bres.isErr(): - raise newHttpCriticalError(bres.error) - # We must handle `Expect`. - await req.handleExpect() - var reader = req.getBodyStream() - if reader.isErr(): - raise newHttpCriticalError(reader.error) - var mpreader = MultiPartReaderRef.new(reader.get(), bres.get()) var table = HttpTable.init() + let res = getMultipartReader(req) + if res.isErr(): + raise newHttpCriticalError("Unable to retrieve multipart form data") + var mpreader = res.get() + + # We must handle `Expect` first. + try: + await req.handleExpect() + except HttpCriticalError as exc: + await mpreader.close() + raise exc + + # Reading multipart/form-data parts. var runLoop = true + var loopError: ref MultipartError while runLoop: try: - let res = await mpreader.readPart() - var value = await res.getBody() + let part = await mpreader.readPart() + var value = await part.getBody() ## TODO (cheatfate) double copy here. var strvalue = newString(len(value)) if len(value) > 0: copyMem(addr strvalue[0], addr value[0], len(value)) - table.add(res.name, strvalue) + table.add(part.name, strvalue) + await part.close() except MultiPartEoM: runLoop = false + except MultipartError as exc: + # We preserve error to avoid Nim's exception transformation bug. + runLoop = false + loopError = exc + + await mpreader.close() + if not(isNil(loopError)): + raise loopError req.postTable = some(table) return table else: @@ -630,58 +678,225 @@ proc post*(req: HttpRequest): Future[HttpTable] {.async.} = elif HttpRequestFlags.UnboundBody in req.requestFlags: raise newHttpCriticalError("Unsupported request body") -proc `keepalive=`*(resp: HttpResponse, value: bool) = +proc `keepalive=`*(resp: HttpResponseRef, value: bool) = + doAssert(resp.state == HttpResponseState.Empty) if value: - resp.responseFlags.incl(KeepAlive) + resp.flags.incl(KeepAlive) else: - resp.responseFlags.excl(KeepAlive) + resp.flags.excl(KeepAlive) -proc keepalive*(resp: HttpResponse): bool = - KeepAlive in resp.responseFlags +proc keepalive*(resp: HttpResponseRef): bool = + KeepAlive in resp.flags -proc setHeader*(resp: HttpResponse, key, value: string) = - resp.httpTable.set(key, value) +proc setHeader*(resp: HttpResponseRef, key, value: string) = + doAssert(resp.state == HttpResponseState.Empty) + resp.headersTable.set(key, value) -proc addHeader*(resp: HttpResponse, key, value: string) = - resp.httpTable.add(key, value) +proc addHeader*(resp: HttpResponseRef, key, value: string) = + doAssert(resp.state == HttpResponseState.Empty) + resp.headersTable.add(key, value) -proc getHeader*(resp: HttpResponse, key: string): string = - resp.httpTable.getString(key) +proc getHeader*(resp: HttpResponseRef, key: string, + default: string = ""): string = + resp.headersTable.getString(key, default) -proc getHeaderOrDefault*(resp: HttpResponse, key: string, - default: string = ""): string = +proc hasHeader*(resp: HttpResponseRef, key: string): bool = + key in resp.headersTable +template doHeaderDef(buf, resp, name, default) = + buf.add(name) + buf.add(": ") + buf.add(resp.getHeader(name, default)) + buf.add("\r\n") -proc sendBody*(resp: HttpResponse, pbytes: pointer, nbytes: int) {.async.} = - resp.headersTable +template doHeaderVal(buf, name, value) = + buf.add(name) + buf.add(": ") + buf.add(value) + buf.add("\r\n") -proc sendBody*(resp: HttpResponse, data: string) {.async.} - var answer = resp.version & " " & resp.status & "\r\n" - answer.add("") -proc prepare*(resp: HttpResponse) {.async.} = - discard +template checkPending(t: untyped) = + if t.state != HttpResponseState.Empty: + raise newHttpCriticalError("Response body was already sent") -proc sendChunk*(resp: HttpResponse) {.async.} = - discard +proc prepareLengthHeaders(resp: HttpResponseRef, length: int): string = + var answer = $(resp.version) & " " & $(resp.status) & "\r\n" + answer.doHeaderDef(resp, "Date", httpDate()) + answer.doHeaderDef(resp, "Content-Type", "text/html; charset=utf-8") + if length > 0: + answer.doHeaderVal("Content-Length", $(length)) + if "Connection" notin resp.headersTable: + if KeepAlive in resp.flags: + answer.doHeaderVal("Connection", "keep-alive") + else: + answer.doHeaderVal("Connection", "close") + for k, v in resp.headersTable.stringItems(): + if k notin ["date", "content-type", "content-length"]: + answer.doHeaderVal(normalizeHeaderName(k), v) + answer.add("\r\n") + answer -proc finish*(resp: HttpResponse) {.async.} = - discard +proc prepareChunkedHeaders(resp: HttpResponseRef): string = + var answer = $(resp.version) & " " & $(resp.status) & "\r\n" + answer.doHeaderDef(resp, "Date", httpDate()) + answer.doHeaderDef(resp, "Content-Type", "text/html; charset=utf-8") + answer.doHeaderDef(resp, "Transfer-Encoding", "chunked") + if "Connection" notin resp.headersTable: + if KeepAlive in resp.flags: + answer.doHeaderVal("Connection", "keep-alive") + else: + answer.doHeaderVal("Connection", "close") + for k, v in resp.headersTable.stringItems(): + if k notin ["date", "content-type", "content-length", "transfer-encoding"]: + answer.doHeaderVal(normalizeHeaderName(k), v) + answer.add("\r\n") + answer +proc sendBody*(resp: HttpResponseRef, pbytes: pointer, nbytes: int) {.async.} = + ## Send HTTP response at once by using bytes pointer ``pbytes`` and length + ## ``nbytes``. + doAssert(not(isNil(pbytes)), "pbytes must not be nil") + doAssert(nbytes >= 0, "nbytes should be bigger or equal to zero") + checkPending(resp) + let responseHeaders = resp.prepareLengthHeaders(nbytes) + resp.state = HttpResponseState.Prepared + try: + resp.state = HttpResponseState.Sending + await resp.connection.mainWriter.write(responseHeaders) + if nbytes > 0: + await resp.connection.mainWriter.write(pbytes, nbytes) + resp.state = HttpResponseState.Finished + except CancelledError as exc: + resp.state = HttpResponseState.Cancelled + raise exc + except AsyncStreamWriteError, AsyncStreamIncompleteError: + resp.state = HttpResponseState.Failed + raise newHttpCriticalError("Unable to send response") + +proc sendBody*[T: string|seq[byte]](resp: HttpResponseRef, data: T) {.async.} = + ## Send HTTP response at once by using data ``data``. + checkPending(resp) + let responseHeaders = resp.prepareLengthHeaders(len(data)) + resp.state = HttpResponseState.Prepared + try: + resp.state = HttpResponseState.Sending + await resp.connection.mainWriter.write(responseHeaders) + if len(data) > 0: + await resp.connection.mainWriter.write(data) + resp.state = HttpResponseState.Finished + except CancelledError as exc: + resp.state = HttpResponseState.Cancelled + raise exc + except AsyncStreamWriteError, AsyncStreamIncompleteError: + resp.state = HttpResponseState.Failed + raise newHttpCriticalError("Unable to send response") + +proc sendError*(resp: HttpResponseRef, code: HttpCode, body = "") {.async.} = + ## Send HTTP error status response. + checkPending(resp) + resp.status = code + let responseHeaders = resp.prepareLengthHeaders(len(body)) + resp.state = HttpResponseState.Prepared + try: + resp.state = HttpResponseState.Sending + await resp.connection.mainWriter.write(responseHeaders) + if len(body) > 0: + await resp.connection.mainWriter.write(body) + resp.state = HttpResponseState.Finished + except CancelledError as exc: + resp.state = HttpResponseState.Cancelled + raise exc + except AsyncStreamWriteError, AsyncStreamIncompleteError: + resp.state = HttpResponseState.Failed + raise newHttpCriticalError("Unable to send response") + +proc prepare*(resp: HttpResponseRef) {.async.} = + ## Prepare for HTTP stream response. + ## + ## Such responses will be sent chunk by chunk using ``chunked`` encoding. + resp.checkPending() + let responseHeaders = resp.prepareChunkedHeaders() + resp.state = HttpResponseState.Prepared + try: + resp.state = HttpResponseState.Sending + await resp.connection.mainWriter.write(responseHeaders) + resp.chunkedWriter = newChunkedStreamWriter(resp.connection.mainWriter) + resp.flags.incl(HttpResponseFlags.Chunked) + except CancelledError as exc: + resp.state = HttpResponseState.Cancelled + raise exc + except AsyncStreamWriteError, AsyncStreamIncompleteError: + resp.state = HttpResponseState.Failed + raise newHttpCriticalError("Unable to send response") + +proc sendChunk*(resp: HttpResponseRef, pbytes: pointer, nbytes: int) {.async.} = + ## Send single chunk of data pointed by ``pbytes`` and ``nbytes``. + doAssert(not(isNil(pbytes)), "pbytes must not be nil") + doAssert(nbytes >= 0, "nbytes should be bigger or equal to zero") + if HttpResponseFlags.Chunked notin resp.flags: + raise newHttpCriticalError("Response was not prepared") + if resp.state notin {HttpResponseState.Prepared, HttpResponseState.Sending}: + raise newHttpCriticalError("Response in incorrect state") + try: + resp.state = HttpResponseState.Sending + await resp.chunkedWriter.write(pbytes, nbytes) + resp.state = HttpResponseState.Sending + except CancelledError as exc: + resp.state = HttpResponseState.Cancelled + raise exc + except AsyncStreamWriteError, AsyncStreamIncompleteError: + resp.state = HttpResponseState.Failed + raise newHttpCriticalError("Unable to send response") + +proc sendChunk*[T: string|seq[byte]](resp: HttpResponseRef, + data: T) {.async.} = + ## Send single chunk of data ``data``. + if HttpResponseFlags.Chunked notin resp.flags: + raise newHttpCriticalError("Response was not prepared") + if resp.state notin {HttpResponseState.Prepared, HttpResponseState.Sending}: + raise newHttpCriticalError("Response in incorrect state") + try: + resp.state = HttpResponseState.Sending + await resp.chunkedWriter.write(data) + resp.state = HttpResponseState.Sending + except CancelledError as exc: + resp.state = HttpResponseState.Cancelled + raise exc + except AsyncStreamWriteError, AsyncStreamIncompleteError: + resp.state = HttpResponseState.Failed + raise newHttpCriticalError("Unable to send response") + +proc finish*(resp: HttpResponseRef) {.async.} = + ## Sending last chunk of data, so it will indicate end of HTTP response. + if HttpResponseFlags.Chunked notin resp.flags: + raise newHttpCriticalError("Response was not prepared") + if resp.state notin {HttpResponseState.Prepared, HttpResponseState.Sending}: + raise newHttpCriticalError("Response in incorrect state") + try: + resp.state = HttpResponseState.Sending + await resp.chunkedWriter.finish() + resp.state = HttpResponseState.Finished + except CancelledError as exc: + resp.state = HttpResponseState.Cancelled + raise exc + except AsyncStreamWriteError, AsyncStreamIncompleteError: + resp.state = HttpResponseState.Failed + raise newHttpCriticalError("Unable to send response") when isMainModule: - proc processCallback(req: RequestFence[HttpRequest]): Future[HttpStatus] {. + proc process(req: RequestFence[HttpRequestRef]): Future[HttpResponseRef] {. async.} = if req.isOk(): let request = req.get() - echo "Got ", request.meth, " request" let post = await request.post() - echo "post = ", post + let response = request.getResponse() + await response.sendBody("Got [" & $request.meth & "] request") + return response else: - echo "Got FAILURE", req.error() - echo "process callback" + return dumbResponse() - let res = HttpServer.new(initTAddress("127.0.0.1:30080"), processCallback, - maxConnections = 1) + let res = HttpServerRef.new(initTAddress("127.0.0.1:30080"), process, + maxConnections = 1) if res.isOk(): let server = res.get() server.start() diff --git a/chronos/apps/http/httptable.nim b/chronos/apps/http/httptable.nim index b8e97a6..b5ba1ef 100644 --- a/chronos/apps/http/httptable.nim +++ b/chronos/apps/http/httptable.nim @@ -105,6 +105,15 @@ proc init*(htt: typedesc[HttpTable]): HttpTable = proc new*(htt: typedesc[HttpTableRef]): HttpTableRef = HttpTableRef(table: initTable[string, seq[string]]()) +iterator stringItems*(ht: HttpTables): tuple[key: string, value: string] = + for k, v in ht.table.pairs(): + for item in v: + yield (k, item) + +iterator items*(ht: HttpTables): tuple[key: string, value: seq[string]] = + for k, v in ht.table.pairs(): + yield (k, v) + proc normalizeHeaderName*(value: string): string = var res = value.toLowerAscii() var k = 0 diff --git a/chronos/apps/http/multipart.nim b/chronos/apps/http/multipart.nim index ac6b50d..b6e089f 100644 --- a/chronos/apps/http/multipart.nim +++ b/chronos/apps/http/multipart.nim @@ -15,13 +15,13 @@ import httptable, httpcommon export httptable, httpcommon, asyncstream type - MultiPartSource {.pure.} = enum + MultiPartSource* {.pure.} = enum Stream, Buffer MultiPartReader* = object case kind: MultiPartSource of MultiPartSource.Stream: - stream: AsyncStreamReader + stream*: AsyncStreamReader of MultiPartSource.Buffer: discard firstTime: bool @@ -35,7 +35,7 @@ type MultiPart* = object case kind: MultiPartSource of MultiPartSource.Stream: - stream*: BoundedStreamReader + stream: BoundedStreamReader of MultiPartSource.Buffer: discard buffer: seq[byte] @@ -76,10 +76,13 @@ proc parseUntil*(s, until: openarray[byte]): int = proc init*[A: BChar, B: BChar](mpt: typedesc[MultiPartReader], buffer: openarray[A], boundary: openarray[B]): MultiPartReader = - # Boundary should not be empty. + ## Create new MultiPartReader instance with `buffer` interface. + ## + ## ``buffer`` - is buffer which will be used to read data. + ## ``boundary`` - is multipart boundary, this value must not be empty. doAssert(len(boundary) > 0) - # Our internal boundary has format `<-><->`, so we can reuse - # different parts of this sequence for processing. + # Our internal boundary has format `<-><->`, so we can + # reuse different parts of this sequence for processing. var fboundary = newSeq[byte](len(boundary) + 4) fboundary[0] = 0x0D'u8 fboundary[1] = 0x0A'u8 @@ -94,13 +97,17 @@ proc init*[A: BChar, B: BChar](mpt: typedesc[MultiPartReader], buffer: buf, offset: 0, boundary: fboundary) proc new*[B: BChar](mpt: typedesc[MultiPartReaderRef], - stream: AsyncStreamReader, - boundary: openarray[B], - partHeadersMaxSize = 4096): MultiPartReaderRef = - # Boundary should not be empty. + stream: AsyncStreamReader, + boundary: openarray[B], + partHeadersMaxSize = 4096): MultiPartReaderRef = + ## Create new MultiPartReader instance with `stream` interface. + ## + ## ``stream`` is stream used to read data. + ## ``boundary`` is multipart boundary, this value must not be empty. + ## ``partHeadersMaxSize`` is maximum size of multipart's headers. doAssert(len(boundary) > 0) - # Our internal boundary has format `<-><->`, so we can reuse - # different parts of this sequence for processing. + # Our internal boundary has format `<-><->`, so we can + # reuse different parts of this sequence for processing. var fboundary = newSeq[byte](len(boundary) + 4) fboundary[0] = 0x0D'u8 fboundary[1] = 0x0A'u8 @@ -111,7 +118,7 @@ proc new*[B: BChar](mpt: typedesc[MultiPartReaderRef], stream: stream, offset: 0, boundary: fboundary, buffer: newSeq[byte](partHeadersMaxSize)) -func setPartNames*(part: var MultiPart): HttpResult[void] = +func setPartNames(part: var MultiPart): HttpResult[void] = if part.headers.count("content-disposition") != 1: return err("Content-Disposition header is incorrect") var header = part.headers.getString("content-disposition") @@ -192,28 +199,55 @@ proc readPart*(mpr: MultiPartReaderRef): Future[MultiPart] {.async.} = raise newMultipartReadError("Error reading multipart message") proc getBody*(mp: MultiPart): Future[seq[byte]] {.async.} = + ## Get multipart's ``mp`` value as sequence of bytes. case mp.kind of MultiPartSource.Stream: try: let res = await mp.stream.read() return res except AsyncStreamError: - raise newException(HttpCriticalError, "Could not read multipart body") + raise newException(MultipartReadError, "Could not read multipart body") of MultiPartSource.Buffer: return mp.buffer proc consumeBody*(mp: MultiPart) {.async.} = + ## Discard multipart's ``mp`` value. case mp.kind of MultiPartSource.Stream: try: await mp.stream.consume() except AsyncStreamError: - raise newException(HttpCriticalError, "Could not consume multipart body") + raise newException(MultipartReadError, "Could not consume multipart body") of MultiPartSource.Buffer: discard +proc getBodyStream*(mp: MultiPart): HttpResult[AsyncStreamReader] = + ## Get multipart's ``mp`` stream, which can be used to obtain value of the + ## part. + case mp.kind + of MultiPartSource.Stream: + ok(mp.stream) + else: + err("Could not obtain stream from buffer-like part") + +proc close*(mp: MultiPart) {.async.} = + ## Close and release MultiPart's ``mp`` stream and resources. + case mp.kind + of MultiPartSource.Stream: + await closeWait(mp.stream) + else: + discard + +proc close*(mpr: MultiPartReaderRef) {.async.} = + ## Close and release MultiPartReader's ``mpr`` stream and resources. + case mpr.kind + of MultiPartSource.Stream: + await mpr.stream.closeWait() + else: + discard + proc getBytes*(mp: MultiPart): seq[byte] = - ## Returns MultiPart value as sequence of bytes. + ## Returns value for MultiPart ``mp`` as sequence of bytes. case mp.kind of MultiPartSource.Buffer: mp.buffer @@ -222,7 +256,7 @@ proc getBytes*(mp: MultiPart): seq[byte] = mp.buffer proc getString*(mp: MultiPart): string = - ## Returns MultiPart value as string. + ## Returns value for MultiPart ``mp`` as string. case mp.kind of MultiPartSource.Buffer: if len(mp.buffer) > 0: @@ -240,7 +274,28 @@ proc getString*(mp: MultiPart): string = else: "" +proc atEoM*(mpr: var MultiPartReader): bool = + ## Procedure returns ``true`` if MultiPartReader has reached the end of + ## multipart message. + case mpr.kind + of MultiPartSource.Buffer: + mpr.offset >= len(mpr.buffer) + of MultiPartSource.Stream: + mpr.stream.atEof() + +proc atEoM*(mpr: MultiPartReaderRef): bool = + ## Procedure returns ``true`` if MultiPartReader has reached the end of + ## multipart message. + case mpr.kind + of MultiPartSource.Buffer: + mpr.offset >= len(mpr.buffer) + of MultiPartSource.Stream: + mpr.stream.atEof() + proc getPart*(mpr: var MultiPartReader): Result[MultiPart, string] = + ## Get multipart part from MultiPartReader instance. + ## + ## This procedure will work only for MultiPartReader with buffer source. doAssert(mpr.kind == MultiPartSource.Buffer) if mpr.offset >= len(mpr.buffer): return err("End of multipart form encountered") @@ -320,6 +375,15 @@ proc getPart*(mpr: var MultiPartReader): Result[MultiPart, string] = err("Incorrect multipart form") func getMultipartBoundary*(ch: openarray[string]): HttpResult[string] = + ## Returns ``multipart/form-data`` boundary value from ``Content-Type`` + ## header. + ## + ## The procedure carries out all the necessary checks: + ## 1) There should be single `Content-Type` header value in headers. + ## 2) `Content-Type` must be ``multipart/form-data``. + ## 3) `boundary` value must be present + ## 4) `boundary` value must be less then 70 characters length and + ## all characters should be part of alphabet. if len(ch) > 1: err("Multiple Content-Type headers found") else: @@ -347,12 +411,3 @@ func getMultipartBoundary*(ch: openarray[string]): HttpResult[string] = '\'' .. ')', '+' .. '/', ':', '=', '?', '_'}: return err("Content-Type boundary alphabat incorrect") ok(candidate) - -when isMainModule: - var buf = "--------------------------5e7d0dd0ed6eb849\r\nContent-Disposition: form-data; =\"key1\"\r\n\r\nvalue1\r\n--------------------------5e7d0dd0ed6eb849\r\nContent-Disposition: form-data; name=\"key2\"\r\n\r\nvalue2\r\n--------------------------5e7d0dd0ed6eb849--" - var reader = MultiPartReader.init(buf, "------------------------5e7d0dd0ed6eb849") - echo getPart(reader) - echo "====" - echo getPart(reader) - echo "====" - echo getPart(reader) diff --git a/chronos/streams/asyncstream.nim b/chronos/streams/asyncstream.nim index 1a63a4e..05dc16f 100644 --- a/chronos/streams/asyncstream.nim +++ b/chronos/streams/asyncstream.nim @@ -730,18 +730,18 @@ proc write*(wstream: AsyncStreamWriter, sbytes: seq[byte], if isNil(wstream.wsource): var res: int try: - res = await write(wstream.tsource, sbytes, msglen) + res = await write(wstream.tsource, sbytes, length) except CancelledError as exc: raise exc except CatchableError as exc: raise newAsyncStreamWriteError(exc) if res != length: raise newAsyncStreamIncompleteError() - wstream.bytesCount = wstream.bytesCount + uint64(msglen) + wstream.bytesCount = wstream.bytesCount + uint64(length) else: if isNil(wstream.writerLoop): - await write(wstream.wsource, sbytes, msglen) - wstream.bytesCount = wstream.bytesCount + uint64(msglen) + await write(wstream.wsource, sbytes, length) + wstream.bytesCount = wstream.bytesCount + uint64(length) else: var item = WriteItem(kind: Sequence) if not isLiteral(sbytes): @@ -778,18 +778,18 @@ proc write*(wstream: AsyncStreamWriter, sbytes: string, if isNil(wstream.wsource): var res: int try: - res = await write(wstream.tsource, sbytes, msglen) + res = await write(wstream.tsource, sbytes, length) except CancelledError as exc: raise exc except CatchableError as exc: raise newAsyncStreamWriteError(exc) if res != length: raise newAsyncStreamIncompleteError() - wstream.bytesCount = wstream.bytesCount + uint64(msglen) + wstream.bytesCount = wstream.bytesCount + uint64(length) else: if isNil(wstream.writerLoop): - await write(wstream.wsource, sbytes, msglen) - wstream.bytesCount = wstream.bytesCount + uint64(msglen) + await write(wstream.wsource, sbytes, length) + wstream.bytesCount = wstream.bytesCount + uint64(length) else: var item = WriteItem(kind: String) if not isLiteral(sbytes):