Add support for binary frame

This commit is contained in:
Arijit Das 2021-04-15 13:30:52 +05:30 committed by Dmitriy Ryajov
parent f37e4bb19a
commit 1b3e47d2f5
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
7 changed files with 178 additions and 25 deletions

View File

@ -17,7 +17,7 @@ proc main() {.async.} =
while true:
try:
await ws.send(reqData)
let buff = await ws.recv()
let (buff, _) = await ws.recv()
if buff.len <= 0:
break

View File

@ -19,7 +19,7 @@ proc process(r: RequestFence): Future[HttpResponseRef] {.async.} =
debug "Websocket handshake completed."
while true:
let recvData = await ws.recv()
let (recvData, opcode) = await ws.recv()
if ws.readyState == ReadyState.Closed:
debug "Websocket closed."
break

View File

@ -11,14 +11,14 @@ proc main() {.async.} =
Port(8888),
path = "/wss",
protocols = @["myfancyprotocol"],
flags = {NoVerifyHost,NoVerifyServerName})
flags = {NoVerifyHost, NoVerifyServerName})
debug "Websocket client: ", State = ws.readyState
let reqData = "Hello Server"
try:
echo "sending client "
await ws.send(reqData)
let buff = await ws.recv()
let (buff, _) = await ws.recv()
if buff.len <= 0:
break
let dataStr = string.fromBytes(buff)

View File

@ -25,7 +25,7 @@ proc process(r: RequestFence): Future[HttpResponseRef] {.async.} =
debug "Websocket handshake completed."
# Only reads header for data frame.
echo "receiving server "
let recvData = await ws.recv()
let (recvData, opcode) = await ws.recv()
if recvData.len <= 0:
debug "Empty messages"
break
@ -42,7 +42,7 @@ proc process(r: RequestFence): Future[HttpResponseRef] {.async.} =
when isMainModule:
let address = initTAddress("127.0.0.1:8888")
let serverFlags = {Secure, NotifyDisconnect}
let serverFlags = {Secure, NotifyDisconnect}
let socketFlags = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr}
let res = SecureHttpServerRef.new(
address, process,

View File

@ -136,7 +136,7 @@ suite "Test websocket TLS transmission":
let request = r.get()
check request.uri.path == "/wss"
let ws = await createServer(request, "proto")
let servRes = await ws.recv()
let (servRes, _) = await ws.recv()
check string.fromBytes(servRes) == testString
await waitForClose(ws)
return dumbResponse()
@ -191,6 +191,6 @@ suite "Test websocket TLS transmission":
protocols = @["proto"],
clientFlags)
var clientRes = await wsClient.recv()
let (clientRes, _) = await wsClient.recv()
check string.fromBytes(clientRes) == testString
await waitForClose(wsClient)

View File

