mirror of
https://github.com/status-im/news.git
synced 2025-02-19 15:24:25 +00:00
Fix assorted bugs and return packet kind (#8)
* add connection state * improve error handling * fix segmentation fault while attempting to close closed socket * implement auto-close * remove gcsafe requirement from onAutoClose callback * remove auto-close * return packet kind * remove auto-close unit test * make interface more consistent * omit double copy
This commit is contained in:
parent
8ea2ee2602
commit
c5e7527601
@ -33,7 +33,7 @@ proc sendMsg() {.async.} =
|
||||
var ws = await newWebSocket("ws://localhost:9001/ws")
|
||||
await ws.send("hi")
|
||||
while ws.readyState == Open:
|
||||
let packet = await ws.receivePacket()
|
||||
let packet = await ws.receiveString()
|
||||
echo "received ", packet
|
||||
|
||||
waitFor sendMsg()
|
||||
@ -50,7 +50,7 @@ proc sendMsg() {.async.} =
|
||||
var ws = await newWebSocket("ws://localhost:9001/ws")
|
||||
await ws.send("hi")
|
||||
while ws.readyState == Open:
|
||||
let packet = await ws.receivePacket()
|
||||
let packet = await ws.receiveString()
|
||||
echo "received ", packet
|
||||
|
||||
waitFor sendMsg()
|
||||
|
@ -1,6 +1,6 @@
|
||||
# Package
|
||||
|
||||
version = "0.2"
|
||||
version = "0.3"
|
||||
author = "Andre von Houck, Volodymyr Melnychuk"
|
||||
description = "Simple WebSocket library for nim."
|
||||
license = "MIT"
|
||||
|
119
src/news.nim
119
src/news.nim
@ -243,6 +243,12 @@ type
|
||||
mask: bool ## Defines whether the "Payload data" is masked.
|
||||
data: string ## Payload data
|
||||
|
||||
Packet* = object
|
||||
case kind*: Opcode
|
||||
of Text, Binary:
|
||||
data*: string
|
||||
else:
|
||||
discard
|
||||
|
||||
proc encodeFrame*(f: Frame): string =
|
||||
## Encodes a frame into a string buffer
|
||||
@ -267,7 +273,6 @@ proc encodeFrame*(f: Frame): string =
|
||||
else:
|
||||
b1 = 127u8
|
||||
|
||||
let b1unmasked = b1
|
||||
if f.mask:
|
||||
b1 = b1 or (1 shl 7)
|
||||
|
||||
@ -306,26 +311,39 @@ proc encodeFrame*(f: Frame): string =
|
||||
|
||||
|
||||
proc send*(ws: WebSocket, text: string, opcode = Opcode.Text): Future[void] {.async.} =
|
||||
## write data to WebSocket
|
||||
var frame = encodeFrame((
|
||||
fin: true,
|
||||
rsv1: false,
|
||||
rsv2: false,
|
||||
rsv3: false,
|
||||
opcode: opcode,
|
||||
mask: ws.maskFrames,
|
||||
data: text
|
||||
))
|
||||
const maxSize = 1024*1024
|
||||
# send stuff in 1 megabyte chunks to prevent IOErrors
|
||||
# with really large packets
|
||||
var i = 0
|
||||
while i < frame.len:
|
||||
let data = frame[i ..< min(frame.len, i + maxSize)]
|
||||
await ws.transp.send(data)
|
||||
i += maxSize
|
||||
await sleepAsync(1)
|
||||
try:
|
||||
## write data to WebSocket
|
||||
var frame = encodeFrame((
|
||||
fin: true,
|
||||
rsv1: false,
|
||||
rsv2: false,
|
||||
rsv3: false,
|
||||
opcode: opcode,
|
||||
mask: ws.maskFrames,
|
||||
data: text
|
||||
))
|
||||
const maxSize = 1024*1024
|
||||
# send stuff in 1 megabyte chunks to prevent IOErrors
|
||||
# with really large packets
|
||||
var i = 0
|
||||
while i < frame.len:
|
||||
let data = frame[i ..< min(frame.len, i + maxSize)]
|
||||
await ws.transp.send(data)
|
||||
i += maxSize
|
||||
await sleepAsync(1)
|
||||
except Exception as e:
|
||||
if ws.transp.isClosed:
|
||||
ws.readyState = Closed
|
||||
raise newException(WebSocketClosedError, "Socket closed")
|
||||
else:
|
||||
raise newException(WebSocketError,
|
||||
&"Could not send packet because of [{e.name}]: {e.msg}")
|
||||
|
||||
proc send*(ws: WebSocket, packet: Packet): Future[void] {.async.} =
|
||||
if packet.kind == Text or packet.kind == Binary:
|
||||
result = ws.send(packet.data, packet.kind)
|
||||
else:
|
||||
result = ws.send("", packet.kind)
|
||||
|
||||
proc recvFrame(ws: WebSocket): Future[Frame] {.async.} =
|
||||
## Gets a frame from the WebSocket
|
||||
@ -350,7 +368,10 @@ proc recvFrame(ws: WebSocket): Future[Frame] {.async.} =
|
||||
result.rsv1 = b0[1]
|
||||
result.rsv2 = b0[2]
|
||||
result.rsv3 = b0[3]
|
||||
result.opcode = (b0 and 0x0f).Opcode
|
||||
try:
|
||||
result.opcode = (b0 and 0x0f).Opcode
|
||||
except RangeError:
|
||||
raise newException(WebSocketError, "Server did not respond with a valid WebSocket frame")
|
||||
|
||||
# if any of the rsv are set close the socket
|
||||
if result.rsv1 or result.rsv2 or result.rsv3:
|
||||
@ -405,30 +426,48 @@ proc sendPing*(ws: WebSocket): Future[void] {.async.} =
|
||||
proc sendPong(ws: WebSocket): Future[void] {.async.} =
|
||||
await ws.send("", Opcode.Pong)
|
||||
|
||||
proc receivePacket*(ws: WebSocket): Future[string] {.async.} =
|
||||
## wait for a string packet to come
|
||||
var frame = await ws.recvFrame()
|
||||
if frame.opcode == Text or frame.opcode == Binary:
|
||||
result = frame.data
|
||||
# If there are more parits read and wait for them
|
||||
while frame.fin != true:
|
||||
frame = await ws.recvFrame()
|
||||
if frame.opcode != Cont:
|
||||
raise newException(WebSocketError, "Socket did not get continue frame")
|
||||
result.add frame.data
|
||||
return
|
||||
proc receivePacket*(ws: WebSocket): Future[Packet] {.async.} =
|
||||
try:
|
||||
## wait for a string packet to come
|
||||
var frame = await ws.recvFrame()
|
||||
result = Packet(kind: frame.opcode)
|
||||
if frame.opcode == Text or frame.opcode == Binary:
|
||||
result.data = frame.data
|
||||
# If there are more parts read and wait for them
|
||||
while frame.fin != true:
|
||||
frame = await ws.recvFrame()
|
||||
if frame.opcode != Cont:
|
||||
raise newException(WebSocketError, "Socket did not get continue frame")
|
||||
result.data.add frame.data
|
||||
return
|
||||
|
||||
if frame.opcode == Ping:
|
||||
await ws.sendPong()
|
||||
if frame.opcode == Ping:
|
||||
await ws.sendPong()
|
||||
|
||||
elif frame.opcode == Pong:
|
||||
discard
|
||||
elif frame.opcode == Pong:
|
||||
return
|
||||
|
||||
elif frame.opcode == Close:
|
||||
raise newException(WebSocketClosedError, "Socket closed")
|
||||
elif frame.opcode == Close:
|
||||
raise newException(WebSocketClosedError, "Socket closed")
|
||||
except WebSocketError as e:
|
||||
raise e
|
||||
except Exception as e:
|
||||
if ws.transp.isClosed:
|
||||
ws.readyState = Closed
|
||||
raise newException(WebSocketClosedError, "Socket closed")
|
||||
else:
|
||||
raise newException(WebSocketError,
|
||||
&"Could not receive packet because of [{e.name}]: {e.msg}")
|
||||
|
||||
proc receiveString*(ws: WebSocket): Future[string] {.async.} =
|
||||
let packet = await ws.receivePacket()
|
||||
if packet.kind == Text or packet.kind == Binary:
|
||||
result = packet.data
|
||||
else:
|
||||
raise newException(WebSocketError, &"Expected string, but got {packet.kind}")
|
||||
|
||||
proc close*(ws: WebSocket) =
|
||||
## close the socket
|
||||
ws.readyState = Closed
|
||||
ws.transp.close()
|
||||
if not ws.transp.isClosed:
|
||||
ws.transp.close()
|
||||
|
@ -17,8 +17,8 @@ proc sendMsg() {.async.} =
|
||||
var ws = await newWebSocket("ws://localhost:9001/ws")
|
||||
await ws.send("hi")
|
||||
while ws.readyState == Open:
|
||||
let packet = await ws.receivePacket()
|
||||
echo "received ", packet
|
||||
let str = await ws.receiveString()
|
||||
echo "received ", str
|
||||
|
||||
asyncCheck sendMsg()
|
||||
runForever()
|
||||
|
17
tests/test_ping.nim
Normal file
17
tests/test_ping.nim
Normal file
@ -0,0 +1,17 @@
|
||||
import news, asyncdispatch, asynchttpserver
|
||||
|
||||
var continueTest = true
|
||||
|
||||
proc establishConnectionAndListen() {.async.} =
|
||||
var ws = await newWebSocket("ws://echo.websocket.org")
|
||||
await ws.sendPing()
|
||||
let pong = await ws.receivePacket()
|
||||
assert(pong.kind == Pong)
|
||||
echo "Got pong"
|
||||
continueTest = false
|
||||
|
||||
asyncCheck sleepAsync(100000) # just to keep dispatcher's queue non-empty
|
||||
asyncCheck establishConnectionAndListen()
|
||||
while continueTest:
|
||||
poll()
|
||||
echo "Finished"
|
Loading…
x
Reference in New Issue
Block a user