mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2025-03-03 09:31:08 +00:00
add metrics for the number of msgs processed
This commit is contained in:
parent
24539bd821
commit
7db4495360
@ -33,6 +33,8 @@ when defined(libp2p_expensive_metrics):
|
|||||||
|
|
||||||
declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id"])
|
declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id"])
|
||||||
declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id"])
|
declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id"])
|
||||||
|
declareCounter(libp2p_gossipsub_priority_queue_messages_processed, "the number of messages processed in the priority queue", labels = ["id"])
|
||||||
|
declareCounter(libp2p_gossipsub_non_priority_queue_messages_processed, "the number of messages processed in the non-priority queue", labels = ["id"])
|
||||||
|
|
||||||
type
|
type
|
||||||
PeerRateLimitError* = object of CatchableError
|
PeerRateLimitError* = object of CatchableError
|
||||||
@ -272,7 +274,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {
|
|||||||
if not p.rpcmessagequeue.isProcessing:
|
if not p.rpcmessagequeue.isProcessing:
|
||||||
p.rpcmessagequeue.isProcessing = true
|
p.rpcmessagequeue.isProcessing = true
|
||||||
p.rpcmessagequeue.messageAvailableEvent.fire()
|
p.rpcmessagequeue.messageAvailableEvent.fire()
|
||||||
await sleepAsync(1.nanoseconds) # give a chance to the task processing the queue to run
|
await sleepAsync(0) # give a chance to the task processing the queue to run
|
||||||
|
|
||||||
iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] =
|
iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] =
|
||||||
## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances.
|
## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances.
|
||||||
@ -374,9 +376,13 @@ proc processMessages(p: PubSubPeer) {.async.} =
|
|||||||
|
|
||||||
if not p.rpcmessagequeue.priorityQueue.empty():
|
if not p.rpcmessagequeue.priorityQueue.empty():
|
||||||
let message = await p.rpcmessagequeue.priorityQueue.get()
|
let message = await p.rpcmessagequeue.priorityQueue.get()
|
||||||
|
when defined(libp2p_expensive_metrics):
|
||||||
|
libp2p_gossipsub_priority_queue_messages_processed.inc(labelValues = [$p.peerId])
|
||||||
await sendMsg(message)
|
await sendMsg(message)
|
||||||
elif not p.rpcmessagequeue.nonPriorityQueue.empty():
|
elif not p.rpcmessagequeue.nonPriorityQueue.empty():
|
||||||
let message = await p.rpcmessagequeue.nonPriorityQueue.get()
|
let message = await p.rpcmessagequeue.nonPriorityQueue.get()
|
||||||
|
when defined(libp2p_expensive_metrics):
|
||||||
|
libp2p_gossipsub_non_priority_queue_messages_processed.inc(labelValues = [$p.peerId])
|
||||||
await sendMsg(message)
|
await sendMsg(message)
|
||||||
|
|
||||||
p.rpcmessagequeue.isProcessing = false
|
p.rpcmessagequeue.isProcessing = false
|
||||||
|
Loading…
x
Reference in New Issue
Block a user