diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index ab010a366..2fed1ce0c 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -202,7 +202,7 @@ proc broadcast*( # Fast path that only encodes message once let encoded = encodeRpcMsg(msg, p.anonymize) for peer in sendPeers: - peer.sendEncoded(encoded) + asyncSpawn peer.sendEncoded(encoded) proc sendSubs*(p: PubSub, peer: PubSubPeer, @@ -307,8 +307,6 @@ proc getOrCreatePeer*( # metrics libp2p_pubsub_peers.set(p.peers.len.int64) - pubSubPeer.connect() - return pubSubPeer proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] = @@ -382,7 +380,8 @@ method subscribePeer*(p: PubSub, peer: PeerId) {.base, gcsafe.} = ## messages ## - discard p.getOrCreatePeer(peer, p.codecs) + let pubSubPeer = p.getOrCreatePeer(peer, p.codecs) + pubSubPeer.connect() proc updateTopicMetrics(p: PubSub, topic: string) = # metrics diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 5a1afedbd..a40332de4 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -55,6 +55,7 @@ type onEvent*: OnEvent # Connectivity updates for peer codec*: string # the protocol that this peer joined from sendConn*: Connection # cached send connection + connectedFut: Future[void] address*: Option[MultiAddress] peerId*: PeerId handler*: RPCHandler @@ -165,6 +166,8 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = proc connectOnce(p: PubSubPeer): Future[void] {.async.} = try: + if p.connectedFut.finished: + p.connectedFut = newFuture[void]() let newConn = await p.getConn() if newConn.isNil: raise (ref LPError)(msg: "Cannot establish send connection") @@ -174,6 +177,7 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} = # stop working so we make an effort to only keep a single channel alive trace "Get new send connection", p, newConn + p.connectedFut.complete() p.sendConn = newConn p.address = if p.sendConn.observedAddr.isSome: some(p.sendConn.observedAddr.get) else: none(MultiAddress) @@ -208,28 +212,11 @@ proc connectImpl(p: PubSubPeer) {.async.} = debug "Could not establish send connection", msg = exc.msg proc connect*(p: PubSubPeer) = + if p.connected: + return + asyncSpawn connectImpl(p) -proc sendImpl(conn: Connection, encoded: seq[byte]): Future[void] {.raises: [Defect].} = - trace "sending encoded msgs to peer", conn, encoded = shortLog(encoded) - - let fut = conn.writeLp(encoded) # Avoid copying `encoded` into future - proc sendWaiter(): Future[void] {.async.} = - try: - await fut - trace "sent pubsub message to remote", conn - - except CatchableError as exc: # never cancelled - # Because we detach the send call from the currently executing task using - # asyncSpawn, no exceptions may leak out of it - trace "Unable to send to remote", conn, msg = exc.msg - # Next time sendConn is used, it will be have its close flag set and thus - # will be recycled - - await conn.close() # This will clean up the send connection - - return sendWaiter() - template sendMetrics(msg: RPCMsg): untyped = when defined(libp2p_expensive_metrics): for x in msg.messages: @@ -237,7 +224,7 @@ template sendMetrics(msg: RPCMsg): untyped = # metrics libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t]) -proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect].} = +proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect], async.} = doAssert(not isNil(p), "pubsubpeer nil!") if msg.len <= 0: @@ -248,14 +235,27 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect].} = info "trying to send a too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len return - let conn = p.sendConn + if p.sendConn == nil: + discard await p.connectedFut.withTimeout(1.seconds) + + var conn = p.sendConn if conn == nil or conn.closed(): - trace "No send connection, skipping message", p, msg = shortLog(msg) + debug "No send connection, skipping message", p, msg = shortLog(msg) return - # To limit the size of the closure, we only pass the encoded message and - # connection to the spawned send task - asyncSpawn sendImpl(conn, msg) + trace "sending encoded msgs to peer", conn, encoded = shortLog(msg) + + try: + await conn.writeLp(msg) + trace "sent pubsub message to remote", conn + except CatchableError as exc: # never cancelled + # Because we detach the send call from the currently executing task using + # asyncSpawn, no exceptions may leak out of it + trace "Unable to send to remote", conn, msg = exc.msg + # Next time sendConn is used, it will be have its close flag set and thus + # will be recycled + + await conn.close() # This will clean up the send connection proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [Defect].} = trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg) @@ -277,7 +277,7 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [Defect].} = sendMetrics(msg) encodeRpcMsg(msg, anonymize) - p.sendEncoded(encoded) + asyncSpawn p.sendEncoded(encoded) proc new*( T: typedesc[PubSubPeer], @@ -292,5 +292,6 @@ proc new*( onEvent: onEvent, codec: codec, peerId: peerId, + connectedFut: newFuture[void](), maxMessageSize: maxMessageSize )