diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index bb4d4f6c9..687243e6f 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -250,6 +250,12 @@ template sendMetrics(msg: RPCMsg): untyped = # metrics libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t]) +proc clearSendPriorityQueue(p: PubSubPeer) = + while p.rpcmessagequeue.sendPriorityQueue.len > 0 and p.rpcmessagequeue.sendPriorityQueue[0].finished: + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId]) + discard p.rpcmessagequeue.sendPriorityQueue.popFirst() + proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} = if p.sendConn == nil: # Wait for a send conn to be setup. `connectOnce` will @@ -287,6 +293,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) { return if isHighPriority: + p.clearSendPriorityQueue() let f = p.sendMsg(msg) if not f.finished: p.rpcmessagequeue.sendPriorityQueue.addLast(f) @@ -366,12 +373,6 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool = return true return false -proc clearSendPriorityQueue(p: PubSubPeer) = - while p.rpcmessagequeue.sendPriorityQueue.len > 0 and p.rpcmessagequeue.sendPriorityQueue[0].finished: - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId]) - discard p.rpcmessagequeue.sendPriorityQueue.popFirst() - proc sendNonPriorityTask(p: PubSubPeer) {.async.} = while true: # we send non-priority messages only if there are no pending priority messages