@ -15,6 +15,10 @@ proc rndStr*(size: int): string =
for _ in .. size:
add(result, char(rand(int('A') .. int('z'))))
proc rndBin*(size: int): seq[byte] =
for _ in .. size:
add(result, byte(rand(0 .. 255)))
proc waitForClose(ws: WebSocket) {.async.} =
try:
while ws.readystate != ReadyState.Closed:
@ -141,7 +145,7 @@ suite "Test transmission":
let request = r.get()
check request.uri.path == "/ws"
let ws = await createServer(request, "proto")
let servRes = await ws.recv()
let (servRes, _) = await ws.recv()
check string.fromBytes(servRes) == testString
let res = HttpServerRef.new(
@ -165,7 +169,7 @@ suite "Test transmission":
let request = r.get()
check request.uri.path == "/ws"
let ws = await createServer(request, "proto")
let servRes = await ws.recv()
let (servRes, _) = await ws.recv()
check string.fromBytes(servRes) == testString
await waitForClose(ws)
@ -208,7 +212,7 @@ suite "Test transmission":
path = "/ws",
protocols = @["proto"])
var clientRes = await wsClient.recv()
let (clientRes, _) = await wsClient.recv()
check string.fromBytes(clientRes) == testString
await waitForClose(wsClient)
@ -241,7 +245,7 @@ suite "Test ping-pong":
ping = true
)
let respData = await ws.recv()
let (respData, _) = await ws.recv()
check string.fromBytes(respData) == testString
await waitForClose(ws)
@ -727,8 +731,7 @@ suite "Test Closing":
getTracker("stream.server").isLeaked() == false
getTracker("stream.transport").isLeaked() == false
suite "Test Payload":
suite "Test text message with payload":
teardown:
await server.closeWait()
@ -770,7 +773,7 @@ suite "Test Payload":
let request = r.get()
check request.uri.path == "/ws"
let ws = await createServer(request, "proto")
let servRes = await ws.recv()
let (servRes, _) = await ws.recv()
check string.fromBytes(servRes) == emptyStr
await waitForClose(ws)
@ -796,7 +799,7 @@ suite "Test Payload":
let request = r.get()
check request.uri.path == "/ws"
let ws = await createServer(request, "proto")
let servRes = await ws.recv()
let (servRes, _) = await ws.recv()
check string.fromBytes(servRes) == emptyStr
await waitForClose(ws)
@ -858,3 +861,146 @@ suite "Test Payload":
getTracker("async.stream.writer").isLeaked() == false
getTracker("stream.server").isLeaked() == false
getTracker("stream.transport").isLeaked() == false
suite "Test Binary message with Payload":
teardown:
await server.closeWait()
test "Test binary message with single empty payload message":
let emptyData = newSeq[byte](0)
proc cb(r: RequestFence): Future[HttpResponseRef] {.async.} =
if r.isErr():
return dumbResponse()
let request = r.get()
check request.uri.path == "/ws"
let ws = await createServer(request, "proto")
let (servRes, opcode) = await ws.recv()
check:
servRes == emptyData
opcode == Opcode.Binary
await waitForClose(ws)
let res = HttpServerRef.new(
address, cb)
server = res.get()
server.start()
let wsClient = await WebSocket.connect(
"127.0.0.1",
Port(8888),
path = "/ws",
protocols = @["proto"])
await wsClient.send(emptyData, Opcode.Binary)
await wsClient.close()
test "Test binary message with multiple empty payload":
let emptyData = newSeq[byte](0)
proc cb(r: RequestFence): Future[HttpResponseRef] {.async.} =
if r.isErr():
return dumbResponse()
let request = r.get()
check request.uri.path == "/ws"
let ws = await createServer(request, "proto")
let (servRes, opcode) = await ws.recv()
check:
servRes == emptyData
opcode == Opcode.Binary
await waitForClose(ws)
let res = HttpServerRef.new(
address, cb)
server = res.get()
server.start()
let wsClient = await WebSocket.connect(
"127.0.0.1",
Port(8888),
path = "/ws",
protocols = @["proto"])
for i in 0..3:
await wsClient.send(emptyData, Opcode.Binary)
await wsClient.close()
test "Send binary data with small text payload":
let testData = rndBin(10)
debug "testData", testData = testData
var ping, pong = false
proc process(r: RequestFence): Future[HttpResponseRef] {.async.} =
if r.isErr():
return dumbResponse()
let request = r.get()
check request.uri.path == "/ws"
let ws = await createServer(
request,
"proto",
onPing = proc() =
ping = true
)
let (res, opcode) = await ws.recv()
check:
res == testData
opcode == Opcode.Binary
await waitForClose(ws)
let res = HttpServerRef.new(
address, process)
server = res.get()
server.start()
let wsClient = await WebSocket.connect(
"127.0.0.1",
Port(8888),
path = "/ws",
protocols = @["proto"],
onPong = proc() =
pong = true
)
await wsClient.send(testData, Opcode.Binary)
await wsClient.close()
test "Send binary message message with payload of length 125":
let testData = rndBin(125)
var ping, pong = false
proc process(r: RequestFence): Future[HttpResponseRef] {.async.} =
if r.isErr():
return dumbResponse()
let request = r.get()
check request.uri.path == "/ws"
let ws = await createServer(
request,
"proto",
onPing = proc() =
ping = true
)
let (res, opcode) = await ws.recv()
check:
res == testData
opcode == Opcode.Binary
await waitForClose(ws)
let res = HttpServerRef.new(
address, process)
server = res.get()
server.start()
let wsClient = await WebSocket.connect(
"127.0.0.1",
Port(8888),
path = "/ws",
protocols = @["proto"],
onPong = proc() =
pong = true
)
await wsClient.send(testData, Opcode.Binary)
await wsClient.close()
test "AsyncStream leaks test":
check:
getTracker("async.stream.reader").isLeaked() == false
getTracker("async.stream.writer").isLeaked() == false
getTracker("stream.server").isLeaked() == false
getTracker("stream.transport").isLeaked() == false

View File

@ -576,7 +576,7 @@ proc ping*(ws: WebSocket, data: seq[byte] = @[]): Future[void] =
proc recv*(
ws: WebSocket,
data: pointer,
size: int): Future[int] {.async.} =
size: int): Future[(int, Opcode)] {.async.} =
## Attempts to read up to `size` bytes
##
## Will read as many frames as necessary
@ -587,8 +587,10 @@ proc recv*(
## one byte is available
##
var consumed = 0
var pbuffer = cast[ptr UncheckedArray[byte]](data)
var
consumed = 0
pbuffer = cast[ptr UncheckedArray[byte]](data)
opcode = Opcode.Text
try:
# read the first frame
@ -638,7 +640,7 @@ proc recv*(
consumed += read
ws.frame.consumed += read.uint64
return consumed.int
return (consumed.int, opcode)
except WebSocketError as exc:
debug "Websocket error", exc = exc.msg
@ -653,7 +655,7 @@ proc recv*(
proc recv*(
ws: WebSocket,
size = WSMaxMessageSize): Future[seq[byte]] {.async.} =
size = WSMaxMessageSize): Future[(seq[byte], Opcode)] {.async.} =
## Attempt to read a full message up to max `size`
## bytes in `frameSize` chunks.
##
@ -665,11 +667,16 @@ proc recv*(
##
## In all other cases it awaits a full message.
##
var res: seq[byte]
var
res: seq[byte]
opcode = Opcode.Text
try:
while ws.readyState != ReadyState.Closed:
var buf = newSeq[byte](ws.frameSize)
let read = await ws.recv(addr buf[0], buf.len)
var
buf = newSeq[byte](ws.frameSize)
read: int
(read, opcode) = await ws.recv(addr buf[0], buf.len)
if read <= 0:
break
@ -695,7 +702,7 @@ proc recv*(
except CatchableError as exc:
debug "Exception reading frames", exc = exc.msg
return res
return (res, opcode)
proc close*(
ws: WebSocket,