From 1b3e47d2f53accae570969dbdddc9ec33887505b Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Thu, 15 Apr 2021 13:30:52 +0530 Subject: [PATCH] Add support for binary frame --- examples/client.nim | 2 +- examples/server.nim | 2 +- examples/tlsclient.nim | 4 +- examples/tlsserver.nim | 4 +- tests/testtlswebsockets.nim | 4 +- tests/testwebsockets.nim | 162 ++++++++++++++++++++++++++++++++++-- ws/ws.nim | 25 ++++-- 7 files changed, 178 insertions(+), 25 deletions(-) diff --git a/examples/client.nim b/examples/client.nim index ea61b99..2a9baee 100644 --- a/examples/client.nim +++ b/examples/client.nim @@ -17,7 +17,7 @@ proc main() {.async.} = while true: try: await ws.send(reqData) - let buff = await ws.recv() + let (buff, _) = await ws.recv() if buff.len <= 0: break diff --git a/examples/server.nim b/examples/server.nim index aacf78d..e007d29 100644 --- a/examples/server.nim +++ b/examples/server.nim @@ -19,7 +19,7 @@ proc process(r: RequestFence): Future[HttpResponseRef] {.async.} = debug "Websocket handshake completed." while true: - let recvData = await ws.recv() + let (recvData, opcode) = await ws.recv() if ws.readyState == ReadyState.Closed: debug "Websocket closed." break diff --git a/examples/tlsclient.nim b/examples/tlsclient.nim index 7b3dbab..5cb117e 100644 --- a/examples/tlsclient.nim +++ b/examples/tlsclient.nim @@ -11,14 +11,14 @@ proc main() {.async.} = Port(8888), path = "/wss", protocols = @["myfancyprotocol"], - flags = {NoVerifyHost,NoVerifyServerName}) + flags = {NoVerifyHost, NoVerifyServerName}) debug "Websocket client: ", State = ws.readyState let reqData = "Hello Server" try: echo "sending client " await ws.send(reqData) - let buff = await ws.recv() + let (buff, _) = await ws.recv() if buff.len <= 0: break let dataStr = string.fromBytes(buff) diff --git a/examples/tlsserver.nim b/examples/tlsserver.nim index 9dc7631..e87539a 100644 --- a/examples/tlsserver.nim +++ b/examples/tlsserver.nim @@ -25,7 +25,7 @@ proc process(r: RequestFence): Future[HttpResponseRef] {.async.} = debug "Websocket handshake completed." # Only reads header for data frame. echo "receiving server " - let recvData = await ws.recv() + let (recvData, opcode) = await ws.recv() if recvData.len <= 0: debug "Empty messages" break @@ -42,7 +42,7 @@ proc process(r: RequestFence): Future[HttpResponseRef] {.async.} = when isMainModule: let address = initTAddress("127.0.0.1:8888") - let serverFlags = {Secure, NotifyDisconnect} + let serverFlags = {Secure, NotifyDisconnect} let socketFlags = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr} let res = SecureHttpServerRef.new( address, process, diff --git a/tests/testtlswebsockets.nim b/tests/testtlswebsockets.nim index 074a1c1..0a4c9e1 100644 --- a/tests/testtlswebsockets.nim +++ b/tests/testtlswebsockets.nim @@ -136,7 +136,7 @@ suite "Test websocket TLS transmission": let request = r.get() check request.uri.path == "/wss" let ws = await createServer(request, "proto") - let servRes = await ws.recv() + let (servRes, _) = await ws.recv() check string.fromBytes(servRes) == testString await waitForClose(ws) return dumbResponse() @@ -191,6 +191,6 @@ suite "Test websocket TLS transmission": protocols = @["proto"], clientFlags) - var clientRes = await wsClient.recv() + let (clientRes, _) = await wsClient.recv() check string.fromBytes(clientRes) == testString await waitForClose(wsClient) diff --git a/tests/testwebsockets.nim b/tests/testwebsockets.nim index d0763f0..a3139be 100644 --- a/tests/testwebsockets.nim +++ b/tests/testwebsockets.nim @@ -15,6 +15,10 @@ proc rndStr*(size: int): string = for _ in .. size: add(result, char(rand(int('A') .. int('z')))) +proc rndBin*(size: int): seq[byte] = + for _ in .. size: + add(result, byte(rand(0 .. 255))) + proc waitForClose(ws: WebSocket) {.async.} = try: while ws.readystate != ReadyState.Closed: @@ -141,7 +145,7 @@ suite "Test transmission": let request = r.get() check request.uri.path == "/ws" let ws = await createServer(request, "proto") - let servRes = await ws.recv() + let (servRes, _) = await ws.recv() check string.fromBytes(servRes) == testString let res = HttpServerRef.new( @@ -165,7 +169,7 @@ suite "Test transmission": let request = r.get() check request.uri.path == "/ws" let ws = await createServer(request, "proto") - let servRes = await ws.recv() + let (servRes, _) = await ws.recv() check string.fromBytes(servRes) == testString await waitForClose(ws) @@ -208,7 +212,7 @@ suite "Test transmission": path = "/ws", protocols = @["proto"]) - var clientRes = await wsClient.recv() + let (clientRes, _) = await wsClient.recv() check string.fromBytes(clientRes) == testString await waitForClose(wsClient) @@ -241,7 +245,7 @@ suite "Test ping-pong": ping = true ) - let respData = await ws.recv() + let (respData, _) = await ws.recv() check string.fromBytes(respData) == testString await waitForClose(ws) @@ -727,8 +731,7 @@ suite "Test Closing": getTracker("stream.server").isLeaked() == false getTracker("stream.transport").isLeaked() == false - -suite "Test Payload": +suite "Test text message with payload": teardown: await server.closeWait() @@ -770,7 +773,7 @@ suite "Test Payload": let request = r.get() check request.uri.path == "/ws" let ws = await createServer(request, "proto") - let servRes = await ws.recv() + let (servRes, _) = await ws.recv() check string.fromBytes(servRes) == emptyStr await waitForClose(ws) @@ -796,7 +799,7 @@ suite "Test Payload": let request = r.get() check request.uri.path == "/ws" let ws = await createServer(request, "proto") - let servRes = await ws.recv() + let (servRes, _) = await ws.recv() check string.fromBytes(servRes) == emptyStr await waitForClose(ws) @@ -858,3 +861,146 @@ suite "Test Payload": getTracker("async.stream.writer").isLeaked() == false getTracker("stream.server").isLeaked() == false getTracker("stream.transport").isLeaked() == false + +suite "Test Binary message with Payload": + teardown: + await server.closeWait() + + test "Test binary message with single empty payload message": + let emptyData = newSeq[byte](0) + proc cb(r: RequestFence): Future[HttpResponseRef] {.async.} = + if r.isErr(): + return dumbResponse() + let request = r.get() + check request.uri.path == "/ws" + let ws = await createServer(request, "proto") + let (servRes, opcode) = await ws.recv() + check: + servRes == emptyData + opcode == Opcode.Binary + await waitForClose(ws) + + let res = HttpServerRef.new( + address, cb) + server = res.get() + server.start() + + let wsClient = await WebSocket.connect( + "127.0.0.1", + Port(8888), + path = "/ws", + protocols = @["proto"]) + + await wsClient.send(emptyData, Opcode.Binary) + await wsClient.close() + + test "Test binary message with multiple empty payload": + let emptyData = newSeq[byte](0) + proc cb(r: RequestFence): Future[HttpResponseRef] {.async.} = + if r.isErr(): + return dumbResponse() + let request = r.get() + check request.uri.path == "/ws" + let ws = await createServer(request, "proto") + let (servRes, opcode) = await ws.recv() + check: + servRes == emptyData + opcode == Opcode.Binary + await waitForClose(ws) + + let res = HttpServerRef.new( + address, cb) + server = res.get() + server.start() + + let wsClient = await WebSocket.connect( + "127.0.0.1", + Port(8888), + path = "/ws", + protocols = @["proto"]) + + for i in 0..3: + await wsClient.send(emptyData, Opcode.Binary) + await wsClient.close() + + test "Send binary data with small text payload": + let testData = rndBin(10) + debug "testData", testData = testData + var ping, pong = false + proc process(r: RequestFence): Future[HttpResponseRef] {.async.} = + if r.isErr(): + return dumbResponse() + let request = r.get() + check request.uri.path == "/ws" + let ws = await createServer( + request, + "proto", + onPing = proc() = + ping = true + ) + let (res, opcode) = await ws.recv() + check: + res == testData + opcode == Opcode.Binary + await waitForClose(ws) + + let res = HttpServerRef.new( + address, process) + server = res.get() + server.start() + + let wsClient = await WebSocket.connect( + "127.0.0.1", + Port(8888), + path = "/ws", + protocols = @["proto"], + onPong = proc() = + pong = true + ) + + await wsClient.send(testData, Opcode.Binary) + await wsClient.close() + + test "Send binary message message with payload of length 125": + let testData = rndBin(125) + var ping, pong = false + proc process(r: RequestFence): Future[HttpResponseRef] {.async.} = + if r.isErr(): + return dumbResponse() + let request = r.get() + check request.uri.path == "/ws" + let ws = await createServer( + request, + "proto", + onPing = proc() = + ping = true + ) + let (res, opcode) = await ws.recv() + check: + res == testData + opcode == Opcode.Binary + await waitForClose(ws) + + let res = HttpServerRef.new( + address, process) + server = res.get() + server.start() + + let wsClient = await WebSocket.connect( + "127.0.0.1", + Port(8888), + path = "/ws", + protocols = @["proto"], + onPong = proc() = + pong = true + ) + + await wsClient.send(testData, Opcode.Binary) + await wsClient.close() + + test "AsyncStream leaks test": + check: + getTracker("async.stream.reader").isLeaked() == false + getTracker("async.stream.writer").isLeaked() == false + getTracker("stream.server").isLeaked() == false + getTracker("stream.transport").isLeaked() == false diff --git a/ws/ws.nim b/ws/ws.nim index 7d08c1f..1ddf4a6 100644 --- a/ws/ws.nim +++ b/ws/ws.nim @@ -576,7 +576,7 @@ proc ping*(ws: WebSocket, data: seq[byte] = @[]): Future[void] = proc recv*( ws: WebSocket, data: pointer, - size: int): Future[int] {.async.} = + size: int): Future[(int, Opcode)] {.async.} = ## Attempts to read up to `size` bytes ## ## Will read as many frames as necessary @@ -587,8 +587,10 @@ proc recv*( ## one byte is available ## - var consumed = 0 - var pbuffer = cast[ptr UncheckedArray[byte]](data) + var + consumed = 0 + pbuffer = cast[ptr UncheckedArray[byte]](data) + opcode = Opcode.Text try: # read the first frame @@ -638,7 +640,7 @@ proc recv*( consumed += read ws.frame.consumed += read.uint64 - return consumed.int + return (consumed.int, opcode) except WebSocketError as exc: debug "Websocket error", exc = exc.msg @@ -653,7 +655,7 @@ proc recv*( proc recv*( ws: WebSocket, - size = WSMaxMessageSize): Future[seq[byte]] {.async.} = + size = WSMaxMessageSize): Future[(seq[byte], Opcode)] {.async.} = ## Attempt to read a full message up to max `size` ## bytes in `frameSize` chunks. ## @@ -665,11 +667,16 @@ proc recv*( ## ## In all other cases it awaits a full message. ## - var res: seq[byte] + var + res: seq[byte] + opcode = Opcode.Text try: while ws.readyState != ReadyState.Closed: - var buf = newSeq[byte](ws.frameSize) - let read = await ws.recv(addr buf[0], buf.len) + var + buf = newSeq[byte](ws.frameSize) + read: int + + (read, opcode) = await ws.recv(addr buf[0], buf.len) if read <= 0: break @@ -695,7 +702,7 @@ proc recv*( except CatchableError as exc: debug "Exception reading frames", exc = exc.msg - return res + return (res, opcode) proc close*( ws: WebSocket,