From 0db45462cd50cebf0675bcf425e90d07bb8f9195 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Mon, 14 Sep 2020 10:19:54 +0200 Subject: [PATCH] mplex fixes (#362) * remove almost-empty types module * lock when writing message (that's the only place the lock matters, and only when the message is > max msg size) * logging updates (log in consistent order, makes reading logs easier) * raise EOF from readExactly only if no bytes have been read (to signal that _no_ bytes were lost) --- docs/GETTING_STARTED.md | 1 - docs/tutorial/directchat/second.nim | 3 +-- docs/tutorial/second.nim | 3 +-- examples/directchat.nim | 3 +-- libp2p/muxers/mplex/coder.nim | 15 +++++++++-- libp2p/muxers/mplex/lpchannel.nim | 27 ++++++++++--------- libp2p/muxers/mplex/mplex.nim | 40 ++++++++++++++--------------- libp2p/muxers/mplex/types.nim | 27 ------------------- libp2p/standard_setup.nim | 2 +- libp2p/stream/lpstream.nim | 16 +++++++----- tests/testbufferstream.nim | 2 +- tests/testinterop.nim | 1 - tests/testmplex.nim | 1 - tests/testnoise.nim | 1 - 14 files changed, 60 insertions(+), 82 deletions(-) delete mode 100644 libp2p/muxers/mplex/types.nim diff --git a/docs/GETTING_STARTED.md b/docs/GETTING_STARTED.md index 1c74e7ff2..43077edaf 100644 --- a/docs/GETTING_STARTED.md +++ b/docs/GETTING_STARTED.md @@ -22,7 +22,6 @@ import ../libp2p/[switch, protocols/protocol, muxers/muxer, muxers/mplex/mplex, - muxers/mplex/types, protocols/secure/secio, protocols/secure/secure] diff --git a/docs/tutorial/directchat/second.nim b/docs/tutorial/directchat/second.nim index de83d3db5..425e2e598 100644 --- a/docs/tutorial/directchat/second.nim +++ b/docs/tutorial/directchat/second.nim @@ -17,8 +17,7 @@ import ../libp2p/[switch, protocols/secure/secure, protocols/secure/secio, muxers/muxer, - muxers/mplex/mplex, - muxers/mplex/types] + muxers/mplex/mplex] const ChatCodec = "/nim-libp2p/chat/1.0.0" const DefaultAddr = "/ip4/127.0.0.1/tcp/55505" diff --git a/docs/tutorial/second.nim b/docs/tutorial/second.nim index de83d3db5..425e2e598 100644 --- a/docs/tutorial/second.nim +++ b/docs/tutorial/second.nim @@ -17,8 +17,7 @@ import ../libp2p/[switch, protocols/secure/secure, protocols/secure/secio, muxers/muxer, - muxers/mplex/mplex, - muxers/mplex/types] + muxers/mplex/mplex] const ChatCodec = "/nim-libp2p/chat/1.0.0" const DefaultAddr = "/ip4/127.0.0.1/tcp/55505" diff --git a/examples/directchat.nim b/examples/directchat.nim index 86d3c8aa5..200fbafc5 100644 --- a/examples/directchat.nim +++ b/examples/directchat.nim @@ -18,8 +18,7 @@ import ../libp2p/[switch, # manage transports, a single entry protocols/secure/secure, # define the protocol of secure connection protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS muxers/muxer, # define an interface for stream multiplexing, allowing peers to offer many protocols over a single connection - muxers/mplex/mplex, # implement stream multiplexing - muxers/mplex/types] # define some contants and message types for stream multiplexing + muxers/mplex/mplex] # define some contants and message types for stream multiplexing const ChatCodec = "/nim-libp2p/chat/1.0.0" const DefaultAddr = "/ip4/127.0.0.1/tcp/55505" diff --git a/libp2p/muxers/mplex/coder.nim b/libp2p/muxers/mplex/coder.nim index 558e266e0..4c82c7d06 100644 --- a/libp2p/muxers/mplex/coder.nim +++ b/libp2p/muxers/mplex/coder.nim @@ -9,8 +9,7 @@ import chronos import nimcrypto/utils, chronicles, stew/byteutils -import types, - ../../stream/connection, +import ../../stream/connection, ../../utility, ../../varint, ../../vbuffer @@ -19,6 +18,15 @@ logScope: topics = "mplexcoder" type + MessageType* {.pure.} = enum + New, + MsgIn, + MsgOut, + CloseIn, + CloseOut, + ResetIn, + ResetOut + Msg* = tuple id: uint64 msgType: MessageType @@ -26,6 +34,9 @@ type InvalidMplexMsgType = object of CatchableError +# https://github.com/libp2p/specs/tree/master/mplex#writing-to-a-stream +const MaxMsgSize* = 1 shl 20 # 1mb + proc newInvalidMplexMsgType*(): ref InvalidMplexMsgType = newException(InvalidMplexMsgType, "invalid message type") diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 33342a922..8b3729359 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -9,8 +9,7 @@ import std/[oids, strformat] import chronos, chronicles, metrics -import types, - coder, +import ./coder, ../muxer, nimcrypto/utils, ../../stream/[bufferstream, connection, streamseq], @@ -76,8 +75,7 @@ func shortLog*(s: LPChannel): auto = chronicles.formatIt(LPChannel): shortLog(it) proc closeMessage(s: LPChannel) {.async.} = - ## send close message - this will not raise - ## on EOF or Closed + ## send close message withWriteLock(s.writeLock): trace "sending close message", s @@ -94,13 +92,13 @@ proc resetMessage(s: LPChannel) {.async.} = # need to re-raise CancelledError. debug "Unexpected cancellation while resetting channel", s except LPStreamEOFError as exc: - trace "muxed connection EOF", s, exc = exc.msg + trace "muxed connection EOF", s, msg = exc.msg except LPStreamClosedError as exc: - trace "muxed connection closed", s, exc = exc.msg + trace "muxed connection closed", s, msg = exc.msg except LPStreamIncompleteError as exc: - trace "incomplete message", s, exc = exc.msg + trace "incomplete message", s, msg = exc.msg except CatchableError as exc: - debug "Unhandled exception leak", s, exc = exc.msg + debug "Unhandled exception leak", s, msg = exc.msg proc open*(s: LPChannel) {.async, gcsafe.} = await s.conn.writeMsg(s.id, MessageType.New, s.name) @@ -115,7 +113,7 @@ proc closeRemote*(s: LPChannel) {.async.} = except CancelledError as exc: raise exc except CatchableError as exc: - trace "exception closing remote channel", s, exc = exc.msg + trace "exception closing remote channel", s, msg = exc.msg trace "Closed remote", s @@ -141,7 +139,7 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} = trace "channel already closed or reset", s return - trace "Resetting channel", s + trace "Resetting channel", s, len = s.len # First, make sure any new calls to `readOnce` and `pushTo` will fail - there # may already be such calls in the event queue @@ -174,7 +172,7 @@ method close*(s: LPChannel) {.async, gcsafe.} = trace "Already closed", s return - trace "Closing channel", s + trace "Closing channel", s, len = s.len proc closeInternal() {.async.} = try: @@ -189,7 +187,7 @@ method close*(s: LPChannel) {.async, gcsafe.} = except LPStreamClosedError, LPStreamEOFError: trace "Connection already closed", s except CatchableError as exc: # Shouldn't happen? - debug "Exception closing channel", s, exc = exc.msg + warn "Exception closing channel", s, msg = exc.msg await s.reset() trace "Closed channel", s @@ -221,10 +219,11 @@ method write*(s: LPChannel, msg: seq[byte]): Future[void] {.async.} = # writes should happen in sequence trace "write msg", len = msg.len - await s.conn.writeMsg(s.id, s.msgCode, msg) + withWriteLock(s.writeLock): + await s.conn.writeMsg(s.id, s.msgCode, msg) s.activity = true except CatchableError as exc: - trace "exception in lpchannel write handler", s, exc = exc.msg + trace "exception in lpchannel write handler", s, msg = exc.msg await s.conn.close() raise exc diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index d54b844b6..f5b18b377 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -14,18 +14,20 @@ import ../muxer, ../../stream/bufferstream, ../../utility, ../../peerinfo, - coder, - types, - lpchannel + ./coder, + ./lpchannel export muxer logScope: topics = "mplex" +const MplexCodec* = "/mplex/6.7.0" + const MaxChannelCount = 200 + when defined(libp2p_expensive_metrics): declareGauge(libp2p_mplex_channels, "mplex channels", labels = ["initiator", "peer"]) @@ -58,19 +60,17 @@ proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} = try: await chann.join() m.channels[chann.initiator].del(chann.id) - debug "cleaned up channel", m, chann + trace "cleaned up channel", m, chann when defined(libp2p_expensive_metrics): libp2p_mplex_channels.set( m.channels[chann.initiator].len.int64, labelValues = [$chann.initiator, $m.connection.peerInfo.peerId]) - except CancelledError: - # This is top-level procedure which will work as separate task, so it - # do not need to propogate CancelledError. - debug "Unexpected cancellation in mplex channel cleanup", - m, chann except CatchableError as exc: - debug "error cleaning up mplex channel", exc = exc.msg, m, chann + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError, and no other exceptions should + # happen here + warn "Error cleaning up mplex channel", m, chann, msg = exc.msg proc newStreamInternal*(m: Mplex, initiator: bool = true, @@ -99,7 +99,7 @@ proc newStreamInternal*(m: Mplex, result.peerInfo = m.connection.peerInfo result.observedAddr = m.connection.observedAddr - trace "Creating new channel", id, initiator, name, m, channel = result + trace "Creating new channel", m, channel = result, id, initiator, name m.channels[initiator][id] = result @@ -118,14 +118,10 @@ proc handleStream(m: Mplex, chann: LPChannel) {.async.} = await m.streamHandler(chann) trace "finished handling stream", m, chann doAssert(chann.closed, "connection not closed by handler!") - except CancelledError: - trace "Unexpected cancellation in stream handler", m, chann - await chann.reset() + except CatchableError as exc: # This is top-level procedure which will work as separate task, so it # do not need to propogate CancelledError. - except CatchableError as exc: - trace "Exception in mplex stream handler", - exc = exc.msg, m, chann + trace "Exception in mplex stream handler", m, chann, msg = exc.msg await chann.reset() method handle*(m: Mplex) {.async, gcsafe.} = @@ -162,6 +158,8 @@ method handle*(m: Mplex) {.async, gcsafe.} = let name = string.fromBytes(data) m.newStreamInternal(false, id, name, timeout = m.outChannTimeout) + trace "Processing channel message", m, channel, data = data.shortLog + case msgType: of MessageType.New: trace "created channel", m, channel @@ -177,9 +175,9 @@ method handle*(m: Mplex) {.async, gcsafe.} = allowed = MaxMsgSize, channel raise newLPStreamLimitError() - trace "pushing data to channel", m, channel + trace "pushing data to channel", m, channel, len = data.len await channel.pushTo(data) - trace "pushed data to channel", m, channel + trace "pushed data to channel", m, channel, len = data.len of MessageType.CloseIn, MessageType.CloseOut: await channel.closeRemote() @@ -190,9 +188,9 @@ method handle*(m: Mplex) {.async, gcsafe.} = # there no way for this procedure to be cancelled implicitely. debug "Unexpected cancellation in mplex handler", m except LPStreamEOFError as exc: - trace "Stream EOF", msg = exc.msg, m + trace "Stream EOF", m, msg = exc.msg except CatchableError as exc: - warn "Unexpected exception in mplex read loop", msg = exc.msg, m + warn "Unexpected exception in mplex read loop", m, msg = exc.msg finally: await m.close() trace "Stopped mplex handler", m diff --git a/libp2p/muxers/mplex/types.nim b/libp2p/muxers/mplex/types.nim deleted file mode 100644 index 680dd10fb..000000000 --- a/libp2p/muxers/mplex/types.nim +++ /dev/null @@ -1,27 +0,0 @@ -## Nim-LibP2P -## Copyright (c) 2019 Status Research & Development GmbH -## Licensed under either of -## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -## * MIT license ([LICENSE-MIT](LICENSE-MIT)) -## at your option. -## This file may not be copied, modified, or distributed except according to -## those terms. - -import chronos - -# https://github.com/libp2p/specs/tree/master/mplex#writing-to-a-stream -const MaxMsgSize* = 1 shl 20 # 1mb -const MplexCodec* = "/mplex/6.7.0" -const MaxReadWriteTime* = 5.seconds - -type - MplexNoSuchChannel* = object of CatchableError - - MessageType* {.pure.} = enum - New, - MsgIn, - MsgOut, - CloseIn, - CloseOut, - ResetIn, - ResetOut diff --git a/libp2p/standard_setup.nim b/libp2p/standard_setup.nim index d9d51ff21..4d7f469c2 100644 --- a/libp2p/standard_setup.nim +++ b/libp2p/standard_setup.nim @@ -2,7 +2,7 @@ import options, tables, chronos, bearssl, switch, peerid, peerinfo, stream/connection, multiaddress, crypto/crypto, transports/[transport, tcptransport], - muxers/[muxer, mplex/mplex, mplex/types], + muxers/[muxer, mplex/mplex], protocols/[identify, secure/secure] import diff --git a/libp2p/stream/lpstream.nim b/libp2p/stream/lpstream.nim index c59366c72..f1f197d9b 100644 --- a/libp2p/stream/lpstream.nim +++ b/libp2p/stream/lpstream.nim @@ -112,6 +112,9 @@ proc readExactly*(s: LPStream, if s.atEof: raise newLPStreamEOFError() + if nbytes == 0: + return + logScope: s nbytes = nbytes @@ -122,13 +125,14 @@ proc readExactly*(s: LPStream, while read < nbytes and not(s.atEof()): read += await s.readOnce(addr pbuffer[read], nbytes - read) + if read == 0: + doAssert s.atEof() + trace "couldn't read all bytes, stream EOF", s, nbytes, read + raise newLPStreamEOFError() + if read < nbytes: - if s.atEof: - trace "couldn't read all bytes, stream EOF", expected = nbytes, read - raise newLPStreamEOFError() - else: - trace "couldn't read all bytes, incomplete data", expected = nbytes, read - raise newLPStreamIncompleteError() + trace "couldn't read all bytes, incomplete data", s, nbytes, read + raise newLPStreamIncompleteError() proc readLine*(s: LPStream, limit = 0, diff --git a/tests/testbufferstream.nim b/tests/testbufferstream.nim index 847ab6708..944fe3730 100644 --- a/tests/testbufferstream.nim +++ b/tests/testbufferstream.nim @@ -95,7 +95,7 @@ suite "BufferStream": try: await readFut - except LPStreamEOFError: + except LPStreamIncompleteError: result = true check: diff --git a/tests/testinterop.nim b/tests/testinterop.nim index 472e9abb8..1573a658a 100644 --- a/tests/testinterop.nim +++ b/tests/testinterop.nim @@ -19,7 +19,6 @@ import ../libp2p/[daemon/daemonapi, crypto/crypto, muxers/mplex/mplex, muxers/muxer, - muxers/mplex/types, protocols/protocol, protocols/identify, transports/transport, diff --git a/tests/testmplex.nim b/tests/testmplex.nim index 57c0e27f1..5b33e646f 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -8,7 +8,6 @@ import ../libp2p/[errors, multiaddress, muxers/mplex/mplex, muxers/mplex/coder, - muxers/mplex/types, muxers/mplex/lpchannel, vbuffer, varint] diff --git a/tests/testnoise.nim b/tests/testnoise.nim index 6da7a1e8c..8fbd5608b 100644 --- a/tests/testnoise.nim +++ b/tests/testnoise.nim @@ -27,7 +27,6 @@ import ../libp2p/[switch, protocols/protocol, muxers/muxer, muxers/mplex/mplex, - muxers/mplex/types, protocols/secure/noise, protocols/secure/secure] import ./helpers