add accepts api (#51)

* add accepts api

* Fix partial frame handling and allow extensions to hijack the flow (#56)

* moving files around

* wip

* wip

* move tls example into server example

* add tls functionality

* rename

* rename

* fix tests

* move extension related files to own folder

* use trace instead of debug

* export extensions

* rework partial frame handling and closing

* rework status codes as distincts

* logging

* re-enable extensions processing for frames

* enable all test for non-tls server

* remove tlsserver

* remove offset to mask - don't think we need it

* pass sessions extensions when calling send/recv

* adding encode/decode extensions flow test

* move server/client setup to helpers

* proper frame order execution on decode

* fix tls tests

* fix merge

* add tls support for `accept` call

* fix tests to use accepts & cb
This commit is contained in:
Dmitriy Ryajov 2021-06-14 17:20:28 -06:00 committed by GitHub
parent f80278aeee
commit 5af418f850
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 112 additions and 93 deletions

View File

@ -1,3 +1,5 @@
{.push raises: [Defect].}
import std/[strutils, random] import std/[strutils, random]
import pkg/[ import pkg/[
chronos, chronos,
@ -38,22 +40,39 @@ proc createServer*(
flags: set[ServerFlags] = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr}, flags: set[ServerFlags] = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr},
tlsFlags: set[TLSFlags] = {}, tlsFlags: set[TLSFlags] = {},
tlsMinVersion = TLSVersion.TLS12, tlsMinVersion = TLSVersion.TLS12,
tlsMaxVersion = TLSVersion.TLS12): HttpServer = tlsMaxVersion = TLSVersion.TLS12): HttpServer
when defined secure: {.raises: [Defect, HttpError].} =
TlsHttpServer.create( try:
address = address, let server = when defined secure:
tlsPrivateKey = tlsPrivateKey, TlsHttpServer.create(
tlsCertificate = tlsCertificate, address = address,
handler = handler, tlsPrivateKey = tlsPrivateKey,
flags = flags, tlsCertificate = tlsCertificate,
tlsFlags = tlsFlags, flags = flags,
tlsMinVersion = tlsMinVersion, tlsFlags = tlsFlags,
tlsMaxVersion = tlsMaxVersion) tlsMinVersion = tlsMinVersion,
else: tlsMaxVersion = tlsMaxVersion)
HttpServer.create( else:
address = address, HttpServer.create(
handler = handler, address = address,
flags = flags) flags = flags)
when defined accepts:
proc accepts() {.async, raises: [Defect].} =
try:
let req = await server.accept()
await req.handler()
except TransportOsError as exc:
error "Transport error", exc = exc.msg
asyncCheck accepts()
else:
server.handler = handler
server.start()
return server
except CatchableError as exc:
raise newException(Defect, exc.msg)
proc connectClient*( proc connectClient*(
address = initTAddress("127.0.0.1:8888"), address = initTAddress("127.0.0.1:8888"),

View File

@ -34,7 +34,6 @@ suite "Test handshake":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
let session = await connectClient( let session = await connectClient(
address = initTAddress("127.0.0.1:8888"), address = initTAddress("127.0.0.1:8888"),
@ -55,7 +54,6 @@ suite "Test handshake":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
expect WSFailedUpgradeError: expect WSFailedUpgradeError:
let session = await connectClient( let session = await connectClient(
@ -80,26 +78,11 @@ suite "Test handshake":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
expect WSFailedUpgradeError: expect WSFailedUpgradeError:
discard await connectClient() discard await connectClient()
test "Test for incorrect scheme": test "Test for incorrect scheme":
proc handle(request: HttpRequest) {.async.} =
check request.uri.path == WSPath
let server = WSServer.new(protos = ["proto"])
expect WSProtoMismatchError:
let ws = await server.handleRequest(request)
check ws.readyState == ReadyState.Closed
server = createServer(
address = address,
handler = handle,
flags = {ReuseAddr})
server.start()
let uri = "wx://127.0.0.1:8888/ws" let uri = "wx://127.0.0.1:8888/ws"
expect WSWrongUriSchemeError: expect WSWrongUriSchemeError:
discard await WebSocket.connect( discard await WebSocket.connect(
@ -127,7 +110,6 @@ suite "Test transmission":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
let session = await connectClient() let session = await connectClient()
await session.send(testString) await session.send(testString)
@ -147,7 +129,6 @@ suite "Test transmission":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
let session = await connectClient() let session = await connectClient()
await session.send(testString) await session.send(testString)
@ -167,7 +148,6 @@ suite "Test transmission":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
let session = await connectClient() let session = await connectClient()
var clientRes = await session.recv() var clientRes = await session.recv()
@ -197,7 +177,6 @@ suite "Test ping-pong":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
let session = await connectClient( let session = await connectClient(
address = initTAddress("127.0.0.1:8888"), address = initTAddress("127.0.0.1:8888"),
@ -230,7 +209,6 @@ suite "Test ping-pong":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
let session = await connectClient( let session = await connectClient(
address = initTAddress("127.0.0.1:8888"), address = initTAddress("127.0.0.1:8888"),
@ -259,7 +237,6 @@ suite "Test ping-pong":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
let session = await connectClient( let session = await connectClient(
address = initTAddress("127.0.0.1:8888"), address = initTAddress("127.0.0.1:8888"),
@ -288,7 +265,6 @@ suite "Test ping-pong":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
let str = rndStr(126) let str = rndStr(126)
let session = await connectClient() let session = await connectClient()
@ -325,7 +301,6 @@ suite "Test framing":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
let session = await connectClient( let session = await connectClient(
address = initTAddress("127.0.0.1:8888"), address = initTAddress("127.0.0.1:8888"),
@ -347,7 +322,6 @@ suite "Test framing":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
let session = await connectClient() let session = await connectClient()
@ -371,7 +345,6 @@ suite "Test Closing":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
let session = await connectClient() let session = await connectClient()
await waitForClose(session) await waitForClose(session)
@ -403,7 +376,6 @@ suite "Test Closing":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
proc clientClose(status: StatusCodes, reason: string): CloseResult {.gcsafe, proc clientClose(status: StatusCodes, reason: string): CloseResult {.gcsafe,
raises: [Defect].} = raises: [Defect].} =
@ -431,7 +403,6 @@ suite "Test Closing":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
let session = await connectClient() let session = await connectClient()
await session.close() await session.close()
@ -459,7 +430,6 @@ suite "Test Closing":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
proc clientClose(status: StatusCodes, reason: string): CloseResult {.gcsafe, proc clientClose(status: StatusCodes, reason: string): CloseResult {.gcsafe,
raises: [Defect].} = raises: [Defect].} =
@ -489,7 +459,6 @@ suite "Test Closing":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
proc closeClient(status: StatusCodes, reason: string): CloseResult proc closeClient(status: StatusCodes, reason: string): CloseResult
{.gcsafe, raises: [Defect].} = {.gcsafe, raises: [Defect].} =
@ -528,7 +497,6 @@ suite "Test Closing":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
let session = await connectClient() let session = await connectClient()
await session.close(code = StatusCodes(3999)) await session.close(code = StatusCodes(3999))
@ -547,7 +515,6 @@ suite "Test Closing":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
let session = await connectClient() let session = await connectClient()
await waitForClose(session) await waitForClose(session)
@ -565,7 +532,6 @@ suite "Test Closing":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
let session = await connectClient() let session = await connectClient()
@ -597,7 +563,6 @@ suite "Test Payload":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
let session = await connectClient() let session = await connectClient()
await session.send(emptyStr) await session.send(emptyStr)
@ -632,7 +597,6 @@ suite "Test Payload":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
let session = await connectClient() let session = await connectClient()
@ -670,7 +634,6 @@ suite "Test Payload":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
let session = await connectClient( let session = await connectClient(
address = initTAddress("127.0.0.1:8888"), address = initTAddress("127.0.0.1:8888"),
@ -728,7 +691,6 @@ suite "Test Payload":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
let session = await connectClient( let session = await connectClient(
address = initTAddress("127.0.0.1:8888"), address = initTAddress("127.0.0.1:8888"),
@ -787,7 +749,6 @@ suite "Test Payload":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
let ws = await connectClient( let ws = await connectClient(
address = address, address = address,
@ -826,7 +787,6 @@ suite "Test Binary message with Payload":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
let session = await connectClient() let session = await connectClient()
await session.send(emptyData, Opcode.Binary) await session.send(emptyData, Opcode.Binary)
@ -852,7 +812,6 @@ suite "Test Binary message with Payload":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
let session = await connectClient() let session = await connectClient()
@ -885,7 +844,6 @@ suite "Test Binary message with Payload":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
let session = await connectClient( let session = await connectClient(
address = initTAddress("127.0.0.1:8888"), address = initTAddress("127.0.0.1:8888"),
@ -920,7 +878,6 @@ suite "Test Binary message with Payload":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
let session = await connectClient( let session = await connectClient(
address = initTAddress("127.0.0.1:8888"), address = initTAddress("127.0.0.1:8888"),
@ -952,7 +909,6 @@ suite "Test Binary message with Payload":
address = address, address = address,
handler = handle, handler = handle,
flags = {ReuseAddr}) flags = {ReuseAddr})
server.start()
let ws = await connectClient( let ws = await connectClient(
address = address, address = address,

View File

@ -23,3 +23,9 @@ task test, "run tests":
exec "nim --hints:off -d:secure c -r --opt:speed -d:debug --verbosity:0 --hints:off -d:chronicles_log_level=info ./tests/testwebsockets.nim" exec "nim --hints:off -d:secure c -r --opt:speed -d:debug --verbosity:0 --hints:off -d:chronicles_log_level=info ./tests/testwebsockets.nim"
rmFile "./tests/testwebsockets" rmFile "./tests/testwebsockets"
exec "nim --hints:off -d:accepts c -r --opt:speed -d:debug --verbosity:0 --hints:off -d:chronicles_log_level=info ./tests/testwebsockets.nim"
rmFile "./tests/testwebsockets"
exec "nim --hints:off -d:secure -d:accepts c -r --opt:speed -d:debug --verbosity:0 --hints:off -d:chronicles_log_level=info ./tests/testwebsockets.nim"
rmFile "./tests/testwebsockets"

View File

@ -56,10 +56,11 @@ proc mask*(
template remainder*(frame: Frame): uint64 = template remainder*(frame: Frame): uint64 =
frame.length - frame.consumed frame.length - frame.consumed
proc read*(frame: Frame, proc read*(
reader: AsyncStreamReader, frame: Frame,
pbytes: pointer, reader: AsyncStreamReader,
nbytes: int): Future[int] {.async.} = pbytes: pointer,
nbytes: int): Future[int] {.async.} =
# read data from buffered payload if available # read data from buffered payload if available
# e.g. data processed by extensions # e.g. data processed by extensions
@ -79,8 +80,7 @@ proc read*(frame: Frame,
mask( mask(
pbuf.toOpenArray(0, readLen - 1), pbuf.toOpenArray(0, readLen - 1),
frame.maskKey, frame.maskKey,
frame.consumed.int frame.consumed.int)
)
frame.consumed += readLen.uint64 frame.consumed += readLen.uint64
return readLen return readLen

View File

@ -7,7 +7,4 @@ import pkg/[
import ./http/client, ./http/server, ./http/common import ./http/client, ./http/server, ./http/common
export uri, httputils, client, server, httptable, tlsstream export uri, httputils, client, server, httptable, tlsstream, common
export TlsHttpClient, HttpClient, HttpServer,
HttpResponse, HttpRequest, closeWait, sendResponse,
sendError

View File

@ -42,9 +42,9 @@ proc validateRequest(
return ReqStatus.Success return ReqStatus.Success
proc handleRequest( proc parseRequest(
server: HttpServer, server: HttpServer,
stream: AsyncStream) {.async.} = stream: AsyncStream): Future[HttpRequest] {.async.} =
## Process transport data to the HTTP server ## Process transport data to the HTTP server
## ##
@ -82,14 +82,11 @@ proc handleRequest(
trace "Remote peer disconnected", address = $remoteAddr trace "Remote peer disconnected", address = $remoteAddr
return return
trace "Received valid HTTP request", address = $remoteAddr debug "Received valid HTTP request", address = $remoteAddr
# Call the user's handler. return HttpRequest(
if server.handler != nil: headers: hdrs,
await server.handler( stream: stream,
HttpRequest( uri: requestData.uri().parseUri())
headers: hdrs,
stream: stream,
uri: requestData.uri().parseUri()))
except TransportLimitError: except TransportLimitError:
# size of headers exceeds `MaxHttpHeadersSize` # size of headers exceeds `MaxHttpHeadersSize`
trace "maximum size of headers limit reached", address = $remoteAddr trace "maximum size of headers limit reached", address = $remoteAddr
@ -100,27 +97,32 @@ proc handleRequest(
except TransportOsError as exc: except TransportOsError as exc:
trace "Problems with networking", address = $remoteAddr, error = exc.msg trace "Problems with networking", address = $remoteAddr, error = exc.msg
except CatchableError as exc: except CatchableError as exc:
trace "Unknown exception", address = $remoteAddr, error = exc.msg debug "Unknown exception", address = $remoteAddr, error = exc.msg
finally:
await stream.closeWait()
proc handleConnCb( proc handleConnCb(
server: StreamServer, server: StreamServer,
transp: StreamTransport) {.async.} = transp: StreamTransport) {.async.} =
var stream: AsyncStream
try:
stream = AsyncStream(
reader: newAsyncStreamReader(transp),
writer: newAsyncStreamWriter(transp))
let stream = AsyncStream( let httpServer = HttpServer(server)
reader: newAsyncStreamReader(transp), let request = await httpServer.parseRequest(stream)
writer: newAsyncStreamWriter(transp))
let httpServer = HttpServer(server) await httpServer.handler(request)
await httpServer.handleRequest(stream) except CatchableError as exc:
debug "Exception in HttpHandler", exc = exc.msg
finally:
await stream.closeWait()
proc handleTlsConnCb( proc handleTlsConnCb(
server: StreamServer, server: StreamServer,
transp: StreamTransport) {.async.} = transp: StreamTransport) {.async.} =
let tlsHttpServer = TlsHttpServer(server) let tlsHttpServer = TlsHttpServer(server)
let stream = newTLSServerAsyncStream( let tlsStream = newTLSServerAsyncStream(
newAsyncStreamReader(transp), newAsyncStreamReader(transp),
newAsyncStreamWriter(transp), newAsyncStreamWriter(transp),
tlsHttpServer.tlsPrivateKey, tlsHttpServer.tlsPrivateKey,
@ -129,10 +131,49 @@ proc handleTlsConnCb(
maxVersion = tlsHttpServer.maxVersion, maxVersion = tlsHttpServer.maxVersion,
flags = tlsHttpServer.tlsFlags) flags = tlsHttpServer.tlsFlags)
await HttpServer(tlsHttpServer) var stream: ASyncStream
.handleRequest(AsyncStream( try:
reader: stream.reader, stream = AsyncStream(
writer: stream.writer)) reader: tlsStream.reader,
writer: tlsStream.writer)
let httpServer = HttpServer(server)
let request = await httpServer.parseRequest(stream)
await httpServer.handler(request)
except CatchableError as exc:
debug "Exception in HttpHandler", exc = exc.msg
finally:
await stream.closeWait()
proc accept*(server: HttpServer | TlsHttpServer): Future[HttpRequest]
{.async, raises: [Defect, HttpError].} =
if not isNil(server.handler):
raise newException(HttpError,
"Callback already registered - cannot mix callback and accepts stypes!")
let transp = await StreamServer(server).accept()
var stream: AsyncStream
when server is TlsHttpServer:
let tlsStream = newTLSServerAsyncStream(
newAsyncStreamReader(transp),
newAsyncStreamWriter(transp),
server.tlsPrivateKey,
server.tlsCertificate,
minVersion = server.minVersion,
maxVersion = server.maxVersion,
flags = server.tlsFlags)
stream = AsyncStream(
reader: tlsStream.reader,
writer: tlsStream.writer)
else:
stream = AsyncStream(
reader: newAsyncStreamReader(transp),
writer: newAsyncStreamWriter(transp))
return await server.parseRequest(stream)
proc create*( proc create*(
_: typedesc[HttpServer], _: typedesc[HttpServer],