From 902880ef1f26512305241befe18073c73d6f966a Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Sat, 27 Jun 2020 11:33:34 -0600 Subject: [PATCH] consolidate reading in lpstream (#241) * consolidate reading in lpstream * remove debug echo * throw if not enough bytes where read * tune log level * set eof flag * test readExactly to fail on not enough bytes --- libp2p/muxers/mplex/mplex.nim | 1 - libp2p/protocols/identify.nim | 4 +-- libp2p/protocols/secure/noise.nim | 6 ++-- libp2p/protocols/secure/secure.nim | 23 ------------ libp2p/stream/bufferstream.nim | 58 ++++++++---------------------- libp2p/stream/chronosstream.nim | 9 ----- libp2p/stream/lpstream.nim | 23 ++++++++---- libp2p/switch.nim | 8 ++--- libp2p/transports/tcptransport.nim | 5 ++- tests/pubsub/testgossipsub.nim | 5 +-- tests/testbufferstream.nim | 21 +++++++++++ tests/testidentify.nim | 1 + tests/testmultistream.nim | 39 ++++++++++++++------ tests/testnoise.nim | 9 ++--- tests/testtransport.nim | 1 + 15 files changed, 104 insertions(+), 109 deletions(-) diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index f11ae6014..eee7a56cf 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -202,5 +202,4 @@ method close*(m: Mplex) {.async, gcsafe.} = finally: m.remote.clear() m.local.clear() - # m.handlerFuts = @[] m.isClosed = true diff --git a/libp2p/protocols/identify.nim b/libp2p/protocols/identify.nim index 0f410ca7d..3c25f0984 100644 --- a/libp2p/protocols/identify.nim +++ b/libp2p/protocols/identify.nim @@ -27,7 +27,7 @@ const ProtoVersion* = "ipfs/0.1.0" AgentVersion* = "nim-libp2p/0.0.1" -#TODO: implment push identify, leaving out for now as it is not essential +#TODO: implement push identify, leaving out for now as it is not essential type IdentityNoMatchError* = object of CatchableError @@ -141,7 +141,7 @@ proc identify*(p: Identify, if not isNil(remotePeerInfo) and result.pubKey.isSome: let peer = PeerID.init(result.pubKey.get()) - # do a string comaprison of the ids, + # do a string comparison of the ids, # because that is the only thing we # have in most cases if peer != remotePeerInfo.peerId: diff --git a/libp2p/protocols/secure/noise.nim b/libp2p/protocols/secure/noise.nim index 0e286b90d..cdd37eec0 100644 --- a/libp2p/protocols/secure/noise.nim +++ b/libp2p/protocols/secure/noise.nim @@ -413,7 +413,7 @@ method write*(sconn: NoiseConnection, message: seq[byte]): Future[void] {.async. await sconn.stream.write(outbuf) method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureConn] {.async.} = - debug "Starting Noise handshake", initiator, peer = $conn + trace "Starting Noise handshake", initiator, peer = $conn # https://github.com/libp2p/specs/tree/master/noise#libp2p-data-in-handshake-messages let @@ -454,7 +454,7 @@ method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureCon if not remoteSig.verify(verifyPayload, remotePubKey): raise newException(NoiseHandshakeError, "Noise handshake signature verify failed.") else: - debug "Remote signature verified", peer = $conn + trace "Remote signature verified", peer = $conn if initiator and not isNil(conn.peerInfo): let pid = PeerID.init(remotePubKey) @@ -477,7 +477,7 @@ method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureCon secure.readCs = handshakeRes.cs1 secure.writeCs = handshakeRes.cs2 - debug "Noise handshake completed!", initiator, peer = $secure.peerInfo + trace "Noise handshake completed!", initiator, peer = $secure.peerInfo return secure diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index 5a594f161..baeaece02 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -87,29 +87,6 @@ method secure*(s: Secure, conn: Connection, initiator: bool): Future[Connection] warn "securing connection failed", msg = exc.msg return nil -method readExactly*(s: SecureConn, - pbytes: pointer, - nbytes: int): - Future[void] {.async, gcsafe.} = - try: - if nbytes == 0: - return - - while s.buf.data().len < nbytes: - # TODO write decrypted content straight into buf using `prepare` - let buf = await s.readMessage() - if buf.len == 0: - raise newLPStreamIncompleteError() - s.buf.add(buf) - - var p = cast[ptr UncheckedArray[byte]](pbytes) - let consumed = s.buf.consumeTo(toOpenArray(p, 0, nbytes - 1)) - doAssert consumed == nbytes, "checked above" - except CatchableError as exc: - trace "exception reading from secure connection", exc = exc.msg, oid = s.oid - await s.close() # make sure to close the wrapped connection - raise exc - method readOnce*(s: SecureConn, pbytes: pointer, nbytes: int): diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index e2dfff9c7..e879766cb 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -15,7 +15,7 @@ ## ## It works by exposing a regular LPStream interface and ## a method ``pushTo`` to push data to the internal read -## buffer; as well as a handler that can be registrered +## buffer; as well as a handler that can be registered ## that gets triggered on every write to the stream. This ## allows using the buffered stream as a sort of proxy, ## which can be consumed as a regular LPStream but allows @@ -25,7 +25,7 @@ ## ordered and asynchronous. Reads are queued up in order ## and are suspended when not enough data available. This ## allows preserving backpressure while maintaining full -## asynchrony. Both writting to the internal buffer with +## asynchrony. Both writing to the internal buffer with ## ``pushTo`` as well as reading with ``read*` methods, ## will suspend until either the amount of elements in the ## buffer goes below ``maxSize`` or more data becomes available. @@ -180,7 +180,7 @@ method pushTo*(s: BufferStream, data: seq[byte]) {.base, async.} = while index < data.len and s.readBuf.len < s.maxSize: s.readBuf.addLast(data[index]) inc(index) - # trace "pushTo()", msg = "added " & $index & " bytes to readBuf", oid = s.oid + # trace "pushTo()", msg = "added " & $s.len & " bytes to readBuf", oid = s.oid # resolve the next queued read request if s.readReqs.len > 0: @@ -195,57 +195,27 @@ method pushTo*(s: BufferStream, data: seq[byte]) {.base, async.} = await s.dataReadEvent.wait() s.dataReadEvent.clear() finally: + # trace "ended", size = s.len s.lock.release() -method readExactly*(s: BufferStream, - pbytes: pointer, - nbytes: int): - Future[void] {.async.} = - ## Read exactly ``nbytes`` bytes from read-only stream ``rstream`` and store - ## it to ``pbytes``. - ## - ## If EOF is received and ``nbytes`` is not yet read, the procedure - ## will raise ``LPStreamIncompleteError``. - ## - - if s.atEof: - raise newLPStreamEOFError() - - # trace "readExactly()", requested_bytes = nbytes, oid = s.oid - var index = 0 - - if s.readBuf.len() == 0: - await s.requestReadBytes() - - let output = cast[ptr UncheckedArray[byte]](pbytes) - while index < nbytes: - while s.readBuf.len() > 0 and index < nbytes: - output[index] = s.popFirst() - inc(index) - # trace "readExactly()", read_bytes = index, oid = s.oid - - if index < nbytes: - await s.requestReadBytes() - method readOnce*(s: BufferStream, pbytes: pointer, nbytes: int): Future[int] {.async.} = - ## Perform one read operation on read-only stream ``rstream``. - ## - ## If internal buffer is not empty, ``nbytes`` bytes will be transferred from - ## internal buffer, otherwise it will wait until some bytes will be received. - ## - if s.atEof: raise newLPStreamEOFError() - if s.readBuf.len == 0: + if s.len() == 0: await s.requestReadBytes() - var len = if nbytes > s.readBuf.len: s.readBuf.len else: nbytes - await s.readExactly(pbytes, len) - result = len + var index = 0 + var size = min(nbytes, s.len) + let output = cast[ptr UncheckedArray[byte]](pbytes) + while s.len() > 0 and index < size: + output[index] = s.popFirst() + inc(index) + + return size method write*(s: BufferStream, msg: seq[byte]) {.async.} = ## Write sequence of bytes ``sbytes`` of length ``msglen`` to writer @@ -266,6 +236,7 @@ method write*(s: BufferStream, msg: seq[byte]) {.async.} = await s.writeHandler(msg) +# TODO: move pipe routines out proc pipe*(s: BufferStream, target: BufferStream): BufferStream = ## pipe the write end of this stream to @@ -310,6 +281,7 @@ method close*(s: BufferStream) {.async, gcsafe.} = ## close the stream and clear the buffer if not s.isClosed: trace "closing bufferstream", oid = s.oid + s.isEof = true for r in s.readReqs: if not(isNil(r)) and not(r.finished()): r.fail(newLPStreamEOFError()) diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index f5b239b80..4ff0d039c 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -42,15 +42,6 @@ template withExceptions(body: untyped) = raise newLPStreamEOFError() # raise (ref LPStreamError)(msg: exc.msg, parent: exc) -method readExactly*(s: ChronosStream, - pbytes: pointer, - nbytes: int): Future[void] {.async.} = - if s.atEof: - raise newLPStreamEOFError() - - withExceptions: - await s.client.readExactly(pbytes, nbytes) - method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async.} = if s.atEof: raise newLPStreamEOFError() diff --git a/libp2p/stream/lpstream.nim b/libp2p/stream/lpstream.nim index d6969a76a..410d3ed46 100644 --- a/libp2p/stream/lpstream.nim +++ b/libp2p/stream/lpstream.nim @@ -94,12 +94,6 @@ method closed*(s: LPStream): bool {.base, inline.} = method atEof*(s: LPStream): bool {.base, inline.} = s.isEof -method readExactly*(s: LPStream, - pbytes: pointer, - nbytes: int): - Future[void] {.base, async.} = - doAssert(false, "not implemented!") - method readOnce*(s: LPStream, pbytes: pointer, nbytes: int): @@ -107,6 +101,22 @@ method readOnce*(s: LPStream, {.base, async.} = doAssert(false, "not implemented!") +proc readExactly*(s: LPStream, + pbytes: pointer, + nbytes: int): + Future[void] {.async.} = + + if s.atEof: + raise newLPStreamEOFError() + + var pbuffer = cast[ptr UncheckedArray[byte]](pbytes) + var read = 0 + while read < nbytes and not(s.atEof()): + read += await s.readOnce(addr pbuffer[read], nbytes - read) + + if read < nbytes: + raise newLPStreamIncompleteError() + proc readLine*(s: LPStream, limit = 0, sep = "\r\n"): Future[string] {.async, deprecated: "todo".} = # TODO replace with something that exploits buffering better var lim = if limit <= 0: -1 else: limit @@ -140,6 +150,7 @@ proc readVarint*(conn: LPStream): Future[uint64] {.async, gcsafe.} = for i in 0..