diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index b7401f65e..855e0fe1d 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -22,8 +22,8 @@ import rpc/[messages, message, protobuf], logScope: topics = "pubsubpeer" -declareCounter(libp2p_pubsub_sent_messages, "number of messages sent", labels = ["id"]) -declareCounter(libp2p_pubsub_received_messages, "number of messages received", labels = ["id"]) +declareCounter(libp2p_pubsub_sent_messages, "number of messages sent", labels = ["id", "topic"]) +declareCounter(libp2p_pubsub_received_messages, "number of messages received", labels = ["id", "topic"]) declareCounter(libp2p_pubsub_skipped_messages, "number of skipped messages", labels = ["id"]) type @@ -87,8 +87,10 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = # trigger hooks p.recvObservers(msg) - # metrics - libp2p_pubsub_received_messages.inc(labelValues = [p.id]) + for m in msg.messages: + for t in m.topicIDs: + # metrics + libp2p_pubsub_received_messages.inc(labelValues = [p.id, t]) await p.handler(p, @[msg]) p.recvdRpcCache.put(digest) @@ -128,8 +130,12 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = await p.sendConn.writeLp(encoded.buffer) p.sentRpcCache.put(digest) - # metrics - libp2p_pubsub_sent_messages.inc(labelValues = [p.id]) + for m in msgs: + for mm in m.messages: + for t in mm.topicIDs: + # metrics + libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t]) + except CatchableError as exc: trace "unable to send to remote", exc = exc.msg p.sendConn = nil