use binary flag

This commit is contained in:
Dmitriy Ryajov 2021-05-22 03:04:40 -06:00
parent 1b3e47d2f5
commit 352bb6b3a4
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
7 changed files with 62 additions and 55 deletions

View File

@ -17,7 +17,7 @@ proc main() {.async.} =
while true: while true:
try: try:
await ws.send(reqData) await ws.send(reqData)
let (buff, _) = await ws.recv() let buff = await ws.recv()
if buff.len <= 0: if buff.len <= 0:
break break

View File

@ -19,12 +19,14 @@ proc process(r: RequestFence): Future[HttpResponseRef] {.async.} =
debug "Websocket handshake completed." debug "Websocket handshake completed."
while true: while true:
let (recvData, opcode) = await ws.recv() let recvData = await ws.recv()
if ws.readyState == ReadyState.Closed: if ws.readyState == ReadyState.Closed:
debug "Websocket closed." debug "Websocket closed."
break break
debug "Client Response: ", size = recvData.len debug "Client Response: ", size = recvData.len
await ws.send(recvData, Opcode.Text) await ws.send(recvData,
if ws.binary: Opcode.Binary else: Opcode.Text)
except WebSocketError as exc: except WebSocketError as exc:
error "WebSocket error:", exception = exc.msg error "WebSocket error:", exception = exc.msg

View File

@ -18,7 +18,7 @@ proc main() {.async.} =
try: try:
echo "sending client " echo "sending client "
await ws.send(reqData) await ws.send(reqData)
let (buff, _) = await ws.recv() let buff = await ws.recv()
if buff.len <= 0: if buff.len <= 0:
break break
let dataStr = string.fromBytes(buff) let dataStr = string.fromBytes(buff)

View File

@ -25,7 +25,7 @@ proc process(r: RequestFence): Future[HttpResponseRef] {.async.} =
debug "Websocket handshake completed." debug "Websocket handshake completed."
# Only reads header for data frame. # Only reads header for data frame.
echo "receiving server " echo "receiving server "
let (recvData, opcode) = await ws.recv() let recvData = await ws.recv()
if recvData.len <= 0: if recvData.len <= 0:
debug "Empty messages" debug "Empty messages"
break break
@ -33,7 +33,8 @@ proc process(r: RequestFence): Future[HttpResponseRef] {.async.} =
if ws.readyState == ReadyState.Closed: if ws.readyState == ReadyState.Closed:
return return
debug "Response: ", data = string.fromBytes(recvData) debug "Response: ", data = string.fromBytes(recvData)
await ws.send(recvData, Opcode.Text) await ws.send(recvData,
if ws.binary: Opcode.Binary else: Opcode.Text)
except WebSocketError: except WebSocketError:
error "WebSocket error:", exception = getCurrentExceptionMsg() error "WebSocket error:", exception = getCurrentExceptionMsg()
discard await request.respond(Http200, "Hello World") discard await request.respond(Http200, "Hello World")

View File

@ -136,7 +136,7 @@ suite "Test websocket TLS transmission":
let request = r.get() let request = r.get()
check request.uri.path == "/wss" check request.uri.path == "/wss"
let ws = await createServer(request, "proto") let ws = await createServer(request, "proto")
let (servRes, _) = await ws.recv() let servRes = await ws.recv()
check string.fromBytes(servRes) == testString check string.fromBytes(servRes) == testString
await waitForClose(ws) await waitForClose(ws)
return dumbResponse() return dumbResponse()
@ -191,6 +191,6 @@ suite "Test websocket TLS transmission":
protocols = @["proto"], protocols = @["proto"],
clientFlags) clientFlags)
let (clientRes, _) = await wsClient.recv() let clientRes = await wsClient.recv()
check string.fromBytes(clientRes) == testString check string.fromBytes(clientRes) == testString
await waitForClose(wsClient) await waitForClose(wsClient)

View File

