give a chance to the task processing the queue to run

This commit is contained in:
Diego 2024-01-19 00:46:58 +01:00
parent c948eec7a7
commit d8e6f26502
No known key found for this signature in database
GPG Key ID: C9DAC9BF68D1F806
2 changed files with 7 additions and 4 deletions

View File

@ -143,7 +143,7 @@ proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool = fals
##
trace "sending pubsub message to peer", peer, msg = shortLog(msg)
peer.send(msg, p.anonymize, isHighPriority)
asyncSpawn peer.send(msg, p.anonymize, isHighPriority)
proc broadcast*(
p: PubSub,

View File

@ -267,7 +267,9 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {
p.rpcmessagequeue.nonPriorityQueue.putNoWait(msg)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
trace "message queued", p, msg = shortLog(msg)
p.rpcmessagequeue.messageAvailableEvent.fire()
await sleepAsync(1.nanoseconds) # give a chance to the task processing the queue to run
iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] =
## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances.
@ -304,7 +306,7 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize:
else:
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool = false) {.raises: [].} =
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool = false) {.async.} =
# When sending messages, we take care to re-encode them with the right
# anonymization flag to ensure that we're not penalized for sending invalid
# or malicious data on the wire - in particular, re-encoding protects against
@ -324,11 +326,11 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool = f
if encoded.len > p.maxMessageSize and msg.messages.len > 1:
for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize):
asyncSpawn p.sendEncoded(encodedSplitMsg, isHighPriority)
await p.sendEncoded(encodedSplitMsg, isHighPriority)
else:
# If the message size is within limits, send it as is
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
asyncSpawn p.sendEncoded(encoded, isHighPriority)
await p.sendEncoded(encoded, isHighPriority)
proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
for sentIHave in p.sentIHaves.mitems():
@ -339,6 +341,7 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
proc processMessages(p: PubSubPeer) {.async.} =
proc sendMsg(msg: seq[byte]) {.async.} =
trace "message dequeued", p, msg = shortLog(msg)
if p.sendConn == nil:
# Wait for a send conn to be setup. `connectOnce` will
# complete this even if the sendConn setup failed