simplify per-topic metric collection

This commit is contained in:
Csaba Kiraly 2022-03-17 11:03:37 +01:00
parent cba3ca3c3e
commit f1795e7761
3 changed files with 29 additions and 80 deletions

View File

@ -269,18 +269,12 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
for smsg in messages:
for topic in smsg.topicIDs:
if g.knownTopics.contains(topic):
libp2p_pubsub_broadcast_messages.inc(labelValues = [topic])
else:
libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"])
topicMetricInc(g, topic, libp2p_pubsub_broadcast_messages)
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)
for prune in respControl.prune:
if g.knownTopics.contains(prune.topicID):
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicID])
else:
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])
topicMetricInc(g, prune.topicID, libp2p_pubsub_broadcast_prune)
trace "sending control message", msg = shortLog(respControl), peer
g.send(
@ -336,10 +330,7 @@ proc validateAndRelay(g: GossipSub,
for topic in msg.topicIDs:
if topic notin g.topics: continue
if g.knownTopics.contains(topic):
libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = [topic])
else:
libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = ["generic"])
topicMetricInc(g, topic, libp2p_pubsub_messages_rebroadcasted, toSendPeers.len.int64)
await handleData(g, topic, msg.data)
except CatchableError as exc:
@ -546,10 +537,7 @@ method publish*(g: GossipSub,
g.broadcast(peers, RPCMsg(messages: @[msg]))
if g.knownTopics.contains(topic):
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic])
else:
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = ["generic"])
topicMetricInc(g, topic, libp2p_pubsub_messages_published, peers.len.int64)
trace "Published message to peers", peers=peers.len

View File

@ -347,10 +347,7 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
npeers = g.mesh.peers(topic)
if npeers > g.parameters.dHigh:
if not isNil(metrics):
if g.knownTopics.contains(topic):
libp2p_gossipsub_above_dhigh_condition.inc(labelValues = [topic])
else:
libp2p_gossipsub_above_dhigh_condition.inc(labelValues = ["other"])
topicMetricInc(g, topic, libp2p_gossipsub_above_dhigh_condition)
# prune peers if we've gone over Dhi
prunes = toSeq(try: g.mesh[topic] except KeyError: raiseAssert "have peers")
@ -612,10 +609,7 @@ proc onHeartbeat(g: GossipSub) {.raises: [Defect].} =
for peer, control in peers:
# only ihave from here
for ihave in control.ihave:
if g.knownTopics.contains(ihave.topicID):
libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicID])
else:
libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"])
topicMetricInc(g, ihave.topicID, libp2p_pubsub_broadcast_ihave)
g.send(peer, RPCMsg(control: some(control)))
g.mcache.shift() # shift the cache

View File

