nim-websock/ws/session.nim

362 lines
9.7 KiB
Nim

## Nim-Libp2p
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import pkg/[chronos, chronicles, stew/byteutils, stew/endians2]
import ./types, ./frame, ./utils, ./utf8_dfa, ./http
import pkg/chronos/[streams/asyncstream]
type
WSSession* = ref object of WebSocket
stream*: AsyncStream
frame*: Frame
proto*: string
proc prepareCloseBody(code: Status, reason: string): seq[byte] =
result = reason.toBytes
if ord(code) > 999:
result = @(ord(code).uint16.toBytesBE()) & result
proc send*(
ws: WSSession,
data: seq[byte] = @[],
opcode: Opcode) {.async.} =
## Send a frame
##
if ws.readyState == ReadyState.Closed:
raise newException(WSClosedError, "Socket is closed!")
logScope:
opcode = opcode
dataSize = data.len
masked = ws.masked
debug "Sending data to remote"
var maskKey: array[4, char]
if ws.masked:
maskKey = genMaskKey(ws.rng)
if opcode notin {Opcode.Text, Opcode.Cont, Opcode.Binary}:
if ws.readyState in {ReadyState.Closing} and opcode notin {Opcode.Close}:
return
await ws.stream.writer.write(
(await Frame(
fin: true,
rsv1: false,
rsv2: false,
rsv3: false,
opcode: opcode,
mask: ws.masked,
data: data, # allow sending data with close messages
maskKey: maskKey)
.encode()))
return
let maxSize = ws.frameSize
var i = 0
while ws.readyState notin {ReadyState.Closing}:
let len = min(data.len, (maxSize + i))
await ws.stream.writer.write(
(await Frame(
fin: if (i + len >= data.len): true else: false,
rsv1: false,
rsv2: false,
rsv3: false,
opcode: if i > 0: Opcode.Cont else: opcode, # fragments have to be `Continuation` frames
mask: ws.masked,
data: data[i ..< len],
maskKey: maskKey)
.encode()))
i += len
if i >= data.len:
break
proc send*(ws: WSSession, data: string): Future[void] =
send(ws, data.toBytes(), Opcode.Text)
proc handleClose*(
ws: WSSession,
frame: Frame,
payLoad: seq[byte] = @[]) {.async.} =
## Handle close sequence
##
logScope:
fin = frame.fin
masked = frame.mask
opcode = frame.opcode
readyState = ws.readyState
debug "Handling close"
if ws.readyState notin {ReadyState.Open}:
debug "Connection isn't open, abortig close sequence!"
return
var
code = Status.Fulfilled
reason = ""
if payLoad.len == 1:
raise newException(WSPayloadLengthError,
"Invalid close frame with payload length 1!")
if payLoad.len > 1:
# first two bytes are the status
let ccode = uint16.fromBytesBE(payLoad[0..<2])
if ccode <= 999 or ccode > 1015:
raise newException(WSInvalidCloseCodeError,
"Invalid code in close message!")
try:
code = Status(ccode)
except RangeError:
raise newException(WSInvalidCloseCodeError,
"Status code out of range!")
# remining payload bytes are reason for closing
reason = string.fromBytes(payLoad[2..payLoad.high])
if not ws.binary and validateUTF8(reason) == false:
raise newException(WSInvalidUTF8,
"Invalid UTF8 sequence detected in close reason")
var rcode: Status
if code in {Status.Fulfilled}:
rcode = Status.Fulfilled
if not isNil(ws.onClose):
try:
(rcode, reason) = ws.onClose(code, reason)
except CatchableError as exc:
debug "Exception in Close callback, this is most likely a bug", exc = exc.msg
# don't respond to a terminated connection
if ws.readyState != ReadyState.Closing:
ws.readyState = ReadyState.Closing
await ws.send(prepareCloseBody(rcode, reason), Opcode.Close)
ws.readyState = ReadyState.Closed
await ws.stream.closeWait()
proc handleControl*(ws: WSSession, frame: Frame) {.async.} =
## Handle control frames
##
logScope:
fin = frame.fin
masked = frame.mask
opcode = frame.opcode
readyState = ws.readyState
len = frame.length
debug "Handling control frame"
if not frame.fin:
raise newException(WSFragmentedControlFrameError,
"Control frame cannot be fragmented!")
if frame.length > 125:
raise newException(WSPayloadTooLarge,
"Control message payload is greater than 125 bytes!")
var payLoad = newSeq[byte](frame.length.int)
if frame.length > 0:
payLoad.setLen(frame.length.int)
# Read control frame payload.
await ws.stream.reader.readExactly(addr payLoad[0], frame.length.int)
if frame.mask:
mask(
payLoad.toOpenArray(0, payLoad.high),
frame.maskKey)
# Process control frame payload.
case frame.opcode:
of Opcode.Ping:
if not isNil(ws.onPing):
try:
ws.onPing(payLoad)
except CatchableError as exc:
debug "Exception in Ping callback, this is most likelly a bug", exc = exc.msg
# send pong to remote
await ws.send(payLoad, Opcode.Pong)
of Opcode.Pong:
if not isNil(ws.onPong):
try:
ws.onPong(payLoad)
except CatchableError as exc:
debug "Exception in Pong callback, this is most likelly a bug", exc = exc.msg
of Opcode.Close:
await ws.handleClose(frame, payLoad)
else:
raise newException(WSInvalidOpcodeError, "Invalid control opcode!")
proc readFrame*(ws: WSSession): Future[Frame] {.async.} =
## Gets a frame from the WebSocket.
## See https://tools.ietf.org/html/rfc6455#section-5.2
##
while ws.readyState != ReadyState.Closed:
let frame = await Frame.decode(ws.stream.reader, ws.masked)
debug "Decoded new frame", opcode = frame.opcode, len = frame.length, mask = frame.mask
# return the current frame if it's not one of the control frames
if frame.opcode notin {Opcode.Text, Opcode.Cont, Opcode.Binary}:
await ws.handleControl(frame) # process control frames# process control frames
continue
return frame
proc ping*(ws: WSSession, data: seq[byte] = @[]): Future[void] =
ws.send(data, opcode = Opcode.Ping)
proc recv*(
ws: WSSession,
data: pointer,
size: int): Future[int] {.async.} =
## Attempts to read up to `size` bytes
##
## Will read as many frames as necessary
## to fill the buffer until either
## the message ends (frame.fin) or
## the buffer is full. If no data is on
## the pipe will await until at least
## one byte is available
##
var consumed = 0
var pbuffer = cast[ptr UncheckedArray[byte]](data)
try:
while consumed < size:
# we might have to read more than
# one frame to fill the buffer
# TODO: Figure out a cleaner way to handle
# retrieving new frames
if isNil(ws.frame):
ws.frame = await ws.readFrame()
if isNil(ws.frame):
return consumed
if ws.frame.opcode == Opcode.Cont:
raise newException(WSOpcodeMismatchError,
"Expected Text or Binary frame")
elif (not ws.frame.fin and ws.frame.remainder() <= 0):
ws.frame = await ws.readFrame()
# This could happen if the connection is closed.
if isNil(ws.frame):
return consumed
if ws.frame.opcode != Opcode.Cont:
raise newException(WSOpcodeMismatchError,
"Expected Continuation frame")
ws.binary = ws.frame.opcode == Opcode.Binary # set binary flag
if ws.frame.fin and ws.frame.remainder() <= 0:
ws.frame = nil
break
let len = min(ws.frame.remainder().int, size - consumed)
if len == 0:
continue
let read = await ws.stream.reader.readOnce(addr pbuffer[consumed], len)
if read <= 0:
continue
if ws.frame.mask:
# unmask data using offset
mask(
pbuffer.toOpenArray(consumed, (consumed + read) - 1),
ws.frame.maskKey,
ws.frame.consumed.int)
consumed += read
ws.frame.consumed += read.uint64
if not ws.binary and validateUTF8(pbuffer.toOpenArray(0, consumed - 1)) == false:
raise newException(WSInvalidUTF8, "Invalid UTF8 sequence detected")
return consumed.int
except CatchableError as exc:
ws.readyState = ReadyState.Closed
await ws.stream.closeWait()
debug "Exception reading frames", exc = exc.msg
raise exc
proc recv*(
ws: WSSession,
size = WSMaxMessageSize): Future[seq[byte]] {.async.} =
## Attempt to read a full message up to max `size`
## bytes in `frameSize` chunks.
##
## If no `fin` flag arrives await until either
## cancelled or the `fin` flag arrives.
##
## If message is larger than `size` a `WSMaxMessageSizeError`
## exception is thrown.
##
## In all other cases it awaits a full message.
##
var res: seq[byte]
while ws.readyState != ReadyState.Closed:
var buf = newSeq[byte](ws.frameSize)
let read = await ws.recv(addr buf[0], buf.len)
if read <= 0:
break
buf.setLen(read)
if res.len + buf.len > size:
raise newException(WSMaxMessageSizeError, "Max message size exceeded")
res.add(buf)
# no more frames
if isNil(ws.frame):
break
# read the entire message, exit
if ws.frame.fin and ws.frame.remainder().int <= 0:
break
return res
proc close*(
ws: WSSession,
code: Status = Status.Fulfilled,
reason: string = "") {.async.} =
## Close the Socket, sends close packet.
##
if ws.readyState != ReadyState.Open:
return
try:
ws.readyState = ReadyState.Closing
await ws.send(
prepareCloseBody(code, reason),
opcode = Opcode.Close)
# read frames until closed
while ws.readyState != ReadyState.Closed:
discard await ws.recv()
except CatchableError as exc:
debug "Exception closing", exc = exc.msg