From 8755d03265987d2125c737259ac9c0eaf7d484a6 Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Tue, 8 Dec 2020 01:17:45 +0530 Subject: [PATCH] Implement a HTTP server using httputils package --- src/ws.nim | 167 +++++++++++++++++++++++++++++++++++++----------- test/server.nim | 13 ++-- ws.nimble | 2 + 3 files changed, 138 insertions(+), 44 deletions(-) diff --git a/src/ws.nim b/src/ws.nim index cab9121..199810f 100644 --- a/src/ws.nim +++ b/src/ws.nim @@ -1,4 +1,11 @@ -import chronos, asynchttpserver, base64, nativesockets +import chronos, chronicles, httputils + +const + MaxHttpHeadersSize = 8192 # maximum size of HTTP headers in octets + MaxHttpRequestSize = 128 * 1024 # maximum size of HTTP body in octets + HttpHeadersTimeout = 120.seconds # timeout for receiving headers (120 sec) + HttpBodyTimeout = 12.seconds # timeout for receiving body (12 sec) + HeadersMark = @[byte(0x0D), byte(0x0A), byte(0x0D), byte(0x0A)] type HeaderVerificationError* {.pure.} = enum None @@ -34,54 +41,140 @@ type Closed = 3 # The connection is closed or couldn't be opened. WebSocket* = ref object - tcpSocket*: AsyncSocket version*: int key*: string protocol*: string readyState*: ReadyState masked*: bool # send masked packets + HttpServer* = ref object + server*: StreamServer + callback: AsyncCallback + maxBody: int + + ReqStatus = enum + Success, Error, ErrorFailure + WebSocketError* = object of IOError -proc handshake*(ws: WebSocket, headers: HttpHeaders): Future[error: HeaderVerificationError] {.async.} = - ws.version = parseInt(headers["Sec-WebSocket-Version"]) - ws.key = headers["Sec-WebSocket-Key"].strip() - if headers.hasKey("Sec-WebSocket-Protocol"): - let wantProtocol = headers["Sec-WebSocket-Protocol"].strip() - if ws.protocol != wantProtocol: - return NoProtocolsSupported +proc sendAnswer(transp: StreamTransport, version: HttpVersion, code: HttpCode, + data: string = ""): Future[bool] {.async.} = + var answer = $version + answer.add(" ") + answer.add($code) + answer.add("\r\n") + answer.add("Date: " & httpDate() & "\r\n") + if len(data) > 0: + answer.add("Content-Type: application/json\r\n") + answer.add("Content-Length: " & $len(data) & "\r\n") + answer.add("\r\n") + if len(data) > 0: + answer.add(data) + try: + let res = await transp.write(answer) + if res != len(answer): + result = false + result = true + except: + result = false - let - sh = secureHash(ws.key & "258EAFA5-E914-47DA-95CA-C5AB0DC85B11") - acceptKey = base64.encode(decodeBase16($sh)) +proc validateRequest(transp: StreamTransport, + header: HttpRequestHeader): Future[ReqStatus] {.async.} = + if header.meth notin {MethodGet}: + # Request method is either PUT or DELETE. + debug "GET method is only allowed", address = transp.remoteAddress() + if await transp.sendAnswer(header.version, Http405): + result = Error + else: + result = ErrorFailure + return - var response = "HTTP/1.1 101 Web Socket Protocol Handshake\c\L" - response.add("Sec-WebSocket-Accept: " & acceptKey & "\c\L") - response.add("Connection: Upgrade\c\L") - response.add("Upgrade: webSocket\c\L") + let length = header.contentLength() + if length <= 0: + # request length could not be calculated. + debug "Content-Length is missing or 0", address = transp.remoteAddress() + if await transp.sendAnswer(header.version, Http411): + result = Error + else: + result = ErrorFailure + return - if ws.protocol != "": - response.add("Sec-WebSocket-Protocol: " & ws.protocol & "\c\L") - response.add "\c\L" + if length > MaxHttpRequestSize: + # request length is more then `MaxHttpRequestSize`. + debug "Maximum size of request body reached", + address = transp.remoteAddress() + if await transp.sendAnswer(header.version, Http413): + result = Error + else: + result = ErrorFailure + return - await ws.tcpSocket.send(response) - ws.readyState = Open + result = Success -proc newWebSocket*(req: Request, protocol: string = ""): Future[tuple[ws: AsyncWebSocket, error: HeaderVerificationError]] {.async.} = - if not req.headers.hasKey("Sec-WebSocket-Version"): - return ("", UnsupportedVersion) +proc serveClient(server: StreamServer, transp: StreamTransport) {.async, gcsafe.} = + ## Process transport data to the RPC server + var buffer = newSeq[byte](MaxHttpHeadersSize) + var header: HttpRequestHeader + var connection: string - var ws = WebSocket() - ws.masked = false - # Todo: Change this to chronos AsyncFD - ws.tcpSocket = req.client - ws.protocol = protocol - let (ws, error) = await ws.handshake(req.headers) - return ws, error + info "Received connection", address = $transp.remoteAddress() + try: + let hlenfut = transp.readUntil(addr buffer[0], MaxHttpHeadersSize, HeadersMark) + let ores = await withTimeout(hlenfut, HttpHeadersTimeout) + if not ores: + # Timeout + debug "Timeout expired while receiving headers", + address = transp.remoteAddress() + let res = await transp.sendAnswer(HttpVersion11, Http408) + await transp.closeWait() + return + else: + let hlen = hlenfut.read() + buffer.setLen(hlen) + header = buffer.parseRequest() + if header.failed(): + # Header could not be parsed + debug "Malformed header received", + address = transp.remoteAddress() + let res = await transp.sendAnswer(HttpVersion11, Http400) + await transp.closeWait() + return + except TransportLimitError: + # size of headers exceeds `MaxHttpHeadersSize` + debug "Maximum size of headers limit reached", + address = transp.remoteAddress() + let res = await transp.sendAnswer(HttpVersion11, Http413) + await transp.closeWait() + return + except TransportIncompleteError: + # remote peer disconnected + debug "Remote peer disconnected", address = transp.remoteAddress() + await transp.closeWait() + return + except TransportOsError as exc: + debug "Problems with networking", address = transp.remoteAddress(), + error = exc.msg + await transp.closeWait() + return + except CatchableError as exc: + debug "Unknown exception", address = transp.remoteAddress(), + error = exc.msg + await transp.closeWait() + return -proc close*(ws: WebSocket) = - ws.readyState = Closed - proc close() {.async.} = - await ws.send("", Close) - ws.tcpSocket.close() - asyncCheck close() + let vres = await validateRequest(transp, header) + + if vres == Success: + trace "Received valid RPC request", address = $transp.remoteAddress() + info "Header: ", header + debug "Disconnecting client", address = transp.remoteAddress() + await transp.closeWait() + elif vres == ErrorFailure: + debug "Remote peer disconnected", address = transp.remoteAddress() + await transp.closeWait() + +proc newHttpServer*(address: string, + flags: set[ServerFlags] = {ReuseAddr}): HttpServer = + let address = initTAddress(address) + new result + result.server = createStreamServer(address, serveClient, {ReuseAddr}) diff --git a/test/server.nim b/test/server.nim index 7789682..9dba1ce 100644 --- a/test/server.nim +++ b/test/server.nim @@ -1,8 +1,7 @@ -import ws, chronos, asynchttpserver +import ws, chronos -proc cb(req: Request) {.async.} = - let (ws, error) = await newWebSocket(req) - ws.close() - -var server = newAsyncHttpServer() -waitFor server.serve(Port(9001), cb) +when isMainModule: + let address = "127.0.0.1:8888" + var httpServer = newHttpServer(address) + httpServer.server.start() + waitFor httpServer.server.join() diff --git a/ws.nimble b/ws.nimble index 1f47f4e..9364f15 100644 --- a/ws.nimble +++ b/ws.nimble @@ -6,6 +6,8 @@ license = "MIT" requires "nim >= 1.2.6" requires "chronos >= 2.5.2 & < 3.0.0" +requires "httputils >= 0.2.0" +requires "chronicles >= 0.10.0" task lint, "format source files according to the official style guide": exec "./lint.nims"