diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 3619f72b1..0f0ad5f2a 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -8,7 +8,7 @@ ## those terms. import sequtils, tables, sets, strutils -import chronos, chronicles +import chronos, chronicles, metrics import pubsub, pubsubpeer, timedcache, diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 5d70970ca..cbe932ccd 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -8,7 +8,7 @@ ## those terms. import tables, sets, options, sequtils, random -import chronos, chronicles +import chronos, chronicles, metrics import pubsub, floodsub, pubsubpeer, @@ -56,6 +56,10 @@ type heartbeatCancel*: Future[void] # cancellation future for heartbeat interval heartbeatLock: AsyncLock # heartbeat lock to prevent two consecutive concurrent heartbeats +declareGauge(libp2p_gossipsub_peers_per_topic_mesh, "gossipsub peers per topic in mesh", labels = ["topic"]) +declareGauge(libp2p_gossipsub_peers_per_topic_fanout, "gossipsub peers per topic in fanout", labels = ["topic"]) +declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub, "gossipsub peers per topic in gossipsub", labels = ["topic"]) + method init(g: GossipSub) = proc handler(conn: Connection, proto: string) {.async.} = ## main protocol handler that gets triggered on every @@ -79,6 +83,7 @@ proc replenishFanout(g: GossipSub, topic: string) {.async.} = if topic in g.gossipsub: for p in g.gossipsub[topic]: if not g.fanout[topic].containsOrIncl(p): + libp2p_gossipsub_peers_per_topic_fanout.inc(labelValues = [topic]) if g.fanout[topic].len == GossipSubD: break @@ -100,16 +105,19 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = if topic in g.fanout and g.fanout[topic].len > 0: id = sample(toSeq(g.fanout[topic])) g.fanout[topic].excl(id) + libp2p_gossipsub_peers_per_topic_fanout.dec(labelValues = [topic]) trace "got fanout peer", peer = id elif topic in g.gossipsub and g.gossipsub[topic].len > 0: id = sample(toSeq(g.gossipsub[topic])) g.gossipsub[topic].excl(id) + libp2p_gossipsub_peers_per_topic_gossipsub.dec(labelValues = [topic]) trace "got gossipsub peer", peer = id else: trace "no more peers" break g.mesh[topic].incl(id) + libp2p_gossipsub_peers_per_topic_mesh.inc(labelValues = [topic]) if id in g.peers: let p = g.peers[id] # send a graft message to the peer @@ -122,6 +130,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = trace "pruning peers", peers = g.mesh[topic].len let id = toSeq(g.mesh[topic])[rand(0..