diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 31f24caac..818b21f5a 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -143,7 +143,7 @@ proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool = fals ## trace "sending pubsub message to peer", peer, msg = shortLog(msg) - peer.send(msg, p.anonymize, isHighPriority) + asyncSpawn peer.send(msg, p.anonymize, isHighPriority) proc broadcast*( p: PubSub, diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 20b506af4..8e2673e14 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -267,7 +267,9 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) { p.rpcmessagequeue.nonPriorityQueue.putNoWait(msg) when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId]) + trace "message queued", p, msg = shortLog(msg) p.rpcmessagequeue.messageAvailableEvent.fire() + await sleepAsync(1.nanoseconds) # give a chance to the task processing the queue to run iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] = ## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances. @@ -304,7 +306,7 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: else: trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg) -proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool = false) {.raises: [].} = +proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool = false) {.async.} = # When sending messages, we take care to re-encode them with the right # anonymization flag to ensure that we're not penalized for sending invalid # or malicious data on the wire - in particular, re-encoding protects against @@ -324,11 +326,11 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool = f if encoded.len > p.maxMessageSize and msg.messages.len > 1: for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize): - asyncSpawn p.sendEncoded(encodedSplitMsg, isHighPriority) + await p.sendEncoded(encodedSplitMsg, isHighPriority) else: # If the message size is within limits, send it as is trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg) - asyncSpawn p.sendEncoded(encoded, isHighPriority) + await p.sendEncoded(encoded, isHighPriority) proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool = for sentIHave in p.sentIHaves.mitems(): @@ -339,6 +341,7 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool = proc processMessages(p: PubSubPeer) {.async.} = proc sendMsg(msg: seq[byte]) {.async.} = + trace "message dequeued", p, msg = shortLog(msg) if p.sendConn == nil: # Wait for a send conn to be setup. `connectOnce` will # complete this even if the sendConn setup failed