fix: tests and docs

This commit is contained in:
Dmitriy Ryajov 2019-09-03 14:41:38 -06:00
parent 96cd7bcf50
commit 917e0553e1
1 changed files with 6 additions and 7 deletions

View File

@ -25,10 +25,10 @@
## ordered and asynchronous. Reads are queued up in order ## ordered and asynchronous. Reads are queued up in order
## and are suspended when not enough data available. This ## and are suspended when not enough data available. This
## allows preserving backpressure while maintaining full ## allows preserving backpressure while maintaining full
## asynchrony. Both pushing data with ``pushTo`` as well as ## asynchrony. Both writting to the internal buffer with
## reading with ``read`` and ``readOnce`` will suspend when ## ``pushTo`` as well as reading with ``read`` and ``readOnce``,
## the buffer is full (``pushTo``) or doesn't have enough ## will suspend until either the amount of elements in the
## data. ## buffer goes below ``maxSize`` or more data becomes available.
import deques, tables, sequtils, math import deques, tables, sequtils, math
import chronos import chronos
@ -38,7 +38,7 @@ type
WriteHandler* = proc (data: seq[byte]): Future[void] {.gcsafe.} # TODO: figure out how to make this generic to avoid casts WriteHandler* = proc (data: seq[byte]): Future[void] {.gcsafe.} # TODO: figure out how to make this generic to avoid casts
BufferStream* = ref object of LPStream BufferStream* = ref object of LPStream
maxSize*: int maxSize*: int # buffer's max size in bytes
readBuf: Deque[byte] # a deque is based on a ring buffer readBuf: Deque[byte] # a deque is based on a ring buffer
readReqs: Deque[Future[int]] # use dequeue to fire reads in order readReqs: Deque[Future[int]] # use dequeue to fire reads in order
dataReadEvent: AsyncEvent dataReadEvent: AsyncEvent
@ -93,8 +93,7 @@ proc pushTo*(s: BufferStream, data: seq[byte]) {.async, gcsafe.} =
break break
# if we couldn't transfer all the data to the # if we couldn't transfer all the data to the
# internal buf do an async sleep and try writting # internal buf wait on a read event
# some more, this should preserve backpresure
await s.dataReadEvent.wait() await s.dataReadEvent.wait()
method read*(s: BufferStream, n = -1): Future[seq[byte]] {.async, gcsafe.} = method read*(s: BufferStream, n = -1): Future[seq[byte]] {.async, gcsafe.} =