2022-07-01 18:19:57 +00:00
|
|
|
# Nim-LibP2P
|
|
|
|
# Copyright (c) 2022 Status Research & Development GmbH
|
|
|
|
# Licensed under either of
|
|
|
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
|
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
|
|
# at your option.
|
|
|
|
# This file may not be copied, modified, or distributed except according to
|
|
|
|
# those terms.
|
|
|
|
|
|
|
|
## Length Prefixed stream implementation
|
2019-09-01 17:31:24 +00:00
|
|
|
|
2022-10-29 21:26:44 +00:00
|
|
|
{.push gcsafe.}
|
2022-08-03 11:33:19 +00:00
|
|
|
when (NimMajor, NimMinor) < (1, 4):
|
|
|
|
{.push raises: [Defect].}
|
|
|
|
else:
|
|
|
|
{.push raises: [].}
|
2021-05-21 16:27:01 +00:00
|
|
|
|
2020-08-15 19:50:31 +00:00
|
|
|
import std/oids
|
|
|
|
import stew/byteutils
|
2020-06-19 17:29:43 +00:00
|
|
|
import chronicles, chronos, metrics
|
2020-05-08 20:58:23 +00:00
|
|
|
import ../varint,
|
2020-06-19 17:29:43 +00:00
|
|
|
../peerinfo,
|
2021-05-21 16:27:01 +00:00
|
|
|
../multiaddress,
|
2022-07-01 18:19:57 +00:00
|
|
|
../utility,
|
2021-05-21 16:27:01 +00:00
|
|
|
../errors
|
2019-09-01 17:31:24 +00:00
|
|
|
|
2021-01-28 22:12:54 +00:00
|
|
|
export errors
|
|
|
|
|
2020-11-01 22:23:26 +00:00
|
|
|
declareGauge(libp2p_open_streams,
|
|
|
|
"open stream instances", labels = ["type", "dir"])
|
2020-05-23 17:08:39 +00:00
|
|
|
|
2020-08-15 19:50:31 +00:00
|
|
|
export oids
|
|
|
|
|
2020-08-02 10:22:49 +00:00
|
|
|
logScope:
|
2020-12-01 17:34:27 +00:00
|
|
|
topics = "libp2p lpstream"
|
2020-08-02 10:22:49 +00:00
|
|
|
|
2020-11-05 03:52:54 +00:00
|
|
|
const
|
|
|
|
LPStreamTrackerName* = "LPStream"
|
2020-11-23 15:07:11 +00:00
|
|
|
Eof* = @[]
|
2020-11-05 03:52:54 +00:00
|
|
|
|
2019-12-10 20:50:35 +00:00
|
|
|
type
|
2020-11-01 22:23:26 +00:00
|
|
|
Direction* {.pure.} = enum
|
|
|
|
In, Out
|
|
|
|
|
2019-09-01 17:31:24 +00:00
|
|
|
LPStream* = ref object of RootObj
|
2020-06-24 15:08:44 +00:00
|
|
|
closeEvent*: AsyncEvent
|
2019-12-04 04:44:54 +00:00
|
|
|
isClosed*: bool
|
2020-05-20 00:14:15 +00:00
|
|
|
isEof*: bool
|
2020-06-19 17:29:43 +00:00
|
|
|
objName*: string
|
|
|
|
oid*: Oid
|
2020-11-01 22:23:26 +00:00
|
|
|
dir*: Direction
|
2020-12-01 08:44:21 +00:00
|
|
|
closedWithEOF: bool # prevent concurrent calls
|
2019-09-01 17:31:24 +00:00
|
|
|
|
2021-05-21 16:27:01 +00:00
|
|
|
LPStreamError* = object of LPError
|
2019-09-01 17:31:24 +00:00
|
|
|
LPStreamIncompleteError* = object of LPStreamError
|
2020-05-06 16:31:47 +00:00
|
|
|
LPStreamIncorrectDefect* = object of Defect
|
2019-09-01 17:31:24 +00:00
|
|
|
LPStreamLimitError* = object of LPStreamError
|
|
|
|
LPStreamReadError* = object of LPStreamError
|
2020-05-23 17:08:39 +00:00
|
|
|
par*: ref CatchableError
|
2019-09-01 17:31:24 +00:00
|
|
|
LPStreamWriteError* = object of LPStreamError
|
2020-05-23 17:08:39 +00:00
|
|
|
par*: ref CatchableError
|
2019-12-10 20:50:35 +00:00
|
|
|
LPStreamEOFError* = object of LPStreamError
|
2022-09-14 08:58:41 +00:00
|
|
|
|
|
|
|
# X | Read | Write
|
|
|
|
# Local close | Works | LPStreamClosedError
|
|
|
|
# Remote close | LPStreamRemoteClosedError | Works
|
|
|
|
# Local reset | LPStreamClosedError | LPStreamClosedError
|
|
|
|
# Remote reset | LPStreamResetError | LPStreamResetError
|
|
|
|
# Connection down | LPStreamConnDown | LPStreamConnDownError
|
|
|
|
|
|
|
|
LPStreamResetError* = object of LPStreamEOFError
|
|
|
|
LPStreamClosedError* = object of LPStreamEOFError
|
|
|
|
LPStreamRemoteClosedError* = object of LPStreamEOFError
|
|
|
|
LPStreamConnDownError* = object of LPStreamEOFError
|
2019-09-01 17:31:24 +00:00
|
|
|
|
2020-05-08 20:58:23 +00:00
|
|
|
InvalidVarintError* = object of LPStreamError
|
|
|
|
MaxSizeError* = object of LPStreamError
|
|
|
|
|
2020-11-05 03:52:54 +00:00
|
|
|
StreamTracker* = ref object of TrackerBase
|
|
|
|
opened*: uint64
|
|
|
|
closed*: uint64
|
|
|
|
|
|
|
|
proc setupStreamTracker(name: string): StreamTracker =
|
|
|
|
let tracker = new StreamTracker
|
|
|
|
|
|
|
|
proc dumpTracking(): string {.gcsafe.} =
|
2020-11-06 15:24:24 +00:00
|
|
|
return "Opened " & tracker.id & ": " & $tracker.opened & "\n" &
|
|
|
|
"Closed " & tracker.id & ": " & $tracker.closed
|
2020-11-05 03:52:54 +00:00
|
|
|
|
|
|
|
proc leakTransport(): bool {.gcsafe.} =
|
|
|
|
return (tracker.opened != tracker.closed)
|
|
|
|
|
|
|
|
tracker.id = name
|
|
|
|
tracker.opened = 0
|
|
|
|
tracker.closed = 0
|
|
|
|
tracker.dump = dumpTracking
|
|
|
|
tracker.isLeaked = leakTransport
|
|
|
|
addTracker(name, tracker)
|
|
|
|
|
|
|
|
return tracker
|
|
|
|
|
|
|
|
proc getStreamTracker(name: string): StreamTracker {.gcsafe.} =
|
|
|
|
result = cast[StreamTracker](getTracker(name))
|
|
|
|
if isNil(result):
|
|
|
|
result = setupStreamTracker(name)
|
|
|
|
|
2021-06-02 13:39:10 +00:00
|
|
|
proc newLPStreamReadError*(p: ref CatchableError): ref LPStreamReadError =
|
2019-09-01 21:56:00 +00:00
|
|
|
var w = newException(LPStreamReadError, "Read stream failed")
|
2019-09-01 17:31:24 +00:00
|
|
|
w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg
|
|
|
|
w.par = p
|
|
|
|
result = w
|
|
|
|
|
2021-06-02 13:39:10 +00:00
|
|
|
proc newLPStreamReadError*(msg: string): ref LPStreamReadError =
|
2020-05-07 20:37:46 +00:00
|
|
|
newException(LPStreamReadError, msg)
|
|
|
|
|
2021-06-02 13:39:10 +00:00
|
|
|
proc newLPStreamWriteError*(p: ref CatchableError): ref LPStreamWriteError =
|
2019-09-01 17:31:24 +00:00
|
|
|
var w = newException(LPStreamWriteError, "Write stream failed")
|
|
|
|
w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg
|
|
|
|
w.par = p
|
|
|
|
result = w
|
|
|
|
|
2021-05-21 16:27:01 +00:00
|
|
|
proc newLPStreamIncompleteError*(): ref LPStreamIncompleteError =
|
2019-09-01 17:31:24 +00:00
|
|
|
result = newException(LPStreamIncompleteError, "Incomplete data received")
|
|
|
|
|
2021-05-21 16:27:01 +00:00
|
|
|
proc newLPStreamLimitError*(): ref LPStreamLimitError =
|
2019-09-01 17:31:24 +00:00
|
|
|
result = newException(LPStreamLimitError, "Buffer limit reached")
|
|
|
|
|
2021-05-21 16:27:01 +00:00
|
|
|
proc newLPStreamIncorrectDefect*(m: string): ref LPStreamIncorrectDefect =
|
2020-05-06 16:31:47 +00:00
|
|
|
result = newException(LPStreamIncorrectDefect, m)
|
2019-09-01 17:31:24 +00:00
|
|
|
|
2021-05-21 16:27:01 +00:00
|
|
|
proc newLPStreamEOFError*(): ref LPStreamEOFError =
|
2019-12-10 20:50:35 +00:00
|
|
|
result = newException(LPStreamEOFError, "Stream EOF!")
|
2019-09-04 06:40:11 +00:00
|
|
|
|
2022-09-14 08:58:41 +00:00
|
|
|
proc newLPStreamResetError*(): ref LPStreamResetError =
|
|
|
|
result = newException(LPStreamResetError, "Stream Reset!")
|
|
|
|
|
2021-05-21 16:27:01 +00:00
|
|
|
proc newLPStreamClosedError*(): ref LPStreamClosedError =
|
2020-05-20 00:14:15 +00:00
|
|
|
result = newException(LPStreamClosedError, "Stream Closed!")
|
|
|
|
|
2022-09-14 08:58:41 +00:00
|
|
|
proc newLPStreamRemoteClosedError*(): ref LPStreamRemoteClosedError =
|
|
|
|
result = newException(LPStreamRemoteClosedError, "Stream Remotely Closed!")
|
|
|
|
|
|
|
|
proc newLPStreamConnDownError*(
|
|
|
|
parentException: ref Exception = nil): ref LPStreamConnDownError =
|
|
|
|
result = newException(
|
|
|
|
LPStreamConnDownError,
|
|
|
|
"Stream Underlying Connection Closed!",
|
|
|
|
parentException)
|
|
|
|
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
func shortLog*(s: LPStream): auto =
|
|
|
|
if s.isNil: "LPStream(nil)"
|
|
|
|
else: $s.oid
|
|
|
|
chronicles.formatIt(LPStream): shortLog(it)
|
|
|
|
|
2020-05-23 17:08:39 +00:00
|
|
|
method initStream*(s: LPStream) {.base.} =
|
2020-06-19 17:29:43 +00:00
|
|
|
if s.objName.len == 0:
|
2021-06-14 08:26:11 +00:00
|
|
|
s.objName = LPStreamTrackerName
|
2020-06-19 17:29:43 +00:00
|
|
|
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
s.closeEvent = newAsyncEvent()
|
2020-06-19 17:29:43 +00:00
|
|
|
s.oid = genOid()
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
|
2020-11-01 22:23:26 +00:00
|
|
|
libp2p_open_streams.inc(labelValues = [s.objName, $s.dir])
|
2020-11-05 03:52:54 +00:00
|
|
|
inc getStreamTracker(s.objName).opened
|
2021-01-04 18:59:05 +00:00
|
|
|
trace "Stream created", s, objName = s.objName, dir = $s.dir
|
2020-06-24 15:08:44 +00:00
|
|
|
|
2022-07-01 18:19:57 +00:00
|
|
|
proc join*(s: LPStream): Future[void] {.public.} =
|
|
|
|
## Wait for the stream to be closed
|
2020-06-24 15:08:44 +00:00
|
|
|
s.closeEvent.wait()
|
2020-05-23 17:08:39 +00:00
|
|
|
|
2022-07-01 18:19:57 +00:00
|
|
|
method closed*(s: LPStream): bool {.base, public.} =
|
2019-12-04 04:44:54 +00:00
|
|
|
s.isClosed
|
|
|
|
|
2022-07-01 18:19:57 +00:00
|
|
|
method atEof*(s: LPStream): bool {.base, public.} =
|
2020-05-20 00:14:15 +00:00
|
|
|
s.isEof
|
|
|
|
|
2021-05-21 16:27:01 +00:00
|
|
|
method readOnce*(
|
|
|
|
s: LPStream,
|
|
|
|
pbytes: pointer,
|
|
|
|
nbytes: int):
|
2022-07-01 18:19:57 +00:00
|
|
|
Future[int] {.base, async, public.} =
|
|
|
|
## Reads whatever is available in the stream,
|
|
|
|
## up to `nbytes`. Will block if nothing is
|
|
|
|
## available
|
2019-10-03 19:29:58 +00:00
|
|
|
doAssert(false, "not implemented!")
|
2019-09-01 17:31:24 +00:00
|
|
|
|
2020-06-27 17:33:34 +00:00
|
|
|
proc readExactly*(s: LPStream,
|
2020-07-12 16:37:10 +00:00
|
|
|
pbytes: pointer,
|
|
|
|
nbytes: int):
|
2022-07-01 18:19:57 +00:00
|
|
|
Future[void] {.async, public.} =
|
|
|
|
## Waits for `nbytes` to be available, then read
|
|
|
|
## them and return them
|
2020-06-27 17:33:34 +00:00
|
|
|
if s.atEof:
|
2022-09-14 08:58:41 +00:00
|
|
|
var ch: char
|
|
|
|
discard await s.readOnce(addr ch, 1)
|
2020-06-27 17:33:34 +00:00
|
|
|
raise newLPStreamEOFError()
|
|
|
|
|
2020-09-14 08:19:54 +00:00
|
|
|
if nbytes == 0:
|
|
|
|
return
|
|
|
|
|
2020-07-12 16:37:10 +00:00
|
|
|
logScope:
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
s
|
2020-07-12 16:37:10 +00:00
|
|
|
nbytes = nbytes
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
objName = s.objName
|
2020-07-12 16:37:10 +00:00
|
|
|
|
2020-06-27 17:33:34 +00:00
|
|
|
var pbuffer = cast[ptr UncheckedArray[byte]](pbytes)
|
|
|
|
var read = 0
|
|
|
|
while read < nbytes and not(s.atEof()):
|
|
|
|
read += await s.readOnce(addr pbuffer[read], nbytes - read)
|
|
|
|
|
2020-09-14 08:19:54 +00:00
|
|
|
if read == 0:
|
|
|
|
doAssert s.atEof()
|
|
|
|
trace "couldn't read all bytes, stream EOF", s, nbytes, read
|
2022-09-14 08:58:41 +00:00
|
|
|
# Re-readOnce to raise a more specific error than EOF
|
|
|
|
# Raise EOF if it doesn't raise anything(shouldn't happen)
|
|
|
|
discard await s.readOnce(addr pbuffer[read], nbytes - read)
|
|
|
|
warn "Read twice while at EOF"
|
2020-09-14 08:19:54 +00:00
|
|
|
raise newLPStreamEOFError()
|
|
|
|
|
2020-06-27 17:33:34 +00:00
|
|
|
if read < nbytes:
|
2020-09-14 08:19:54 +00:00
|
|
|
trace "couldn't read all bytes, incomplete data", s, nbytes, read
|
|
|
|
raise newLPStreamIncompleteError()
|
2020-06-27 17:33:34 +00:00
|
|
|
|
2020-07-12 16:37:10 +00:00
|
|
|
proc readLine*(s: LPStream,
|
|
|
|
limit = 0,
|
|
|
|
sep = "\r\n"): Future[string]
|
2022-07-01 18:19:57 +00:00
|
|
|
{.async, public.} =
|
|
|
|
## Reads up to `limit` bytes are read, or a `sep` is found
|
2020-05-06 16:31:47 +00:00
|
|
|
# TODO replace with something that exploits buffering better
|
|
|
|
var lim = if limit <= 0: -1 else: limit
|
|
|
|
var state = 0
|
|
|
|
|
2020-05-23 17:08:39 +00:00
|
|
|
while true:
|
|
|
|
var ch: char
|
2022-09-14 08:58:41 +00:00
|
|
|
await readExactly(s, addr ch, 1)
|
2020-05-23 17:08:39 +00:00
|
|
|
|
|
|
|
if sep[state] == ch:
|
|
|
|
inc(state)
|
|
|
|
if state == len(sep):
|
|
|
|
break
|
|
|
|
else:
|
|
|
|
state = 0
|
|
|
|
if limit > 0:
|
|
|
|
let missing = min(state, lim - len(result) - 1)
|
|
|
|
result.add(sep[0 ..< missing])
|
2020-05-06 16:31:47 +00:00
|
|
|
else:
|
2020-05-23 17:08:39 +00:00
|
|
|
result.add(sep[0 ..< state])
|
|
|
|
|
|
|
|
result.add(ch)
|
|
|
|
if len(result) == lim:
|
|
|
|
break
|
2019-09-01 17:31:24 +00:00
|
|
|
|
2022-07-01 18:19:57 +00:00
|
|
|
proc readVarint*(conn: LPStream): Future[uint64] {.async, gcsafe, public.} =
|
2020-05-08 20:58:23 +00:00
|
|
|
var
|
|
|
|
buffer: array[10, byte]
|
|
|
|
|
|
|
|
for i in 0..<len(buffer):
|
2022-09-14 08:58:41 +00:00
|
|
|
await conn.readExactly(addr buffer[i], 1)
|
2021-04-18 08:08:33 +00:00
|
|
|
|
|
|
|
var
|
|
|
|
varint: uint64
|
|
|
|
length: int
|
2020-05-08 20:58:23 +00:00
|
|
|
let res = PB.getUVarint(buffer.toOpenArray(0, i), length, varint)
|
2020-05-31 14:22:49 +00:00
|
|
|
if res.isOk():
|
2020-05-08 20:58:23 +00:00
|
|
|
return varint
|
2020-05-31 14:22:49 +00:00
|
|
|
if res.error() != VarintError.Incomplete:
|
2020-05-08 20:58:23 +00:00
|
|
|
break
|
|
|
|
if true: # can't end with a raise apparently
|
|
|
|
raise (ref InvalidVarintError)(msg: "Cannot parse varint")
|
|
|
|
|
2022-07-01 18:19:57 +00:00
|
|
|
proc readLp*(s: LPStream, maxSize: int): Future[seq[byte]] {.async, gcsafe, public.} =
|
2020-05-08 20:58:23 +00:00
|
|
|
## read length prefixed msg, with the length encoded as a varint
|
|
|
|
let
|
|
|
|
length = await s.readVarint()
|
|
|
|
maxLen = uint64(if maxSize < 0: int.high else: maxSize)
|
|
|
|
|
|
|
|
if length > maxLen:
|
|
|
|
raise (ref MaxSizeError)(msg: "Message exceeds maximum length")
|
|
|
|
|
|
|
|
if length == 0:
|
|
|
|
return
|
|
|
|
|
|
|
|
var res = newSeq[byte](length)
|
|
|
|
await s.readExactly(addr res[0], res.len)
|
|
|
|
return res
|
|
|
|
|
2022-07-01 18:19:57 +00:00
|
|
|
method write*(s: LPStream, msg: seq[byte]): Future[void] {.base, public.} =
|
|
|
|
# Write `msg` to stream, waiting for the write to be finished
|
2020-08-15 19:50:31 +00:00
|
|
|
doAssert(false, "not implemented!")
|
|
|
|
|
2022-07-01 18:19:57 +00:00
|
|
|
proc writeLp*(s: LPStream, msg: openArray[byte]): Future[void] {.public.} =
|
2020-12-15 18:15:22 +00:00
|
|
|
## Write `msg` with a varint-encoded length prefix
|
|
|
|
let vbytes = PB.toBytes(msg.len().uint64)
|
|
|
|
var buf = newSeqUninitialized[byte](msg.len() + vbytes.len)
|
|
|
|
buf[0..<vbytes.len] = vbytes.toOpenArray()
|
|
|
|
buf[vbytes.len..<buf.len] = msg
|
|
|
|
s.write(buf)
|
|
|
|
|
2022-07-01 18:19:57 +00:00
|
|
|
proc writeLp*(s: LPStream, msg: string): Future[void] {.public.} =
|
2020-12-15 18:15:22 +00:00
|
|
|
writeLp(s, msg.toOpenArrayByte(0, msg.high))
|
2020-05-08 20:58:23 +00:00
|
|
|
|
2022-07-01 18:19:57 +00:00
|
|
|
proc write*(s: LPStream, msg: string): Future[void] {.public.} =
|
2020-08-15 19:50:31 +00:00
|
|
|
s.write(msg.toBytes())
|
2019-09-01 17:31:24 +00:00
|
|
|
|
2020-09-21 17:48:19 +00:00
|
|
|
method closeImpl*(s: LPStream): Future[void] {.async, base.} =
|
|
|
|
## Implementation of close - called only once
|
2020-11-01 22:23:26 +00:00
|
|
|
trace "Closing stream", s, objName = s.objName, dir = $s.dir
|
|
|
|
libp2p_open_streams.dec(labelValues = [s.objName, $s.dir])
|
2020-11-05 03:52:54 +00:00
|
|
|
inc getStreamTracker(s.objName).closed
|
2021-06-14 08:26:11 +00:00
|
|
|
s.closeEvent.fire()
|
2021-01-04 18:59:05 +00:00
|
|
|
trace "Closed stream", s, objName = s.objName, dir = $s.dir
|
2020-09-21 17:48:19 +00:00
|
|
|
|
2022-07-01 18:19:57 +00:00
|
|
|
method close*(s: LPStream): Future[void] {.base, async, public.} = # {.raises [Defect].}
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
## close the stream - this may block, but will not raise exceptions
|
|
|
|
##
|
|
|
|
if s.isClosed:
|
|
|
|
trace "Already closed", s
|
|
|
|
return
|
2020-11-01 22:23:26 +00:00
|
|
|
|
2020-09-21 17:48:19 +00:00
|
|
|
s.isClosed = true # Set flag before performing virtual close
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
|
2020-09-21 17:48:19 +00:00
|
|
|
# An separate implementation method is used so that even when derived types
|
|
|
|
# override `closeImpl`, it is called only once - anyone overriding `close`
|
|
|
|
# itself must implement this - once-only check as well, with their own field
|
|
|
|
await closeImpl(s)
|
2020-09-24 05:30:19 +00:00
|
|
|
|
2022-07-01 18:19:57 +00:00
|
|
|
proc closeWithEOF*(s: LPStream): Future[void] {.async, public.} =
|
2020-09-24 05:30:19 +00:00
|
|
|
## Close the stream and wait for EOF - use this with half-closed streams where
|
|
|
|
## an EOF is expected to arrive from the other end.
|
2020-11-17 14:59:25 +00:00
|
|
|
##
|
|
|
|
## Note - this should only be used when there has been an in-protocol
|
|
|
|
## notification that no more data will arrive and that the only thing left
|
|
|
|
## for the other end to do is to close the stream gracefully.
|
|
|
|
##
|
|
|
|
## In particular, it must not be used when there is another concurrent read
|
|
|
|
## ongoing (which may be the case during cancellations)!
|
2020-12-01 08:44:21 +00:00
|
|
|
##
|
|
|
|
|
|
|
|
trace "Closing with EOF", s
|
|
|
|
if s.closedWithEOF:
|
|
|
|
trace "Already closed"
|
|
|
|
return
|
|
|
|
|
|
|
|
# prevent any further calls to avoid triggering
|
|
|
|
# reading the stream twice (which should assert)
|
|
|
|
s.closedWithEOF = true
|
2020-09-24 05:30:19 +00:00
|
|
|
await s.close()
|
|
|
|
|
|
|
|
if s.atEof():
|
|
|
|
return
|
|
|
|
|
|
|
|
try:
|
|
|
|
var buf: array[8, byte]
|
|
|
|
if (await readOnce(s, addr buf[0], buf.len)) != 0:
|
|
|
|
debug "Unexpected bytes while waiting for EOF", s
|
|
|
|
except LPStreamEOFError:
|
|
|
|
trace "Expected EOF came", s
|
|
|
|
except CancelledError as exc:
|
|
|
|
raise exc
|
|
|
|
except CatchableError as exc:
|
|
|
|
debug "Unexpected error while waiting for EOF", s, msg = exc.msg
|