From abe8c2489ec0981ad5549cb44214a6ed2ff966c9 Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 16 Jan 2024 13:33:42 +0100 Subject: [PATCH] handle exception and remove unnecessary async --- libp2p/protocols/pubsub/pubsubpeer.nim | 8 ++++---- libp2p/protocols/pubsub/rpcmessagequeue.nim | 14 ++++++++++---- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 2045ce2b1..9ccf0da03 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -335,8 +335,8 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool = return true return false -proc getMessage(rpcMessageQueue: RpcMessageQueue, p: PubSubPeer): Future[Opt[seq[byte]]] {.async.} = - var m = await rpcMessageQueue.getPriorityMessage() +proc getMessage(rpcMessageQueue: RpcMessageQueue, p: PubSubPeer): Opt[seq[byte]] = + var m = rpcMessageQueue.getPriorityMessage() if m.isSome(): when defined(libp2p_expensive_metrics): libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId]) @@ -344,7 +344,7 @@ proc getMessage(rpcMessageQueue: RpcMessageQueue, p: PubSubPeer): Future[Opt[seq libp2p_gossipsub_priority_queue_size.dec(labelValues = [p.getAgent()]) return m else: - m = await rpcMessageQueue.getNonPriorityMessage() + m = rpcMessageQueue.getNonPriorityMessage() if m.isSome(): when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) @@ -356,7 +356,7 @@ proc getMessage(rpcMessageQueue: RpcMessageQueue, p: PubSubPeer): Future[Opt[seq proc processMessages(p: PubSubPeer, queue: RpcMessageQueue) {.async.} = while true: - let m = await queue.getMessage(p) + let m = queue.getMessage(p) m.withValue(msg): if p.sendConn == nil: # Wait for a send conn to be setup. `connectOnce` will diff --git a/libp2p/protocols/pubsub/rpcmessagequeue.nim b/libp2p/protocols/pubsub/rpcmessagequeue.nim index 7f7c52f26..3abb492fa 100644 --- a/libp2p/protocols/pubsub/rpcmessagequeue.nim +++ b/libp2p/protocols/pubsub/rpcmessagequeue.nim @@ -29,16 +29,22 @@ proc new*(T: typedesc[RpcMessageQueue]): T = nonPriorityQueue: newAsyncQueue[seq[byte]]() ) -proc getPriorityMessage*(rpcMessageQueue: RpcMessageQueue): Future[Opt[seq[byte]]] {.async.} = +proc getPriorityMessage*(rpcMessageQueue: RpcMessageQueue): Opt[seq[byte]] = return if not rpcMessageQueue.priorityQueue.empty(): - Opt.some(rpcMessageQueue.priorityQueue.getNoWait()) + try: + Opt.some(rpcMessageQueue.priorityQueue.getNoWait()) + except AsyncQueueEmptyError: + Opt.none(seq[byte]) else: Opt.none(seq[byte]) -proc getNonPriorityMessage*(rpcMessageQueue: RpcMessageQueue): Future[Opt[seq[byte]]] {.async.} = +proc getNonPriorityMessage*(rpcMessageQueue: RpcMessageQueue): Opt[seq[byte]] = return if not rpcMessageQueue.nonPriorityQueue.empty(): - Opt.some(rpcMessageQueue.nonPriorityQueue.getNoWait()) + try: + Opt.some(rpcMessageQueue.nonPriorityQueue.getNoWait()) + except AsyncQueueEmptyError: + Opt.none(seq[byte]) else: Opt.none(seq[byte])