diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index ac1c2f1bc..7c084bc22 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -46,6 +46,7 @@ type recvdRpcCache: TimedCache[string] # cache for already received messages observers*: ref seq[PubSubObserver] # ref as in smart_ptr subscribed*: bool # are we subscribed to this peer + dialLock: AsyncLock RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} @@ -141,31 +142,36 @@ proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} = # Careful, p.sendConn might change after here! await current.close() # TODO this might be unnecessary - # Grab a new send connection - let newConn = await p.switch.dial(p.peerId, p.codec) # ...and here - if newConn == nil: - return p.sendConn # A concurrent attempt perhaps succeeded? + try: + # Testing has demonstrated that when we perform concurrent meshsub dials + # and later close one of them, other implementations such as rust-libp2p + # become deaf to our messages (potentially due to the clean-up associated + # with closing connections). To prevent this, we use a lock that ensures + # that only a single dial will be performed for each peer: + await p.dialLock.acquire() - # Because of the awaits above, a concurrent `getSendConn` call might have - # set up a send connection already. We cannot take a lock here because - # it might block the reading of data from mplex which will cause its - # backpressure handling to stop reading from the socket and thus prevent the - # channel negotiation from finishing - if p.sendConn != nil and not(p.sendConn.closed or p.sendConn.atEof): - let current = p.sendConn - # Either the new or the old connection could potentially be closed - it's - # slightly easier to sequence the closing of the new connection because the - # old one might still be in use. - trace "Closing redundant connection", - oid = $current.oid, newConn = $newConn.oid - await newConn.close() - return current + # Another concurrent dial may have populated p.sendConn + if p.sendConn != nil: + let current = p.sendConn + if not current.isNil: + if not (current.closed() or current.atEof): + # The existing send connection looks like it might work - reuse it + trace "Reusing existing connection", oid = $current.oid + return current - trace "Caching new send connection", oid = $newConn.oid - p.sendConn = newConn - asyncCheck p.handle(newConn) # start a read loop on the new connection + # Grab a new send connection + let newConn = await p.switch.dial(p.peerId, p.codec) # ...and here + if newConn.isNil: + return nil - return newConn + trace "Caching new send connection", oid = $newConn.oid + p.sendConn = newConn + asyncCheck p.handle(newConn) # start a read loop on the new connection + return newConn + + finally: + if p.dialLock.locked: + p.dialLock.release() proc send*( p: PubSubPeer, @@ -207,11 +213,11 @@ proc send*( if conn == nil: debug "Couldn't get send connection, dropping message" return - trace "sending encoded msgs to peer" + trace "sending encoded msgs to peer", connId = $conn.oid await conn.writeLp(encoded).wait(timeout) p.sentRpcCache.put(digest) - trace "sent pubsub message to remote" + trace "sent pubsub message to remote", connId = $conn.oid when defined(libp2p_expensive_metrics): for x in mm.messages: @@ -240,3 +246,4 @@ proc newPubSubPeer*(peerId: PeerID, result.peerId = peerId result.sentRpcCache = newTimedCache[string](2.minutes) result.recvdRpcCache = newTimedCache[string](2.minutes) + result.dialLock = newAsyncLock()