@ -127,6 +127,15 @@ method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base.} =
libp2p_pubsub_peers.set(p.peers.len.int64)
func topicMetricInc*(
p: PubSub,
topic: string,
counter: Collector | type IgnoredCollector,
amount: int64|float64 = 1) =
## helper for increasing per-topic metrics
when defined(metrics) and counter is not IgnoredCollector:
counter.inc(amount, labelValues = p.knownTopics.containsOr(topic, "generic"))
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) {.raises: [Defect].} =
## Attempt to send `msg` to remote peer
##
@ -143,42 +152,24 @@ proc broadcast*(
let npeers = sendPeers.len.int64
for sub in msg.subscriptions:
if sub.subscribe:
if p.knownTopics.contains(sub.topic):
libp2p_pubsub_broadcast_subscriptions.inc(npeers, labelValues = [sub.topic])
else:
libp2p_pubsub_broadcast_subscriptions.inc(npeers, labelValues = ["generic"])
topicMetricInc(p, sub.topic, libp2p_pubsub_broadcast_subscriptions, npeers)
else:
if p.knownTopics.contains(sub.topic):
libp2p_pubsub_broadcast_unsubscriptions.inc(npeers, labelValues = [sub.topic])
else:
libp2p_pubsub_broadcast_unsubscriptions.inc(npeers, labelValues = ["generic"])
topicMetricInc(p, sub.topic, libp2p_pubsub_broadcast_unsubscriptions, npeers)
for smsg in msg.messages:
for topic in smsg.topicIDs:
if p.knownTopics.contains(topic):
libp2p_pubsub_broadcast_messages.inc(npeers, labelValues = [topic])
else:
libp2p_pubsub_broadcast_messages.inc(npeers, labelValues = ["generic"])
topicMetricInc(p, topic, libp2p_pubsub_broadcast_messages, npeers)
if msg.control.isSome():
libp2p_pubsub_broadcast_iwant.inc(npeers * msg.control.get().iwant.len.int64)
let control = msg.control.get()
for ihave in control.ihave:
if p.knownTopics.contains(ihave.topicID):
libp2p_pubsub_broadcast_ihave.inc(npeers, labelValues = [ihave.topicID])
else:
libp2p_pubsub_broadcast_ihave.inc(npeers, labelValues = ["generic"])
topicMetricInc(p, ihave.topicID, libp2p_pubsub_broadcast_ihave, npeers)
for graft in control.graft:
if p.knownTopics.contains(graft.topicID):
libp2p_pubsub_broadcast_graft.inc(npeers, labelValues = [graft.topicID])
else:
libp2p_pubsub_broadcast_graft.inc(npeers, labelValues = ["generic"])
topicMetricInc(p, graft.topicID, libp2p_pubsub_broadcast_graft, npeers)
for prune in control.prune:
if p.knownTopics.contains(prune.topicID):
libp2p_pubsub_broadcast_prune.inc(npeers, labelValues = [prune.topicID])
else:
libp2p_pubsub_broadcast_prune.inc(npeers, labelValues = ["generic"])
topicMetricInc(p, prune.topicID, libp2p_pubsub_broadcast_prune, npeers)
trace "broadcasting messages to peers",
peers = sendPeers.len, msg = shortLog(msg)
@ -201,57 +192,33 @@ proc sendSubs*(p: PubSub,
for topic in topics:
if subscribe:
if p.knownTopics.contains(topic):
libp2p_pubsub_broadcast_subscriptions.inc(labelValues = [topic])
else:
libp2p_pubsub_broadcast_subscriptions.inc(labelValues = ["generic"])
topicMetricInc(p, topic, libp2p_pubsub_broadcast_subscriptions)
else:
if p.knownTopics.contains(topic):
libp2p_pubsub_broadcast_unsubscriptions.inc(labelValues = [topic])
else:
libp2p_pubsub_broadcast_unsubscriptions.inc(labelValues = ["generic"])
topicMetricInc(p, topic, libp2p_pubsub_broadcast_unsubscriptions)
proc updateMetrics*(p: PubSub, rpcMsg: RPCMsg) =
for i in 0..<min(rpcMsg.subscriptions.len, p.topicsHigh):
template sub(): untyped = rpcMsg.subscriptions[i]
if sub.subscribe:
if p.knownTopics.contains(sub.topic):
libp2p_pubsub_received_subscriptions.inc(labelValues = [sub.topic])
else:
libp2p_pubsub_received_subscriptions.inc(labelValues = ["generic"])
topicMetricInc(p, sub.topic, libp2p_pubsub_received_subscriptions)
else:
if p.knownTopics.contains(sub.topic):
libp2p_pubsub_received_unsubscriptions.inc(labelValues = [sub.topic])
else:
libp2p_pubsub_received_unsubscriptions.inc(labelValues = ["generic"])
topicMetricInc(p, sub.topic, libp2p_pubsub_received_unsubscriptions)
for i in 0..<rpcMsg.messages.len():
template smsg: untyped = rpcMsg.messages[i]
for j in 0..<smsg.topicIDs.len():
template topic: untyped = smsg.topicIDs[j]
if p.knownTopics.contains(topic):
libp2p_pubsub_received_messages.inc(labelValues = [topic])
else:
libp2p_pubsub_received_messages.inc(labelValues = ["generic"])
topicMetricInc(p, topic, libp2p_pubsub_received_messages)
if rpcMsg.control.isSome():
libp2p_pubsub_received_iwant.inc(rpcMsg.control.get().iwant.len.int64)
template control: untyped = rpcMsg.control.unsafeGet()
for ihave in control.ihave:
if p.knownTopics.contains(ihave.topicID):
libp2p_pubsub_received_ihave.inc(labelValues = [ihave.topicID])
else:
libp2p_pubsub_received_ihave.inc(labelValues = ["generic"])
topicMetricInc(p, ihave.topicID, libp2p_pubsub_received_ihave)
for graft in control.graft:
if p.knownTopics.contains(graft.topicID):
libp2p_pubsub_received_graft.inc(labelValues = [graft.topicID])
else:
libp2p_pubsub_received_graft.inc(labelValues = ["generic"])
topicMetricInc(p, graft.topicID, libp2p_pubsub_received_graft)
for prune in control.prune:
if p.knownTopics.contains(prune.topicID):
libp2p_pubsub_received_prune.inc(labelValues = [prune.topicID])
else:
libp2p_pubsub_received_prune.inc(labelValues = ["generic"])
topicMetricInc(p, prune.topicID, libp2p_pubsub_received_prune)
method rpcHandler*(p: PubSub,
peer: PubSubPeer,