handle isNotEmpty futures correctly

This commit is contained in:
Diego 2024-01-22 20:49:00 +01:00
parent 7db4495360
commit f34f30de93
No known key found for this signature in database
GPG Key ID: C9DAC9BF68D1F806
1 changed files with 15 additions and 21 deletions

View File

@ -19,7 +19,8 @@ import rpc/[messages, message, protobuf],
../../stream/connection,
../../crypto/crypto,
../../protobuf/minprotobuf,
../../utility
../../utility,
../../utils/future
export peerid, connection, deques
@ -271,10 +272,6 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
trace "message queued", p, msg = shortLog(msg)
if not p.rpcmessagequeue.isProcessing:
p.rpcmessagequeue.isProcessing = true
p.rpcmessagequeue.messageAvailableEvent.fire()
await sleepAsync(0) # give a chance to the task processing the queue to run
iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] =
## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances.
@ -346,7 +343,6 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
proc processMessages(p: PubSubPeer) {.async.} =
proc sendMsg(msg: seq[byte]) {.async.} =
trace "message dequeued", p, msg = shortLog(msg)
if p.sendConn == nil:
# Wait for a send conn to be setup. `connectOnce` will
# complete this even if the sendConn setup failed
@ -371,28 +367,26 @@ proc processMessages(p: PubSubPeer) {.async.} =
await conn.close() # This will clean up the send connection
while true:
await p.rpcmessagequeue.messageAvailableEvent.wait()
p.rpcmessagequeue.messageAvailableEvent.clear()
var futs = @[p.rpcmessagequeue.priorityQueue.isNotempty(), p.rpcmessagequeue.nonPriorityQueue.isNotempty()]
try:
discard await anyCompleted(futs)#.wait(self.connectTimeout)
trace "waiting for message", p
finally:
for fut in futs: fut.cancel()
trace "message available", p
if not p.rpcmessagequeue.priorityQueue.empty():
let message = await p.rpcmessagequeue.priorityQueue.get()
let message = p.rpcmessagequeue.priorityQueue.getNoWait()
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_messages_processed.inc(labelValues = [$p.peerId])
libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId])
trace "message dequeued from priority queue", p, msg = shortLog(message)
await sendMsg(message)
elif not p.rpcmessagequeue.nonPriorityQueue.empty():
let message = await p.rpcmessagequeue.nonPriorityQueue.get()
let message = p.rpcmessagequeue.nonPriorityQueue.getNoWait()
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_messages_processed.inc(labelValues = [$p.peerId])
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
trace "message dequeued from non-priority queue", p, msg = shortLog(message)
await sendMsg(message)
p.rpcmessagequeue.isProcessing = false
# Additional logic to re-fire the event if messages are still in the queue
if not p.rpcmessagequeue.priorityQueue.empty() or not p.rpcmessagequeue.nonPriorityQueue.empty():
p.rpcmessagequeue.isProcessing = true
p.rpcmessagequeue.messageAvailableEvent.fire()
proc startProcessingMessages(p: PubSubPeer) =
if p.rpcmessagequeue.queueProcessingTask.isNil:
p.rpcmessagequeue.queueProcessingTask = p.processMessages()