libp2p/stream/lpstream

    Dark Mode
Search:
Group by:
  Source   Edit

Length Prefixed stream implementation

Types

Direction {.pure.} = enum
  In, Out
  Source   Edit
LPStream = ref object of RootObj
  closeEvent*: AsyncEvent
  isClosed*: bool
  isEof*: bool
  objName*: string
  oid*: Oid
  dir*: Direction
  closedWithEOF: bool
  Source   Edit
LPStreamError = object of LPError
  Source   Edit
MaxSizeError = object of LPStreamError
  Source   Edit
StreamTracker = ref object of TrackerBase
  opened*: uint64
  closed*: uint64
  Source   Edit

Consts

Eof = []
  Source   Edit
LPStreamTrackerName = "LPStream"
  Source   Edit

Procs

proc closeWithEOF(s: LPStream): Future[void] {.async: (...raises: []), public,
    ...gcsafe, raises: [], tags: [WriteIOEffect, TimeEffect].}

Close the stream and wait for EOF - use this with half-closed streams where an EOF is expected to arrive from the other end.

Note - this should only be used when there has been an in-protocol notification that no more data will arrive and that the only thing left for the other end to do is to close the stream gracefully.

In particular, it must not be used when there is another concurrent read ongoing (which may be the case during cancellations)!

  Source   Edit
proc join(s: LPStream): Future[void] {.async: (...raises: [CancelledError],
    raw: true), public, ...gcsafe, raises: [], tags: [RootEffect].}
Wait for the stream to be closed   Source   Edit
proc newLPStreamClosedError(): ref LPStreamClosedError {....gcsafe, raises: [],
    tags: [].}
  Source   Edit
proc newLPStreamConnDownError(parentException: ref Exception = nil): ref LPStreamConnDownError {.
    ...gcsafe, raises: [], tags: [].}
  Source   Edit
proc newLPStreamEOFError(): ref LPStreamEOFError {....gcsafe, raises: [], tags: [].}
  Source   Edit
proc newLPStreamIncompleteError(): ref LPStreamIncompleteError {....gcsafe,
    raises: [], tags: [].}
  Source   Edit
proc newLPStreamLimitError(): ref LPStreamLimitError {....gcsafe, raises: [],
    tags: [].}
  Source   Edit
proc newLPStreamRemoteClosedError(): ref LPStreamRemoteClosedError {....gcsafe,
    raises: [], tags: [].}
  Source   Edit
proc newLPStreamResetError(): ref LPStreamResetError {....gcsafe, raises: [],
    tags: [].}
  Source   Edit
proc readExactly(s: LPStream; pbytes: pointer; nbytes: int): Future[void] {.
    async: (...raises: [CancelledError, LPStreamError]), public, ...gcsafe,
    raises: [], tags: [WriteIOEffect, TimeEffect].}
Waits for nbytes to be available, then read them and return them   Source   Edit
proc readLine(s: LPStream; limit = 0; sep = "\r\n"): Future[string] {.
    async: (...raises: [CancelledError, LPStreamError]), public, ...gcsafe,
    raises: [], tags: [].}
Reads up to limit bytes are read, or a sep is found   Source   Edit
proc readLp(s: LPStream; maxSize: int): Future[seq[byte]] {.
    async: (...raises: [CancelledError, LPStreamError]), public, ...gcsafe,
    raises: [], tags: [].}
read length prefixed msg, with the length encoded as a varint   Source   Edit
proc readVarint(conn: LPStream): Future[uint64] {.
    async: (...raises: [CancelledError, LPStreamError]), public, ...gcsafe,
    raises: [], tags: [].}
  Source   Edit
func shortLog(s: LPStream): auto {....gcsafe, raises: [], tags: [].}
  Source   Edit
proc write(s: LPStream; msg: string): Future[void] {.
    async: (...raises: [CancelledError, LPStreamError], raw: true), public, ...gcsafe,
    raises: [], tags: [].}
  Source   Edit
proc writeLp(s: LPStream; msg: openArray[byte]): Future[void] {.
    async: (...raises: [CancelledError, LPStreamError], raw: true), public, ...gcsafe,
    raises: [], tags: [].}
Write msg with a varint-encoded length prefix   Source   Edit
proc writeLp(s: LPStream; msg: string): Future[void] {.
    async: (...raises: [CancelledError, LPStreamError], raw: true), public, ...gcsafe,
    raises: [], tags: [].}
  Source   Edit

Methods

method atEof(s: LPStream): bool {.base, public, ...gcsafe, raises: [], tags: [].}
  Source   Edit
method close(s: LPStream): Future[void] {.async: (...raises: [], raw: true), base,
    public, ...gcsafe, raises: [], tags: [RootEffect].}
close the stream - this may block, but will not raise exceptions   Source   Edit
method closed(s: LPStream): bool {.base, public, ...gcsafe, raises: [], tags: [].}
  Source   Edit
method closeImpl(s: LPStream): Future[void] {.async: (...raises: [], raw: true),
    base, ...gcsafe, raises: [], tags: [RootEffect].}
Implementation of close - called only once   Source   Edit
method initStream(s: LPStream) {.base, ...gcsafe, raises: [],
                                 tags: [RootEffect, TimeEffect].}
  Source   Edit
method readOnce(s: LPStream; pbytes: pointer; nbytes: int): Future[int] {.base,
    async: (...raises: [CancelledError, LPStreamError], raw: true), public, ...gcsafe,
    raises: [], tags: [].}
Reads whatever is available in the stream, up to nbytes. Will block if nothing is available   Source   Edit
method write(s: LPStream; msg: seq[byte]): Future[void] {.
    async: (...raises: [CancelledError, LPStreamError], raw: true), base, public,
    ...gcsafe, raises: [], tags: [].}
  Source   Edit

Templates

template formatItIMPL(it: LPStream): auto
  Source   Edit