From 5af418f85079f23895da8ba31f9ef1447c62726c Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 14 Jun 2021 17:20:28 -0600 Subject: [PATCH] add accepts api (#51) * add accepts api * Fix partial frame handling and allow extensions to hijack the flow (#56) * moving files around * wip * wip * move tls example into server example * add tls functionality * rename * rename * fix tests * move extension related files to own folder * use trace instead of debug * export extensions * rework partial frame handling and closing * rework status codes as distincts * logging * re-enable extensions processing for frames * enable all test for non-tls server * remove tlsserver * remove offset to mask - don't think we need it * pass sessions extensions when calling send/recv * adding encode/decode extensions flow test * move server/client setup to helpers * proper frame order execution on decode * fix tls tests * fix merge * add tls support for `accept` call * fix tests to use accepts & cb --- tests/helpers.nim | 51 +++++++++++++++-------- tests/testwebsockets.nim | 44 -------------------- ws.nimble | 6 +++ ws/frame.nim | 12 +++--- ws/http.nim | 5 +-- ws/http/server.nim | 87 +++++++++++++++++++++++++++++----------- 6 files changed, 112 insertions(+), 93 deletions(-) diff --git a/tests/helpers.nim b/tests/helpers.nim index 13c3753..596afb5 100644 --- a/tests/helpers.nim +++ b/tests/helpers.nim @@ -1,3 +1,5 @@ +{.push raises: [Defect].} + import std/[strutils, random] import pkg/[ chronos, @@ -38,22 +40,39 @@ proc createServer*( flags: set[ServerFlags] = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr}, tlsFlags: set[TLSFlags] = {}, tlsMinVersion = TLSVersion.TLS12, - tlsMaxVersion = TLSVersion.TLS12): HttpServer = - when defined secure: - TlsHttpServer.create( - address = address, - tlsPrivateKey = tlsPrivateKey, - tlsCertificate = tlsCertificate, - handler = handler, - flags = flags, - tlsFlags = tlsFlags, - tlsMinVersion = tlsMinVersion, - tlsMaxVersion = tlsMaxVersion) - else: - HttpServer.create( - address = address, - handler = handler, - flags = flags) + tlsMaxVersion = TLSVersion.TLS12): HttpServer + {.raises: [Defect, HttpError].} = + try: + let server = when defined secure: + TlsHttpServer.create( + address = address, + tlsPrivateKey = tlsPrivateKey, + tlsCertificate = tlsCertificate, + flags = flags, + tlsFlags = tlsFlags, + tlsMinVersion = tlsMinVersion, + tlsMaxVersion = tlsMaxVersion) + else: + HttpServer.create( + address = address, + flags = flags) + + when defined accepts: + proc accepts() {.async, raises: [Defect].} = + try: + let req = await server.accept() + await req.handler() + except TransportOsError as exc: + error "Transport error", exc = exc.msg + + asyncCheck accepts() + else: + server.handler = handler + server.start() + + return server + except CatchableError as exc: + raise newException(Defect, exc.msg) proc connectClient*( address = initTAddress("127.0.0.1:8888"), diff --git a/tests/testwebsockets.nim b/tests/testwebsockets.nim index 56380d7..b69c410 100644 --- a/tests/testwebsockets.nim +++ b/tests/testwebsockets.nim @@ -34,7 +34,6 @@ suite "Test handshake": address = address, handler = handle, flags = {ReuseAddr}) - server.start() let session = await connectClient( address = initTAddress("127.0.0.1:8888"), @@ -55,7 +54,6 @@ suite "Test handshake": address = address, handler = handle, flags = {ReuseAddr}) - server.start() expect WSFailedUpgradeError: let session = await connectClient( @@ -80,26 +78,11 @@ suite "Test handshake": address = address, handler = handle, flags = {ReuseAddr}) - server.start() expect WSFailedUpgradeError: discard await connectClient() test "Test for incorrect scheme": - proc handle(request: HttpRequest) {.async.} = - check request.uri.path == WSPath - - let server = WSServer.new(protos = ["proto"]) - expect WSProtoMismatchError: - let ws = await server.handleRequest(request) - check ws.readyState == ReadyState.Closed - - server = createServer( - address = address, - handler = handle, - flags = {ReuseAddr}) - server.start() - let uri = "wx://127.0.0.1:8888/ws" expect WSWrongUriSchemeError: discard await WebSocket.connect( @@ -127,7 +110,6 @@ suite "Test transmission": address = address, handler = handle, flags = {ReuseAddr}) - server.start() let session = await connectClient() await session.send(testString) @@ -147,7 +129,6 @@ suite "Test transmission": address = address, handler = handle, flags = {ReuseAddr}) - server.start() let session = await connectClient() await session.send(testString) @@ -167,7 +148,6 @@ suite "Test transmission": address = address, handler = handle, flags = {ReuseAddr}) - server.start() let session = await connectClient() var clientRes = await session.recv() @@ -197,7 +177,6 @@ suite "Test ping-pong": address = address, handler = handle, flags = {ReuseAddr}) - server.start() let session = await connectClient( address = initTAddress("127.0.0.1:8888"), @@ -230,7 +209,6 @@ suite "Test ping-pong": address = address, handler = handle, flags = {ReuseAddr}) - server.start() let session = await connectClient( address = initTAddress("127.0.0.1:8888"), @@ -259,7 +237,6 @@ suite "Test ping-pong": address = address, handler = handle, flags = {ReuseAddr}) - server.start() let session = await connectClient( address = initTAddress("127.0.0.1:8888"), @@ -288,7 +265,6 @@ suite "Test ping-pong": address = address, handler = handle, flags = {ReuseAddr}) - server.start() let str = rndStr(126) let session = await connectClient() @@ -325,7 +301,6 @@ suite "Test framing": address = address, handler = handle, flags = {ReuseAddr}) - server.start() let session = await connectClient( address = initTAddress("127.0.0.1:8888"), @@ -347,7 +322,6 @@ suite "Test framing": address = address, handler = handle, flags = {ReuseAddr}) - server.start() let session = await connectClient() @@ -371,7 +345,6 @@ suite "Test Closing": address = address, handler = handle, flags = {ReuseAddr}) - server.start() let session = await connectClient() await waitForClose(session) @@ -403,7 +376,6 @@ suite "Test Closing": address = address, handler = handle, flags = {ReuseAddr}) - server.start() proc clientClose(status: StatusCodes, reason: string): CloseResult {.gcsafe, raises: [Defect].} = @@ -431,7 +403,6 @@ suite "Test Closing": address = address, handler = handle, flags = {ReuseAddr}) - server.start() let session = await connectClient() await session.close() @@ -459,7 +430,6 @@ suite "Test Closing": address = address, handler = handle, flags = {ReuseAddr}) - server.start() proc clientClose(status: StatusCodes, reason: string): CloseResult {.gcsafe, raises: [Defect].} = @@ -489,7 +459,6 @@ suite "Test Closing": address = address, handler = handle, flags = {ReuseAddr}) - server.start() proc closeClient(status: StatusCodes, reason: string): CloseResult {.gcsafe, raises: [Defect].} = @@ -528,7 +497,6 @@ suite "Test Closing": address = address, handler = handle, flags = {ReuseAddr}) - server.start() let session = await connectClient() await session.close(code = StatusCodes(3999)) @@ -547,7 +515,6 @@ suite "Test Closing": address = address, handler = handle, flags = {ReuseAddr}) - server.start() let session = await connectClient() await waitForClose(session) @@ -565,7 +532,6 @@ suite "Test Closing": address = address, handler = handle, flags = {ReuseAddr}) - server.start() let session = await connectClient() @@ -597,7 +563,6 @@ suite "Test Payload": address = address, handler = handle, flags = {ReuseAddr}) - server.start() let session = await connectClient() await session.send(emptyStr) @@ -632,7 +597,6 @@ suite "Test Payload": address = address, handler = handle, flags = {ReuseAddr}) - server.start() let session = await connectClient() @@ -670,7 +634,6 @@ suite "Test Payload": address = address, handler = handle, flags = {ReuseAddr}) - server.start() let session = await connectClient( address = initTAddress("127.0.0.1:8888"), @@ -728,7 +691,6 @@ suite "Test Payload": address = address, handler = handle, flags = {ReuseAddr}) - server.start() let session = await connectClient( address = initTAddress("127.0.0.1:8888"), @@ -787,7 +749,6 @@ suite "Test Payload": address = address, handler = handle, flags = {ReuseAddr}) - server.start() let ws = await connectClient( address = address, @@ -826,7 +787,6 @@ suite "Test Binary message with Payload": address = address, handler = handle, flags = {ReuseAddr}) - server.start() let session = await connectClient() await session.send(emptyData, Opcode.Binary) @@ -852,7 +812,6 @@ suite "Test Binary message with Payload": address = address, handler = handle, flags = {ReuseAddr}) - server.start() let session = await connectClient() @@ -885,7 +844,6 @@ suite "Test Binary message with Payload": address = address, handler = handle, flags = {ReuseAddr}) - server.start() let session = await connectClient( address = initTAddress("127.0.0.1:8888"), @@ -920,7 +878,6 @@ suite "Test Binary message with Payload": address = address, handler = handle, flags = {ReuseAddr}) - server.start() let session = await connectClient( address = initTAddress("127.0.0.1:8888"), @@ -952,7 +909,6 @@ suite "Test Binary message with Payload": address = address, handler = handle, flags = {ReuseAddr}) - server.start() let ws = await connectClient( address = address, diff --git a/ws.nimble b/ws.nimble index 672c16f..373990f 100644 --- a/ws.nimble +++ b/ws.nimble @@ -23,3 +23,9 @@ task test, "run tests": exec "nim --hints:off -d:secure c -r --opt:speed -d:debug --verbosity:0 --hints:off -d:chronicles_log_level=info ./tests/testwebsockets.nim" rmFile "./tests/testwebsockets" + + exec "nim --hints:off -d:accepts c -r --opt:speed -d:debug --verbosity:0 --hints:off -d:chronicles_log_level=info ./tests/testwebsockets.nim" + rmFile "./tests/testwebsockets" + + exec "nim --hints:off -d:secure -d:accepts c -r --opt:speed -d:debug --verbosity:0 --hints:off -d:chronicles_log_level=info ./tests/testwebsockets.nim" + rmFile "./tests/testwebsockets" diff --git a/ws/frame.nim b/ws/frame.nim index afe9fde..1aeff78 100644 --- a/ws/frame.nim +++ b/ws/frame.nim @@ -56,10 +56,11 @@ proc mask*( template remainder*(frame: Frame): uint64 = frame.length - frame.consumed -proc read*(frame: Frame, - reader: AsyncStreamReader, - pbytes: pointer, - nbytes: int): Future[int] {.async.} = +proc read*( + frame: Frame, + reader: AsyncStreamReader, + pbytes: pointer, + nbytes: int): Future[int] {.async.} = # read data from buffered payload if available # e.g. data processed by extensions @@ -79,8 +80,7 @@ proc read*(frame: Frame, mask( pbuf.toOpenArray(0, readLen - 1), frame.maskKey, - frame.consumed.int - ) + frame.consumed.int) frame.consumed += readLen.uint64 return readLen diff --git a/ws/http.nim b/ws/http.nim index c7a4b9c..479674e 100644 --- a/ws/http.nim +++ b/ws/http.nim @@ -7,7 +7,4 @@ import pkg/[ import ./http/client, ./http/server, ./http/common -export uri, httputils, client, server, httptable, tlsstream -export TlsHttpClient, HttpClient, HttpServer, - HttpResponse, HttpRequest, closeWait, sendResponse, - sendError +export uri, httputils, client, server, httptable, tlsstream, common diff --git a/ws/http/server.nim b/ws/http/server.nim index 2173653..827d2da 100644 --- a/ws/http/server.nim +++ b/ws/http/server.nim @@ -42,9 +42,9 @@ proc validateRequest( return ReqStatus.Success -proc handleRequest( +proc parseRequest( server: HttpServer, - stream: AsyncStream) {.async.} = + stream: AsyncStream): Future[HttpRequest] {.async.} = ## Process transport data to the HTTP server ## @@ -82,14 +82,11 @@ proc handleRequest( trace "Remote peer disconnected", address = $remoteAddr return - trace "Received valid HTTP request", address = $remoteAddr - # Call the user's handler. - if server.handler != nil: - await server.handler( - HttpRequest( - headers: hdrs, - stream: stream, - uri: requestData.uri().parseUri())) + debug "Received valid HTTP request", address = $remoteAddr + return HttpRequest( + headers: hdrs, + stream: stream, + uri: requestData.uri().parseUri()) except TransportLimitError: # size of headers exceeds `MaxHttpHeadersSize` trace "maximum size of headers limit reached", address = $remoteAddr @@ -100,27 +97,32 @@ proc handleRequest( except TransportOsError as exc: trace "Problems with networking", address = $remoteAddr, error = exc.msg except CatchableError as exc: - trace "Unknown exception", address = $remoteAddr, error = exc.msg - finally: - await stream.closeWait() + debug "Unknown exception", address = $remoteAddr, error = exc.msg proc handleConnCb( server: StreamServer, transp: StreamTransport) {.async.} = + var stream: AsyncStream + try: + stream = AsyncStream( + reader: newAsyncStreamReader(transp), + writer: newAsyncStreamWriter(transp)) - let stream = AsyncStream( - reader: newAsyncStreamReader(transp), - writer: newAsyncStreamWriter(transp)) + let httpServer = HttpServer(server) + let request = await httpServer.parseRequest(stream) - let httpServer = HttpServer(server) - await httpServer.handleRequest(stream) + await httpServer.handler(request) + except CatchableError as exc: + debug "Exception in HttpHandler", exc = exc.msg + finally: + await stream.closeWait() proc handleTlsConnCb( server: StreamServer, transp: StreamTransport) {.async.} = let tlsHttpServer = TlsHttpServer(server) - let stream = newTLSServerAsyncStream( + let tlsStream = newTLSServerAsyncStream( newAsyncStreamReader(transp), newAsyncStreamWriter(transp), tlsHttpServer.tlsPrivateKey, @@ -129,10 +131,49 @@ proc handleTlsConnCb( maxVersion = tlsHttpServer.maxVersion, flags = tlsHttpServer.tlsFlags) - await HttpServer(tlsHttpServer) - .handleRequest(AsyncStream( - reader: stream.reader, - writer: stream.writer)) + var stream: ASyncStream + try: + stream = AsyncStream( + reader: tlsStream.reader, + writer: tlsStream.writer) + + let httpServer = HttpServer(server) + let request = await httpServer.parseRequest(stream) + + await httpServer.handler(request) + except CatchableError as exc: + debug "Exception in HttpHandler", exc = exc.msg + finally: + await stream.closeWait() + +proc accept*(server: HttpServer | TlsHttpServer): Future[HttpRequest] + {.async, raises: [Defect, HttpError].} = + + if not isNil(server.handler): + raise newException(HttpError, + "Callback already registered - cannot mix callback and accepts stypes!") + + let transp = await StreamServer(server).accept() + var stream: AsyncStream + when server is TlsHttpServer: + let tlsStream = newTLSServerAsyncStream( + newAsyncStreamReader(transp), + newAsyncStreamWriter(transp), + server.tlsPrivateKey, + server.tlsCertificate, + minVersion = server.minVersion, + maxVersion = server.maxVersion, + flags = server.tlsFlags) + + stream = AsyncStream( + reader: tlsStream.reader, + writer: tlsStream.writer) + else: + stream = AsyncStream( + reader: newAsyncStreamReader(transp), + writer: newAsyncStreamWriter(transp)) + + return await server.parseRequest(stream) proc create*( _: typedesc[HttpServer],