@ -145,7 +145,7 @@ suite "Test transmission":
let request = r.get() let request = r.get()
check request.uri.path == "/ws" check request.uri.path == "/ws"
let ws = await createServer(request, "proto") let ws = await createServer(request, "proto")
let (servRes, _) = await ws.recv() let servRes = await ws.recv()
check string.fromBytes(servRes) == testString check string.fromBytes(servRes) == testString
let res = HttpServerRef.new( let res = HttpServerRef.new(
@ -169,7 +169,7 @@ suite "Test transmission":
let request = r.get() let request = r.get()
check request.uri.path == "/ws" check request.uri.path == "/ws"
let ws = await createServer(request, "proto") let ws = await createServer(request, "proto")
let (servRes, _) = await ws.recv() let servRes = await ws.recv()
check string.fromBytes(servRes) == testString check string.fromBytes(servRes) == testString
await waitForClose(ws) await waitForClose(ws)
@ -212,7 +212,7 @@ suite "Test transmission":
path = "/ws", path = "/ws",
protocols = @["proto"]) protocols = @["proto"])
let (clientRes, _) = await wsClient.recv() var clientRes = await wsClient.recv()
check string.fromBytes(clientRes) == testString check string.fromBytes(clientRes) == testString
await waitForClose(wsClient) await waitForClose(wsClient)
@ -241,11 +241,11 @@ suite "Test ping-pong":
let ws = await createServer( let ws = await createServer(
request, request,
"proto", "proto",
onPing = proc(data: openArray[byte] = []) = onPing = proc(data: openArray[byte]) =
ping = true ping = true
) )
let (respData, _) = await ws.recv() let respData = await ws.recv()
check string.fromBytes(respData) == testString check string.fromBytes(respData) == testString
await waitForClose(ws) await waitForClose(ws)
@ -260,7 +260,7 @@ suite "Test ping-pong":
path = "/ws", path = "/ws",
protocols = @["proto"], protocols = @["proto"],
frameSize = maxFrameSize, frameSize = maxFrameSize,
onPong = proc(data: openArray[byte] = []) = onPong = proc(data: openArray[byte]) =
pong = true pong = true
) )
@ -310,7 +310,7 @@ suite "Test ping-pong":
let ws = await createServer( let ws = await createServer(
request, request,
"proto", "proto",
onPong = proc(data: openArray[byte] = []) = onPong = proc(data: openArray[byte]) =
pong = true pong = true
) )
@ -326,7 +326,7 @@ suite "Test ping-pong":
Port(8888), Port(8888),
path = "/ws", path = "/ws",
protocols = @["proto"], protocols = @["proto"],
onPing = proc(data: openArray[byte] = []) = onPing = proc(data: openArray[byte]) =
ping = true ping = true
) )
@ -346,7 +346,7 @@ suite "Test ping-pong":
let ws = await createServer( let ws = await createServer(
request, request,
"proto", "proto",
onPing = proc(data: openArray[byte] = []) = onPing = proc(data: openArray[byte]) =
ping = true ping = true
) )
await waitForClose(ws) await waitForClose(ws)
@ -363,7 +363,7 @@ suite "Test ping-pong":
Port(8888), Port(8888),
path = "/ws", path = "/ws",
protocols = @["proto"], protocols = @["proto"],
onPong = proc(data: openArray[byte] = []) = onPong = proc(data: openArray[byte]) =
pong = true pong = true
) )
@ -731,7 +731,8 @@ suite "Test Closing":
getTracker("stream.server").isLeaked() == false getTracker("stream.server").isLeaked() == false
getTracker("stream.transport").isLeaked() == false getTracker("stream.transport").isLeaked() == false
suite "Test text message with payload":
suite "Test Payload":
teardown: teardown:
await server.closeWait() await server.closeWait()
@ -762,7 +763,7 @@ suite "Test text message with payload":
path = "/ws", path = "/ws",
protocols = @["proto"]) protocols = @["proto"])
await wsClient.ping(toBytes(str)) await wsClient.ping(str.toBytes())
await wsClient.close() await wsClient.close()
test "Test single empty payload": test "Test single empty payload":
@ -773,7 +774,7 @@ suite "Test text message with payload":
let request = r.get() let request = r.get()
check request.uri.path == "/ws" check request.uri.path == "/ws"
let ws = await createServer(request, "proto") let ws = await createServer(request, "proto")
let (servRes, _) = await ws.recv() let servRes = await ws.recv()
check string.fromBytes(servRes) == emptyStr check string.fromBytes(servRes) == emptyStr
await waitForClose(ws) await waitForClose(ws)
@ -799,7 +800,7 @@ suite "Test text message with payload":
let request = r.get() let request = r.get()
check request.uri.path == "/ws" check request.uri.path == "/ws"
let ws = await createServer(request, "proto") let ws = await createServer(request, "proto")
let (servRes, _) = await ws.recv() let servRes = await ws.recv()
check string.fromBytes(servRes) == emptyStr check string.fromBytes(servRes) == emptyStr
await waitForClose(ws) await waitForClose(ws)
@ -845,7 +846,7 @@ suite "Test text message with payload":
Port(8888), Port(8888),
path = "/ws", path = "/ws",
protocols = @["proto"], protocols = @["proto"],
onPong = proc(data: openArray[byte] = []) = onPong = proc(data: openArray[byte]) =
pong = true pong = true
) )
@ -874,10 +875,12 @@ suite "Test Binary message with Payload":
let request = r.get() let request = r.get()
check request.uri.path == "/ws" check request.uri.path == "/ws"
let ws = await createServer(request, "proto") let ws = await createServer(request, "proto")
let (servRes, opcode) = await ws.recv() let servRes = await ws.recv()
check: check:
servRes == emptyData servRes == emptyData
opcode == Opcode.Binary ws.binary == true
await waitForClose(ws) await waitForClose(ws)
let res = HttpServerRef.new( let res = HttpServerRef.new(
@ -902,10 +905,12 @@ suite "Test Binary message with Payload":
let request = r.get() let request = r.get()
check request.uri.path == "/ws" check request.uri.path == "/ws"
let ws = await createServer(request, "proto") let ws = await createServer(request, "proto")
let (servRes, opcode) = await ws.recv() let servRes = await ws.recv()
check: check:
servRes == emptyData servRes == emptyData
opcode == Opcode.Binary ws.binary == true
await waitForClose(ws) await waitForClose(ws)
let res = HttpServerRef.new( let res = HttpServerRef.new(
@ -935,13 +940,15 @@ suite "Test Binary message with Payload":
let ws = await createServer( let ws = await createServer(
request, request,
"proto", "proto",
onPing = proc() = onPing = proc(data: openArray[byte]) =
ping = true ping = true
) )
let (res, opcode) = await ws.recv()
let res = await ws.recv()
check: check:
res == testData res == testData
opcode == Opcode.Binary ws.binary == true
await waitForClose(ws) await waitForClose(ws)
let res = HttpServerRef.new( let res = HttpServerRef.new(
@ -954,8 +961,8 @@ suite "Test Binary message with Payload":
Port(8888), Port(8888),
path = "/ws", path = "/ws",
protocols = @["proto"], protocols = @["proto"],
onPong = proc() = onPong = proc(data: openArray[byte]) =
pong = true pong = true
) )
await wsClient.send(testData, Opcode.Binary) await wsClient.send(testData, Opcode.Binary)
@ -972,13 +979,15 @@ suite "Test Binary message with Payload":
let ws = await createServer( let ws = await createServer(
request, request,
"proto", "proto",
onPing = proc() = onPing = proc(data: openArray[byte]) =
ping = true ping = true
) )
let (res, opcode) = await ws.recv()
let res = await ws.recv()
check: check:
res == testData res == testData
opcode == Opcode.Binary ws.binary == true
await waitForClose(ws) await waitForClose(ws)
let res = HttpServerRef.new( let res = HttpServerRef.new(
@ -991,8 +1000,8 @@ suite "Test Binary message with Payload":
Port(8888), Port(8888),
path = "/ws", path = "/ws",
protocols = @["proto"], protocols = @["proto"],
onPong = proc() = onPong = proc(data: openArray[byte]) =
pong = true pong = true
) )
await wsClient.send(testData, Opcode.Binary) await wsClient.send(testData, Opcode.Binary)

View File

@ -146,6 +146,7 @@ type
protocol*: string protocol*: string
readyState*: ReadyState readyState*: ReadyState
masked*: bool # send masked packets masked*: bool # send masked packets
binary*: bool # is payload binary?
rng*: ref BrHmacDrbgContext rng*: ref BrHmacDrbgContext
frameSize: int frameSize: int
frame: Frame frame: Frame
@ -576,7 +577,7 @@ proc ping*(ws: WebSocket, data: seq[byte] = @[]): Future[void] =
proc recv*( proc recv*(
ws: WebSocket, ws: WebSocket,
data: pointer, data: pointer,
size: int): Future[(int, Opcode)] {.async.} = size: int): Future[int] {.async.} =
## Attempts to read up to `size` bytes ## Attempts to read up to `size` bytes
## ##
## Will read as many frames as necessary ## Will read as many frames as necessary
@ -587,10 +588,8 @@ proc recv*(
## one byte is available ## one byte is available
## ##
var var consumed = 0
consumed = 0 var pbuffer = cast[ptr UncheckedArray[byte]](data)
pbuffer = cast[ptr UncheckedArray[byte]](data)
opcode = Opcode.Text
try: try:
# read the first frame # read the first frame
@ -618,6 +617,7 @@ proc recv*(
raise newException(WSOpcodeMismatchError, raise newException(WSOpcodeMismatchError,
"expected continue frame") "expected continue frame")
ws.binary = ws.frame.opcode == Opcode.Binary
if ws.frame.fin and ws.frame.remainder().int <= 0: if ws.frame.fin and ws.frame.remainder().int <= 0:
ws.frame = nil ws.frame = nil
break break
@ -640,7 +640,7 @@ proc recv*(
consumed += read consumed += read
ws.frame.consumed += read.uint64 ws.frame.consumed += read.uint64
return (consumed.int, opcode) return consumed.int
except WebSocketError as exc: except WebSocketError as exc:
debug "Websocket error", exc = exc.msg debug "Websocket error", exc = exc.msg
@ -655,7 +655,7 @@ proc recv*(
proc recv*( proc recv*(
ws: WebSocket, ws: WebSocket,
size = WSMaxMessageSize): Future[(seq[byte], Opcode)] {.async.} = size = WSMaxMessageSize): Future[seq[byte]] {.async.} =
## Attempt to read a full message up to max `size` ## Attempt to read a full message up to max `size`
## bytes in `frameSize` chunks. ## bytes in `frameSize` chunks.
## ##
@ -667,16 +667,11 @@ proc recv*(
## ##
## In all other cases it awaits a full message. ## In all other cases it awaits a full message.
## ##
var var res: seq[byte]
res: seq[byte]
opcode = Opcode.Text
try: try:
while ws.readyState != ReadyState.Closed: while ws.readyState != ReadyState.Closed:
var var buf = newSeq[byte](ws.frameSize)
buf = newSeq[byte](ws.frameSize) let read = await ws.recv(addr buf[0], buf.len)
read: int
(read, opcode) = await ws.recv(addr buf[0], buf.len)
if read <= 0: if read <= 0:
break break
@ -702,7 +697,7 @@ proc recv*(
except CatchableError as exc: except CatchableError as exc:
debug "Exception reading frames", exc = exc.msg debug "Exception reading frames", exc = exc.msg
return (res, opcode) return res
proc close*( proc close*(
ws: WebSocket, ws: WebSocket,