diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index e411d40ad..f5298c506 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -294,7 +294,7 @@ proc cleanupConn(c: ConnManager, conn: Connection) {.async.} = trace "Connection cleaned up", conn -proc onConnUpgraded(c: ConnManager, conn: Connection) {.async.} = +proc onConnUpgraded*(c: ConnManager, conn: Connection) {.async.} = try: trace "Triggering connect events", conn conn.upgrade() @@ -469,8 +469,6 @@ proc storeMuxer*(c: ConnManager, trace "Stored muxer", muxer, handle = not handle.isNil, connections = c.conns.len - asyncSpawn c.onConnUpgraded(muxer.connection) - proc getStream*(c: ConnManager, peerId: PeerId, dir: Direction): Future[Connection] {.async, gcsafe.} = diff --git a/libp2p/upgrademngrs/muxedupgrade.nim b/libp2p/upgrademngrs/muxedupgrade.nim index 6c8720e90..2031eea40 100644 --- a/libp2p/upgrademngrs/muxedupgrade.nim +++ b/libp2p/upgrademngrs/muxedupgrade.nim @@ -71,7 +71,10 @@ proc mux*( # install stream handler muxer.streamHandler = self.streamHandler - let handler = muxer.handle() + self.connManager.storeConn(conn) + + # store it in muxed connections if we have a peer for it + self.connManager.storeMuxer(muxer, muxer.handle()) # store muxer and start read loop try: await self.identify(muxer) @@ -81,10 +84,7 @@ proc mux*( # loop debug "Could not identify connection", conn, msg = exc.msg - self.connManager.storeConn(conn) - - # store it in muxed connections if we have a peer for it - self.connManager.storeMuxer(muxer, handler) # store muxer and start read loop + await self.connManager.onConnUpgraded(muxer.connection) return muxer @@ -176,6 +176,12 @@ proc muxerHandler( let conn = muxer.connection + # store incoming connection + self.connManager.storeConn(conn) + + # store muxer and muxed connection + self.connManager.storeMuxer(muxer) + try: await self.identify(muxer) when defined(libp2p_agents_metrics): @@ -199,11 +205,7 @@ proc muxerHandler( await muxer.close() trace "Exception in muxer handler", conn, msg = exc.msg - # store incoming connection - self.connManager.storeConn(conn) - - # store muxer and muxed connection - self.connManager.storeMuxer(muxer) + await self.connManager.onConnUpgraded(muxer.connection) proc new*( T: type MuxedUpgrade,