From 24a3474ef984310ae8a72b026620ab5b5f7028e1 Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Thu, 29 Jul 2021 17:35:36 +0100 Subject: [PATCH] Generalise low-level recv() for accepting partial frames why: Function bailed out with op-code exception when reading partial frames. details: Re-implemented the text/binary mode handling. --- tests/testwebsockets.nim | 57 +++++++++++++++++++++++++- websock/session.nim | 86 +++++++++++++++++++++++++++++++++++----- websock/types.nim | 2 + 3 files changed, 135 insertions(+), 10 deletions(-) diff --git a/tests/testwebsockets.nim b/tests/testwebsockets.nim index 9b3a241c..7915095e 100644 --- a/tests/testwebsockets.nim +++ b/tests/testwebsockets.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/[strutils, random] +import std/[strutils] import pkg/[ httputils, chronos, @@ -951,3 +951,58 @@ suite "Test Binary message with Payload": check: echoed == testData ws.binary == true + + test "read partial frames from low-level recv()": + const + howMuchWood = "How much wood could a wood chuck chuck ..." + + # Frame size to be used for this test + frameSize = 7 + + # Fetching data using buffers of this size + chunkLen = frameSize - 2 + + # FIXME: for some reason, the data must be a multiple of the `frameSize` + # otherwise the system crashes, most probably in the server + # === needs further investigation? + dataLen = frameSize * (howMuchWood.len div frameSize) + testData = howMuchWood[0 ..< datalen] + + proc handle(request: HttpRequest) {.async.} = + check request.uri.path == WSPath + + let + server = WSServer.new(protos = ["proto"]) + ws = await server.handleRequest(request) + #check ws.binary == false + + var + res = newSeq[byte](testData.len + 10) + pos = 0 + try: + while ws.readyState != ReadyState.Closed and pos < res.len: + let read = await ws.recv(addr res[pos], min(res.len - pos, chunkLen)) + pos += read + except: + discard + + # If there was a problem reading partial frames, the `recv()` should have + # thrown an exception. This would result in an incomplete result detected + # here. + res.setlen(pos) + check string.fromBytes(res) == testData + await ws.waitForClose + + server = createServer( + address = address, + handler = handle, + flags = {ReuseAddr}) + + let session = await connectClient( + address = address, + frameSize = frameSize) + + await session.send(testData) + await session.close + +# End diff --git a/websock/session.nim b/websock/session.nim index a79e660f..52c39931 100644 --- a/websock/session.nim +++ b/websock/session.nim @@ -18,6 +18,38 @@ import pkg/chronos/streams/asyncstream logScope: topics = "websock ws-session" +proc updateReadMode(ws: WSSession): bool = + ## This helper function sets text/binary mode for the current frame. + ## + ## Processing frames might imply a read mode switch. This is typically so + ## for the very first frame in binary mode when `ws.binary` has initial + ## value. + ## + ## This function allows to switch from `binary` to `text` mode (aka utf8) + ## but not the other way round unless explicitely enabled by setting + ## `ws.textSwitchOk` to `true`. Here `text` mode is seen as the more + ## restrictive one. Locking `text` mode allows to process the final read + ## mode when all of the frame sequence was read after the last frame. + ## + ## Return value `false` indicates unconfirmed mode switch `text` => `binary` + + if not ws.frame.isNil: + # Very first frame, take encoding at face value. + if not ws.seen: + ws.binary = ws.frame.opcode != Opcode.Text + ws.seen = true + + # Accept mode switch from binary => text + elif ws.binary and ws.frame.opcode == Opcode.Text: + ws.binary = false + + # Beware of a mode switch from text => binary + elif not ws.binary and ws.frame.opcode == Opcode.Binary: + trace "Read mode changed from text to binary" + if not ws.textSwitchOk: + return false + true + proc prepareCloseBody(code: StatusCodes, reason: string): seq[byte] = result = reason.toBytes if ord(code) > 999: @@ -309,20 +341,31 @@ proc recv*( ## up to ``size`` bytes, note that we do break ## at message boundaries (``fin`` flag set). ## + ## Processing frames, this function allows to switch from `binary` to `text` + ## mode (aka utf8) but not the other way round. It allows to process the + ## final read mode only when all of the frame sequence was read after the + ## very last frame. The `text` mode is seen to be more restrictive than + ## `binary` mode. + ## + ## If the frame data types change from `text` to `binary`, a + ## `WSOpcodeMismatchError` exception is thrown unless the `ws` descriptor + ## flag `ws.textSwitchOk` was set `true` in which case this data type change + ## is accepted. + ## ## Use this to stream data from frames ## var consumed = 0 var pbuffer = cast[ptr UncheckedArray[byte]](data) try: - var first = true + #var first = true if not isNil(ws.frame): if ws.frame.fin and ws.frame.remainder > 0: trace "Continue reading from the same frame" - first = true elif not ws.frame.fin and ws.frame.remainder > 0: - trace "Restarting reads in the middle of a frame in a multiframe message" - first = false + trace "Restarting reads in the middle of a frame" & + " in a multiframe message" + #first = false elif ws.frame.fin and ws.frame.remainder <= 0: trace "Resetting an already consumed frame" ws.frame = nil @@ -332,6 +375,11 @@ proc recv*( if isNil(ws.frame): ws.frame = await ws.readFrame(ws.extensions) + # Note: The `ws.updateReadMode` directive replaces the functionality + # of the `first` variable handling. + if not ws.updateReadMode: + raise newException( + WSOpcodeMismatchError, "Opcode switch text to binary") while consumed < size: if isNil(ws.frame): @@ -339,7 +387,6 @@ proc recv*( break logScope: - first = first fin = ws.frame.fin len = ws.frame.length consumed = ws.frame.consumed @@ -347,6 +394,17 @@ proc recv*( opcode = ws.frame.opcode masked = ws.frame.mask + # The code below is left commented out for informational purposes, only. + # + # As it seems, the condition of the first `if` clause below is + # frequently violated when re-entering the `recv()` function when the + # read buffer data `size` is smaller than the frame size. And this + # leads to an unwanted exception. + # + # The functionality of the second `if` clause is re-implemented with + # the `readFrame()` directives rifht before the `while` loop and at + # the bottom of the while loop. + #[ if first == (ws.frame.opcode == Opcode.Cont): error "Opcode mismatch!" raise newException(WSOpcodeMismatchError, @@ -355,11 +413,13 @@ proc recv*( if first: ws.binary = ws.frame.opcode == Opcode.Binary # set binary flag trace "Setting binary flag" + ]# let len = min(ws.frame.remainder.int, size - consumed) if len > 0: trace "Reading bytes from frame stream", len - let read = await ws.frame.read(ws.stream.reader, addr pbuffer[consumed], len) + let read = await: + ws.frame.read(ws.stream.reader, addr pbuffer[consumed], len) if read <= 0: trace "Didn't read any bytes, breaking" break @@ -370,7 +430,7 @@ proc recv*( # all has been consumed from the frame # read the next frame if ws.frame.remainder <= 0: - first = false + #first = false if ws.frame.fin: # we're at the end of the message, break trace "Read all frames, breaking" @@ -378,7 +438,14 @@ proc recv*( break ws.frame = await ws.readFrame(ws.extensions) + # Note: The `ws.updateReadMode` directive replaces the functionality + # of the `first` variable handling. + if not ws.updateReadMode: + raise newException( + WSOpcodeMismatchError, "Opcode switch text to binary") + # The next `if` clause is earmarked to go away. It is currently needed to + # make some unit tests work. if not ws.binary and validateUTF8(pbuffer.toOpenArray(0, consumed - 1)) == false: raise newException(WSInvalidUTF8, "Invalid UTF8 sequence detected") @@ -386,12 +453,13 @@ proc recv*( trace "Exception reading frames", exc = exc.msg ws.readyState = ReadyState.Closed await ws.stream.closeWait() - raise exc + finally: if not isNil(ws.frame) and (ws.frame.fin and ws.frame.remainder <= 0): - trace "Last frame in message and no more bytes left to read, reseting current frame" + trace "Last frame in message and no more bytes left to read," & + " reseting current frame" ws.frame = nil return consumed diff --git a/websock/types.nim b/websock/types.nim index 1f172efe..79c1b08c 100644 --- a/websock/types.nim +++ b/websock/types.nim @@ -81,6 +81,7 @@ type readyState*: ReadyState masked*: bool # send masked packets binary*: bool # is payload binary? + textSwitchOk*: bool # allow switch from text => bin between frames flags*: set[TLSFlags] rng*: Rng frameSize*: int @@ -91,6 +92,7 @@ type WSSession* = ref object of WebSocket stream*: AsyncStream frame*: Frame + seen*: bool # true => at least one frame was seen proto*: string Ext* = ref object of RootObj