diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 859a240..dfb2e9f 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -156,7 +156,19 @@ proc identify(s: Switch, conn: Connection) {.async, gcsafe.} = trace "identify: identified remote peer", peer = $conn.peerInfo -proc mux(s: Switch, conn: Connection) {.async, gcsafe.} = +proc identify(s: Switch, muxer: Muxer) {.async, gcsafe.} = + # new stream for identify + var stream = await muxer.newStream() + + defer: + if not(isNil(stream)): + await stream.close() # close identify stream + + # do identify first, so that we have a + # PeerInfo in case we didn't before + await s.identify(stream) + +proc mux(s: Switch, conn: Connection): Future[Muxer] {.async, gcsafe.} = ## mux incoming connection trace "muxing connection", peer = $conn @@ -171,37 +183,24 @@ proc mux(s: Switch, conn: Connection) {.async, gcsafe.} = # create new muxer for connection let muxer = s.muxers[muxerName].newMuxer(conn) - s.connManager.storeMuxer(muxer) - - trace "found a muxer", name = muxerName, peer = $conn # install stream handler muxer.streamHandler = s.streamHandler - # new stream for identify - var stream = await muxer.newStream() + s.connManager.storeOutgoing(muxer.connection) + s.connManager.storeMuxer(muxer) - defer: - if not(isNil(stream)): - await stream.close() # close identify stream + trace "found a muxer", name = muxerName, peer = $conn - # call muxer handler, this should - # not end until muxer ends + # start muxer read loop - the future will complete when loop ends let handlerFut = muxer.handle() - # do identify first, so that we have a - # PeerInfo in case we didn't before - await s.identify(stream) - - if isNil(conn.peerInfo): - await muxer.close() - raise newException(CatchableError, - "unable to identify peer, aborting upgrade") - # store it in muxed connections if we have a peer for it trace "adding muxer for peer", peer = conn.peerInfo.id s.connManager.storeMuxer(muxer, handlerFut) # update muxer with handler + return muxer + proc disconnect*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} = s.connManager.dropPeer(peerId) @@ -215,8 +214,15 @@ proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, g raise newException(CatchableError, "unable to secure connection, stopping upgrade") + if sconn.peerInfo.isNil: + raise newException(CatchableError, + "current version of nim-libp2p requires that secure protocol negotiates peerid") + trace "upgrading connection" - await s.mux(sconn) # mux it if possible + let muxer = await s.mux(sconn) # mux it if possible + + await s.identify(muxer) + if isNil(sconn.peerInfo): await sconn.close() raise newException(CatchableError, @@ -332,7 +338,6 @@ proc internalConnect(s: Switch, doAssert not isNil(upgraded), "connection died after upgradeOutgoing" - s.connManager.storeOutgoing(upgraded) conn = upgraded trace "dial successful", oid = $upgraded.oid, @@ -465,32 +470,21 @@ proc stop*(s: Switch) {.async.} = trace "switch stopped" proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = - var stream = await muxer.newStream() - defer: - if not(isNil(stream)): - await stream.close() + if muxer.connection.peerInfo.isNil: + warn "This version of nim-libp2p requires secure protocol to negotiate peerid" + await muxer.close() + return + + # store incoming connection + s.connManager.storeIncoming(muxer.connection) + + # store muxer and muxed connection + s.connManager.storeMuxer(muxer) try: - # once we got a muxed connection, attempt to - # identify it - await s.identify(stream) - if isNil(stream.peerInfo): - await muxer.close() - return - - let - peerInfo = stream.peerInfo - peerId = peerInfo.peerId - muxer.connection.peerInfo = peerInfo - - # store incoming connection - s.connManager.storeIncoming(muxer.connection) - - # store muxer and muxed connection - s.connManager.storeMuxer(muxer) - - trace "got new muxer", peer = shortLog(peerInfo) + await s.identify(muxer) + let peerId = muxer.connection.peerInfo.peerId muxer.connection.closeEvent.wait() .addCallback do(udata: pointer): asyncCheck s.triggerConnEvent(