diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index 4c7e865..7b23ef0 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -25,10 +25,10 @@ ## ordered and asynchronous. Reads are queued up in order ## and are suspended when not enough data available. This ## allows preserving backpressure while maintaining full -## asynchrony. Both pushing data with ``pushTo`` as well as -## reading with ``read`` and ``readOnce`` will suspend when -## the buffer is full (``pushTo``) or doesn't have enough -## data. +## asynchrony. Both writting to the internal buffer with +## ``pushTo`` as well as reading with ``read`` and ``readOnce``, +## will suspend until either the amount of elements in the +## buffer goes below ``maxSize`` or more data becomes available. import deques, tables, sequtils, math import chronos @@ -38,7 +38,7 @@ type WriteHandler* = proc (data: seq[byte]): Future[void] {.gcsafe.} # TODO: figure out how to make this generic to avoid casts BufferStream* = ref object of LPStream - maxSize*: int + maxSize*: int # buffer's max size in bytes readBuf: Deque[byte] # a deque is based on a ring buffer readReqs: Deque[Future[int]] # use dequeue to fire reads in order dataReadEvent: AsyncEvent @@ -93,8 +93,7 @@ proc pushTo*(s: BufferStream, data: seq[byte]) {.async, gcsafe.} = break # if we couldn't transfer all the data to the - # internal buf do an async sleep and try writting - # some more, this should preserve backpresure + # internal buf wait on a read event await s.dataReadEvent.wait() method read*(s: BufferStream, n = -1): Future[seq[byte]] {.async, gcsafe.} =