Parse Frame and handle different message type.
This commit is contained in:
parent
8fb4e78353
commit
fa37c87dd5
264
src/ws.nim
264
src/ws.nim
|
@ -1,4 +1,5 @@
|
|||
import chronos, chronicles, httputils, strutils, base64, std/sha1, random
|
||||
import chronos, chronicles, httputils, strutils, base64, std/sha1, random,
|
||||
streams, nativesockets
|
||||
|
||||
const
|
||||
MaxHttpHeadersSize = 8192 # maximum size of HTTP headers in octets
|
||||
|
@ -78,7 +79,8 @@ proc handshake*(ws: WebSocket, header: HttpRequestHeader) {.async.} =
|
|||
let wantProtocol = header["Sec-WebSocket-Protocol"].strip()
|
||||
if ws.protocol != wantProtocol:
|
||||
raise newException(WebSocketError,
|
||||
"Protocol mismatch (expected: " & ws.protocol & ", got: " & wantProtocol & ")")
|
||||
"Protocol mismatch (expected: " & ws.protocol & ", got: " &
|
||||
wantProtocol & ")")
|
||||
|
||||
let
|
||||
sh = secureHash(ws.key & "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")
|
||||
|
@ -96,8 +98,8 @@ proc handshake*(ws: WebSocket, header: HttpRequestHeader) {.async.} =
|
|||
discard await ws.tcpSocket.write(response)
|
||||
ws.readyState = Open
|
||||
|
||||
proc newWebSocket*(header: HttpRequestHeader, transp: StreamTransport, protocol: string = ""): Future[
|
||||
WebSocket] {.async.} =
|
||||
proc newWebSocket*(header: HttpRequestHeader, transp: StreamTransport,
|
||||
protocol: string = ""): Future[WebSocket] {.async.} =
|
||||
## Creates a new socket from a request.
|
||||
try:
|
||||
if not header.contains("Sec-WebSocket-Version"):
|
||||
|
@ -115,6 +117,260 @@ proc newWebSocket*(header: HttpRequestHeader, transp: StreamTransport, protocol:
|
|||
"Failed to create WebSocket from request: " & getCurrentExceptionMsg()
|
||||
)
|
||||
|
||||
type
|
||||
Opcode* = enum
|
||||
## 4 bits. Defines the interpretation of the "Payload data".
|
||||
Cont = 0x0 ## Denotes a continuation frame.
|
||||
Text = 0x1 ## Denotes a text frame.
|
||||
Binary = 0x2 ## Denotes a binary frame.
|
||||
# 3-7 are reserved for further non-control frames.
|
||||
Close = 0x8 ## Denotes a connection close.
|
||||
Ping = 0x9 ## Denotes a ping.
|
||||
Pong = 0xa ## Denotes a pong.
|
||||
# B-F are reserved for further control frames.
|
||||
|
||||
#[
|
||||
+---------------------------------------------------------------+
|
||||
|0 1 2 3 |
|
||||
|0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1|
|
||||
+-+-+-+-+-------+-+-------------+-------------------------------+
|
||||
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|
||||
|I|S|S|S| (4) |A| (7) | (16/64) |
|
||||
|N|V|V|V| |S| | (if payload len==126/127) |
|
||||
| |1|2|3| |K| | |
|
||||
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
|
||||
| Extended payload length continued, if payload len == 127 |
|
||||
+ - - - - - - - - - - - - - - - +-------------------------------+
|
||||
| |Masking-key, if MASK set to 1 |
|
||||
+-------------------------------+-------------------------------+
|
||||
| Masking-key (continued) | Payload Data |
|
||||
+-------------------------------- - - - - - - - - - - - - - - - +
|
||||
: Payload Data continued ... :
|
||||
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
|
||||
| Payload Data continued ... |
|
||||
+---------------------------------------------------------------+
|
||||
]#
|
||||
Frame = tuple
|
||||
fin: bool ## Indicates that this is the final fragment in a message.
|
||||
rsv1: bool ## MUST be 0 unless negotiated that defines meanings
|
||||
rsv2: bool ## MUST be 0
|
||||
rsv3: bool ## MUST be 0
|
||||
opcode: Opcode ## Defines the interpretation of the "Payload data".
|
||||
mask: bool ## Defines whether the "Payload data" is masked.
|
||||
data: string ## Payload data
|
||||
|
||||
proc encodeFrame(f: Frame): string =
|
||||
## Encodes a frame into a string buffer.
|
||||
## See https://tools.ietf.org/html/rfc6455#section-5.2
|
||||
|
||||
var ret = newStringStream()
|
||||
|
||||
var b0 = (f.opcode.uint8 and 0x0f) # 0th byte: opcodes and flags.
|
||||
if f.fin:
|
||||
b0 = b0 or 128u8
|
||||
|
||||
ret.write(b0)
|
||||
|
||||
# Payload length can be 7 bits, 7+16 bits, or 7+64 bits.
|
||||
# 1st byte: payload len start and mask bit.
|
||||
var b1 = 0u8
|
||||
|
||||
if f.data.len <= 125:
|
||||
b1 = f.data.len.uint8
|
||||
elif f.data.len > 125 and f.data.len <= 0xffff:
|
||||
b1 = 126u8
|
||||
else:
|
||||
b1 = 127u8
|
||||
|
||||
if f.mask:
|
||||
b1 = b1 or (1 shl 7)
|
||||
|
||||
ret.write(uint8 b1)
|
||||
|
||||
# Only need more bytes if data len is 7+16 bits, or 7+64 bits.
|
||||
if f.data.len > 125 and f.data.len <= 0xffff:
|
||||
# Data len is 7+16 bits.
|
||||
ret.write(htons(f.data.len.uint16))
|
||||
elif f.data.len > 0xffff:
|
||||
# Data len is 7+64 bits.
|
||||
var len = f.data.len
|
||||
ret.write char((len shr 56) and 255)
|
||||
ret.write char((len shr 48) and 255)
|
||||
ret.write char((len shr 40) and 255)
|
||||
ret.write char((len shr 32) and 255)
|
||||
ret.write char((len shr 24) and 255)
|
||||
ret.write char((len shr 16) and 255)
|
||||
ret.write char((len shr 8) and 255)
|
||||
ret.write char(len and 255)
|
||||
|
||||
var data = f.data
|
||||
|
||||
if f.mask:
|
||||
# If we need to mask it generate random mask key and mask the data.
|
||||
let maskKey = genMaskKey()
|
||||
for i in 0..<data.len:
|
||||
data[i] = (data[i].uint8 xor maskKey[i mod 4].uint8).char
|
||||
# Write mask key next.
|
||||
ret.write(maskKey)
|
||||
|
||||
# Write the data.
|
||||
ret.write(data)
|
||||
ret.setPosition(0)
|
||||
return ret.readAll()
|
||||
|
||||
proc send*(ws: WebSocket, text: string, opcode = Opcode.Text): Future[void] {.async.} =
|
||||
try:
|
||||
var frame = encodeFrame((
|
||||
fin: true,
|
||||
rsv1: false,
|
||||
rsv2: false,
|
||||
rsv3: false,
|
||||
opcode: opcode,
|
||||
mask: ws.masked,
|
||||
data: text
|
||||
))
|
||||
const maxSize = 1024*1024
|
||||
# Send stuff in 1 megabyte chunks to prevent IOErrors.
|
||||
# This really large packets.
|
||||
var i = 0
|
||||
while i < frame.len:
|
||||
let data = frame[i ..< min(frame.len, i + maxSize)]
|
||||
discard await ws.tcpSocket.write(data)
|
||||
i += maxSize
|
||||
await sleepAsync(1)
|
||||
except Defect, IOError, OSError, ValueError:
|
||||
# Wrap all exceptions in a WebSocketError so its easy to catch
|
||||
raise newException(WebSocketError, "Failed to send data: " &
|
||||
getCurrentExceptionMsg())
|
||||
|
||||
proc close*(ws: WebSocket) =
|
||||
## Close the Socket, sends close packet.
|
||||
ws.readyState = Closed
|
||||
proc close() {.async.} =
|
||||
await ws.send("", Close)
|
||||
ws.tcpSocket.close()
|
||||
asyncCheck close()
|
||||
|
||||
proc toString(bytes: seq[byte]): string =
|
||||
result = ""
|
||||
for byte in bytes: result.add(char(byte))
|
||||
|
||||
proc receiveFrame(ws: WebSocket): Future[Frame] {.async.} =
|
||||
## Gets a frame from the WebSocket.
|
||||
## See https://tools.ietf.org/html/rfc6455#section-5.2
|
||||
|
||||
if cast[int](ws.tcpSocket.fd) == -1:
|
||||
ws.readyState = Closed
|
||||
raise newException(WebSocketError, "Socket closed")
|
||||
|
||||
# Grab the header.
|
||||
var header: seq[byte]
|
||||
try:
|
||||
header = await ws.tcpSocket.read(2)
|
||||
except:
|
||||
raise newException(WebSocketError, "Socket closed")
|
||||
|
||||
if header.len != 2:
|
||||
ws.readyState = Closed
|
||||
raise newException(WebSocketError, "Socket closed")
|
||||
|
||||
let b0 = header[0].uint8
|
||||
let b1 = header[1].uint8
|
||||
|
||||
# Read the flags and fin from the header.
|
||||
result.fin = b0[0]
|
||||
result.rsv1 = b0[1]
|
||||
result.rsv2 = b0[2]
|
||||
result.rsv3 = b0[3]
|
||||
result.opcode = (b0 and 0x0f).Opcode
|
||||
|
||||
# If any of the rsv are set close the socket.
|
||||
if result.rsv1 or result.rsv2 or result.rsv3:
|
||||
ws.readyState = Closed
|
||||
raise newException(WebSocketError, "WebSocket rsv mismatch")
|
||||
|
||||
# Payload length can be 7 bits, 7+16 bits, or 7+64 bits.
|
||||
var finalLen: uint = 0
|
||||
|
||||
let headerLen = uint(b1 and 0x7f)
|
||||
if headerLen == 0x7e:
|
||||
# Length must be 7+16 bits.
|
||||
var length = await ws.tcpSocket.read(2)
|
||||
if length.len != 2:
|
||||
raise newException(WebSocketError, "Socket closed")
|
||||
finalLen = cast[ptr uint16](length[0].addr)[].htons
|
||||
|
||||
elif headerLen == 0x7f:
|
||||
# Length must be 7+64 bits.
|
||||
var length = await ws.tcpSocket.read(8)
|
||||
if length.len != 8:
|
||||
raise newException(WebSocketError, "Socket closed")
|
||||
finalLen = cast[ptr uint32](length[4].addr)[].htonl
|
||||
|
||||
else:
|
||||
# Length must be 7 bits.
|
||||
finalLen = headerLen
|
||||
|
||||
# Do we need to apply mask?
|
||||
result.mask = (b1 and 0x80) == 0x80
|
||||
|
||||
if ws.masked == result.mask:
|
||||
# Server sends unmasked but accepts only masked.
|
||||
# Client sends masked but accepts only unmasked.
|
||||
raise newException(WebSocketError, "Socket mask mismatch")
|
||||
|
||||
var maskKey: seq[byte]
|
||||
if result.mask:
|
||||
# Read the mask.
|
||||
maskKey = await ws.tcpSocket.read(4)
|
||||
if maskKey.len != 4:
|
||||
raise newException(WebSocketError, "Socket closed")
|
||||
|
||||
# Read the data.
|
||||
var data: seq[byte]
|
||||
data = await ws.tcpSocket.read(int finalLen)
|
||||
result.data = toString(data)
|
||||
if result.data.len != int finalLen:
|
||||
raise newException(WebSocketError, "Socket closed")
|
||||
|
||||
if result.mask:
|
||||
# Apply mask, if we need too.
|
||||
for i in 0 ..< result.data.len:
|
||||
result.data[i] = (result.data[i].uint8 xor maskKey[i mod 4].uint8).char
|
||||
|
||||
|
||||
proc receivePacket*(ws: WebSocket): Future[(Opcode, string)] {.async.} =
|
||||
## Wait for a string or binary packet to come in.
|
||||
var frame = await ws.receiveFrame()
|
||||
result[0] = frame.opcode
|
||||
result[1] = frame.data
|
||||
# If there are more parts read and wait for them
|
||||
while frame.fin != true:
|
||||
frame = await ws.receiveFrame()
|
||||
if frame.opcode != Cont:
|
||||
raise newException(WebSocketError, "Socket closed")
|
||||
result[1].add frame.data
|
||||
return
|
||||
|
||||
proc receiveStrPacket*(ws: WebSocket): Future[string] {.async.} =
|
||||
## Wait only for a string packet to come. Errors out on Binary packets.
|
||||
let (opcode, data) = await ws.receivePacket()
|
||||
case opcode:
|
||||
of Text:
|
||||
return data
|
||||
of Binary:
|
||||
raise newException(WebSocketError,
|
||||
"Expected string packet, received binary packet")
|
||||
of Ping:
|
||||
await ws.send(data, Pong)
|
||||
of Pong:
|
||||
discard
|
||||
of Cont:
|
||||
discard
|
||||
of Close:
|
||||
ws.readyState = Closed
|
||||
raise newException(WebSocketError, "Socket closed")
|
||||
|
||||
proc sendHTTPResponse*(transp: StreamTransport, version: HttpVersion, code: HttpCode,
|
||||
data: string = ""): Future[bool] {.async.} =
|
||||
var answer = $version
|
||||
|
|
|
@ -6,6 +6,7 @@ proc cb(transp: StreamTransport, header: HttpRequestHeader) {.async.} =
|
|||
info "Initiating web socket connection."
|
||||
try:
|
||||
var ws = await newWebSocket(header, transp)
|
||||
echo await ws.receivePacket()
|
||||
info "Websocket handshake completed."
|
||||
except WebSocketError:
|
||||
echo "socket closed:", getCurrentExceptionMsg()
|
||||
|
|
Loading…
Reference in New Issue