From b12145dff778bbda0e8115e0643662bc11f8cbfa Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Mon, 17 Aug 2020 12:10:22 +0200 Subject: [PATCH 1/8] avoid crash when subscribe is received (#333) ...by making subscribeTopic synchronous, avoiding a peer table lookup completely. rebalanceMesh will be called a second later - it's fine --- libp2p/protocols/pubsub/floodsub.nim | 6 +++--- libp2p/protocols/pubsub/gossipsub.nim | 15 +++------------ libp2p/protocols/pubsub/pubsub.nim | 4 ++-- 3 files changed, 8 insertions(+), 17 deletions(-) diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 3ec69adde..c9ca57e96 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -31,9 +31,9 @@ type method subscribeTopic*(f: FloodSub, topic: string, subscribe: bool, - peerId: PeerID) {.gcsafe, async.} = - await procCall PubSub(f).subscribeTopic(topic, subscribe, peerId) - let peer = f.peers.getOrDefault(peerId) + peer: PubsubPeer) {.gcsafe.} = + procCall PubSub(f).subscribeTopic(topic, subscribe, peer) + if topic notin f.floodsub: f.floodsub[topic] = initHashSet[PubSubPeer]() diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 9195207c0..468959f37 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -281,18 +281,13 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) = method subscribeTopic*(g: GossipSub, topic: string, subscribe: bool, - peerId: PeerID) {.gcsafe, async.} = - await procCall FloodSub(g).subscribeTopic(topic, subscribe, peerId) + peer: PubSubPeer) {.gcsafe.} = + procCall FloodSub(g).subscribeTopic(topic, subscribe, peer) logScope: - peer = $peerId + peer = $peer.id topic - let peer = g.peers.getOrDefault(peerId) - if peer == nil: - # floodsub method logs a debug line already - return - if subscribe: trace "peer subscribed to topic" # subscribe remote peer to the topic @@ -316,10 +311,6 @@ method subscribeTopic*(g: GossipSub, trace "gossip peers", peers = g.gossipsub.peers(topic), topic - # also rebalance current topic if we are subbed to - if topic in g.topics: - await g.rebalanceMesh(topic) - proc handleGraft(g: GossipSub, peer: PubSubPeer, grafts: seq[ControlGraft]): seq[ControlPrune] = diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 17e0203e8..72d5f527e 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -119,7 +119,7 @@ proc sendSubs*(p: PubSub, method subscribeTopic*(p: PubSub, topic: string, subscribe: bool, - peerId: PeerID) {.base, async.} = + peer: PubSubPeer) {.base.} = # called when remote peer subscribes to a topic discard @@ -134,7 +134,7 @@ method rpcHandler*(p: PubSub, if m.subscriptions.len > 0: # if there are any subscriptions for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic trace "about to subscribe to topic", topicId = s.topic - await p.subscribeTopic(s.topic, s.subscribe, peer.peerId) + p.subscribeTopic(s.topic, s.subscribe, peer) proc getOrCreatePeer*( p: PubSub, From f46bf0faa47ac4a8b62c6cb381e69a7b3f218a5c Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Mon, 17 Aug 2020 12:38:27 +0200 Subject: [PATCH 2/8] remove send lock (#334) * remove send lock When mplex receives data it will block until a reader has processed the data. Thus, when a large message is received, such as a gossipsub subscription table, all of mplex will be blocked until all reading is finished. However, if at the same time a `dial` to establish a gossipsub send connection is ongoing, that `dial` will be blocked because mplex is no longer reading data - specifically, it might indeed be the connection that's processing the previous data that is waiting for a send connection. There are other problems with the current code: * If an exception is raised, it is not necessarily raised for the same connection as `p.sendConn`, so resetting `p.sendConn` in the exception handling is wrong * `p.isConnected` is checked before taking the lock - thus, if it returns false, a new dial will be started. If a new task enters `send` before dial is finished, it will also determine `p.isConnected` is false, then get stuck on the lock - when the previous task finishes and releases the lock, the new task will _also_ dial and thus reset `p.sendConn` causing a leak. * prefer existing connection simplifies flow --- libp2p/protocols/pubsub/pubsub.nim | 2 - libp2p/protocols/pubsub/pubsubpeer.nim | 79 ++++++++++++++++++-------- tests/pubsub/testfloodsub.nim | 2 - 3 files changed, 55 insertions(+), 28 deletions(-) diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 72d5f527e..534dbfa07 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -55,7 +55,6 @@ type triggerSelf*: bool # trigger own local handler on publish verifySignature*: bool # enable signature verification sign*: bool # enable message signing - cleanupLock: AsyncLock validators*: Table[string, HashSet[ValidatorHandler]] observers: ref seq[PubSubObserver] # ref as in smart_ptr msgIdProvider*: MsgIdProvider # Turn message into message id (not nil) @@ -338,7 +337,6 @@ proc init*( sign: sign, peers: initTable[PeerID, PubSubPeer](), topics: initTable[string, Topic](), - cleanupLock: newAsyncLock(), msgIdProvider: msgIdProvider) result.initPubSub() diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index ac47e811c..82bcd0c8c 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/[hashes, options, sequtils, strutils, tables] +import std/[hashes, options, strutils, tables] import chronos, chronicles, nimcrypto/sha2, metrics import rpc/[messages, message, protobuf], timedcache, @@ -39,14 +39,13 @@ type PubSubPeer* = ref object of RootObj switch*: Switch # switch instance to dial peers codec*: string # the protocol that this peer joined from - sendConn: Connection + sendConn: Connection # cached send connection peerId*: PeerID handler*: RPCHandler sentRpcCache: TimedCache[string] # cache for already sent messages recvdRpcCache: TimedCache[string] # cache for already received messages observers*: ref seq[PubSubObserver] # ref as in smart_ptr subscribed*: bool # are we subscribed to this peer - sendLock*: AsyncLock # send connection lock RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} @@ -117,11 +116,53 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = debug "exiting pubsub peer read loop" await conn.close() + if p.sendConn == conn: + p.sendConn = nil + except CancelledError as exc: raise exc except CatchableError as exc: trace "Exception occurred in PubSubPeer.handle", exc = exc.msg +proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} = + # get a cached send connection or create a new one + block: # check if there's an existing connection that can be reused + let current = p.sendConn + + if not current.isNil: + if not (current.closed() or current.atEof): + # The existing send connection looks like it might work - reuse it + return current + + # Send connection is set but broken - get rid of it + p.sendConn = nil + + # Careful, p.sendConn might change after here! + await current.close() # TODO this might be unnecessary + + # Grab a new send connection + let newConn = await p.switch.dial(p.peerId, p.codec) # ...and here + if newConn == nil: + return p.sendConn # A concurrent attempt perhaps succeeded? + + # Because of the awaits above, a concurrent `getSendConn` call might have + # set up a send connection already. We cannot take a lock here because + # it might block the reading of data from mplex which will cause its + # backpressure handling to stop reading from the socket and thus prevent the + # channel negotiation from finishing + if p.sendConn != nil and not(p.sendConn.closed or p.sendConn.atEof): + let current = p.sendConn + # Either the new or the old connection could potentially be closed - it's + # slightly easier to sequence the closing of the new connection because the + # old one might still be in use. + await newConn.close() + return current + + p.sendConn = newConn + asyncCheck p.handle(newConn) # start a read loop on the new connection + + return newConn + proc send*( p: PubSubPeer, msg: RPCMsg, @@ -154,27 +195,17 @@ proc send*( libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id]) return + var conn: Connection try: trace "about to send message" - if not p.connected: - try: - await p.sendLock.acquire() - trace "no send connection, dialing peer" - # get a send connection if there is none - p.sendConn = await p.switch.dial( - p.peerId, p.codec) - - if not p.connected: - raise newException(CatchableError, "unable to get send pubsub stream") - - # install a reader on the send connection - asyncCheck p.handle(p.sendConn) - finally: - if p.sendLock.locked: - p.sendLock.release() + conn = await p.getSendConn() + if conn == nil: + debug "Couldn't get send connection, dropping message" + return trace "sending encoded msgs to peer" - await p.sendConn.writeLp(encoded).wait(timeout) + await conn.writeLp(encoded).wait(timeout) + p.sentRpcCache.put(digest) trace "sent pubsub message to remote" @@ -186,9 +217,10 @@ proc send*( except CatchableError as exc: trace "unable to send to remote", exc = exc.msg - if not(isNil(p.sendConn)): - await p.sendConn.close() - p.sendConn = nil + # Next time sendConn is used, it will be have its close flag set and thus + # will be recycled + if not isNil(conn): + await conn.close() raise exc @@ -204,4 +236,3 @@ proc newPubSubPeer*(peerId: PeerID, result.peerId = peerId result.sentRpcCache = newTimedCache[string](2.minutes) result.recvdRpcCache = newTimedCache[string](2.minutes) - result.sendLock = newAsyncLock() diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index 870cfaf56..fc6a89a4d 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -15,9 +15,7 @@ import utils, ../../libp2p/[errors, switch, stream/connection, - stream/bufferstream, crypto/crypto, - protocols/pubsub/pubsubpeer, protocols/pubsub/pubsub, protocols/pubsub/floodsub, protocols/pubsub/rpc/messages, From 53877e97bd9c2833b9231b8bf41257c5bc10bca3 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Mon, 17 Aug 2020 12:39:25 +0200 Subject: [PATCH 3/8] trace logs --- libp2p/protocols/pubsub/pubsubpeer.nim | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 82bcd0c8c..ac1c2f1bc 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -132,6 +132,7 @@ proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} = if not current.isNil: if not (current.closed() or current.atEof): # The existing send connection looks like it might work - reuse it + trace "Reusing existing connection", oid = $current.oid return current # Send connection is set but broken - get rid of it @@ -155,9 +156,12 @@ proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} = # Either the new or the old connection could potentially be closed - it's # slightly easier to sequence the closing of the new connection because the # old one might still be in use. + trace "Closing redundant connection", + oid = $current.oid, newConn = $newConn.oid await newConn.close() return current + trace "Caching new send connection", oid = $newConn.oid p.sendConn = newConn asyncCheck p.handle(newConn) # start a read loop on the new connection From 790b67c9235852b74c4fc8591657cece9ad5b5fc Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Mon, 17 Aug 2020 12:45:54 +0200 Subject: [PATCH 4/8] work around bufferstream deadlock (#332) mplex backpressure handling deadlocks with something --- libp2p/stream/bufferstream.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index 6fcaab52a..a0fb92141 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -43,7 +43,7 @@ logScope: topics = "bufferstream" const - DefaultBufferSize* = 1024 + DefaultBufferSize* = 102400 const BufferStreamTrackerName* = "libp2p.bufferstream" From cfcda3c3effc38d03ba33daea946954b05cb1109 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Mon, 17 Aug 2020 13:29:45 +0200 Subject: [PATCH 5/8] work around race conditions between identify and other protocols when identify is run on incoming connections, the connmanager tables are updated too late for incoming connections to properly be handled this is a quickfix that will eventually need cleaning up --- libp2p/switch.nim | 84 ++++++++++++++++++++++------------------------- 1 file changed, 39 insertions(+), 45 deletions(-) diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 859a240f6..dfb2e9f06 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -156,7 +156,19 @@ proc identify(s: Switch, conn: Connection) {.async, gcsafe.} = trace "identify: identified remote peer", peer = $conn.peerInfo -proc mux(s: Switch, conn: Connection) {.async, gcsafe.} = +proc identify(s: Switch, muxer: Muxer) {.async, gcsafe.} = + # new stream for identify + var stream = await muxer.newStream() + + defer: + if not(isNil(stream)): + await stream.close() # close identify stream + + # do identify first, so that we have a + # PeerInfo in case we didn't before + await s.identify(stream) + +proc mux(s: Switch, conn: Connection): Future[Muxer] {.async, gcsafe.} = ## mux incoming connection trace "muxing connection", peer = $conn @@ -171,37 +183,24 @@ proc mux(s: Switch, conn: Connection) {.async, gcsafe.} = # create new muxer for connection let muxer = s.muxers[muxerName].newMuxer(conn) - s.connManager.storeMuxer(muxer) - - trace "found a muxer", name = muxerName, peer = $conn # install stream handler muxer.streamHandler = s.streamHandler - # new stream for identify - var stream = await muxer.newStream() + s.connManager.storeOutgoing(muxer.connection) + s.connManager.storeMuxer(muxer) - defer: - if not(isNil(stream)): - await stream.close() # close identify stream + trace "found a muxer", name = muxerName, peer = $conn - # call muxer handler, this should - # not end until muxer ends + # start muxer read loop - the future will complete when loop ends let handlerFut = muxer.handle() - # do identify first, so that we have a - # PeerInfo in case we didn't before - await s.identify(stream) - - if isNil(conn.peerInfo): - await muxer.close() - raise newException(CatchableError, - "unable to identify peer, aborting upgrade") - # store it in muxed connections if we have a peer for it trace "adding muxer for peer", peer = conn.peerInfo.id s.connManager.storeMuxer(muxer, handlerFut) # update muxer with handler + return muxer + proc disconnect*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} = s.connManager.dropPeer(peerId) @@ -215,8 +214,15 @@ proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, g raise newException(CatchableError, "unable to secure connection, stopping upgrade") + if sconn.peerInfo.isNil: + raise newException(CatchableError, + "current version of nim-libp2p requires that secure protocol negotiates peerid") + trace "upgrading connection" - await s.mux(sconn) # mux it if possible + let muxer = await s.mux(sconn) # mux it if possible + + await s.identify(muxer) + if isNil(sconn.peerInfo): await sconn.close() raise newException(CatchableError, @@ -332,7 +338,6 @@ proc internalConnect(s: Switch, doAssert not isNil(upgraded), "connection died after upgradeOutgoing" - s.connManager.storeOutgoing(upgraded) conn = upgraded trace "dial successful", oid = $upgraded.oid, @@ -465,32 +470,21 @@ proc stop*(s: Switch) {.async.} = trace "switch stopped" proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = - var stream = await muxer.newStream() - defer: - if not(isNil(stream)): - await stream.close() + if muxer.connection.peerInfo.isNil: + warn "This version of nim-libp2p requires secure protocol to negotiate peerid" + await muxer.close() + return + + # store incoming connection + s.connManager.storeIncoming(muxer.connection) + + # store muxer and muxed connection + s.connManager.storeMuxer(muxer) try: - # once we got a muxed connection, attempt to - # identify it - await s.identify(stream) - if isNil(stream.peerInfo): - await muxer.close() - return - - let - peerInfo = stream.peerInfo - peerId = peerInfo.peerId - muxer.connection.peerInfo = peerInfo - - # store incoming connection - s.connManager.storeIncoming(muxer.connection) - - # store muxer and muxed connection - s.connManager.storeMuxer(muxer) - - trace "got new muxer", peer = shortLog(peerInfo) + await s.identify(muxer) + let peerId = muxer.connection.peerInfo.peerId muxer.connection.closeEvent.wait() .addCallback do(udata: pointer): asyncCheck s.triggerConnEvent( From 833a5b8e5771fbf34a941dcedb1bcc116fca7cfa Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Mon, 17 Aug 2020 13:32:02 +0200 Subject: [PATCH 6/8] add muxer nil check --- libp2p/switch.nim | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/libp2p/switch.nim b/libp2p/switch.nim index dfb2e9f06..23e120476 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -220,6 +220,10 @@ proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, g trace "upgrading connection" let muxer = await s.mux(sconn) # mux it if possible + if muxer == nil: + # TODO this might be relaxed in the future + raise newException(CatchableError, + "a muxer is required for outgoing connections") await s.identify(muxer) From 60122a044cf9da8f1be5a7185e8bda078bb93ae4 Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Mon, 17 Aug 2020 22:39:39 +0300 Subject: [PATCH 7/8] Restore interop with Lighthouse by preventing concurrent meshsub dials --- libp2p/protocols/pubsub/pubsubpeer.nim | 55 +++++++++++++++----------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index ac1c2f1bc..7c084bc22 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -46,6 +46,7 @@ type recvdRpcCache: TimedCache[string] # cache for already received messages observers*: ref seq[PubSubObserver] # ref as in smart_ptr subscribed*: bool # are we subscribed to this peer + dialLock: AsyncLock RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} @@ -141,31 +142,36 @@ proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} = # Careful, p.sendConn might change after here! await current.close() # TODO this might be unnecessary - # Grab a new send connection - let newConn = await p.switch.dial(p.peerId, p.codec) # ...and here - if newConn == nil: - return p.sendConn # A concurrent attempt perhaps succeeded? + try: + # Testing has demonstrated that when we perform concurrent meshsub dials + # and later close one of them, other implementations such as rust-libp2p + # become deaf to our messages (potentially due to the clean-up associated + # with closing connections). To prevent this, we use a lock that ensures + # that only a single dial will be performed for each peer: + await p.dialLock.acquire() - # Because of the awaits above, a concurrent `getSendConn` call might have - # set up a send connection already. We cannot take a lock here because - # it might block the reading of data from mplex which will cause its - # backpressure handling to stop reading from the socket and thus prevent the - # channel negotiation from finishing - if p.sendConn != nil and not(p.sendConn.closed or p.sendConn.atEof): - let current = p.sendConn - # Either the new or the old connection could potentially be closed - it's - # slightly easier to sequence the closing of the new connection because the - # old one might still be in use. - trace "Closing redundant connection", - oid = $current.oid, newConn = $newConn.oid - await newConn.close() - return current + # Another concurrent dial may have populated p.sendConn + if p.sendConn != nil: + let current = p.sendConn + if not current.isNil: + if not (current.closed() or current.atEof): + # The existing send connection looks like it might work - reuse it + trace "Reusing existing connection", oid = $current.oid + return current - trace "Caching new send connection", oid = $newConn.oid - p.sendConn = newConn - asyncCheck p.handle(newConn) # start a read loop on the new connection + # Grab a new send connection + let newConn = await p.switch.dial(p.peerId, p.codec) # ...and here + if newConn.isNil: + return nil - return newConn + trace "Caching new send connection", oid = $newConn.oid + p.sendConn = newConn + asyncCheck p.handle(newConn) # start a read loop on the new connection + return newConn + + finally: + if p.dialLock.locked: + p.dialLock.release() proc send*( p: PubSubPeer, @@ -207,11 +213,11 @@ proc send*( if conn == nil: debug "Couldn't get send connection, dropping message" return - trace "sending encoded msgs to peer" + trace "sending encoded msgs to peer", connId = $conn.oid await conn.writeLp(encoded).wait(timeout) p.sentRpcCache.put(digest) - trace "sent pubsub message to remote" + trace "sent pubsub message to remote", connId = $conn.oid when defined(libp2p_expensive_metrics): for x in mm.messages: @@ -240,3 +246,4 @@ proc newPubSubPeer*(peerId: PeerID, result.peerId = peerId result.sentRpcCache = newTimedCache[string](2.minutes) result.recvdRpcCache = newTimedCache[string](2.minutes) + result.dialLock = newAsyncLock() From af0955c58b06998b73540195363e6833b1382b42 Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Tue, 18 Aug 2020 13:51:27 +0300 Subject: [PATCH 8/8] Add comments explaning a possible deadlock --- libp2p/protocols/pubsub/pubsubpeer.nim | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 7c084bc22..15004d7bc 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -147,7 +147,19 @@ proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} = # and later close one of them, other implementations such as rust-libp2p # become deaf to our messages (potentially due to the clean-up associated # with closing connections). To prevent this, we use a lock that ensures - # that only a single dial will be performed for each peer: + # that only a single dial will be performed for each peer. + # + # Nevertheless, this approach is still quite problematic because the gossip + # sends and their respective dials may be started from the mplex read loop. + # This may cause the read loop to get stuck which ultimately results in a + # deadlock when the other side tries to send us any other message that must + # be routed through mplex (it will be stuck on `pushTo`). Such messages + # naturally arise in the process of dialing itself. + # + # See https://github.com/status-im/nim-libp2p/issues/337 + # + # One possible long-term solution is to avoid "blocking" the mplex read + # loop by making the gossip send non-blocking through the use of a queue. await p.dialLock.acquire() # Another concurrent dial may have populated p.sendConn