documentation

This commit is contained in:
Dmitriy Ryajov 2019-09-01 22:42:29 -06:00
parent f3cc6fbef0
commit 2c3c23c7c6
1 changed files with 67 additions and 9 deletions

View File

@ -7,13 +7,33 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
## This module implements an asynchronous buffer stream
## which emulates physical async IO.
##
## The stream is based on the standard library's `Deque`,
## which is itself based on a ring buffer.
##
## It works by exposing a regular LPStream interface and
## a method ``pushTo`` to push data to the internal read
## buffer; as well as a handler that can be registrered
## that gets triggered on every write to the stream. This
## allows using the buffered stream as a sort of proxy,
## which can be consumed as a regular LPStream but allows
## injecting data for reads and intercepting writes.
##
## Another notable feature is that the stream is fully
## ordered and asynchronous. Reads are queued up in order
## and are suspended when not enough data available. This
## allows preserving backpressure while maintaining full
## asynchrony. Both pushing data with ``pushTo`` as well as
## reading with ``read`` and ``readOnce`` will suspend when
## the buffer is full (``pushTo``) or doesn't have enough
## data.
import deques, tables, sequtils, math import deques, tables, sequtils, math
import chronos import chronos
import ../stream/lpstream import ../stream/lpstream
## I use a dequeu here because it uses a ring buffer under the hood
## which should on average be more performant than memMoves and copies
type type
WriteHandler* = proc (data: seq[byte]) {.gcsafe.} # TODO: figure out how to make this generic to avoid casts WriteHandler* = proc (data: seq[byte]) {.gcsafe.} # TODO: figure out how to make this generic to avoid casts
@ -91,7 +111,10 @@ method read*(s: BufferStream, n = -1): Future[seq[byte]] {.async, gcsafe.} =
if index < size: if index < size:
discard await s.requestReadBytes() discard await s.requestReadBytes()
method readExactly*(s: BufferStream, pbytes: pointer, nbytes: int): Future[void] {.async, gcsafe.} = method readExactly*(s: BufferStream,
pbytes: pointer,
nbytes: int):
Future[void] {.async, gcsafe.} =
## Read exactly ``nbytes`` bytes from read-only stream ``rstream`` and store ## Read exactly ``nbytes`` bytes from read-only stream ``rstream`` and store
## it to ``pbytes``. ## it to ``pbytes``.
## ##
@ -103,7 +126,10 @@ method readExactly*(s: BufferStream, pbytes: pointer, nbytes: int): Future[void]
let buff = await s.read(nbytes) let buff = await s.read(nbytes)
copyMem(pbytes, unsafeAddr buff[0], nbytes) copyMem(pbytes, unsafeAddr buff[0], nbytes)
method readLine*(s: BufferStream, limit = 0, sep = "\r\n"): Future[string] {.async, gcsafe.} = method readLine*(s: BufferStream,
limit = 0,
sep = "\r\n"):
Future[string] {.async, gcsafe.} =
## Read one line from read-only stream ``rstream``, where ``"line"`` is a ## Read one line from read-only stream ``rstream``, where ``"line"`` is a
## sequence of bytes ending with ``sep`` (default is ``"\r\n"``). ## sequence of bytes ending with ``sep`` (default is ``"\r\n"``).
## ##
@ -136,7 +162,10 @@ method readLine*(s: BufferStream, limit = 0, sep = "\r\n"): Future[string] {.asy
break break
inc(index) inc(index)
method readOnce*(s: BufferStream, pbytes: pointer, nbytes: int): Future[int] {.async, gcsafe.} = method readOnce*(s: BufferStream,
pbytes: pointer,
nbytes: int):
Future[int] {.async, gcsafe.} =
## Perform one read operation on read-only stream ``rstream``. ## Perform one read operation on read-only stream ``rstream``.
## ##
## If internal buffer is not empty, ``nbytes`` bytes will be transferred from ## If internal buffer is not empty, ``nbytes`` bytes will be transferred from
@ -197,22 +226,51 @@ method readUntil*(s: BufferStream,
else: else:
s.shrink(datalen) s.shrink(datalen)
method write*(s: BufferStream, pbytes: pointer, nbytes: int) {.async, gcsafe.} = method write*(s: BufferStream,
pbytes: pointer,
nbytes: int)
{.async, gcsafe.} =
## Consume (discard) all bytes (n <= 0) or ``n`` bytes from read-only stream
## ``rstream``.
##
## Return number of bytes actually consumed (discarded).
var buf: seq[byte] = newSeq[byte](nbytes) var buf: seq[byte] = newSeq[byte](nbytes)
copyMem(addr buf[0], pbytes, nbytes) copyMem(addr buf[0], pbytes, nbytes)
s.writeHandler(buf) s.writeHandler(buf)
method write*(s: BufferStream, msg: string, msglen = -1) {.async, gcsafe.} = method write*(s: BufferStream,
msg: string,
msglen = -1)
{.async, gcsafe.} =
## Write string ``sbytes`` of length ``msglen`` to writer stream ``wstream``.
##
## String ``sbytes`` must not be zero-length.
##
## If ``msglen < 0`` whole string ``sbytes`` will be writen to stream.
## If ``msglen > len(sbytes)`` only ``len(sbytes)`` bytes will be written to
## stream.
var buf = "" var buf = ""
shallowCopy(buf, if msglen > 0: msg[0..<msglen] else: msg) shallowCopy(buf, if msglen > 0: msg[0..<msglen] else: msg)
s.writeHandler(cast[seq[byte]](toSeq(buf.items))) s.writeHandler(cast[seq[byte]](toSeq(buf.items)))
method write*(s: BufferStream, msg: seq[byte], msglen = -1) {.async, gcsafe.} = method write*(s: BufferStream,
msg: seq[byte],
msglen = -1)
{.async, gcsafe.} =
## Write sequence of bytes ``sbytes`` of length ``msglen`` to writer
## stream ``wstream``.
##
## Sequence of bytes ``sbytes`` must not be zero-length.
##
## If ``msglen < 0`` whole sequence ``sbytes`` will be writen to stream.
## If ``msglen > len(sbytes)`` only ``len(sbytes)`` bytes will be written to
## stream.
var buf: seq[byte] var buf: seq[byte]
shallowCopy(buf, if msglen > 0: msg[0..<msglen] else: msg) shallowCopy(buf, if msglen > 0: msg[0..<msglen] else: msg)
s.writeHandler(buf) s.writeHandler(buf)
method close*(s: BufferStream) {.async, gcsafe.} = method close*(s: BufferStream) {.async, gcsafe.} =
## close the stream and clear the buffer
for r in s.readReqs: for r in s.readReqs:
r.cancel() r.cancel()
s.readBuf.clear() s.readBuf.clear()