From cfcda3c3effc38d03ba33daea946954b05cb1109 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Mon, 17 Aug 2020 13:29:45 +0200 Subject: [PATCH] work around race conditions between identify and other protocols when identify is run on incoming connections, the connmanager tables are updated too late for incoming connections to properly be handled this is a quickfix that will eventually need cleaning up --- libp2p/switch.nim | 84 ++++++++++++++++++++++------------------------- 1 file changed, 39 insertions(+), 45 deletions(-) diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 859a240..dfb2e9f 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -156,7 +156,19 @@ proc identify(s: Switch, conn: Connection) {.async, gcsafe.} = trace "identify: identified remote peer", peer = $conn.peerInfo -proc mux(s: Switch, conn: Connection) {.async, gcsafe.} = +proc identify(s: Switch, muxer: Muxer) {.async, gcsafe.} = + # new stream for identify + var stream = await muxer.newStream() + + defer: + if not(isNil(stream)): + await stream.close() # close identify stream + + # do identify first, so that we have a + # PeerInfo in case we didn't before + await s.identify(stream) + +proc mux(s: Switch, conn: Connection): Future[Muxer] {.async, gcsafe.} = ## mux incoming connection trace "muxing connection", peer = $conn @@ -171,37 +183,24 @@ proc mux(s: Switch, conn: Connection) {.async, gcsafe.} = # create new muxer for connection let muxer = s.muxers[muxerName].newMuxer(conn) - s.connManager.storeMuxer(muxer) - - trace "found a muxer", name = muxerName, peer = $conn # install stream handler muxer.streamHandler = s.streamHandler - # new stream for identify - var stream = await muxer.newStream() + s.connManager.storeOutgoing(muxer.connection) + s.connManager.storeMuxer(muxer) - defer: - if not(isNil(stream)): - await stream.close() # close identify stream + trace "found a muxer", name = muxerName, peer = $conn - # call muxer handler, this should - # not end until muxer ends + # start muxer read loop - the future will complete when loop ends let handlerFut = muxer.handle() - # do identify first, so that we have a - # PeerInfo in case we didn't before - await s.identify(stream) - - if isNil(conn.peerInfo): - await muxer.close() - raise newException(CatchableError, - "unable to identify peer, aborting upgrade") - # store it in muxed connections if we have a peer for it trace "adding muxer for peer", peer = conn.peerInfo.id s.connManager.storeMuxer(muxer, handlerFut) # update muxer with handler + return muxer + proc disconnect*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} = s.connManager.dropPeer(peerId) @@ -215,8 +214,15 @@ proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, g raise newException(CatchableError, "unable to secure connection, stopping upgrade") + if sconn.peerInfo.isNil: + raise newException(CatchableError, + "current version of nim-libp2p requires that secure protocol negotiates peerid") + trace "upgrading connection" - await s.mux(sconn) # mux it if possible + let muxer = await s.mux(sconn) # mux it if possible + + await s.identify(muxer) + if isNil(sconn.peerInfo): await sconn.close() raise newException(CatchableError, @@ -332,7 +338,6 @@ proc internalConnect(s: Switch, doAssert not isNil(upgraded), "connection died after upgradeOutgoing" - s.connManager.storeOutgoing(upgraded) conn = upgraded trace "dial successful", oid = $upgraded.oid, @@ -465,32 +470,21 @@ proc stop*(s: Switch) {.async.} = trace "switch stopped" proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = - var stream = await muxer.newStream() - defer: - if not(isNil(stream)): - await stream.close() + if muxer.connection.peerInfo.isNil: + warn "This version of nim-libp2p requires secure protocol to negotiate peerid" + await muxer.close() + return + + # store incoming connection + s.connManager.storeIncoming(muxer.connection) + + # store muxer and muxed connection + s.connManager.storeMuxer(muxer) try: - # once we got a muxed connection, attempt to - # identify it - await s.identify(stream) - if isNil(stream.peerInfo): - await muxer.close() - return - - let - peerInfo = stream.peerInfo - peerId = peerInfo.peerId - muxer.connection.peerInfo = peerInfo - - # store incoming connection - s.connManager.storeIncoming(muxer.connection) - - # store muxer and muxed connection - s.connManager.storeMuxer(muxer) - - trace "got new muxer", peer = shortLog(peerInfo) + await s.identify(muxer) + let peerId = muxer.connection.peerInfo.peerId muxer.connection.closeEvent.wait() .addCallback do(udata: pointer): asyncCheck s.triggerConnEvent(