Generalise low-level recv() for accepting partial frames

why:
  Function bailed out with op-code exception when reading partial frames.

details:
  Re-implemented the text/binary mode handling.
This commit is contained in:
Jordan Hrycaj 2021-07-29 17:35:36 +01:00
parent 00440b6eff
commit 24a3474ef9
3 changed files with 135 additions and 10 deletions

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import std/[strutils, random] import std/[strutils]
import pkg/[ import pkg/[
httputils, httputils,
chronos, chronos,
@ -951,3 +951,58 @@ suite "Test Binary message with Payload":
check: check:
echoed == testData echoed == testData
ws.binary == true ws.binary == true
test "read partial frames from low-level recv()":
const
howMuchWood = "How much wood could a wood chuck chuck ..."
# Frame size to be used for this test
frameSize = 7
# Fetching data using buffers of this size
chunkLen = frameSize - 2
# FIXME: for some reason, the data must be a multiple of the `frameSize`
# otherwise the system crashes, most probably in the server
# === needs further investigation?
dataLen = frameSize * (howMuchWood.len div frameSize)
testData = howMuchWood[0 ..< datalen]
proc handle(request: HttpRequest) {.async.} =
check request.uri.path == WSPath
let
server = WSServer.new(protos = ["proto"])
ws = await server.handleRequest(request)
#check ws.binary == false
var
res = newSeq[byte](testData.len + 10)
pos = 0
try:
while ws.readyState != ReadyState.Closed and pos < res.len:
let read = await ws.recv(addr res[pos], min(res.len - pos, chunkLen))
pos += read
except:
discard
# If there was a problem reading partial frames, the `recv()` should have
# thrown an exception. This would result in an incomplete result detected
# here.
res.setlen(pos)
check string.fromBytes(res) == testData
await ws.waitForClose
server = createServer(
address = address,
handler = handle,
flags = {ReuseAddr})
let session = await connectClient(
address = address,
frameSize = frameSize)
await session.send(testData)
await session.close
# End

View File

@ -18,6 +18,38 @@ import pkg/chronos/streams/asyncstream
logScope: logScope:
topics = "websock ws-session" topics = "websock ws-session"
proc updateReadMode(ws: WSSession): bool =
## This helper function sets text/binary mode for the current frame.
##
## Processing frames might imply a read mode switch. This is typically so
## for the very first frame in binary mode when `ws.binary` has initial
## value.
##
## This function allows to switch from `binary` to `text` mode (aka utf8)
## but not the other way round unless explicitely enabled by setting
## `ws.textSwitchOk` to `true`. Here `text` mode is seen as the more
## restrictive one. Locking `text` mode allows to process the final read
## mode when all of the frame sequence was read after the last frame.
##
## Return value `false` indicates unconfirmed mode switch `text` => `binary`
if not ws.frame.isNil:
# Very first frame, take encoding at face value.
if not ws.seen:
ws.binary = ws.frame.opcode != Opcode.Text
ws.seen = true
# Accept mode switch from binary => text
elif ws.binary and ws.frame.opcode == Opcode.Text:
ws.binary = false
# Beware of a mode switch from text => binary
elif not ws.binary and ws.frame.opcode == Opcode.Binary:
trace "Read mode changed from text to binary"
if not ws.textSwitchOk:
return false
true
proc prepareCloseBody(code: StatusCodes, reason: string): seq[byte] = proc prepareCloseBody(code: StatusCodes, reason: string): seq[byte] =
result = reason.toBytes result = reason.toBytes
if ord(code) > 999: if ord(code) > 999:
@ -309,20 +341,31 @@ proc recv*(
## up to ``size`` bytes, note that we do break ## up to ``size`` bytes, note that we do break
## at message boundaries (``fin`` flag set). ## at message boundaries (``fin`` flag set).
## ##
## Processing frames, this function allows to switch from `binary` to `text`
## mode (aka utf8) but not the other way round. It allows to process the
## final read mode only when all of the frame sequence was read after the
## very last frame. The `text` mode is seen to be more restrictive than
## `binary` mode.
##
## If the frame data types change from `text` to `binary`, a
## `WSOpcodeMismatchError` exception is thrown unless the `ws` descriptor
## flag `ws.textSwitchOk` was set `true` in which case this data type change
## is accepted.
##
## Use this to stream data from frames ## Use this to stream data from frames
## ##
var consumed = 0 var consumed = 0
var pbuffer = cast[ptr UncheckedArray[byte]](data) var pbuffer = cast[ptr UncheckedArray[byte]](data)
try: try:
var first = true #var first = true
if not isNil(ws.frame): if not isNil(ws.frame):
if ws.frame.fin and ws.frame.remainder > 0: if ws.frame.fin and ws.frame.remainder > 0:
trace "Continue reading from the same frame" trace "Continue reading from the same frame"
first = true
elif not ws.frame.fin and ws.frame.remainder > 0: elif not ws.frame.fin and ws.frame.remainder > 0:
trace "Restarting reads in the middle of a frame in a multiframe message" trace "Restarting reads in the middle of a frame" &
first = false " in a multiframe message"
#first = false
elif ws.frame.fin and ws.frame.remainder <= 0: elif ws.frame.fin and ws.frame.remainder <= 0:
trace "Resetting an already consumed frame" trace "Resetting an already consumed frame"
ws.frame = nil ws.frame = nil
@ -332,6 +375,11 @@ proc recv*(
if isNil(ws.frame): if isNil(ws.frame):
ws.frame = await ws.readFrame(ws.extensions) ws.frame = await ws.readFrame(ws.extensions)
# Note: The `ws.updateReadMode` directive replaces the functionality
# of the `first` variable handling.
if not ws.updateReadMode:
raise newException(
WSOpcodeMismatchError, "Opcode switch text to binary")
while consumed < size: while consumed < size:
if isNil(ws.frame): if isNil(ws.frame):
@ -339,7 +387,6 @@ proc recv*(
break break
logScope: logScope:
first = first
fin = ws.frame.fin fin = ws.frame.fin
len = ws.frame.length len = ws.frame.length
consumed = ws.frame.consumed consumed = ws.frame.consumed
@ -347,6 +394,17 @@ proc recv*(
opcode = ws.frame.opcode opcode = ws.frame.opcode
masked = ws.frame.mask masked = ws.frame.mask
# The code below is left commented out for informational purposes, only.
#
# As it seems, the condition of the first `if` clause below is
# frequently violated when re-entering the `recv()` function when the
# read buffer data `size` is smaller than the frame size. And this
# leads to an unwanted exception.
#
# The functionality of the second `if` clause is re-implemented with
# the `readFrame()` directives rifht before the `while` loop and at
# the bottom of the while loop.
#[
if first == (ws.frame.opcode == Opcode.Cont): if first == (ws.frame.opcode == Opcode.Cont):
error "Opcode mismatch!" error "Opcode mismatch!"
raise newException(WSOpcodeMismatchError, raise newException(WSOpcodeMismatchError,
@ -355,11 +413,13 @@ proc recv*(
if first: if first:
ws.binary = ws.frame.opcode == Opcode.Binary # set binary flag ws.binary = ws.frame.opcode == Opcode.Binary # set binary flag
trace "Setting binary flag" trace "Setting binary flag"
]#
let len = min(ws.frame.remainder.int, size - consumed) let len = min(ws.frame.remainder.int, size - consumed)
if len > 0: if len > 0:
trace "Reading bytes from frame stream", len trace "Reading bytes from frame stream", len
let read = await ws.frame.read(ws.stream.reader, addr pbuffer[consumed], len) let read = await:
ws.frame.read(ws.stream.reader, addr pbuffer[consumed], len)
if read <= 0: if read <= 0:
trace "Didn't read any bytes, breaking" trace "Didn't read any bytes, breaking"
break break
@ -370,7 +430,7 @@ proc recv*(
# all has been consumed from the frame # all has been consumed from the frame
# read the next frame # read the next frame
if ws.frame.remainder <= 0: if ws.frame.remainder <= 0:
first = false #first = false
if ws.frame.fin: # we're at the end of the message, break if ws.frame.fin: # we're at the end of the message, break
trace "Read all frames, breaking" trace "Read all frames, breaking"
@ -378,7 +438,14 @@ proc recv*(
break break
ws.frame = await ws.readFrame(ws.extensions) ws.frame = await ws.readFrame(ws.extensions)
# Note: The `ws.updateReadMode` directive replaces the functionality
# of the `first` variable handling.
if not ws.updateReadMode:
raise newException(
WSOpcodeMismatchError, "Opcode switch text to binary")
# The next `if` clause is earmarked to go away. It is currently needed to
# make some unit tests work.
if not ws.binary and validateUTF8(pbuffer.toOpenArray(0, consumed - 1)) == false: if not ws.binary and validateUTF8(pbuffer.toOpenArray(0, consumed - 1)) == false:
raise newException(WSInvalidUTF8, "Invalid UTF8 sequence detected") raise newException(WSInvalidUTF8, "Invalid UTF8 sequence detected")
@ -386,12 +453,13 @@ proc recv*(
trace "Exception reading frames", exc = exc.msg trace "Exception reading frames", exc = exc.msg
ws.readyState = ReadyState.Closed ws.readyState = ReadyState.Closed
await ws.stream.closeWait() await ws.stream.closeWait()
raise exc raise exc
finally: finally:
if not isNil(ws.frame) and if not isNil(ws.frame) and
(ws.frame.fin and ws.frame.remainder <= 0): (ws.frame.fin and ws.frame.remainder <= 0):
trace "Last frame in message and no more bytes left to read, reseting current frame" trace "Last frame in message and no more bytes left to read," &
" reseting current frame"
ws.frame = nil ws.frame = nil
return consumed return consumed

View File

@ -81,6 +81,7 @@ type
readyState*: ReadyState readyState*: ReadyState
masked*: bool # send masked packets masked*: bool # send masked packets
binary*: bool # is payload binary? binary*: bool # is payload binary?
textSwitchOk*: bool # allow switch from text => bin between frames
flags*: set[TLSFlags] flags*: set[TLSFlags]
rng*: Rng rng*: Rng
frameSize*: int frameSize*: int
@ -91,6 +92,7 @@ type
WSSession* = ref object of WebSocket WSSession* = ref object of WebSocket
stream*: AsyncStream stream*: AsyncStream
frame*: Frame frame*: Frame
seen*: bool # true => at least one frame was seen
proto*: string proto*: string
Ext* = ref object of RootObj Ext* = ref object of RootObj