216 lines
6.3 KiB
Nim
216 lines
6.3 KiB
Nim
# Nim-LibP2P
|
|
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
|
# Licensed under either of
|
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
# at your option.
|
|
# This file may not be copied, modified, or distributed except according to
|
|
# those terms.
|
|
|
|
{.push raises: [].}
|
|
|
|
import std/strformat
|
|
import stew/byteutils
|
|
import chronos, chronicles, metrics
|
|
import ../stream/connection
|
|
import ./streamseq
|
|
|
|
export connection
|
|
|
|
logScope:
|
|
topics = "libp2p bufferstream"
|
|
|
|
const
|
|
BufferStreamTrackerName* = "BufferStream"
|
|
|
|
type
|
|
BufferStream* = ref object of Connection
|
|
readQueue*: AsyncQueue[seq[byte]] # read queue for managing backpressure
|
|
readBuf*: StreamSeq # overflow buffer for readOnce
|
|
pushing*: bool # number of ongoing push operations
|
|
reading*: bool # is there an ongoing read? (only allow one)
|
|
pushedEof*: bool # eof marker has been put on readQueue
|
|
returnedEof*: bool # 0-byte readOnce has been completed
|
|
|
|
func shortLog*(s: BufferStream): auto =
|
|
try:
|
|
if s == nil: "BufferStream(nil)"
|
|
else: &"{shortLog(s.peerId)}:{s.oid}"
|
|
except ValueError as exc:
|
|
raiseAssert(exc.msg)
|
|
|
|
chronicles.formatIt(BufferStream): shortLog(it)
|
|
|
|
proc len*(s: BufferStream): int =
|
|
s.readBuf.len + (if s.readQueue.len > 0: s.readQueue[0].len() else: 0)
|
|
|
|
method initStream*(s: BufferStream) =
|
|
if s.objName.len == 0:
|
|
s.objName = BufferStreamTrackerName
|
|
|
|
procCall Connection(s).initStream()
|
|
|
|
s.readQueue = newAsyncQueue[seq[byte]](1)
|
|
|
|
trace "BufferStream created", s
|
|
|
|
proc new*(
|
|
T: typedesc[BufferStream],
|
|
timeout: Duration = DefaultConnectionTimeout): T =
|
|
let bufferStream = T(timeout: timeout)
|
|
bufferStream.initStream()
|
|
bufferStream
|
|
|
|
method pushData*(
|
|
s: BufferStream,
|
|
data: seq[byte]
|
|
) {.base, async: (raises: [CancelledError, LPStreamError]).} =
|
|
## Write bytes to internal read buffer, use this to fill up the
|
|
## buffer with data.
|
|
##
|
|
## `pushTo` will block if the queue is full, thus maintaining backpressure.
|
|
##
|
|
|
|
doAssert(not s.pushing,
|
|
"Only one concurrent push allowed for stream " & s.shortLog())
|
|
|
|
if s.isClosed or s.pushedEof:
|
|
raise newLPStreamClosedError()
|
|
|
|
if data.len == 0:
|
|
return # Don't push 0-length buffers, these signal EOF
|
|
|
|
# We will block here if there is already data queued, until it has been
|
|
# processed
|
|
try:
|
|
s.pushing = true
|
|
trace "Pushing data", s, data = data.len
|
|
await s.readQueue.addLast(data)
|
|
finally:
|
|
s.pushing = false
|
|
|
|
method pushEof*(
|
|
s: BufferStream
|
|
) {.base, async: (raises: [CancelledError, LPStreamError]).} =
|
|
if s.pushedEof:
|
|
return
|
|
|
|
doAssert(not s.pushing,
|
|
"Only one concurrent push allowed for stream " & s.shortLog())
|
|
|
|
s.pushedEof = true
|
|
|
|
# We will block here if there is already data queued, until it has been
|
|
# processed
|
|
try:
|
|
s.pushing = true
|
|
trace "Pushing EOF", s
|
|
await s.readQueue.addLast(Eof)
|
|
finally:
|
|
s.pushing = false
|
|
|
|
method atEof*(s: BufferStream): bool =
|
|
s.isEof and s.readBuf.len == 0
|
|
|
|
method readOnce*(
|
|
s: BufferStream,
|
|
pbytes: pointer,
|
|
nbytes: int
|
|
): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} =
|
|
doAssert(nbytes > 0, "nbytes must be positive integer")
|
|
doAssert(not s.reading,
|
|
"Only one concurrent read allowed for stream " & s.shortLog())
|
|
|
|
if s.returnedEof:
|
|
raise newLPStreamEOFError()
|
|
|
|
var
|
|
p = cast[ptr UncheckedArray[byte]](pbytes)
|
|
|
|
# First consume leftovers from previous read
|
|
var rbytes = s.readBuf.consumeTo(toOpenArray(p, 0, nbytes - 1))
|
|
|
|
if rbytes < nbytes and not s.isEof:
|
|
# There's space in the buffer - consume some data from the read queue
|
|
s.reading = true
|
|
let buf =
|
|
try:
|
|
await s.readQueue.popFirst()
|
|
except CancelledError as exc:
|
|
# Not very efficient, but shouldn't happen often
|
|
s.readBuf.assign(@(p.toOpenArray(0, rbytes - 1)) & @(s.readBuf.data))
|
|
raise exc
|
|
finally:
|
|
s.reading = false
|
|
|
|
if buf.len == 0:
|
|
# No more data will arrive on read queue
|
|
trace "EOF", s
|
|
s.isEof = true
|
|
else:
|
|
let remaining = min(buf.len, nbytes - rbytes)
|
|
toOpenArray(p, rbytes, nbytes - 1)[0..<remaining] =
|
|
buf.toOpenArray(0, remaining - 1)
|
|
rbytes += remaining
|
|
|
|
if remaining < buf.len:
|
|
trace "add leftovers", s, len = buf.len - remaining
|
|
s.readBuf.add(buf.toOpenArray(remaining, buf.high))
|
|
|
|
if s.isEof and s.readBuf.len() == 0:
|
|
# We can clear the readBuf memory since it won't be used any more
|
|
s.readBuf = StreamSeq()
|
|
|
|
s.activity = true
|
|
|
|
# We want to return 0 exactly once - after that, we'll start raising instead -
|
|
# this is a bit nuts in a mixed exception / return value world, but allows the
|
|
# consumer of the stream to rely on the 0-byte read as a "regular" EOF marker
|
|
# (instead of _sometimes_ getting an exception).
|
|
s.returnedEof = rbytes == 0
|
|
|
|
return rbytes
|
|
|
|
method closeImpl*(
|
|
s: BufferStream): Future[void] {.async: (raises: [], raw: true).} =
|
|
## close the stream and clear the buffer
|
|
trace "Closing BufferStream", s, len = s.len
|
|
|
|
# First, make sure any new calls to `readOnce` and `pushData` etc will fail -
|
|
# there may already be such calls in the event queue however
|
|
s.isEof = true
|
|
s.readBuf = StreamSeq()
|
|
s.pushedEof = true
|
|
|
|
# Essentially we need to handle the following cases
|
|
#
|
|
# - If a push was in progress but no reader is
|
|
# attached we need to pop the queue
|
|
# - If a read was in progress without without a
|
|
# push/data we need to push the Eof marker to
|
|
# notify the reader that the channel closed
|
|
#
|
|
# In all other cases, there should be a data to complete
|
|
# a read or enough room in the queue/buffer to complete a
|
|
# push.
|
|
#
|
|
# State | Q Empty | Q Full
|
|
# ------------|----------|-------
|
|
# Reading | Push Eof | Na
|
|
# Pushing | Na | Pop
|
|
try:
|
|
if not(s.reading and s.pushing):
|
|
if s.reading:
|
|
if s.readQueue.empty():
|
|
# There is an active reader
|
|
s.readQueue.addLastNoWait(Eof)
|
|
elif s.pushing:
|
|
if not s.readQueue.empty():
|
|
discard s.readQueue.popFirstNoWait()
|
|
except AsyncQueueFullError, AsyncQueueEmptyError:
|
|
raiseAssert(getCurrentExceptionMsg())
|
|
|
|
trace "Closed BufferStream", s
|
|
|
|
procCall Connection(s).closeImpl()
|