diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index 3930de5..92ba5e3 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -86,7 +86,7 @@ func shortLog*(c: ControlMessage): auto = func shortLog*(msg: Message): auto = ( - fromPeer: msg.fromPeer, + fromPeer: msg.fromPeer.pretty, data: msg.data.shortLog, seqno: msg.seqno.shortLog, topicIDs: $msg.topicIDs, diff --git a/libp2p/switch.nim b/libp2p/switch.nim index f7b9aab..d31067c 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -205,10 +205,6 @@ proc disconnect*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} = s.connManager.dropPeer(peerId) proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = - logScope: - conn = $conn - oid = $conn.oid - let sconn = await s.secure(conn) # secure the connection if isNil(sconn): raise newException(CatchableError, @@ -218,21 +214,29 @@ proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, g raise newException(CatchableError, "current version of nim-libp2p requires that secure protocol negotiates peerid") - trace "upgrading connection" + trace "upgrading connection", conn = $sconn, uoid = $conn.oid let muxer = await s.mux(sconn) # mux it if possible if muxer == nil: # TODO this might be relaxed in the future raise newException(CatchableError, "a muxer is required for outgoing connections") - await s.identify(muxer) + try: + await s.identify(muxer) + except CatchableError as exc: + # Identify is non-essential, though if it fails, it might indicate that + # the connection was closed already - this will be picked up by the read + # loop + debug "Could not identify connection", + err = exc.msg, conn = $conn, uoid = $conn.oid if isNil(sconn.peerInfo): await sconn.close() raise newException(CatchableError, "unable to identify connection, stopping upgrade") - trace "successfully upgraded outgoing connection", oid = sconn.oid + trace "successfully upgraded outgoing connection", + conn = $sconn, uoid = $conn.oid, oid = $sconn.oid return sconn @@ -267,7 +271,8 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = except CancelledError as exc: raise exc except CatchableError as exc: - debug "ending secured handler", err = exc.msg + debug "ending secured handler", + err = exc.msg, conn = $conn, oid = $conn.oid if (await ms.select(conn)): # just handshake # add the secure handlers @@ -281,9 +286,6 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = proc internalConnect(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]): Future[Connection] {.async.} = - logScope: - peer = peerId - if s.peerInfo.peerId == peerId: raise newException(CatchableError, "can't dial self!") @@ -300,12 +302,12 @@ proc internalConnect(s: Switch, # This connection should already have been removed from the connection # manager - it's essentially a bug that we end up here - we'll fail # for now, hoping that this will clean themselves up later... - warn "dead connection in connection manager" + warn "dead connection in connection manager", peer = $peerId await conn.close() raise newException(CatchableError, "Zombie connection encountered") - trace "Reusing existing connection", oid = $conn.oid, - direction = $conn.dir + trace "Reusing existing connection", + oid = $conn.oid, direction = $conn.dir, peer = $peerId return conn @@ -317,10 +319,10 @@ proc internalConnect(s: Switch, let dialed = try: await t.dial(a) except CancelledError as exc: - trace "dialing canceled", exc = exc.msg + trace "dialing canceled", exc = exc.msg, peer = $peerId raise exc except CatchableError as exc: - trace "dialing failed", exc = exc.msg + trace "dialing failed", exc = exc.msg, peer = $peerId libp2p_failed_dials.inc() continue # Try the next address @@ -344,6 +346,7 @@ proc internalConnect(s: Switch, conn = upgraded trace "dial successful", + peer = $peerId, oid = $upgraded.oid, peerInfo = shortLog(upgraded.peerInfo) break @@ -354,6 +357,14 @@ proc internalConnect(s: Switch, if isNil(conn): # None of the addresses connected raise newException(CatchableError, "Unable to establish outgoing link") + if conn.closed(): + # This can happen if one of the peer event handlers deems the peer + # unworthy and disconnects it + raise newLPStreamClosedError() + + await s.triggerConnEvent( + peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: false)) + proc peerCleanup() {.async.} = try: await conn.closeEvent.wait() @@ -362,21 +373,15 @@ proc internalConnect(s: Switch, except CancelledError: # This is top-level procedure which will work as separate task, so it # do not need to propogate CancelledError. - trace "Unexpected cancellation in switch peer connect cleanup" + trace "Unexpected cancellation in switch peer connect cleanup", + peer = $peerId except CatchableError as exc: trace "Unexpected exception in switch peer connect cleanup", - errMsg = exc.msg + errMsg = exc.msg, peer = $peerId # All the errors are handled inside `cleanup()` procedure. asyncSpawn peerCleanup() - await s.triggerConnEvent( - peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: false)) - - if conn.closed(): - # This can happen if one of the peer event handlers deems the peer - # unworthy and disconnects it - raise newException(CatchableError, "Connection closed during handshake") return conn @@ -498,7 +503,13 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = try: await s.identify(muxer) + except CatchableError as exc: + # Identify is non-essential, though if it fails, it might indicate that + # the connection was closed already - this will be picked up by the read + # loop + debug "Could not identify connection", err = exc.msg + try: let peerId = muxer.connection.peerInfo.peerId proc peerCleanup() {.async.} = @@ -509,9 +520,9 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = except CancelledError: # This is top-level procedure which will work as separate task, so it # do not need to propogate CancelledError. - trace "Unexpected cancellation in switch muxer cleanup" + debug "Unexpected cancellation in switch muxer cleanup" except CatchableError as exc: - trace "Unexpected exception in switch muxer cleanup", + debug "Unexpected exception in switch muxer cleanup", errMsg = exc.msg proc peerStartup() {.async.} = @@ -522,15 +533,16 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = except CancelledError: # This is top-level procedure which will work as separate task, so it # do not need to propogate CancelledError. - trace "Unexpected cancellation in switch muxer startup" + debug "Unexpected cancellation in switch muxer startup" except CatchableError as exc: - trace "Unexpected exception in switch muxer startup", + debug "Unexpected exception in switch muxer startup", errMsg = exc.msg + # All the errors are handled inside `peerStartup()` procedure. + asyncSpawn peerStartup() + # All the errors are handled inside `peerCleanup()` procedure. asyncSpawn peerCleanup() - # All the errors are handled inside `peerStartup()` procedure. - asyncSpawn peerStartup() except CancelledError as exc: await muxer.close()