From 82c179db9ea919304dfa0d22063e11d7aecba8a3 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Tue, 8 Sep 2020 08:24:28 +0200 Subject: [PATCH] mplex fixes (#356) * close the right connection when channel send fails * don't crash on channel id that is not unique --- libp2p/connmanager.nim | 25 ++++++++++++++++--------- libp2p/muxers/mplex/lpchannel.nim | 6 +++--- libp2p/muxers/mplex/mplex.nim | 24 +++++++++++++----------- 3 files changed, 32 insertions(+), 23 deletions(-) diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index 9be88f400..b17bbf46a 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -76,11 +76,12 @@ proc contains*(c: ConnManager, muxer: Muxer): bool = return muxer == c.muxed[conn].muxer proc closeMuxerHolder(muxerHolder: MuxerHolder) {.async.} = - trace "cleaning up muxer for peer" + trace "Cleaning up muxer", m = muxerHolder.muxer await muxerHolder.muxer.close() if not(isNil(muxerHolder.handle)): await muxerHolder.handle # TODO noraises? + trace "Cleaned up muxer", m = muxerHolder.muxer proc delConn(c: ConnManager, conn: Connection) = let peerId = conn.peerInfo.peerId @@ -91,6 +92,8 @@ proc delConn(c: ConnManager, conn: Connection) = c.conns.del(peerId) libp2p_peers.set(c.conns.len.int64) + trace "Removed connection", conn + proc cleanupConn(c: ConnManager, conn: Connection) {.async.} = ## clean connection's resources such as muxers and streams @@ -113,7 +116,7 @@ proc cleanupConn(c: ConnManager, conn: Connection) {.async.} = finally: await conn.close() - trace "connection cleaned up", conn + trace "Connection cleaned up", conn proc onClose(c: ConnManager, conn: Connection) {.async.} = ## connection close even handler @@ -122,15 +125,15 @@ proc onClose(c: ConnManager, conn: Connection) {.async.} = ## try: await conn.join() - trace "triggering connection cleanup", conn + trace "Connection closed, cleaning up", conn await c.cleanupConn(conn) except CancelledError: # This is top-level procedure which will work as separate task, so it # do not need to propogate CancelledError. - trace "Unexpected cancellation in connection manager's cleanup" + debug "Unexpected cancellation in connection manager's cleanup", conn except CatchableError as exc: - trace "Unexpected exception in connection manager's cleanup", - errMsg = exc.msg + debug "Unexpected exception in connection manager's cleanup", + errMsg = exc.msg, conn proc selectConn*(c: ConnManager, peerId: PeerID, @@ -181,7 +184,7 @@ proc storeConn*(c: ConnManager, conn: Connection) = let peerId = conn.peerInfo.peerId if c.conns.getOrDefault(peerId).len > c.maxConns: - trace "too many connections", peer = $peerId, + debug "too many connections", peer = conn, conns = c.conns.getOrDefault(peerId).len raise newTooManyConnections() @@ -196,7 +199,8 @@ proc storeConn*(c: ConnManager, conn: Connection) = asyncSpawn c.onClose(conn) libp2p_peers.set(c.conns.len.int64) - trace "stored connection", connections = c.conns.len, conn + trace "Stored connection", + connections = c.conns.len, conn, direction = $conn.dir proc storeOutgoing*(c: ConnManager, conn: Connection) = conn.dir = Direction.Out @@ -222,7 +226,7 @@ proc storeMuxer*(c: ConnManager, muxer: muxer, handle: handle) - trace "stored muxer", connections = c.conns.len, muxer + trace "Stored muxer", connections = c.conns.len, muxer proc getMuxedStream*(c: ConnManager, peerId: PeerID, @@ -256,8 +260,10 @@ proc getMuxedStream*(c: ConnManager, proc dropPeer*(c: ConnManager, peerId: PeerID) {.async.} = ## drop connections and cleanup resources for peer ## + trace "Dropping peer", peerId let conns = c.conns.getOrDefault(peerId) for conn in conns: + trace "Removing connection", conn delConn(c, conn) var muxers: seq[MuxerHolder] @@ -271,6 +277,7 @@ proc dropPeer*(c: ConnManager, peerId: PeerID) {.async.} = for conn in conns: await conn.close() + trace "Dropped peer", peerId proc close*(c: ConnManager) {.async.} = ## cleanup resources for the connection diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 4e8a2b534..54c2a8fe1 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -90,7 +90,7 @@ proc resetMessage(s: LPChannel) {.async.} = except CancelledError: # This procedure is called from one place and never awaited, so there no # need to re-raise CancelledError. - trace "Unexpected cancellation while resetting channel" + debug "Unexpected cancellation while resetting channel", s except LPStreamEOFError as exc: trace "muxed connection EOF", exc = exc.msg, s except LPStreamClosedError as exc: @@ -217,14 +217,14 @@ proc init*( await chann.open() # writes should happen in sequence - trace "sending data", len = data.len, chann + trace "sending data", len = data.len, conn, chann await conn.writeMsg(chann.id, chann.msgCode, data) except CatchableError as exc: trace "exception in lpchannel write handler", exc = exc.msg, chann - await chann.reset() + asyncSpawn conn.close() raise exc chann.initBufferStream(writeHandler, size) diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 37244258b..71d24c0be 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -32,6 +32,7 @@ when defined(libp2p_expensive_metrics): type TooManyChannels* = object of CatchableError + InvalidChannelIdError* = object of CatchableError Mplex* = ref object of Muxer channels: array[bool, Table[uint64, LPChannel]] @@ -48,13 +49,16 @@ chronicles.formatIt(Mplex): shortLog(it) proc newTooManyChannels(): ref TooManyChannels = newException(TooManyChannels, "max allowed channel count exceeded") +proc newInvalidChannelIdError(): ref InvalidChannelIdError = + newException(InvalidChannelIdError, "max allowed channel count exceeded") + proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} = ## remove the local channel from the internal tables ## try: await chann.join() m.channels[chann.initiator].del(chann.id) - trace "cleaned up channel", m, chann + debug "cleaned up channel", m, chann when defined(libp2p_expensive_metrics): libp2p_mplex_channels.set( @@ -63,10 +67,10 @@ proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} = except CancelledError: # This is top-level procedure which will work as separate task, so it # do not need to propogate CancelledError. - trace "Unexpected cancellation in mplex channel cleanup", + debug "Unexpected cancellation in mplex channel cleanup", m, chann except CatchableError as exc: - trace "error cleaning up mplex channel", exc = exc.msg, m, chann + debug "error cleaning up mplex channel", exc = exc.msg, m, chann proc newStreamInternal*(m: Mplex, initiator: bool = true, @@ -81,10 +85,9 @@ proc newStreamInternal*(m: Mplex, m.currentId.inc(); m.currentId else: chanId - trace "creating new channel", id, - initiator, - name, - m + if id in m.channels[initiator]: + raise newInvalidChannelIdError() + result = LPChannel.init( id, m.connection, @@ -96,8 +99,7 @@ proc newStreamInternal*(m: Mplex, result.peerInfo = m.connection.peerInfo result.observedAddr = m.connection.observedAddr - doAssert(id notin m.channels[initiator], - "channel slot already taken!") + trace "Creating new channel", id, initiator, name, m, channel = result m.channels[initiator][id] = result @@ -127,7 +129,7 @@ proc handleStream(m: Mplex, chann: LPChannel) {.async.} = await chann.reset() method handle*(m: Mplex) {.async, gcsafe.} = - trace "starting mplex main loop", m, peer = m.connection.peerInfo.peerId + trace "Starting mplex main loop", m try: while not m.connection.atEof: trace "waiting for data", m @@ -186,7 +188,7 @@ method handle*(m: Mplex) {.async, gcsafe.} = except CancelledError: # This procedure is spawned as task and it is not part of public API, so # there no way for this procedure to be cancelled implicitely. - trace "Unexpected cancellation in mplex handler" + debug "Unexpected cancellation in mplex handler", m except CatchableError as exc: trace "Exception occurred", exception = exc.msg, m finally: