From a644a19a2df842093ea1c852ef562833164a26de Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Sun, 12 Apr 2020 22:26:42 -0600 Subject: [PATCH] wip: rework with async iterators --- libp2p/connection.nim | 185 --------------- libp2p/stream/bufferstream.nim | 403 -------------------------------- libp2p/stream/chronosstream.nim | 122 +++------- libp2p/stream/lenprefixed.nim | 106 +++++++++ libp2p/stream/lpstream.nim | 104 --------- libp2p/stream/ringbuffer.nim | 233 ++++++++++++++++++ tests/testlenprefixed.nim | 36 +++ 7 files changed, 401 insertions(+), 788 deletions(-) delete mode 100644 libp2p/connection.nim delete mode 100644 libp2p/stream/bufferstream.nim create mode 100644 libp2p/stream/lenprefixed.nim delete mode 100644 libp2p/stream/lpstream.nim create mode 100644 libp2p/stream/ringbuffer.nim create mode 100644 tests/testlenprefixed.nim diff --git a/libp2p/connection.nim b/libp2p/connection.nim deleted file mode 100644 index 46fc5cc52..000000000 --- a/libp2p/connection.nim +++ /dev/null @@ -1,185 +0,0 @@ -## Nim-LibP2P -## Copyright (c) 2019 Status Research & Development GmbH -## Licensed under either of -## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -## * MIT license ([LICENSE-MIT](LICENSE-MIT)) -## at your option. -## This file may not be copied, modified, or distributed except according to -## those terms. - -import oids -import chronos, chronicles, metrics -import peerinfo, - multiaddress, - stream/lpstream, - peerinfo, - varint, - vbuffer - -logScope: - topic = "Connection" - -const DefaultReadSize* = 1 shl 20 - -type - Connection* = ref object of LPStream - peerInfo*: PeerInfo - stream*: LPStream - observedAddrs*: Multiaddress - - InvalidVarintException = object of LPStreamError - InvalidVarintSizeException = object of LPStreamError - -declareGauge libp2p_open_connection, "open Connection instances" - -proc newInvalidVarintException*(): ref InvalidVarintException = - newException(InvalidVarintException, "Unable to parse varint") - -proc newInvalidVarintSizeException*(): ref InvalidVarintSizeException = - newException(InvalidVarintSizeException, "Wrong varint size") - -proc bindStreamClose(conn: Connection) {.async.} = - # bind stream's close event to connection's close - # to ensure correct close propagation - if not isNil(conn.stream.closeEvent): - await conn.stream.closeEvent.wait() - trace "wrapped stream closed, about to close conn", closed = conn.isClosed, - peer = if not isNil(conn.peerInfo): - conn.peerInfo.id else: "" - if not conn.isClosed: - trace "wrapped stream closed, closing conn", closed = conn.isClosed, - peer = if not isNil(conn.peerInfo): - conn.peerInfo.id else: "" - asyncCheck conn.close() - -proc init*[T: Connection](self: var T, stream: LPStream): T = - ## create a new Connection for the specified async reader/writer - new self - self.stream = stream - self.closeEvent = newAsyncEvent() - when chronicles.enabledLogLevel == LogLevel.TRACE: - self.oid = genOid() - asyncCheck self.bindStreamClose() - libp2p_open_connection.inc() - - return self - -proc newConnection*(stream: LPStream): Connection = - ## create a new Connection for the specified async reader/writer - result.init(stream) - -method read*(s: Connection, n = -1): Future[seq[byte]] {.gcsafe.} = - s.stream.read(n) - -method readExactly*(s: Connection, - pbytes: pointer, - nbytes: int): - Future[void] {.gcsafe.} = - s.stream.readExactly(pbytes, nbytes) - -method readLine*(s: Connection, - limit = 0, - sep = "\r\n"): - Future[string] {.gcsafe.} = - s.stream.readLine(limit, sep) - -method readOnce*(s: Connection, - pbytes: pointer, - nbytes: int): - Future[int] {.gcsafe.} = - s.stream.readOnce(pbytes, nbytes) - -method readUntil*(s: Connection, - pbytes: pointer, - nbytes: int, - sep: seq[byte]): - Future[int] {.gcsafe.} = - s.stream.readUntil(pbytes, nbytes, sep) - -method write*(s: Connection, - pbytes: pointer, - nbytes: int): - Future[void] {.gcsafe.} = - s.stream.write(pbytes, nbytes) - -method write*(s: Connection, - msg: string, - msglen = -1): - Future[void] {.gcsafe.} = - s.stream.write(msg, msglen) - -method write*(s: Connection, - msg: seq[byte], - msglen = -1): - Future[void] {.gcsafe.} = - s.stream.write(msg, msglen) - -method closed*(s: Connection): bool = - if isNil(s.stream): - return false - - result = s.stream.closed - -method close*(s: Connection) {.async, gcsafe.} = - if not s.closed: - trace "about to close connection", closed = s.closed, - peer = if not isNil(s.peerInfo): - s.peerInfo.id else: "" - - if not isNil(s.stream) and not s.stream.closed: - trace "closing child stream", closed = s.closed, - peer = if not isNil(s.peerInfo): - s.peerInfo.id else: "" - await s.stream.close() - - s.closeEvent.fire() - s.isClosed = true - - trace "connection closed", closed = s.closed, - peer = if not isNil(s.peerInfo): - s.peerInfo.id else: "" - libp2p_open_connection.dec() - -proc readLp*(s: Connection): Future[seq[byte]] {.async, gcsafe.} = - ## read lenght prefixed msg - var - size: uint - length: int - res: VarintStatus - buff = newSeq[byte](10) - try: - for i in 0.. DefaultReadSize: - raise newInvalidVarintSizeException() - buff.setLen(size) - if size > 0.uint: - trace "reading exact bytes from stream", size = size - await s.readExactly(addr buff[0], int(size)) - return buff - except LPStreamIncompleteError as exc: - trace "remote connection ended unexpectedly", exc = exc.msg - raise exc - except LPStreamReadError as exc: - trace "couldn't read from stream", exc = exc.msg - raise exc - -proc writeLp*(s: Connection, msg: string | seq[byte]): Future[void] {.gcsafe.} = - ## write lenght prefixed - var buf = initVBuffer() - buf.writeSeq(msg) - buf.finish() - s.write(buf.buffer) - -method getObservedAddrs*(c: Connection): Future[MultiAddress] {.base, async, gcsafe.} = - ## get resolved multiaddresses for the connection - result = c.observedAddrs - -proc `$`*(conn: Connection): string = - if not isNil(conn.peerInfo): - result = $(conn.peerInfo) diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim deleted file mode 100644 index f2c168abe..000000000 --- a/libp2p/stream/bufferstream.nim +++ /dev/null @@ -1,403 +0,0 @@ -## Nim-LibP2P -## Copyright (c) 2019 Status Research & Development GmbH -## Licensed under either of -## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -## * MIT license ([LICENSE-MIT](LICENSE-MIT)) -## at your option. -## This file may not be copied, modified, or distributed except according to -## those terms. - -## This module implements an asynchronous buffer stream -## which emulates physical async IO. -## -## The stream is based on the standard library's `Deque`, -## which is itself based on a ring buffer. -## -## It works by exposing a regular LPStream interface and -## a method ``pushTo`` to push data to the internal read -## buffer; as well as a handler that can be registrered -## that gets triggered on every write to the stream. This -## allows using the buffered stream as a sort of proxy, -## which can be consumed as a regular LPStream but allows -## injecting data for reads and intercepting writes. -## -## Another notable feature is that the stream is fully -## ordered and asynchronous. Reads are queued up in order -## and are suspended when not enough data available. This -## allows preserving backpressure while maintaining full -## asynchrony. Both writting to the internal buffer with -## ``pushTo`` as well as reading with ``read*` methods, -## will suspend until either the amount of elements in the -## buffer goes below ``maxSize`` or more data becomes available. - -import deques, math, oids -import chronos, chronicles, metrics -import ../stream/lpstream - -const DefaultBufferSize* = 1024 - -type - # TODO: figure out how to make this generic to avoid casts - WriteHandler* = proc (data: seq[byte]): Future[void] {.gcsafe.} - - BufferStream* = ref object of LPStream - maxSize*: int # buffer's max size in bytes - readBuf: Deque[byte] # this is a ring buffer based dequeue, this makes it perfect as the backing store here - readReqs: Deque[Future[void]] # use dequeue to fire reads in order - dataReadEvent: AsyncEvent - writeHandler*: WriteHandler - lock: AsyncLock - isPiped: bool - - AlreadyPipedError* = object of CatchableError - NotWritableError* = object of CatchableError - -declareGauge libp2p_open_bufferstream, "open BufferStream instances" - -proc newAlreadyPipedError*(): ref Exception {.inline.} = - result = newException(AlreadyPipedError, "stream already piped") - -proc newNotWritableError*(): ref Exception {.inline.} = - result = newException(NotWritableError, "stream is not writable") - -proc requestReadBytes(s: BufferStream): Future[void] = - ## create a future that will complete when more - ## data becomes available in the read buffer - result = newFuture[void]() - s.readReqs.addLast(result) - trace "requestReadBytes(): added a future to readReqs" - -proc initBufferStream*(s: BufferStream, - handler: WriteHandler = nil, - size: int = DefaultBufferSize) = - s.maxSize = if isPowerOfTwo(size): size else: nextPowerOfTwo(size) - s.readBuf = initDeque[byte](s.maxSize) - s.readReqs = initDeque[Future[void]]() - s.dataReadEvent = newAsyncEvent() - s.lock = newAsyncLock() - s.writeHandler = handler - s.closeEvent = newAsyncEvent() - when chronicles.enabledLogLevel == LogLevel.TRACE: - s.oid = genOid() - s.isClosed = false - libp2p_open_bufferstream.inc() - -proc newBufferStream*(handler: WriteHandler = nil, - size: int = DefaultBufferSize): BufferStream = - new result - result.initBufferStream(handler, size) - -proc popFirst*(s: BufferStream): byte = - result = s.readBuf.popFirst() - s.dataReadEvent.fire() - -proc popLast*(s: BufferStream): byte = - result = s.readBuf.popLast() - s.dataReadEvent.fire() - -proc shrink(s: BufferStream, fromFirst = 0, fromLast = 0) = - s.readBuf.shrink(fromFirst, fromLast) - s.dataReadEvent.fire() - -proc len*(s: BufferStream): int = s.readBuf.len - -proc pushTo*(s: BufferStream, data: seq[byte]) {.async.} = - ## Write bytes to internal read buffer, use this to fill up the - ## buffer with data. - ## - ## This method is async and will wait until all data has been - ## written to the internal buffer; this is done so that backpressure - ## is preserved. - ## - - when chronicles.enabledLogLevel == LogLevel.TRACE: - logScope: - stream_oid = $s.oid - - try: - await s.lock.acquire() - var index = 0 - while not s.closed(): - while index < data.len and s.readBuf.len < s.maxSize: - s.readBuf.addLast(data[index]) - inc(index) - trace "pushTo()", msg = "added " & $index & " bytes to readBuf" - - # resolve the next queued read request - if s.readReqs.len > 0: - s.readReqs.popFirst().complete() - trace "pushTo(): completed a readReqs future" - - if index >= data.len: - return - - # if we couldn't transfer all the data to the - # internal buf wait on a read event - await s.dataReadEvent.wait() - s.dataReadEvent.clear() - finally: - s.lock.release() - -method read*(s: BufferStream, n = -1): Future[seq[byte]] {.async.} = - ## Read all bytes (n <= 0) or exactly `n` bytes from buffer - ## - ## This procedure allocates buffer seq[byte] and return it as result. - ## - when chronicles.enabledLogLevel == LogLevel.TRACE: - logScope: - stream_oid = $s.oid - - trace "read()", requested_bytes = n - var size = if n > 0: n else: s.readBuf.len() - var index = 0 - - if s.readBuf.len() == 0: - await s.requestReadBytes() - - while index < size: - while s.readBuf.len() > 0 and index < size: - result.add(s.popFirst()) - inc(index) - trace "read()", read_bytes = index - - if index < size: - await s.requestReadBytes() - -method readExactly*(s: BufferStream, - pbytes: pointer, - nbytes: int): - Future[void] {.async.} = - ## Read exactly ``nbytes`` bytes from read-only stream ``rstream`` and store - ## it to ``pbytes``. - ## - ## If EOF is received and ``nbytes`` is not yet read, the procedure - ## will raise ``LPStreamIncompleteError``. - ## - when chronicles.enabledLogLevel == LogLevel.TRACE: - logScope: - stream_oid = $s.oid - - var buff: seq[byte] - try: - buff = await s.read(nbytes) - except LPStreamEOFError as exc: - trace "Exception occured", exc = exc.msg - - if nbytes > buff.len(): - raise newLPStreamIncompleteError() - - copyMem(pbytes, addr buff[0], nbytes) - -method readLine*(s: BufferStream, - limit = 0, - sep = "\r\n"): - Future[string] {.async.} = - ## Read one line from read-only stream ``rstream``, where ``"line"`` is a - ## sequence of bytes ending with ``sep`` (default is ``"\r\n"``). - ## - ## If EOF is received, and ``sep`` was not found, the method will return the - ## partial read bytes. - ## - ## If the EOF was received and the internal buffer is empty, return an - ## empty string. - ## - ## If ``limit`` more then 0, then result string will be limited to ``limit`` - ## bytes. - ## - result = "" - var lim = if limit <= 0: -1 else: limit - var state = 0 - var index = 0 - - index = 0 - while index < s.readBuf.len: - let ch = char(s.readBuf[index]) - if sep[state] == ch: - inc(state) - if state == len(sep): - s.shrink(index + 1) - break - else: - state = 0 - result.add(ch) - if len(result) == lim: - s.shrink(index + 1) - break - inc(index) - -method readOnce*(s: BufferStream, - pbytes: pointer, - nbytes: int): - Future[int] {.async.} = - ## Perform one read operation on read-only stream ``rstream``. - ## - ## If internal buffer is not empty, ``nbytes`` bytes will be transferred from - ## internal buffer, otherwise it will wait until some bytes will be received. - ## - if s.readBuf.len == 0: - await s.requestReadBytes() - - var len = if nbytes > s.readBuf.len: s.readBuf.len else: nbytes - await s.readExactly(pbytes, len) - result = len - -method readUntil*(s: BufferStream, - pbytes: pointer, - nbytes: int, - sep: seq[byte]): - Future[int] {.async.} = - ## Read data from the read-only stream ``rstream`` until separator ``sep`` is - ## found. - ## - ## On success, the data and separator will be removed from the internal - ## buffer (consumed). Returned data will include the separator at the end. - ## - ## If EOF is received, and `sep` was not found, procedure will raise - ## ``LPStreamIncompleteError``. - ## - ## If ``nbytes`` bytes has been received and `sep` was not found, procedure - ## will raise ``LPStreamLimitError``. - ## - ## Procedure returns actual number of bytes read. - ## - var - dest = cast[ptr UncheckedArray[byte]](pbytes) - state = 0 - k = 0 - - let datalen = s.readBuf.len() - if datalen == 0 and s.readBuf.len() == 0: - raise newLPStreamIncompleteError() - - var index = 0 - while index < datalen: - let ch = s.readBuf[index] - if sep[state] == ch: - inc(state) - else: - state = 0 - if k < nbytes: - dest[k] = ch - inc(k) - else: - raise newLPStreamLimitError() - if state == len(sep): - break - inc(index) - - if state == len(sep): - s.shrink(index + 1) - result = k - else: - s.shrink(datalen) - -method write*(s: BufferStream, - pbytes: pointer, - nbytes: int): Future[void] = - ## Consume (discard) all bytes (n <= 0) or ``n`` bytes from read-only stream - ## ``rstream``. - ## - ## Return number of bytes actually consumed (discarded). - ## - if isNil(s.writeHandler): - var retFuture = newFuture[void]("BufferStream.write(pointer)") - retFuture.fail(newNotWritableError()) - return retFuture - - var buf: seq[byte] = newSeq[byte](nbytes) - copyMem(addr buf[0], pbytes, nbytes) - result = s.writeHandler(buf) - -method write*(s: BufferStream, - msg: string, - msglen = -1): Future[void] = - ## Write string ``sbytes`` of length ``msglen`` to writer stream ``wstream``. - ## - ## String ``sbytes`` must not be zero-length. - ## - ## If ``msglen < 0`` whole string ``sbytes`` will be writen to stream. - ## If ``msglen > len(sbytes)`` only ``len(sbytes)`` bytes will be written to - ## stream. - ## - if isNil(s.writeHandler): - var retFuture = newFuture[void]("BufferStream.write(string)") - retFuture.fail(newNotWritableError()) - return retFuture - - var buf = "" - shallowCopy(buf, if msglen > 0: msg[0.. len(sbytes)`` only ``len(sbytes)`` bytes will be written to - ## stream. - ## - if isNil(s.writeHandler): - var retFuture = newFuture[void]("BufferStream.write(seq)") - retFuture.fail(newNotWritableError()) - return retFuture - - var buf: seq[byte] - shallowCopy(buf, if msglen > 0: msg[0.. SafeVarintSize: + raise newInvalidVarintSizeException() + + return size.int + +proc read(lp: LenPrefixed, + chunk: Future[seq[byte]]): + Future[seq[byte]] {.async, gcsafe.} = + try: + lp.readBuff.append((await chunk)) + + while lp.readBuff.len > 0: + case lp.mode: + of Mode.Decoding: + lp.size = lp.decodeLen() + lp.mode = Mode.Reading + else: + result = lp.readBuff.read(lp.size) + echo result + lp.size -= result.len + if lp.size == 0: + lp.mode = Mode.Decoding + + except CatchableError as exc: + trace "Exception occured", exc = exc.msg + raise exc + +proc decode*(lp: LenPrefixed, + i: iterator(): Future[seq[byte]]): + iterator(): Future[seq[byte]] = + return iterator(): Future[seq[byte]] = + for chunk in i(): + yield lp.read(chunk) + +proc write(lp: LenPrefixed, + i: iterator(): Future[seq[byte]]): + Future[seq[byte]] {.async.} = + for chunk in i(): + lp.writeBuff.append((await chunk)) + + var buf = initVBuffer() + buf.writeSeq(lp.writeBuff.read()) + buf.finish() + result = buf.buffer + +proc encode*(lp: LenPrefixed, + i: iterator(): Future[seq[byte]]): + iterator(): Future[seq[byte]] = + return iterator(): Future[seq[byte]] = + yield lp.write(i) diff --git a/libp2p/stream/lpstream.nim b/libp2p/stream/lpstream.nim deleted file mode 100644 index fc7764a1c..000000000 --- a/libp2p/stream/lpstream.nim +++ /dev/null @@ -1,104 +0,0 @@ -## Nim-LibP2P -## Copyright (c) 2019 Status Research & Development GmbH -## Licensed under either of -## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -## * MIT license ([LICENSE-MIT](LICENSE-MIT)) -## at your option. -## This file may not be copied, modified, or distributed except according to -## those terms. - -import oids -import chronicles, chronos - -type - LPStream* = ref object of RootObj - isClosed*: bool - closeEvent*: AsyncEvent - when chronicles.enabledLogLevel == LogLevel.TRACE: - oid*: Oid - - LPStreamError* = object of CatchableError - LPStreamIncompleteError* = object of LPStreamError - LPStreamIncorrectError* = object of Defect - LPStreamLimitError* = object of LPStreamError - LPStreamReadError* = object of LPStreamError - par*: ref Exception - LPStreamWriteError* = object of LPStreamError - par*: ref Exception - LPStreamEOFError* = object of LPStreamError - -proc newLPStreamReadError*(p: ref Exception): ref Exception {.inline.} = - var w = newException(LPStreamReadError, "Read stream failed") - w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg - w.par = p - result = w - -proc newLPStreamWriteError*(p: ref Exception): ref Exception {.inline.} = - var w = newException(LPStreamWriteError, "Write stream failed") - w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg - w.par = p - result = w - -proc newLPStreamIncompleteError*(): ref Exception {.inline.} = - result = newException(LPStreamIncompleteError, "Incomplete data received") - -proc newLPStreamLimitError*(): ref Exception {.inline.} = - result = newException(LPStreamLimitError, "Buffer limit reached") - -proc newLPStreamIncorrectError*(m: string): ref Exception {.inline.} = - result = newException(LPStreamIncorrectError, m) - -proc newLPStreamEOFError*(): ref Exception {.inline.} = - result = newException(LPStreamEOFError, "Stream EOF!") - -method closed*(s: LPStream): bool {.base, inline.} = - s.isClosed - -method read*(s: LPStream, - n = -1): - Future[seq[byte]] {.base, async.} = - doAssert(false, "not implemented!") - -method readExactly*(s: LPStream, - pbytes: pointer, - nbytes: int): - Future[void] {.base, async.} = - doAssert(false, "not implemented!") - -method readLine*(s: LPStream, - limit = 0, - sep = "\r\n"): - Future[string] - {.base, async.} = - doAssert(false, "not implemented!") - -method readOnce*(s: LPStream, - pbytes: pointer, - nbytes: int): - Future[int] - {.base, async.} = - doAssert(false, "not implemented!") - -method readUntil*(s: LPStream, - pbytes: pointer, - nbytes: int, - sep: seq[byte]): - Future[int] - {.base, async.} = - doAssert(false, "not implemented!") - -method write*(s: LPStream, pbytes: pointer, nbytes: int) - {.base, async.} = - doAssert(false, "not implemented!") - -method write*(s: LPStream, msg: string, msglen = -1) - {.base, async.} = - doAssert(false, "not implemented!") - -method write*(s: LPStream, msg: seq[byte], msglen = -1) - {.base, async.} = - doAssert(false, "not implemented!") - -method close*(s: LPStream) - {.base, async.} = - doAssert(false, "not implemented!") diff --git a/libp2p/stream/ringbuffer.nim b/libp2p/stream/ringbuffer.nim new file mode 100644 index 000000000..13c4d9309 --- /dev/null +++ b/libp2p/stream/ringbuffer.nim @@ -0,0 +1,233 @@ +type + RingBuffer*[T: byte | char] = object + buff*: seq[T] + head: int + tail: int + size: int + len*: int + +const DefaultSize = 1024 + +## A bare bones ring buffer suited for byte oriented data. +## The buffer uses `shallowCopy` when appending and reading +## data to overcome Nim's copy semantics. +## +## This is a FIFO data structure, data is always appended to the end +## and read from the front. +## + +proc init*[T](b: type[RingBuffer[T]], size = DefaultSize): b = + ## Create and initialize the ring buffer. Takes an optional + ## maximum ``size`` parameter, otherwise ``size`` will default + ## to ``DefaultSize`` which is set to 1024. + ## + ## .. code-block:: nim + ## # create a buffer with 5 + ## var buff = RingBuffer[byte].init(5) + ## buff.add(@['a', 'b', 'c', 'd', 'e']) + ## var data = newSeq[char](5) + ## discard buff.read(data) + ## echo data # prints @['a', 'b', 'c', 'd', 'e'] + ## + RingBuffer[T](buff: newSeq[T](size), size: size) + +proc append*[T](b: var RingBuffer[T], data: openArray[T]) = + ## Append data to the end of the buffer. ``data`` will be + ## ``shallowCopy``ed into the buffer to overcome Nim's copy + ## semantics for ``seq``. + ## + ## .. code-block:: nim + ## buff.append(@['a', 'b', 'b', 'c', 'd']) + ## + if data.len + b.len > b.size: + raise newException(CatchableError, "Buffer would overflow!") + + for i in data: + shallowCopy(b.buff[b.tail], i) + if b.tail == b.size - 1: + b.tail = 0 + else: + b.tail.inc + b.len.inc + +proc read*[T](b: var RingBuffer[T], + data: var openArray[T], + size: int = -1): int = + ## Read up to ``size`` bytes/chars from the front of the buffer + ## into the ``data`` argument. + ## + ## Returns an int indicating the amount of bytes/chars read. + ## + ## Note that ``size`` is the maximum amount of bytes/chars to + ## read, if not enough data is available read will return what + ## it can. If ``size`` is not provided, then the ``len`` field + ## of the ``data`` argument will be used instead. + ## + ## .. code-block:: nim + ## # read 5 chars from the buffer + ## var data = newSeq[char](10) + ## assert(buff.read(data, 5) == 5) + ## + if b.len == 0: + return + + if data.len == 0 or size > data.len: + raise newException(CatchableError, "Data isn't big enough!") + + var isize = size + if size > b.size: + isize = b.size + + if size < 0 or size > b.len: + isize = b.len + else: + isize = size + + while result < isize: + shallowCopy(data[result], b.buff[b.head]) + if b.len == 0: + break + + if b.head == b.size - 1: + b.head = 0 + else: + b.head.inc() + b.len.dec + result.inc + +proc read*[T](b: var RingBuffer[T], size: int = -1): seq[T] = + ## Read up to ``size`` bytes/chars from the front of the buffer. + ## + ## Returns a `seq` with the read bytes/chars. + ## + ## Note that ``size`` is the maximum amount of bytes/chars to read, + ## if not enough data is available read will return what it can. + ## If ``size`` is not provided, the entire contents of the buffer + ## will be returned. + ## + ## .. code-block:: nim + ## # read 5 chars from the buffer + ## assert(buff.read() == @[...]) + ## + var isize = size + if size < 0: + isize = b.len + + result = newSeq[T](isize) + discard b.read(result, isize) + +proc reset*[T](b: var RingBuffer[T]) = + ## Reset the internal state of the buffer. The + ## internal buffer itself will not be cleared, + ## but all internal pointers will be which allows + ## reusing the buffer as if new. + b.len = 0 + b.head = 0 + b.tail = 0 + +proc clear*[T](b: var RingBuffer[T]) = + ## Reset and clear the buffer. + b.reset() + b.buff.setLen(0) + +when isMainModule: + block Basic: + var buff = RingBuffer[char].init(10) + var data = newSeq[char](10) + + buff.append(@['a', 'b', 'c', 'd', 'e']) + assert(buff.len == 5, "len should be 5") + assert(buff.head == 0, "head should be 0") + assert(buff.tail == 5, "tail should b4 5") + + buff.append(@['f', 'g', 'h', 'i', 'j']) + assert(buff.len == 10, "len should be 10") + assert(buff.head == 0, "head should be 0") + assert(buff.tail == 0, "tail should be 0") + + assert(buff.read(data, 5) == 5, "should have read 5 chars") + assert(data[0..4] == @['a', 'b', 'c', 'd', 'e']) + assert(buff.len == 5, "len should be 5") + assert(buff.head == 5, "head should be 5") + assert(buff.tail == 0, "tail should be 0") + + buff.append(@['k', 'l', 'm', 'n', 'o']) + assert(buff.len == 10, "len should be 10") + assert(buff.head == 5, "head should be 5") + assert(buff.tail == 5, "tail should be 5") + + assert(buff.read(data, 2) == 2, "should have read 2 chars") + assert(data[0..1] == @['f', 'g']) + assert(buff.len == 8, "len should be 8") + assert(buff.head == 7, "head should be 7") + assert(buff.tail == 5, "tail should be 5") + + buff.append(@['p', 'q']) + assert(buff.len == 10, "len should be 10") + assert(buff.head == 7, "head should be 7") + assert(buff.tail == 7, "tail should be 7") + + assert(buff.read(data) == 10, "should have read 10 chars") + assert(data == @['h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q']) + assert(buff.len == 0, "len should be 0") + assert(buff.head == 7, "head should be 7") + assert(buff.tail == 7, "tail should be 7") + + buff.append(@['x', 'y']) + assert(buff.len == 2, "len should be 2") + assert(buff.head == 7, "head should be 7") + assert(buff.tail == 9, "tail should be 9") + + assert(buff.read(data, 4) == 2, "should have read 2 chars") + assert(data[0..1] == @['x', 'y']) + assert(buff.len == 0, "len should be 0") + assert(buff.head == 9, "head should be 9") + assert(buff.tail == 9, "tail should be 9") + + buff.append(@['a', 'b', 'c', 'd', 'e']) + assert(buff.len == 5, "len should be 5") + assert(buff.head == 9, "head should be 7") + assert(buff.tail == 4, "tail should be 9") + + assert(buff.read(5) == @['a', 'b', 'c', 'd', 'e']) + assert(buff.len == 0, "len should be 0") + assert(buff.head == 4, "head should be 9") + assert(buff.tail == 4, "tail should be 9") + + block Errors: + var buff = RingBuffer[char].init(5) + + try: + buff.append(@['a', 'b', 'c', 'd', 'e', 'g']) + assert(false, "should not allow adding pas buffer size") + except CatchableError as exc: + assert(true) + + try: + buff.append(@['a', 'b', 'c', 'd', 'e', 'g']) + var data: seq[char] + discard buff.read(data) + assert(false, "should not allow passing empty container") + except CatchableError as exc: + assert(true) + + try: + buff.append(@['a', 'b', 'c', 'd', 'e', 'g']) + var data = newSeq[char](2) + discard buff.read(data, 5) + assert(false, "should not allow passing size greater than container") + except CatchableError as exc: + assert(true) + + block Cleanup: + var buff = RingBuffer[char].init(5) + + buff.reset() + assert(buff.len == 0, "buff.len should be 0") + assert(buff.head == 0, "buff.head should be 0") + assert(buff.tail == 0, "buff.tail should be 0") + + buff.clear() + assert(buff.buff.len == 0, "buff.buff.len should be 0") + + echo "All passed!" diff --git a/tests/testlenprefixed.nim b/tests/testlenprefixed.nim new file mode 100644 index 000000000..53e42a697 --- /dev/null +++ b/tests/testlenprefixed.nim @@ -0,0 +1,36 @@ +import unittest +import chronos +import ../libp2p/stream/lenprefixed + +suite "LenPrefixed stream": + test "encode": + proc test(): Future[bool] {.async.} = + var lp = LenPrefixed.init() + iterator stream(): Future[seq[byte]] {.closure.} = + var fut = newFuture[seq[byte]]() + fut.complete(cast[seq[byte]](@['a', 'b', 'c', 'd', 'e'])) + yield fut + + var encoded = await lp.encode(stream)() + check: + encoded == cast[seq[byte]](@['\5', 'a', 'b', 'c', 'd', 'e']) + result = true + + check: + waitFor(test()) == true + + test "decode": + proc test(): Future[bool] {.async.} = + var lp = LenPrefixed.init() + iterator stream(): Future[seq[byte]] {.closure.} = + var fut = newFuture[seq[byte]]() + fut.complete(cast[seq[byte]](@['\5', 'a', 'b', 'c', 'd', 'e'])) + yield fut + + var decoded = await lp.decode(stream)() + check: + decoded == cast[seq[byte]](@['a', 'b', 'c', 'd', 'e']) + result = true + + check: + waitFor(test()) == true