2019-09-01 17:31:24 +00:00
|
|
|
## Nim-LibP2P
|
2019-09-24 17:48:23 +00:00
|
|
|
## Copyright (c) 2019 Status Research & Development GmbH
|
2019-09-01 17:31:24 +00:00
|
|
|
## Licensed under either of
|
|
|
|
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
|
|
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
|
|
## at your option.
|
|
|
|
## This file may not be copied, modified, or distributed except according to
|
|
|
|
## those terms.
|
|
|
|
|
2020-08-15 19:50:31 +00:00
|
|
|
import std/oids
|
|
|
|
import stew/byteutils
|
2020-06-19 17:29:43 +00:00
|
|
|
import chronicles, chronos, metrics
|
2020-05-08 20:58:23 +00:00
|
|
|
import ../varint,
|
2020-06-19 17:29:43 +00:00
|
|
|
../vbuffer,
|
|
|
|
../peerinfo,
|
|
|
|
../multiaddress
|
2019-09-01 17:31:24 +00:00
|
|
|
|
2020-10-27 17:21:03 +00:00
|
|
|
declareGauge(libp2p_open_streams,
|
|
|
|
"open stream instances", labels = ["type", "dir"])
|
2020-05-23 17:08:39 +00:00
|
|
|
|
2020-08-15 19:50:31 +00:00
|
|
|
export oids
|
|
|
|
|
2020-08-02 10:22:49 +00:00
|
|
|
logScope:
|
|
|
|
topics = "lpstream"
|
|
|
|
|
2019-12-10 20:50:35 +00:00
|
|
|
type
|
2020-10-27 17:21:03 +00:00
|
|
|
Direction* {.pure.} = enum
|
|
|
|
In, Out
|
|
|
|
|
2019-09-01 17:31:24 +00:00
|
|
|
LPStream* = ref object of RootObj
|
2020-06-24 15:08:44 +00:00
|
|
|
closeEvent*: AsyncEvent
|
2019-12-04 04:44:54 +00:00
|
|
|
isClosed*: bool
|
2020-05-20 00:14:15 +00:00
|
|
|
isEof*: bool
|
2020-06-19 17:29:43 +00:00
|
|
|
objName*: string
|
|
|
|
oid*: Oid
|
2020-10-27 17:21:03 +00:00
|
|
|
dir*: Direction
|
2019-09-01 17:31:24 +00:00
|
|
|
|
|
|
|
LPStreamError* = object of CatchableError
|
|
|
|
LPStreamIncompleteError* = object of LPStreamError
|
2020-05-06 16:31:47 +00:00
|
|
|
LPStreamIncorrectDefect* = object of Defect
|
2019-09-01 17:31:24 +00:00
|
|
|
LPStreamLimitError* = object of LPStreamError
|
|
|
|
LPStreamReadError* = object of LPStreamError
|
2020-05-23 17:08:39 +00:00
|
|
|
par*: ref CatchableError
|
2019-09-01 17:31:24 +00:00
|
|
|
LPStreamWriteError* = object of LPStreamError
|
2020-05-23 17:08:39 +00:00
|
|
|
par*: ref CatchableError
|
2019-12-10 20:50:35 +00:00
|
|
|
LPStreamEOFError* = object of LPStreamError
|
2020-05-20 00:14:15 +00:00
|
|
|
LPStreamClosedError* = object of LPStreamError
|
2019-09-01 17:31:24 +00:00
|
|
|
|
2020-05-08 20:58:23 +00:00
|
|
|
InvalidVarintError* = object of LPStreamError
|
|
|
|
MaxSizeError* = object of LPStreamError
|
|
|
|
|
2020-05-23 17:08:39 +00:00
|
|
|
proc newLPStreamReadError*(p: ref CatchableError): ref CatchableError =
|
2019-09-01 21:56:00 +00:00
|
|
|
var w = newException(LPStreamReadError, "Read stream failed")
|
2019-09-01 17:31:24 +00:00
|
|
|
w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg
|
|
|
|
w.par = p
|
|
|
|
result = w
|
|
|
|
|
2020-05-23 17:08:39 +00:00
|
|
|
proc newLPStreamReadError*(msg: string): ref CatchableError =
|
2020-05-07 20:37:46 +00:00
|
|
|
newException(LPStreamReadError, msg)
|
|
|
|
|
2020-05-23 17:08:39 +00:00
|
|
|
proc newLPStreamWriteError*(p: ref CatchableError): ref CatchableError =
|
2019-09-01 17:31:24 +00:00
|
|
|
var w = newException(LPStreamWriteError, "Write stream failed")
|
|
|
|
w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg
|
|
|
|
w.par = p
|
|
|
|
result = w
|
|
|
|
|
2020-05-23 17:08:39 +00:00
|
|
|
proc newLPStreamIncompleteError*(): ref CatchableError =
|
2019-09-01 17:31:24 +00:00
|
|
|
result = newException(LPStreamIncompleteError, "Incomplete data received")
|
|
|
|
|
2020-05-23 17:08:39 +00:00
|
|
|
proc newLPStreamLimitError*(): ref CatchableError =
|
2019-09-01 17:31:24 +00:00
|
|
|
result = newException(LPStreamLimitError, "Buffer limit reached")
|
|
|
|
|
2020-05-23 17:08:39 +00:00
|
|
|
proc newLPStreamIncorrectDefect*(m: string): ref Defect =
|
2020-05-06 16:31:47 +00:00
|
|
|
result = newException(LPStreamIncorrectDefect, m)
|
2019-09-01 17:31:24 +00:00
|
|
|
|
2020-05-23 17:08:39 +00:00
|
|
|
proc newLPStreamEOFError*(): ref CatchableError =
|
2019-12-10 20:50:35 +00:00
|
|
|
result = newException(LPStreamEOFError, "Stream EOF!")
|
2019-09-04 06:40:11 +00:00
|
|
|
|
2020-05-20 00:14:15 +00:00
|
|
|
proc newLPStreamClosedError*(): ref Exception =
|
|
|
|
result = newException(LPStreamClosedError, "Stream Closed!")
|
|
|
|
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
func shortLog*(s: LPStream): auto =
|
|
|
|
if s.isNil: "LPStream(nil)"
|
|
|
|
else: $s.oid
|
|
|
|
chronicles.formatIt(LPStream): shortLog(it)
|
|
|
|
|
2020-05-23 17:08:39 +00:00
|
|
|
method initStream*(s: LPStream) {.base.} =
|
2020-06-19 17:29:43 +00:00
|
|
|
if s.objName.len == 0:
|
|
|
|
s.objName = "LPStream"
|
|
|
|
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
s.closeEvent = newAsyncEvent()
|
2020-06-19 17:29:43 +00:00
|
|
|
s.oid = genOid()
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
|
2020-10-27 17:21:03 +00:00
|
|
|
libp2p_open_streams.inc(labelValues = [s.objName, $s.dir])
|
|
|
|
trace "Stream created", s, objName = s.objName, dir = $s.dir
|
2020-06-24 15:08:44 +00:00
|
|
|
|
|
|
|
proc join*(s: LPStream): Future[void] =
|
|
|
|
s.closeEvent.wait()
|
2020-05-23 17:08:39 +00:00
|
|
|
|
2019-12-10 20:50:35 +00:00
|
|
|
method closed*(s: LPStream): bool {.base, inline.} =
|
2019-12-04 04:44:54 +00:00
|
|
|
s.isClosed
|
|
|
|
|
2020-05-20 00:14:15 +00:00
|
|
|
method atEof*(s: LPStream): bool {.base, inline.} =
|
|
|
|
s.isEof
|
|
|
|
|
2020-03-27 14:25:52 +00:00
|
|
|
method readOnce*(s: LPStream,
|
|
|
|
pbytes: pointer,
|
|
|
|
nbytes: int):
|
|
|
|
Future[int]
|
2019-12-04 04:44:54 +00:00
|
|
|
{.base, async.} =
|
2019-10-03 19:29:58 +00:00
|
|
|
doAssert(false, "not implemented!")
|
2019-09-01 17:31:24 +00:00
|
|
|
|
2020-06-27 17:33:34 +00:00
|
|
|
proc readExactly*(s: LPStream,
|
2020-07-12 16:37:10 +00:00
|
|
|
pbytes: pointer,
|
|
|
|
nbytes: int):
|
|
|
|
Future[void] {.async.} =
|
2020-06-27 17:33:34 +00:00
|
|
|
if s.atEof:
|
|
|
|
raise newLPStreamEOFError()
|
|
|
|
|
2020-09-14 08:19:54 +00:00
|
|
|
if nbytes == 0:
|
|
|
|
return
|
|
|
|
|
2020-07-12 16:37:10 +00:00
|
|
|
logScope:
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
s
|
2020-07-12 16:37:10 +00:00
|
|
|
nbytes = nbytes
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
objName = s.objName
|
2020-07-12 16:37:10 +00:00
|
|
|
|
2020-06-27 17:33:34 +00:00
|
|
|
var pbuffer = cast[ptr UncheckedArray[byte]](pbytes)
|
|
|
|
var read = 0
|
|
|
|
while read < nbytes and not(s.atEof()):
|
|
|
|
read += await s.readOnce(addr pbuffer[read], nbytes - read)
|
|
|
|
|
2020-09-14 08:19:54 +00:00
|
|
|
if read == 0:
|
|
|
|
doAssert s.atEof()
|
|
|
|
trace "couldn't read all bytes, stream EOF", s, nbytes, read
|
|
|
|
raise newLPStreamEOFError()
|
|
|
|
|
2020-06-27 17:33:34 +00:00
|
|
|
if read < nbytes:
|
2020-09-14 08:19:54 +00:00
|
|
|
trace "couldn't read all bytes, incomplete data", s, nbytes, read
|
|
|
|
raise newLPStreamIncompleteError()
|
2020-06-27 17:33:34 +00:00
|
|
|
|
2020-07-12 16:37:10 +00:00
|
|
|
proc readLine*(s: LPStream,
|
|
|
|
limit = 0,
|
|
|
|
sep = "\r\n"): Future[string]
|
|
|
|
{.async, deprecated: "todo".} =
|
2020-05-06 16:31:47 +00:00
|
|
|
# TODO replace with something that exploits buffering better
|
|
|
|
var lim = if limit <= 0: -1 else: limit
|
|
|
|
var state = 0
|
|
|
|
|
2020-05-23 17:08:39 +00:00
|
|
|
while true:
|
|
|
|
var ch: char
|
|
|
|
await readExactly(s, addr ch, 1)
|
|
|
|
|
|
|
|
if sep[state] == ch:
|
|
|
|
inc(state)
|
|
|
|
if state == len(sep):
|
|
|
|
break
|
|
|
|
else:
|
|
|
|
state = 0
|
|
|
|
if limit > 0:
|
|
|
|
let missing = min(state, lim - len(result) - 1)
|
|
|
|
result.add(sep[0 ..< missing])
|
2020-05-06 16:31:47 +00:00
|
|
|
else:
|
2020-05-23 17:08:39 +00:00
|
|
|
result.add(sep[0 ..< state])
|
|
|
|
|
|
|
|
result.add(ch)
|
|
|
|
if len(result) == lim:
|
|
|
|
break
|
2019-09-01 17:31:24 +00:00
|
|
|
|
2020-05-08 20:58:23 +00:00
|
|
|
proc readVarint*(conn: LPStream): Future[uint64] {.async, gcsafe.} =
|
|
|
|
var
|
|
|
|
varint: uint64
|
|
|
|
length: int
|
|
|
|
buffer: array[10, byte]
|
|
|
|
|
|
|
|
for i in 0..<len(buffer):
|
|
|
|
await conn.readExactly(addr buffer[i], 1)
|
|
|
|
let res = PB.getUVarint(buffer.toOpenArray(0, i), length, varint)
|
2020-05-31 14:22:49 +00:00
|
|
|
if res.isOk():
|
2020-05-08 20:58:23 +00:00
|
|
|
return varint
|
2020-05-31 14:22:49 +00:00
|
|
|
if res.error() != VarintError.Incomplete:
|
2020-05-08 20:58:23 +00:00
|
|
|
break
|
|
|
|
if true: # can't end with a raise apparently
|
|
|
|
raise (ref InvalidVarintError)(msg: "Cannot parse varint")
|
|
|
|
|
|
|
|
proc readLp*(s: LPStream, maxSize: int): Future[seq[byte]] {.async, gcsafe.} =
|
|
|
|
## read length prefixed msg, with the length encoded as a varint
|
|
|
|
let
|
|
|
|
length = await s.readVarint()
|
|
|
|
maxLen = uint64(if maxSize < 0: int.high else: maxSize)
|
|
|
|
|
|
|
|
if length > maxLen:
|
|
|
|
raise (ref MaxSizeError)(msg: "Message exceeds maximum length")
|
|
|
|
|
|
|
|
if length == 0:
|
|
|
|
return
|
|
|
|
|
|
|
|
var res = newSeq[byte](length)
|
|
|
|
await s.readExactly(addr res[0], res.len)
|
|
|
|
return res
|
|
|
|
|
2020-08-15 19:50:31 +00:00
|
|
|
method write*(s: LPStream, msg: seq[byte]): Future[void] {.base.} =
|
|
|
|
doAssert(false, "not implemented!")
|
|
|
|
|
2020-05-08 20:58:23 +00:00
|
|
|
proc writeLp*(s: LPStream, msg: string | seq[byte]): Future[void] {.gcsafe.} =
|
|
|
|
## write length prefixed
|
|
|
|
var buf = initVBuffer()
|
|
|
|
buf.writeSeq(msg)
|
|
|
|
buf.finish()
|
|
|
|
s.write(buf.buffer)
|
|
|
|
|
2020-05-06 16:31:47 +00:00
|
|
|
proc write*(s: LPStream, pbytes: pointer, nbytes: int): Future[void] {.deprecated: "seq".} =
|
|
|
|
s.write(@(toOpenArray(cast[ptr UncheckedArray[byte]](pbytes), 0, nbytes - 1)))
|
2019-09-01 17:31:24 +00:00
|
|
|
|
2020-05-07 20:37:46 +00:00
|
|
|
proc write*(s: LPStream, msg: string): Future[void] =
|
2020-08-15 19:50:31 +00:00
|
|
|
s.write(msg.toBytes())
|
2019-09-01 17:31:24 +00:00
|
|
|
|
2020-09-21 17:48:19 +00:00
|
|
|
method closeImpl*(s: LPStream): Future[void] {.async, base.} =
|
|
|
|
## Implementation of close - called only once
|
|
|
|
trace "Closing stream", s, objName = s.objName
|
|
|
|
s.closeEvent.fire()
|
2020-10-27 17:21:03 +00:00
|
|
|
libp2p_open_streams.dec(labelValues = [s.objName, $s.dir])
|
|
|
|
trace "Closed stream", s, objName = s.objName, dir = $s.dir
|
2020-09-21 17:48:19 +00:00
|
|
|
|
|
|
|
method close*(s: LPStream): Future[void] {.base, async.} = # {.raises [Defect].}
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
## close the stream - this may block, but will not raise exceptions
|
|
|
|
##
|
|
|
|
if s.isClosed:
|
|
|
|
trace "Already closed", s
|
|
|
|
return
|
2020-10-27 17:21:03 +00:00
|
|
|
|
2020-09-21 17:48:19 +00:00
|
|
|
s.isClosed = true # Set flag before performing virtual close
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
|
2020-09-21 17:48:19 +00:00
|
|
|
# An separate implementation method is used so that even when derived types
|
|
|
|
# override `closeImpl`, it is called only once - anyone overriding `close`
|
|
|
|
# itself must implement this - once-only check as well, with their own field
|
|
|
|
await closeImpl(s)
|
2020-09-24 05:30:19 +00:00
|
|
|
|
|
|
|
proc closeWithEOF*(s: LPStream): Future[void] {.async.} =
|
|
|
|
## Close the stream and wait for EOF - use this with half-closed streams where
|
|
|
|
## an EOF is expected to arrive from the other end.
|
|
|
|
await s.close()
|
|
|
|
|
|
|
|
if s.atEof():
|
|
|
|
return
|
|
|
|
|
|
|
|
try:
|
|
|
|
var buf: array[8, byte]
|
|
|
|
if (await readOnce(s, addr buf[0], buf.len)) != 0:
|
|
|
|
debug "Unexpected bytes while waiting for EOF", s
|
|
|
|
except LPStreamEOFError:
|
|
|
|
trace "Expected EOF came", s
|
|
|
|
except CancelledError as exc:
|
|
|
|
raise exc
|
|
|
|
except CatchableError as exc:
|
|
|
|
debug "Unexpected error while waiting for EOF", s, msg = exc.msg
|