2019-09-01 17:31:24 +00:00
|
|
|
## Nim-LibP2P
|
2019-09-24 17:48:23 +00:00
|
|
|
## Copyright (c) 2019 Status Research & Development GmbH
|
2019-09-01 17:31:24 +00:00
|
|
|
## Licensed under either of
|
|
|
|
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
|
|
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
|
|
## at your option.
|
|
|
|
## This file may not be copied, modified, or distributed except according to
|
|
|
|
## those terms.
|
|
|
|
|
2020-08-15 19:50:31 +00:00
|
|
|
import std/oids
|
|
|
|
import stew/byteutils
|
2020-06-19 17:29:43 +00:00
|
|
|
import chronicles, chronos, metrics
|
2020-05-08 20:58:23 +00:00
|
|
|
import ../varint,
|
2020-06-19 17:29:43 +00:00
|
|
|
../peerinfo,
|
|
|
|
../multiaddress
|
2019-09-01 17:31:24 +00:00
|
|
|
|
2020-11-01 22:23:26 +00:00
|
|
|
declareGauge(libp2p_open_streams,
|
|
|
|
"open stream instances", labels = ["type", "dir"])
|
2020-05-23 17:08:39 +00:00
|
|
|
|
2020-08-15 19:50:31 +00:00
|
|
|
export oids
|
|
|
|
|
2020-08-02 10:22:49 +00:00
|
|
|
logScope:
|
2020-12-01 17:34:27 +00:00
|
|
|
topics = "libp2p lpstream"
|
2020-08-02 10:22:49 +00:00
|
|
|
|
2020-11-05 03:52:54 +00:00
|
|
|
const
|
|
|
|
LPStreamTrackerName* = "LPStream"
|
2020-11-23 15:07:11 +00:00
|
|
|
Eof* = @[]
|
2020-11-05 03:52:54 +00:00
|
|
|
|
2019-12-10 20:50:35 +00:00
|
|
|
type
|
2020-11-01 22:23:26 +00:00
|
|
|
Direction* {.pure.} = enum
|
|
|
|
In, Out
|
|
|
|
|
2019-09-01 17:31:24 +00:00
|
|
|
LPStream* = ref object of RootObj
|
2020-06-24 15:08:44 +00:00
|
|
|
closeEvent*: AsyncEvent
|
2019-12-04 04:44:54 +00:00
|
|
|
isClosed*: bool
|
2020-05-20 00:14:15 +00:00
|
|
|
isEof*: bool
|
2020-06-19 17:29:43 +00:00
|
|
|
objName*: string
|
|
|
|
oid*: Oid
|
2020-11-01 22:23:26 +00:00
|
|
|
dir*: Direction
|
2020-12-01 08:44:21 +00:00
|
|
|
closedWithEOF: bool # prevent concurrent calls
|
2019-09-01 17:31:24 +00:00
|
|
|
|
|
|
|
LPStreamError* = object of CatchableError
|
|
|
|
LPStreamIncompleteError* = object of LPStreamError
|
2020-05-06 16:31:47 +00:00
|
|
|
LPStreamIncorrectDefect* = object of Defect
|
2019-09-01 17:31:24 +00:00
|
|
|
LPStreamLimitError* = object of LPStreamError
|
|
|
|
LPStreamReadError* = object of LPStreamError
|
2020-05-23 17:08:39 +00:00
|
|
|
par*: ref CatchableError
|
2019-09-01 17:31:24 +00:00
|
|
|
LPStreamWriteError* = object of LPStreamError
|
2020-05-23 17:08:39 +00:00
|
|
|
par*: ref CatchableError
|
2019-12-10 20:50:35 +00:00
|
|
|
LPStreamEOFError* = object of LPStreamError
|
2020-05-20 00:14:15 +00:00
|
|
|
LPStreamClosedError* = object of LPStreamError
|
2019-09-01 17:31:24 +00:00
|
|
|
|
2020-05-08 20:58:23 +00:00
|
|
|
InvalidVarintError* = object of LPStreamError
|
|
|
|
MaxSizeError* = object of LPStreamError
|
|
|
|
|
2020-11-05 03:52:54 +00:00
|
|
|
StreamTracker* = ref object of TrackerBase
|
|
|
|
opened*: uint64
|
|
|
|
closed*: uint64
|
|
|
|
|
|
|
|
proc setupStreamTracker(name: string): StreamTracker =
|
|
|
|
let tracker = new StreamTracker
|
|
|
|
|
|
|
|
proc dumpTracking(): string {.gcsafe.} =
|
2020-11-06 15:24:24 +00:00
|
|
|
return "Opened " & tracker.id & ": " & $tracker.opened & "\n" &
|
|
|
|
"Closed " & tracker.id & ": " & $tracker.closed
|
2020-11-05 03:52:54 +00:00
|
|
|
|
|
|
|
proc leakTransport(): bool {.gcsafe.} =
|
|
|
|
return (tracker.opened != tracker.closed)
|
|
|
|
|
|
|
|
tracker.id = name
|
|
|
|
tracker.opened = 0
|
|
|
|
tracker.closed = 0
|
|
|
|
tracker.dump = dumpTracking
|
|
|
|
tracker.isLeaked = leakTransport
|
|
|
|
addTracker(name, tracker)
|
|
|
|
|
|
|
|
return tracker
|
|
|
|
|
|
|
|
proc getStreamTracker(name: string): StreamTracker {.gcsafe.} =
|
|
|
|
result = cast[StreamTracker](getTracker(name))
|
|
|
|
if isNil(result):
|
|
|
|
result = setupStreamTracker(name)
|
|
|
|
|
2020-05-23 17:08:39 +00:00
|
|
|
proc newLPStreamReadError*(p: ref CatchableError): ref CatchableError =
|
2019-09-01 21:56:00 +00:00
|
|
|
var w = newException(LPStreamReadError, "Read stream failed")
|
2019-09-01 17:31:24 +00:00
|
|
|
w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg
|
|
|
|
w.par = p
|
|
|
|
result = w
|
|
|
|
|
2020-05-23 17:08:39 +00:00
|
|
|
proc newLPStreamReadError*(msg: string): ref CatchableError =
|
2020-05-07 20:37:46 +00:00
|
|
|
newException(LPStreamReadError, msg)
|
|
|
|
|
2020-05-23 17:08:39 +00:00
|
|
|
proc newLPStreamWriteError*(p: ref CatchableError): ref CatchableError =
|
2019-09-01 17:31:24 +00:00
|
|
|
var w = newException(LPStreamWriteError, "Write stream failed")
|
|
|
|
w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg
|
|
|
|
w.par = p
|
|
|
|
result = w
|
|
|
|
|
2020-05-23 17:08:39 +00:00
|
|
|
proc newLPStreamIncompleteError*(): ref CatchableError =
|
2019-09-01 17:31:24 +00:00
|
|
|
result = newException(LPStreamIncompleteError, "Incomplete data received")
|
|
|
|
|
2020-05-23 17:08:39 +00:00
|
|
|
proc newLPStreamLimitError*(): ref CatchableError =
|
2019-09-01 17:31:24 +00:00
|
|
|
result = newException(LPStreamLimitError, "Buffer limit reached")
|
|
|
|
|
2020-05-23 17:08:39 +00:00
|
|
|
proc newLPStreamIncorrectDefect*(m: string): ref Defect =
|
2020-05-06 16:31:47 +00:00
|
|
|
result = newException(LPStreamIncorrectDefect, m)
|
2019-09-01 17:31:24 +00:00
|
|
|
|
2020-05-23 17:08:39 +00:00
|
|
|
proc newLPStreamEOFError*(): ref CatchableError =
|
2019-12-10 20:50:35 +00:00
|
|
|
result = newException(LPStreamEOFError, "Stream EOF!")
|
2019-09-04 06:40:11 +00:00
|
|
|
|
2020-05-20 00:14:15 +00:00
|
|
|
proc newLPStreamClosedError*(): ref Exception =
|
|
|
|
result = newException(LPStreamClosedError, "Stream Closed!")
|
|
|
|
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
func shortLog*(s: LPStream): auto =
|
|
|
|
if s.isNil: "LPStream(nil)"
|
|
|
|
else: $s.oid
|
|
|
|
chronicles.formatIt(LPStream): shortLog(it)
|
|
|
|
|
2020-05-23 17:08:39 +00:00
|
|
|
method initStream*(s: LPStream) {.base.} =
|
2020-06-19 17:29:43 +00:00
|
|
|
if s.objName.len == 0:
|
|
|
|
s.objName = "LPStream"
|
|
|
|
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
s.closeEvent = newAsyncEvent()
|
2020-06-19 17:29:43 +00:00
|
|
|
s.oid = genOid()
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
|
2020-11-01 22:23:26 +00:00
|
|
|
libp2p_open_streams.inc(labelValues = [s.objName, $s.dir])
|
2020-11-05 03:52:54 +00:00
|
|
|
inc getStreamTracker(s.objName).opened
|
2021-01-04 18:59:05 +00:00
|
|
|
trace "Stream created", s, objName = s.objName, dir = $s.dir
|
2020-06-24 15:08:44 +00:00
|
|
|
|
|
|
|
proc join*(s: LPStream): Future[void] =
|
|
|
|
s.closeEvent.wait()
|
2020-05-23 17:08:39 +00:00
|
|
|
|
2020-11-17 14:59:25 +00:00
|
|
|
method closed*(s: LPStream): bool {.base.} =
|
2019-12-04 04:44:54 +00:00
|
|
|
s.isClosed
|
|
|
|
|
2020-11-17 14:59:25 +00:00
|
|
|
method atEof*(s: LPStream): bool {.base.} =
|
2020-05-20 00:14:15 +00:00
|
|
|
s.isEof
|
|
|
|
|
2020-03-27 14:25:52 +00:00
|
|
|
method readOnce*(s: LPStream,
|
|
|
|
pbytes: pointer,
|
|
|
|
nbytes: int):
|
|
|
|
Future[int]
|
2019-12-04 04:44:54 +00:00
|
|
|
{.base, async.} =
|
2019-10-03 19:29:58 +00:00
|
|
|
doAssert(false, "not implemented!")
|
2019-09-01 17:31:24 +00:00
|
|
|
|
2020-06-27 17:33:34 +00:00
|
|
|
proc readExactly*(s: LPStream,
|
2020-07-12 16:37:10 +00:00
|
|
|
pbytes: pointer,
|
|
|
|
nbytes: int):
|
|
|
|
Future[void] {.async.} =
|
2020-06-27 17:33:34 +00:00
|
|
|
if s.atEof:
|
|
|
|
raise newLPStreamEOFError()
|
|
|
|
|
2020-09-14 08:19:54 +00:00
|
|
|
if nbytes == 0:
|
|
|
|
return
|
|
|
|
|
2020-07-12 16:37:10 +00:00
|
|
|
logScope:
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
s
|
2020-07-12 16:37:10 +00:00
|
|
|
nbytes = nbytes
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
objName = s.objName
|
2020-07-12 16:37:10 +00:00
|
|
|
|
2020-06-27 17:33:34 +00:00
|
|
|
var pbuffer = cast[ptr UncheckedArray[byte]](pbytes)
|
|
|
|
var read = 0
|
|
|
|
while read < nbytes and not(s.atEof()):
|
|
|
|
read += await s.readOnce(addr pbuffer[read], nbytes - read)
|
|
|
|
|
2020-09-14 08:19:54 +00:00
|
|
|
if read == 0:
|
|
|
|
doAssert s.atEof()
|
|
|
|
trace "couldn't read all bytes, stream EOF", s, nbytes, read
|
|
|
|
raise newLPStreamEOFError()
|
|
|
|
|
2020-06-27 17:33:34 +00:00
|
|
|
if read < nbytes:
|
2020-09-14 08:19:54 +00:00
|
|
|
trace "couldn't read all bytes, incomplete data", s, nbytes, read
|
|
|
|
raise newLPStreamIncompleteError()
|
2020-06-27 17:33:34 +00:00
|
|
|
|
2020-07-12 16:37:10 +00:00
|
|
|
proc readLine*(s: LPStream,
|
|
|
|
limit = 0,
|
|
|
|
sep = "\r\n"): Future[string]
|
|
|
|
{.async, deprecated: "todo".} =
|
2020-05-06 16:31:47 +00:00
|
|
|
# TODO replace with something that exploits buffering better
|
|
|
|
var lim = if limit <= 0: -1 else: limit
|
|
|
|
var state = 0
|
|
|
|
|
2020-05-23 17:08:39 +00:00
|
|
|
while true:
|
|
|
|
var ch: char
|
|
|
|
await readExactly(s, addr ch, 1)
|
|
|
|
|
|
|
|
if sep[state] == ch:
|
|
|
|
inc(state)
|
|
|
|
if state == len(sep):
|
|
|
|
break
|
|
|
|
else:
|
|
|
|
state = 0
|
|
|
|
if limit > 0:
|
|
|
|
let missing = min(state, lim - len(result) - 1)
|
|
|
|
result.add(sep[0 ..< missing])
|
2020-05-06 16:31:47 +00:00
|
|
|
else:
|
2020-05-23 17:08:39 +00:00
|
|
|
result.add(sep[0 ..< state])
|
|
|
|
|
|
|
|
result.add(ch)
|
|
|
|
if len(result) == lim:
|
|
|
|
break
|
2019-09-01 17:31:24 +00:00
|
|
|
|
2020-05-08 20:58:23 +00:00
|
|
|
proc readVarint*(conn: LPStream): Future[uint64] {.async, gcsafe.} =
|
|
|
|
var
|
|
|
|
varint: uint64
|
|
|
|
length: int
|
|
|
|
buffer: array[10, byte]
|
|
|
|
|
|
|
|
for i in 0..<len(buffer):
|
|
|
|
await conn.readExactly(addr buffer[i], 1)
|
|
|
|
let res = PB.getUVarint(buffer.toOpenArray(0, i), length, varint)
|
2020-05-31 14:22:49 +00:00
|
|
|
if res.isOk():
|
2020-05-08 20:58:23 +00:00
|
|
|
return varint
|
2020-05-31 14:22:49 +00:00
|
|
|
if res.error() != VarintError.Incomplete:
|
2020-05-08 20:58:23 +00:00
|
|
|
break
|
|
|
|
if true: # can't end with a raise apparently
|
|
|
|
raise (ref InvalidVarintError)(msg: "Cannot parse varint")
|
|
|
|
|
|
|
|
proc readLp*(s: LPStream, maxSize: int): Future[seq[byte]] {.async, gcsafe.} =
|
|
|
|
## read length prefixed msg, with the length encoded as a varint
|
|
|
|
let
|
|
|
|
length = await s.readVarint()
|
|
|
|
maxLen = uint64(if maxSize < 0: int.high else: maxSize)
|
|
|
|
|
|
|
|
if length > maxLen:
|
|
|
|
raise (ref MaxSizeError)(msg: "Message exceeds maximum length")
|
|
|
|
|
|
|
|
if length == 0:
|
|
|
|
return
|
|
|
|
|
|
|
|
var res = newSeq[byte](length)
|
|
|
|
await s.readExactly(addr res[0], res.len)
|
|
|
|
return res
|
|
|
|
|
2020-08-15 19:50:31 +00:00
|
|
|
method write*(s: LPStream, msg: seq[byte]): Future[void] {.base.} =
|
|
|
|
doAssert(false, "not implemented!")
|
|
|
|
|
2020-12-15 18:15:22 +00:00
|
|
|
proc writeLp*(s: LPStream, msg: openArray[byte]): Future[void] =
|
|
|
|
## Write `msg` with a varint-encoded length prefix
|
|
|
|
let vbytes = PB.toBytes(msg.len().uint64)
|
|
|
|
var buf = newSeqUninitialized[byte](msg.len() + vbytes.len)
|
|
|
|
buf[0..<vbytes.len] = vbytes.toOpenArray()
|
|
|
|
buf[vbytes.len..<buf.len] = msg
|
|
|
|
s.write(buf)
|
|
|
|
|
|
|
|
proc writeLp*(s: LPStream, msg: string): Future[void] =
|
|
|
|
writeLp(s, msg.toOpenArrayByte(0, msg.high))
|
2020-05-08 20:58:23 +00:00
|
|
|
|
2020-05-06 16:31:47 +00:00
|
|
|
proc write*(s: LPStream, pbytes: pointer, nbytes: int): Future[void] {.deprecated: "seq".} =
|
|
|
|
s.write(@(toOpenArray(cast[ptr UncheckedArray[byte]](pbytes), 0, nbytes - 1)))
|
2019-09-01 17:31:24 +00:00
|
|
|
|
2020-05-07 20:37:46 +00:00
|
|
|
proc write*(s: LPStream, msg: string): Future[void] =
|
2020-08-15 19:50:31 +00:00
|
|
|
s.write(msg.toBytes())
|
2019-09-01 17:31:24 +00:00
|
|
|
|
2020-09-21 17:48:19 +00:00
|
|
|
method closeImpl*(s: LPStream): Future[void] {.async, base.} =
|
|
|
|
## Implementation of close - called only once
|
2020-11-01 22:23:26 +00:00
|
|
|
trace "Closing stream", s, objName = s.objName, dir = $s.dir
|
2020-09-21 17:48:19 +00:00
|
|
|
s.closeEvent.fire()
|
2020-11-01 22:23:26 +00:00
|
|
|
libp2p_open_streams.dec(labelValues = [s.objName, $s.dir])
|
2020-11-05 03:52:54 +00:00
|
|
|
inc getStreamTracker(s.objName).closed
|
2021-01-04 18:59:05 +00:00
|
|
|
trace "Closed stream", s, objName = s.objName, dir = $s.dir
|
2020-09-21 17:48:19 +00:00
|
|
|
|
|
|
|
method close*(s: LPStream): Future[void] {.base, async.} = # {.raises [Defect].}
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
## close the stream - this may block, but will not raise exceptions
|
|
|
|
##
|
|
|
|
if s.isClosed:
|
|
|
|
trace "Already closed", s
|
|
|
|
return
|
2020-11-01 22:23:26 +00:00
|
|
|
|
2020-09-21 17:48:19 +00:00
|
|
|
s.isClosed = true # Set flag before performing virtual close
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
|
2020-09-21 17:48:19 +00:00
|
|
|
# An separate implementation method is used so that even when derived types
|
|
|
|
# override `closeImpl`, it is called only once - anyone overriding `close`
|
|
|
|
# itself must implement this - once-only check as well, with their own field
|
|
|
|
await closeImpl(s)
|
2020-09-24 05:30:19 +00:00
|
|
|
|
|
|
|
proc closeWithEOF*(s: LPStream): Future[void] {.async.} =
|
|
|
|
## Close the stream and wait for EOF - use this with half-closed streams where
|
|
|
|
## an EOF is expected to arrive from the other end.
|
2020-11-17 14:59:25 +00:00
|
|
|
##
|
|
|
|
## Note - this should only be used when there has been an in-protocol
|
|
|
|
## notification that no more data will arrive and that the only thing left
|
|
|
|
## for the other end to do is to close the stream gracefully.
|
|
|
|
##
|
|
|
|
## In particular, it must not be used when there is another concurrent read
|
|
|
|
## ongoing (which may be the case during cancellations)!
|
2020-12-01 08:44:21 +00:00
|
|
|
##
|
|
|
|
|
|
|
|
trace "Closing with EOF", s
|
|
|
|
if s.closedWithEOF:
|
|
|
|
trace "Already closed"
|
|
|
|
return
|
|
|
|
|
|
|
|
# prevent any further calls to avoid triggering
|
|
|
|
# reading the stream twice (which should assert)
|
|
|
|
s.closedWithEOF = true
|
2020-09-24 05:30:19 +00:00
|
|
|
await s.close()
|
|
|
|
|
|
|
|
if s.atEof():
|
|
|
|
return
|
|
|
|
|
|
|
|
try:
|
|
|
|
var buf: array[8, byte]
|
|
|
|
if (await readOnce(s, addr buf[0], buf.len)) != 0:
|
|
|
|
debug "Unexpected bytes while waiting for EOF", s
|
|
|
|
except LPStreamEOFError:
|
|
|
|
trace "Expected EOF came", s
|
|
|
|
except CancelledError as exc:
|
|
|
|
raise exc
|
|
|
|
except CatchableError as exc:
|
|
|
|
debug "Unexpected error while waiting for EOF", s, msg = exc.msg
|