From 9d9f793b4f4674b95b524e175509ea6402744f68 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 15 Jun 2020 17:39:03 -0600 Subject: [PATCH] add metrics for sent messages by topic and peer (#220) --- libp2p/protocols/pubsub/pubsubpeer.nim | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index b7401f6..855e0fe 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -22,8 +22,8 @@ import rpc/[messages, message, protobuf], logScope: topics = "pubsubpeer" -declareCounter(libp2p_pubsub_sent_messages, "number of messages sent", labels = ["id"]) -declareCounter(libp2p_pubsub_received_messages, "number of messages received", labels = ["id"]) +declareCounter(libp2p_pubsub_sent_messages, "number of messages sent", labels = ["id", "topic"]) +declareCounter(libp2p_pubsub_received_messages, "number of messages received", labels = ["id", "topic"]) declareCounter(libp2p_pubsub_skipped_messages, "number of skipped messages", labels = ["id"]) type @@ -87,8 +87,10 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = # trigger hooks p.recvObservers(msg) - # metrics - libp2p_pubsub_received_messages.inc(labelValues = [p.id]) + for m in msg.messages: + for t in m.topicIDs: + # metrics + libp2p_pubsub_received_messages.inc(labelValues = [p.id, t]) await p.handler(p, @[msg]) p.recvdRpcCache.put(digest) @@ -128,8 +130,12 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = await p.sendConn.writeLp(encoded.buffer) p.sentRpcCache.put(digest) - # metrics - libp2p_pubsub_sent_messages.inc(labelValues = [p.id]) + for m in msgs: + for mm in m.messages: + for t in mm.topicIDs: + # metrics + libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t]) + except CatchableError as exc: trace "unable to send to remote", exc = exc.msg p.sendConn = nil