From 2c3c23c7c6450b13b46dab43f446bc3b8b47a291 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Sun, 1 Sep 2019 22:42:29 -0600 Subject: [PATCH] documentation --- libp2p/stream/bufferstream.nim | 76 ++++++++++++++++++++++++++++++---- 1 file changed, 67 insertions(+), 9 deletions(-) diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index c327d71..5e6577b 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -7,13 +7,33 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +## This module implements an asynchronous buffer stream +## which emulates physical async IO. +## +## The stream is based on the standard library's `Deque`, +## which is itself based on a ring buffer. +## +## It works by exposing a regular LPStream interface and +## a method ``pushTo`` to push data to the internal read +## buffer; as well as a handler that can be registrered +## that gets triggered on every write to the stream. This +## allows using the buffered stream as a sort of proxy, +## which can be consumed as a regular LPStream but allows +## injecting data for reads and intercepting writes. +## +## Another notable feature is that the stream is fully +## ordered and asynchronous. Reads are queued up in order +## and are suspended when not enough data available. This +## allows preserving backpressure while maintaining full +## asynchrony. Both pushing data with ``pushTo`` as well as +## reading with ``read`` and ``readOnce`` will suspend when +## the buffer is full (``pushTo``) or doesn't have enough +## data. + import deques, tables, sequtils, math import chronos import ../stream/lpstream -## I use a dequeu here because it uses a ring buffer under the hood -## which should on average be more performant than memMoves and copies - type WriteHandler* = proc (data: seq[byte]) {.gcsafe.} # TODO: figure out how to make this generic to avoid casts @@ -91,7 +111,10 @@ method read*(s: BufferStream, n = -1): Future[seq[byte]] {.async, gcsafe.} = if index < size: discard await s.requestReadBytes() -method readExactly*(s: BufferStream, pbytes: pointer, nbytes: int): Future[void] {.async, gcsafe.} = +method readExactly*(s: BufferStream, + pbytes: pointer, + nbytes: int): + Future[void] {.async, gcsafe.} = ## Read exactly ``nbytes`` bytes from read-only stream ``rstream`` and store ## it to ``pbytes``. ## @@ -103,7 +126,10 @@ method readExactly*(s: BufferStream, pbytes: pointer, nbytes: int): Future[void] let buff = await s.read(nbytes) copyMem(pbytes, unsafeAddr buff[0], nbytes) -method readLine*(s: BufferStream, limit = 0, sep = "\r\n"): Future[string] {.async, gcsafe.} = +method readLine*(s: BufferStream, + limit = 0, + sep = "\r\n"): + Future[string] {.async, gcsafe.} = ## Read one line from read-only stream ``rstream``, where ``"line"`` is a ## sequence of bytes ending with ``sep`` (default is ``"\r\n"``). ## @@ -136,7 +162,10 @@ method readLine*(s: BufferStream, limit = 0, sep = "\r\n"): Future[string] {.asy break inc(index) -method readOnce*(s: BufferStream, pbytes: pointer, nbytes: int): Future[int] {.async, gcsafe.} = +method readOnce*(s: BufferStream, + pbytes: pointer, + nbytes: int): + Future[int] {.async, gcsafe.} = ## Perform one read operation on read-only stream ``rstream``. ## ## If internal buffer is not empty, ``nbytes`` bytes will be transferred from @@ -197,22 +226,51 @@ method readUntil*(s: BufferStream, else: s.shrink(datalen) -method write*(s: BufferStream, pbytes: pointer, nbytes: int) {.async, gcsafe.} = +method write*(s: BufferStream, + pbytes: pointer, + nbytes: int) + {.async, gcsafe.} = + ## Consume (discard) all bytes (n <= 0) or ``n`` bytes from read-only stream + ## ``rstream``. + ## + ## Return number of bytes actually consumed (discarded). var buf: seq[byte] = newSeq[byte](nbytes) copyMem(addr buf[0], pbytes, nbytes) s.writeHandler(buf) -method write*(s: BufferStream, msg: string, msglen = -1) {.async, gcsafe.} = +method write*(s: BufferStream, + msg: string, + msglen = -1) + {.async, gcsafe.} = + ## Write string ``sbytes`` of length ``msglen`` to writer stream ``wstream``. + ## + ## String ``sbytes`` must not be zero-length. + ## + ## If ``msglen < 0`` whole string ``sbytes`` will be writen to stream. + ## If ``msglen > len(sbytes)`` only ``len(sbytes)`` bytes will be written to + ## stream. var buf = "" shallowCopy(buf, if msglen > 0: msg[0.. len(sbytes)`` only ``len(sbytes)`` bytes will be written to + ## stream. var buf: seq[byte] shallowCopy(buf, if msglen > 0: msg[0..