continue processing while the queues arent empty

This commit is contained in:
Diego 2024-01-19 01:24:37 +01:00
parent d8e6f26502
commit 24539bd821
No known key found for this signature in database
GPG Key ID: C9DAC9BF68D1F806

View File

@ -57,6 +57,7 @@ type
nonPriorityQueue: AsyncQueue[seq[byte]]
messageAvailableEvent: AsyncEvent
queueProcessingTask: Future[void]
isProcessing: bool # Flag to indicate if processing is underway
PubSubPeer* = ref object of RootObj
getConn*: GetConn # callback to establish a new send connection
@ -268,6 +269,8 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
trace "message queued", p, msg = shortLog(msg)
if not p.rpcmessagequeue.isProcessing:
p.rpcmessagequeue.isProcessing = true
p.rpcmessagequeue.messageAvailableEvent.fire()
await sleepAsync(1.nanoseconds) # give a chance to the task processing the queue to run
@ -366,16 +369,23 @@ proc processMessages(p: PubSubPeer) {.async.} =
await conn.close() # This will clean up the send connection
while true:
await p.rpcmessagequeue.messageAvailableEvent.wait() # Wait for an event
p.rpcmessagequeue.messageAvailableEvent.clear() # Reset the event after handling
await p.rpcmessagequeue.messageAvailableEvent.wait()
p.rpcmessagequeue.messageAvailableEvent.clear()
if not p.rpcmessagequeue.priorityQueue.empty(): # Process messages from the priority queue first
if not p.rpcmessagequeue.priorityQueue.empty():
let message = await p.rpcmessagequeue.priorityQueue.get()
await sendMsg(message)
elif not p.rpcmessagequeue.nonPriorityQueue.empty(): # Then process messages from the non-priority queue
elif not p.rpcmessagequeue.nonPriorityQueue.empty():
let message = await p.rpcmessagequeue.nonPriorityQueue.get()
await sendMsg(message)
p.rpcmessagequeue.isProcessing = false
# Additional logic to re-fire the event if messages are still in the queue
if not p.rpcmessagequeue.priorityQueue.empty() or not p.rpcmessagequeue.nonPriorityQueue.empty():
p.rpcmessagequeue.isProcessing = true
p.rpcmessagequeue.messageAvailableEvent.fire()
proc startProcessingMessages(p: PubSubPeer) =
if p.rpcmessagequeue.queueProcessingTask.isNil: