# Nim-LibP2P # Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) # at your option. # This file may not be copied, modified, or distributed except according to # those terms. {.push raises: [].} import std/strformat import stew/byteutils import chronos, chronicles, metrics import ../stream/connection import ./streamseq export connection logScope: topics = "libp2p bufferstream" const BufferStreamTrackerName* = "BufferStream" type BufferStream* = ref object of Connection readQueue*: AsyncQueue[seq[byte]] # read queue for managing backpressure readBuf*: StreamSeq # overflow buffer for readOnce pushing*: bool # number of ongoing push operations reading*: bool # is there an ongoing read? (only allow one) pushedEof*: bool # eof marker has been put on readQueue returnedEof*: bool # 0-byte readOnce has been completed func shortLog*(s: BufferStream): auto = try: if s == nil: "BufferStream(nil)" else: &"{shortLog(s.peerId)}:{s.oid}" except ValueError as exc: raiseAssert(exc.msg) chronicles.formatIt(BufferStream): shortLog(it) proc len*(s: BufferStream): int = s.readBuf.len + (if s.readQueue.len > 0: s.readQueue[0].len() else: 0) method initStream*(s: BufferStream) = if s.objName.len == 0: s.objName = BufferStreamTrackerName procCall Connection(s).initStream() s.readQueue = newAsyncQueue[seq[byte]](1) trace "BufferStream created", s proc new*( T: typedesc[BufferStream], timeout: Duration = DefaultConnectionTimeout): T = let bufferStream = T(timeout: timeout) bufferStream.initStream() bufferStream method pushData*( s: BufferStream, data: seq[byte] ) {.base, async: (raises: [CancelledError, LPStreamError]).} = ## Write bytes to internal read buffer, use this to fill up the ## buffer with data. ## ## `pushTo` will block if the queue is full, thus maintaining backpressure. ## doAssert(not s.pushing, "Only one concurrent push allowed for stream " & s.shortLog()) if s.isClosed or s.pushedEof: raise newLPStreamClosedError() if data.len == 0: return # Don't push 0-length buffers, these signal EOF # We will block here if there is already data queued, until it has been # processed try: s.pushing = true trace "Pushing data", s, data = data.len await s.readQueue.addLast(data) finally: s.pushing = false method pushEof*( s: BufferStream ) {.base, async: (raises: [CancelledError, LPStreamError]).} = if s.pushedEof: return doAssert(not s.pushing, "Only one concurrent push allowed for stream " & s.shortLog()) s.pushedEof = true # We will block here if there is already data queued, until it has been # processed try: s.pushing = true trace "Pushing EOF", s await s.readQueue.addLast(Eof) finally: s.pushing = false method atEof*(s: BufferStream): bool = s.isEof and s.readBuf.len == 0 method readOnce*( s: BufferStream, pbytes: pointer, nbytes: int ): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} = doAssert(nbytes > 0, "nbytes must be positive integer") doAssert(not s.reading, "Only one concurrent read allowed for stream " & s.shortLog()) if s.returnedEof: raise newLPStreamEOFError() var p = cast[ptr UncheckedArray[byte]](pbytes) # First consume leftovers from previous read var rbytes = s.readBuf.consumeTo(toOpenArray(p, 0, nbytes - 1)) if rbytes < nbytes and not s.isEof: # There's space in the buffer - consume some data from the read queue s.reading = true let buf = try: await s.readQueue.popFirst() except CancelledError as exc: # Not very efficient, but shouldn't happen often s.readBuf.assign(@(p.toOpenArray(0, rbytes - 1)) & @(s.readBuf.data)) raise exc finally: s.reading = false if buf.len == 0: # No more data will arrive on read queue trace "EOF", s s.isEof = true else: let remaining = min(buf.len, nbytes - rbytes) toOpenArray(p, rbytes, nbytes - 1)[0..