handle exception and remove unnecessary async

This commit is contained in:
Diego 2024-01-16 13:33:42 +01:00
parent f682bb9171
commit abe8c2489e
No known key found for this signature in database
GPG Key ID: C9DAC9BF68D1F806
2 changed files with 14 additions and 8 deletions

View File

@ -335,8 +335,8 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
return true return true
return false return false
proc getMessage(rpcMessageQueue: RpcMessageQueue, p: PubSubPeer): Future[Opt[seq[byte]]] {.async.} = proc getMessage(rpcMessageQueue: RpcMessageQueue, p: PubSubPeer): Opt[seq[byte]] =
var m = await rpcMessageQueue.getPriorityMessage() var m = rpcMessageQueue.getPriorityMessage()
if m.isSome(): if m.isSome():
when defined(libp2p_expensive_metrics): when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId]) libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId])
@ -344,7 +344,7 @@ proc getMessage(rpcMessageQueue: RpcMessageQueue, p: PubSubPeer): Future[Opt[seq
libp2p_gossipsub_priority_queue_size.dec(labelValues = [p.getAgent()]) libp2p_gossipsub_priority_queue_size.dec(labelValues = [p.getAgent()])
return m return m
else: else:
m = await rpcMessageQueue.getNonPriorityMessage() m = rpcMessageQueue.getNonPriorityMessage()
if m.isSome(): if m.isSome():
when defined(libp2p_expensive_metrics): when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
@ -356,7 +356,7 @@ proc getMessage(rpcMessageQueue: RpcMessageQueue, p: PubSubPeer): Future[Opt[seq
proc processMessages(p: PubSubPeer, queue: RpcMessageQueue) {.async.} = proc processMessages(p: PubSubPeer, queue: RpcMessageQueue) {.async.} =
while true: while true:
let m = await queue.getMessage(p) let m = queue.getMessage(p)
m.withValue(msg): m.withValue(msg):
if p.sendConn == nil: if p.sendConn == nil:
# Wait for a send conn to be setup. `connectOnce` will # Wait for a send conn to be setup. `connectOnce` will

View File

@ -29,16 +29,22 @@ proc new*(T: typedesc[RpcMessageQueue]): T =
nonPriorityQueue: newAsyncQueue[seq[byte]]() nonPriorityQueue: newAsyncQueue[seq[byte]]()
) )
proc getPriorityMessage*(rpcMessageQueue: RpcMessageQueue): Future[Opt[seq[byte]]] {.async.} = proc getPriorityMessage*(rpcMessageQueue: RpcMessageQueue): Opt[seq[byte]] =
return return
if not rpcMessageQueue.priorityQueue.empty(): if not rpcMessageQueue.priorityQueue.empty():
try:
Opt.some(rpcMessageQueue.priorityQueue.getNoWait()) Opt.some(rpcMessageQueue.priorityQueue.getNoWait())
except AsyncQueueEmptyError:
Opt.none(seq[byte])
else: else:
Opt.none(seq[byte]) Opt.none(seq[byte])
proc getNonPriorityMessage*(rpcMessageQueue: RpcMessageQueue): Future[Opt[seq[byte]]] {.async.} = proc getNonPriorityMessage*(rpcMessageQueue: RpcMessageQueue): Opt[seq[byte]] =
return return
if not rpcMessageQueue.nonPriorityQueue.empty(): if not rpcMessageQueue.nonPriorityQueue.empty():
try:
Opt.some(rpcMessageQueue.nonPriorityQueue.getNoWait()) Opt.some(rpcMessageQueue.nonPriorityQueue.getNoWait())
except AsyncQueueEmptyError:
Opt.none(seq[byte])
else: else:
Opt.none(seq[byte]) Opt.none(seq[byte])