diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 72d5f527e..534dbfa07 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -55,7 +55,6 @@ type triggerSelf*: bool # trigger own local handler on publish verifySignature*: bool # enable signature verification sign*: bool # enable message signing - cleanupLock: AsyncLock validators*: Table[string, HashSet[ValidatorHandler]] observers: ref seq[PubSubObserver] # ref as in smart_ptr msgIdProvider*: MsgIdProvider # Turn message into message id (not nil) @@ -338,7 +337,6 @@ proc init*( sign: sign, peers: initTable[PeerID, PubSubPeer](), topics: initTable[string, Topic](), - cleanupLock: newAsyncLock(), msgIdProvider: msgIdProvider) result.initPubSub() diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index ac47e811c..82bcd0c8c 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/[hashes, options, sequtils, strutils, tables] +import std/[hashes, options, strutils, tables] import chronos, chronicles, nimcrypto/sha2, metrics import rpc/[messages, message, protobuf], timedcache, @@ -39,14 +39,13 @@ type PubSubPeer* = ref object of RootObj switch*: Switch # switch instance to dial peers codec*: string # the protocol that this peer joined from - sendConn: Connection + sendConn: Connection # cached send connection peerId*: PeerID handler*: RPCHandler sentRpcCache: TimedCache[string] # cache for already sent messages recvdRpcCache: TimedCache[string] # cache for already received messages observers*: ref seq[PubSubObserver] # ref as in smart_ptr subscribed*: bool # are we subscribed to this peer - sendLock*: AsyncLock # send connection lock RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} @@ -117,11 +116,53 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = debug "exiting pubsub peer read loop" await conn.close() + if p.sendConn == conn: + p.sendConn = nil + except CancelledError as exc: raise exc except CatchableError as exc: trace "Exception occurred in PubSubPeer.handle", exc = exc.msg +proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} = + # get a cached send connection or create a new one + block: # check if there's an existing connection that can be reused + let current = p.sendConn + + if not current.isNil: + if not (current.closed() or current.atEof): + # The existing send connection looks like it might work - reuse it + return current + + # Send connection is set but broken - get rid of it + p.sendConn = nil + + # Careful, p.sendConn might change after here! + await current.close() # TODO this might be unnecessary + + # Grab a new send connection + let newConn = await p.switch.dial(p.peerId, p.codec) # ...and here + if newConn == nil: + return p.sendConn # A concurrent attempt perhaps succeeded? + + # Because of the awaits above, a concurrent `getSendConn` call might have + # set up a send connection already. We cannot take a lock here because + # it might block the reading of data from mplex which will cause its + # backpressure handling to stop reading from the socket and thus prevent the + # channel negotiation from finishing + if p.sendConn != nil and not(p.sendConn.closed or p.sendConn.atEof): + let current = p.sendConn + # Either the new or the old connection could potentially be closed - it's + # slightly easier to sequence the closing of the new connection because the + # old one might still be in use. + await newConn.close() + return current + + p.sendConn = newConn + asyncCheck p.handle(newConn) # start a read loop on the new connection + + return newConn + proc send*( p: PubSubPeer, msg: RPCMsg, @@ -154,27 +195,17 @@ proc send*( libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id]) return + var conn: Connection try: trace "about to send message" - if not p.connected: - try: - await p.sendLock.acquire() - trace "no send connection, dialing peer" - # get a send connection if there is none - p.sendConn = await p.switch.dial( - p.peerId, p.codec) - - if not p.connected: - raise newException(CatchableError, "unable to get send pubsub stream") - - # install a reader on the send connection - asyncCheck p.handle(p.sendConn) - finally: - if p.sendLock.locked: - p.sendLock.release() + conn = await p.getSendConn() + if conn == nil: + debug "Couldn't get send connection, dropping message" + return trace "sending encoded msgs to peer" - await p.sendConn.writeLp(encoded).wait(timeout) + await conn.writeLp(encoded).wait(timeout) + p.sentRpcCache.put(digest) trace "sent pubsub message to remote" @@ -186,9 +217,10 @@ proc send*( except CatchableError as exc: trace "unable to send to remote", exc = exc.msg - if not(isNil(p.sendConn)): - await p.sendConn.close() - p.sendConn = nil + # Next time sendConn is used, it will be have its close flag set and thus + # will be recycled + if not isNil(conn): + await conn.close() raise exc @@ -204,4 +236,3 @@ proc newPubSubPeer*(peerId: PeerID, result.peerId = peerId result.sentRpcCache = newTimedCache[string](2.minutes) result.recvdRpcCache = newTimedCache[string](2.minutes) - result.sendLock = newAsyncLock() diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index 870cfaf56..fc6a89a4d 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -15,9 +15,7 @@ import utils, ../../libp2p/[errors, switch, stream/connection, - stream/bufferstream, crypto/crypto, - protocols/pubsub/pubsubpeer, protocols/pubsub/pubsub, protocols/pubsub/floodsub, protocols/pubsub/rpc/messages,