# Nim-LibP2P # Copyright (c) 2022 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) # at your option. # This file may not be copied, modified, or distributed except according to # those terms. {.push raises: [Defect].} import std/strformat import stew/byteutils import chronos, chronicles, metrics import ../stream/connection import ./streamseq when chronicles.enabledLogLevel == LogLevel.TRACE: import oids export connection logScope: topics = "libp2p bufferstream" const BufferStreamTrackerName* = "BufferStream" type BufferStream* = ref object of Connection readQueue*: AsyncQueue[seq[byte]] # read queue for managing backpressure readBuf*: StreamSeq # overflow buffer for readOnce pushing*: bool # number of ongoing push operations reading*: bool # is there an ongoing read? (only allow one) pushedEof*: bool # eof marker has been put on readQueue returnedEof*: bool # 0-byte readOnce has been completed func shortLog*(s: BufferStream): auto = try: if s.isNil: "BufferStream(nil)" else: &"{shortLog(s.peerId)}:{s.oid}" except ValueError as exc: raise newException(Defect, exc.msg) chronicles.formatIt(BufferStream): shortLog(it) proc len*(s: BufferStream): int = s.readBuf.len + (if s.readQueue.len > 0: s.readQueue[0].len() else: 0) method initStream*(s: BufferStream) = if s.objName.len == 0: s.objName = BufferStreamTrackerName procCall Connection(s).initStream() s.readQueue = newAsyncQueue[seq[byte]](1) trace "BufferStream created", s proc new*( T: typedesc[BufferStream], timeout: Duration = DefaultConnectionTimeout): T = let bufferStream = T(timeout: timeout) bufferStream.initStream() bufferStream method pushData*(s: BufferStream, data: seq[byte]) {.base, async.} = ## Write bytes to internal read buffer, use this to fill up the ## buffer with data. ## ## `pushTo` will block if the queue is full, thus maintaining backpressure. ## doAssert(not s.pushing, &"Only one concurrent push allowed for stream {s.shortLog()}") if s.isClosed or s.pushedEof: raise newLPStreamEOFError() if data.len == 0: return # Don't push 0-length buffers, these signal EOF # We will block here if there is already data queued, until it has been # processed try: s.pushing = true trace "Pushing data", s, data = data.len await s.readQueue.addLast(data) finally: s.pushing = false method pushEof*(s: BufferStream) {.base, async.} = if s.pushedEof: return doAssert(not s.pushing, &"Only one concurrent push allowed for stream {s.shortLog()}") s.pushedEof = true # We will block here if there is already data queued, until it has been # processed try: s.pushing = true trace "Pushing EOF", s await s.readQueue.addLast(Eof) finally: s.pushing = false method atEof*(s: BufferStream): bool {.raises: [Defect].} = s.isEof and s.readBuf.len == 0 method readOnce*(s: BufferStream, pbytes: pointer, nbytes: int): Future[int] {.async.} = doAssert(nbytes > 0, "nbytes must be positive integer") doAssert(not s.reading, &"Only one concurrent read allowed for stream {s.shortLog()}") if s.returnedEof: raise newLPStreamEOFError() var p = cast[ptr UncheckedArray[byte]](pbytes) # First consume leftovers from previous read var rbytes = s.readBuf.consumeTo(toOpenArray(p, 0, nbytes - 1)) if rbytes < nbytes and not s.isEof: # There's space in the buffer - consume some data from the read queue s.reading = true let buf = try: await s.readQueue.popFirst() except CancelledError as exc: # Not very efficient, but shouldn't happen often s.readBuf.assign(@(p.toOpenArray(0, rbytes - 1)) & @(s.readBuf.data)) raise exc except CatchableError as exc: # When an exception happens here, the Bufferstream is effectively # broken and no more reads will be valid - for now, return EOF if it's # called again, though this is not completely true - EOF represents an # "orderly" shutdown and that's not what happened here.. s.returnedEof = true raise exc finally: s.reading = false if buf.len == 0: # No more data will arrive on read queue trace "EOF", s s.isEof = true else: let remaining = min(buf.len, nbytes - rbytes) toOpenArray(p, rbytes, nbytes - 1)[0..