diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 3ec69adde..c9ca57e96 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -31,9 +31,9 @@ type method subscribeTopic*(f: FloodSub, topic: string, subscribe: bool, - peerId: PeerID) {.gcsafe, async.} = - await procCall PubSub(f).subscribeTopic(topic, subscribe, peerId) - let peer = f.peers.getOrDefault(peerId) + peer: PubsubPeer) {.gcsafe.} = + procCall PubSub(f).subscribeTopic(topic, subscribe, peer) + if topic notin f.floodsub: f.floodsub[topic] = initHashSet[PubSubPeer]() diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 4ca4d7d67..64e34f9cb 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -857,25 +857,20 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) = method subscribeTopic*(g: GossipSub, topic: string, subscribe: bool, - peerId: PeerID) {.gcsafe, async.} = - await procCall FloodSub(g).subscribeTopic(topic, subscribe, peerId) + peer: PubSubPeer) {.gcsafe.} = + procCall FloodSub(g).subscribeTopic(topic, subscribe, peer) logScope: - peer = $peerId + peer = $peer.id topic - - let peer = g.peers.getOrDefault(peerId) - if peer == nil: - # floodsub method logs a trace line already - return - + g.onNewPeer(peer) if subscribe: trace "peer subscribed to topic" # subscribe remote peer to the topic discard g.gossipsub.addPeer(topic, peer) - if peerId in g.parameters.directPeers: + if peer.peerId in g.parameters.directPeers: discard g.explicit.addPeer(topic, peer) else: trace "peer unsubscribed from topic" @@ -883,7 +878,7 @@ method subscribeTopic*(g: GossipSub, g.gossipsub.removePeer(topic, peer) g.mesh.removePeer(topic, peer) g.fanout.removePeer(topic, peer) - if peerId in g.parameters.directPeers: + if peer.peerId in g.parameters.directPeers: g.explicit.removePeer(topic, peer) when defined(libp2p_expensive_metrics): @@ -898,10 +893,6 @@ method subscribeTopic*(g: GossipSub, trace "gossip peers", peers = g.gossipsub.peers(topic), topic - # also rebalance current topic if we are subbed to - if topic in g.topics: - await g.rebalanceMesh(topic) - proc handleGraft(g: GossipSub, peer: PubSubPeer, grafts: seq[ControlGraft]): seq[ControlPrune] = diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 409fb0b9e..e8a055489 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -61,7 +61,6 @@ type triggerSelf*: bool # trigger own local handler on publish verifySignature*: bool # enable signature verification sign*: bool # enable message signing - cleanupLock: AsyncLock validators*: Table[string, HashSet[ValidatorHandler]] observers: ref seq[PubSubObserver] # ref as in smart_ptr msgIdProvider*: MsgIdProvider # Turn message into message id (not nil) @@ -127,7 +126,7 @@ proc sendSubs*(p: PubSub, method subscribeTopic*(p: PubSub, topic: string, subscribe: bool, - peerId: PeerID) {.base, async.} = + peer: PubSubPeer) {.base.} = # called when remote peer subscribes to a topic discard @@ -142,7 +141,7 @@ method rpcHandler*(p: PubSub, if m.subscriptions.len > 0: # if there are any subscriptions for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic trace "about to subscribe to topic", topicId = s.topic - await p.subscribeTopic(s.topic, s.subscribe, peer.peerId) + p.subscribeTopic(s.topic, s.subscribe, peer) method onNewPeer(p: PubSub, peer: PubSubPeer) {.base.} = discard @@ -354,7 +353,6 @@ proc init*[PubParams: object | bool]( sign: sign, peers: initTable[PeerID, PubSubPeer](), topics: initTable[string, Topic](), - cleanupLock: newAsyncLock(), msgIdProvider: msgIdProvider) else: result = P(switch: switch, @@ -364,7 +362,6 @@ proc init*[PubParams: object | bool]( sign: sign, peers: initTable[PeerID, PubSubPeer](), topics: initTable[string, Topic](), - cleanupLock: newAsyncLock(), msgIdProvider: msgIdProvider, parameters: parameters) result.initPubSub() diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 8bdda1ff7..503e9e8a4 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/[hashes, options, sequtils, strutils, tables, hashes, sets] +import std/[sequtils, strutils, tables, hashes, sets] import chronos, chronicles, nimcrypto/sha2, metrics import rpc/[messages, message, protobuf], timedcache, @@ -46,7 +46,7 @@ type recvdRpcCache: TimedCache[string] # cache for already received messages observers*: ref seq[PubSubObserver] # ref as in smart_ptr subscribed*: bool # are we subscribed to this peer - sendLock*: AsyncLock # send connection lock + dialLock: AsyncLock score*: float64 iWantBudget*: int @@ -126,11 +126,74 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = debug "exiting pubsub peer read loop" await conn.close() + if p.sendConn == conn: + p.sendConn = nil + except CancelledError as exc: raise exc except CatchableError as exc: trace "Exception occurred in PubSubPeer.handle", exc = exc.msg +proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} = + # get a cached send connection or create a new one + block: # check if there's an existing connection that can be reused + let current = p.sendConn + + if not current.isNil: + if not (current.closed() or current.atEof): + # The existing send connection looks like it might work - reuse it + trace "Reusing existing connection", oid = $current.oid + return current + + # Send connection is set but broken - get rid of it + p.sendConn = nil + + # Careful, p.sendConn might change after here! + await current.close() # TODO this might be unnecessary + + try: + # Testing has demonstrated that when we perform concurrent meshsub dials + # and later close one of them, other implementations such as rust-libp2p + # become deaf to our messages (potentially due to the clean-up associated + # with closing connections). To prevent this, we use a lock that ensures + # that only a single dial will be performed for each peer. + # + # Nevertheless, this approach is still quite problematic because the gossip + # sends and their respective dials may be started from the mplex read loop. + # This may cause the read loop to get stuck which ultimately results in a + # deadlock when the other side tries to send us any other message that must + # be routed through mplex (it will be stuck on `pushTo`). Such messages + # naturally arise in the process of dialing itself. + # + # See https://github.com/status-im/nim-libp2p/issues/337 + # + # One possible long-term solution is to avoid "blocking" the mplex read + # loop by making the gossip send non-blocking through the use of a queue. + await p.dialLock.acquire() + + # Another concurrent dial may have populated p.sendConn + if p.sendConn != nil: + let current = p.sendConn + if not current.isNil: + if not (current.closed() or current.atEof): + # The existing send connection looks like it might work - reuse it + trace "Reusing existing connection", oid = $current.oid + return current + + # Grab a new send connection + let newConn = await p.switch.dial(p.peerId, p.codec) # ...and here + if newConn.isNil: + return nil + + trace "Caching new send connection", oid = $newConn.oid + p.sendConn = newConn + asyncCheck p.handle(newConn) # start a read loop on the new connection + return newConn + + finally: + if p.dialLock.locked: + p.dialLock.release() + proc send*( p: PubSubPeer, msg: RPCMsg, @@ -163,29 +226,19 @@ proc send*( libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id]) return + var conn: Connection try: trace "about to send message" - if not p.connected: - try: - await p.sendLock.acquire() - trace "no send connection, dialing peer" - # get a send connection if there is none - p.sendConn = await p.switch.dial( - p.peerId, p.codec) + conn = await p.getSendConn() - if not p.connected: - raise newException(CatchableError, "unable to get send pubsub stream") + if conn == nil: + debug "Couldn't get send connection, dropping message" + return + trace "sending encoded msgs to peer", connId = $conn.oid + await conn.writeLp(encoded).wait(timeout) - # install a reader on the send connection - asyncCheck p.handle(p.sendConn) - finally: - if p.sendLock.locked: - p.sendLock.release() - - trace "sending encoded msgs to peer" - await p.sendConn.writeLp(encoded).wait(timeout) p.sentRpcCache.put(digest) - trace "sent pubsub message to remote" + trace "sent pubsub message to remote", connId = $conn.oid when defined(libp2p_expensive_metrics): for x in mm.messages: @@ -195,9 +248,10 @@ proc send*( except CatchableError as exc: trace "unable to send to remote", exc = exc.msg - if not(isNil(p.sendConn)): - await p.sendConn.close() - p.sendConn = nil + # Next time sendConn is used, it will be have its close flag set and thus + # will be recycled + if not isNil(conn): + await conn.close() raise exc @@ -213,4 +267,4 @@ proc newPubSubPeer*(peerId: PeerID, result.peerId = peerId result.sentRpcCache = newTimedCache[string](2.minutes) result.recvdRpcCache = newTimedCache[string](2.minutes) - result.sendLock = newAsyncLock() + result.dialLock = newAsyncLock() diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index 6fcaab52a..a0fb92141 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -43,7 +43,7 @@ logScope: topics = "bufferstream" const - DefaultBufferSize* = 1024 + DefaultBufferSize* = 102400 const BufferStreamTrackerName* = "libp2p.bufferstream" diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 9f5b1eec4..03d4c2022 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -156,7 +156,19 @@ proc identify(s: Switch, conn: Connection) {.async, gcsafe.} = trace "identify: identified remote peer", peer = $conn.peerInfo -proc mux(s: Switch, conn: Connection) {.async, gcsafe.} = +proc identify(s: Switch, muxer: Muxer) {.async, gcsafe.} = + # new stream for identify + var stream = await muxer.newStream() + + defer: + if not(isNil(stream)): + await stream.close() # close identify stream + + # do identify first, so that we have a + # PeerInfo in case we didn't before + await s.identify(stream) + +proc mux(s: Switch, conn: Connection): Future[Muxer] {.async, gcsafe.} = ## mux incoming connection trace "muxing connection", peer = $conn @@ -171,37 +183,24 @@ proc mux(s: Switch, conn: Connection) {.async, gcsafe.} = # create new muxer for connection let muxer = s.muxers[muxerName].newMuxer(conn) - s.connManager.storeMuxer(muxer) - - trace "found a muxer", name = muxerName, peer = $conn # install stream handler muxer.streamHandler = s.streamHandler - # new stream for identify - var stream = await muxer.newStream() + s.connManager.storeOutgoing(muxer.connection) + s.connManager.storeMuxer(muxer) - defer: - if not(isNil(stream)): - await stream.close() # close identify stream + trace "found a muxer", name = muxerName, peer = $conn - # call muxer handler, this should - # not end until muxer ends + # start muxer read loop - the future will complete when loop ends let handlerFut = muxer.handle() - # do identify first, so that we have a - # PeerInfo in case we didn't before - await s.identify(stream) - - if isNil(conn.peerInfo): - await muxer.close() - raise newException(CatchableError, - "unable to identify peer, aborting upgrade") - # store it in muxed connections if we have a peer for it trace "adding muxer for peer", peer = conn.peerInfo.id s.connManager.storeMuxer(muxer, handlerFut) # update muxer with handler + return muxer + proc disconnect*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} = s.connManager.dropPeer(peerId) @@ -215,8 +214,19 @@ proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, g raise newException(CatchableError, "unable to secure connection, stopping upgrade") + if sconn.peerInfo.isNil: + raise newException(CatchableError, + "current version of nim-libp2p requires that secure protocol negotiates peerid") + trace "upgrading connection" - await s.mux(sconn) # mux it if possible + let muxer = await s.mux(sconn) # mux it if possible + if muxer == nil: + # TODO this might be relaxed in the future + raise newException(CatchableError, + "a muxer is required for outgoing connections") + + await s.identify(muxer) + if isNil(sconn.peerInfo): await sconn.close() raise newException(CatchableError, @@ -337,7 +347,6 @@ proc internalConnect(s: Switch, doAssert not isNil(upgraded), "connection died after upgradeOutgoing" - s.connManager.storeOutgoing(upgraded) conn = upgraded trace "dial successful", oid = $upgraded.oid, @@ -470,32 +479,21 @@ proc stop*(s: Switch) {.async.} = trace "switch stopped" proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = - var stream = await muxer.newStream() - defer: - if not(isNil(stream)): - await stream.close() + if muxer.connection.peerInfo.isNil: + warn "This version of nim-libp2p requires secure protocol to negotiate peerid" + await muxer.close() + return + + # store incoming connection + s.connManager.storeIncoming(muxer.connection) + + # store muxer and muxed connection + s.connManager.storeMuxer(muxer) try: - # once we got a muxed connection, attempt to - # identify it - await s.identify(stream) - if isNil(stream.peerInfo): - await muxer.close() - return - - let - peerInfo = stream.peerInfo - peerId = peerInfo.peerId - muxer.connection.peerInfo = peerInfo - - # store incoming connection - s.connManager.storeIncoming(muxer.connection) - - # store muxer and muxed connection - s.connManager.storeMuxer(muxer) - - trace "got new muxer", peer = shortLog(peerInfo) + await s.identify(muxer) + let peerId = muxer.connection.peerInfo.peerId muxer.connection.closeEvent.wait() .addCallback do(udata: pointer): asyncCheck s.triggerConnEvent( diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index 870cfaf56..fc6a89a4d 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -15,9 +15,7 @@ import utils, ../../libp2p/[errors, switch, stream/connection, - stream/bufferstream, crypto/crypto, - protocols/pubsub/pubsubpeer, protocols/pubsub/pubsub, protocols/pubsub/floodsub, protocols/pubsub/rpc/messages,