From 16a008db75df36183d375dac5a9e0f923bca4617 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Fri, 4 Sep 2020 20:30:26 +0200 Subject: [PATCH] fix connection event order when connection dies early (#351) if the connection is already closed (because the remote closes during identfiy for example), an exception would be raised which would leave the connection in limbo, beacuse it would not go through the rest of internalConnect. Also, if the connection is already closed, the disconnect event would be scheduled before the connect event :/ --- libp2p/protocols/pubsub/rpc/messages.nim | 2 +- libp2p/switch.nim | 74 ++++++++++++++---------- 2 files changed, 44 insertions(+), 32 deletions(-) diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index 3930de5..92ba5e3 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -86,7 +86,7 @@ func shortLog*(c: ControlMessage): auto = func shortLog*(msg: Message): auto = ( - fromPeer: msg.fromPeer, + fromPeer: msg.fromPeer.pretty, data: msg.data.shortLog, seqno: msg.seqno.shortLog, topicIDs: $msg.topicIDs, diff --git a/libp2p/switch.nim b/libp2p/switch.nim index f7b9aab..d31067c 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -205,10 +205,6 @@ proc disconnect*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} = s.connManager.dropPeer(peerId) proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = - logScope: - conn = $conn - oid = $conn.oid - let sconn = await s.secure(conn) # secure the connection if isNil(sconn): raise newException(CatchableError, @@ -218,21 +214,29 @@ proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, g raise newException(CatchableError, "current version of nim-libp2p requires that secure protocol negotiates peerid") - trace "upgrading connection" + trace "upgrading connection", conn = $sconn, uoid = $conn.oid let muxer = await s.mux(sconn) # mux it if possible if muxer == nil: # TODO this might be relaxed in the future raise newException(CatchableError, "a muxer is required for outgoing connections") - await s.identify(muxer) + try: + await s.identify(muxer) + except CatchableError as exc: + # Identify is non-essential, though if it fails, it might indicate that + # the connection was closed already - this will be picked up by the read + # loop + debug "Could not identify connection", + err = exc.msg, conn = $conn, uoid = $conn.oid if isNil(sconn.peerInfo): await sconn.close() raise newException(CatchableError, "unable to identify connection, stopping upgrade") - trace "successfully upgraded outgoing connection", oid = sconn.oid + trace "successfully upgraded outgoing connection", + conn = $sconn, uoid = $conn.oid, oid = $sconn.oid return sconn @@ -267,7 +271,8 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = except CancelledError as exc: raise exc except CatchableError as exc: - debug "ending secured handler", err = exc.msg + debug "ending secured handler", + err = exc.msg, conn = $conn, oid = $conn.oid if (await ms.select(conn)): # just handshake # add the secure handlers @@ -281,9 +286,6 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = proc internalConnect(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]): Future[Connection] {.async.} = - logScope: - peer = peerId - if s.peerInfo.peerId == peerId: raise newException(CatchableError, "can't dial self!") @@ -300,12 +302,12 @@ proc internalConnect(s: Switch, # This connection should already have been removed from the connection # manager - it's essentially a bug that we end up here - we'll fail # for now, hoping that this will clean themselves up later... - warn "dead connection in connection manager" + warn "dead connection in connection manager", peer = $peerId await conn.close() raise newException(CatchableError, "Zombie connection encountered") - trace "Reusing existing connection", oid = $conn.oid, - direction = $conn.dir + trace "Reusing existing connection", + oid = $conn.oid, direction = $conn.dir, peer = $peerId return conn @@ -317,10 +319,10 @@ proc internalConnect(s: Switch, let dialed = try: await t.dial(a) except CancelledError as exc: - trace "dialing canceled", exc = exc.msg + trace "dialing canceled", exc = exc.msg, peer = $peerId raise exc except CatchableError as exc: - trace "dialing failed", exc = exc.msg + trace "dialing failed", exc = exc.msg, peer = $peerId libp2p_failed_dials.inc() continue # Try the next address @@ -344,6 +346,7 @@ proc internalConnect(s: Switch, conn = upgraded trace "dial successful", + peer = $peerId, oid = $upgraded.oid, peerInfo = shortLog(upgraded.peerInfo) break @@ -354,6 +357,14 @@ proc internalConnect(s: Switch, if isNil(conn): # None of the addresses connected raise newException(CatchableError, "Unable to establish outgoing link") + if conn.closed(): + # This can happen if one of the peer event handlers deems the peer + # unworthy and disconnects it + raise newLPStreamClosedError() + + await s.triggerConnEvent( + peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: false)) + proc peerCleanup() {.async.} = try: await conn.closeEvent.wait() @@ -362,21 +373,15 @@ proc internalConnect(s: Switch, except CancelledError: # This is top-level procedure which will work as separate task, so it # do not need to propogate CancelledError. - trace "Unexpected cancellation in switch peer connect cleanup" + trace "Unexpected cancellation in switch peer connect cleanup", + peer = $peerId except CatchableError as exc: trace "Unexpected exception in switch peer connect cleanup", - errMsg = exc.msg + errMsg = exc.msg, peer = $peerId # All the errors are handled inside `cleanup()` procedure. asyncSpawn peerCleanup() - await s.triggerConnEvent( - peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: false)) - - if conn.closed(): - # This can happen if one of the peer event handlers deems the peer - # unworthy and disconnects it - raise newException(CatchableError, "Connection closed during handshake") return conn @@ -498,7 +503,13 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = try: await s.identify(muxer) + except CatchableError as exc: + # Identify is non-essential, though if it fails, it might indicate that + # the connection was closed already - this will be picked up by the read + # loop + debug "Could not identify connection", err = exc.msg + try: let peerId = muxer.connection.peerInfo.peerId proc peerCleanup() {.async.} = @@ -509,9 +520,9 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = except CancelledError: # This is top-level procedure which will work as separate task, so it # do not need to propogate CancelledError. - trace "Unexpected cancellation in switch muxer cleanup" + debug "Unexpected cancellation in switch muxer cleanup" except CatchableError as exc: - trace "Unexpected exception in switch muxer cleanup", + debug "Unexpected exception in switch muxer cleanup", errMsg = exc.msg proc peerStartup() {.async.} = @@ -522,15 +533,16 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = except CancelledError: # This is top-level procedure which will work as separate task, so it # do not need to propogate CancelledError. - trace "Unexpected cancellation in switch muxer startup" + debug "Unexpected cancellation in switch muxer startup" except CatchableError as exc: - trace "Unexpected exception in switch muxer startup", + debug "Unexpected exception in switch muxer startup", errMsg = exc.msg + # All the errors are handled inside `peerStartup()` procedure. + asyncSpawn peerStartup() + # All the errors are handled inside `peerCleanup()` procedure. asyncSpawn peerCleanup() - # All the errors are handled inside `peerStartup()` procedure. - asyncSpawn peerStartup() except CancelledError as exc: await muxer.close()