Start adding some metrics to pubsub (#192)

* Start adding some metrics to pubsub

In order to visualize it's functionality
Still WIP

* more metrics

* add per topic metrics

* finishup with requested metrics

* add a metrisServer define to start local server

* PR fixes and cleanup
This commit is contained in:
Giovanni Petrantoni 2020-06-07 16:15:21 +09:00 committed by GitHub
parent 130c64f33a
commit a6a2a81711
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 65 additions and 4 deletions

View File

@ -8,7 +8,7 @@
## those terms.
import sequtils, tables, sets, strutils
import chronos, chronicles
import chronos, chronicles, metrics
import pubsub,
pubsubpeer,
timedcache,

View File

@ -8,7 +8,7 @@
## those terms.
import tables, sets, options, sequtils, random
import chronos, chronicles
import chronos, chronicles, metrics
import pubsub,
floodsub,
pubsubpeer,
@ -56,6 +56,10 @@ type
heartbeatCancel*: Future[void] # cancellation future for heartbeat interval
heartbeatLock: AsyncLock # heartbeat lock to prevent two consecutive concurrent heartbeats
declareGauge(libp2p_gossipsub_peers_per_topic_mesh, "gossipsub peers per topic in mesh", labels = ["topic"])
declareGauge(libp2p_gossipsub_peers_per_topic_fanout, "gossipsub peers per topic in fanout", labels = ["topic"])
declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub, "gossipsub peers per topic in gossipsub", labels = ["topic"])
method init(g: GossipSub) =
proc handler(conn: Connection, proto: string) {.async.} =
## main protocol handler that gets triggered on every
@ -79,6 +83,7 @@ proc replenishFanout(g: GossipSub, topic: string) {.async.} =
if topic in g.gossipsub:
for p in g.gossipsub[topic]:
if not g.fanout[topic].containsOrIncl(p):
libp2p_gossipsub_peers_per_topic_fanout.inc(labelValues = [topic])
if g.fanout[topic].len == GossipSubD:
break
@ -100,16 +105,19 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
if topic in g.fanout and g.fanout[topic].len > 0:
id = sample(toSeq(g.fanout[topic]))
g.fanout[topic].excl(id)
libp2p_gossipsub_peers_per_topic_fanout.dec(labelValues = [topic])
trace "got fanout peer", peer = id
elif topic in g.gossipsub and g.gossipsub[topic].len > 0:
id = sample(toSeq(g.gossipsub[topic]))
g.gossipsub[topic].excl(id)
libp2p_gossipsub_peers_per_topic_gossipsub.dec(labelValues = [topic])
trace "got gossipsub peer", peer = id
else:
trace "no more peers"
break
g.mesh[topic].incl(id)
libp2p_gossipsub_peers_per_topic_mesh.inc(labelValues = [topic])
if id in g.peers:
let p = g.peers[id]
# send a graft message to the peer
@ -122,6 +130,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
trace "pruning peers", peers = g.mesh[topic].len
let id = toSeq(g.mesh[topic])[rand(0..<g.mesh[topic].len)]
g.mesh[topic].excl(id)
libp2p_gossipsub_peers_per_topic_mesh.dec(labelValues = [topic])
let p = g.peers[id]
# send a graft message to the peer
@ -175,6 +184,7 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
let id = toSeq(g.gossipsub[topic]).sample()
g.gossipsub[topic].excl(id)
libp2p_gossipsub_peers_per_topic_gossipsub.dec(labelValues = [topic])
if id notin gossipPeers:
if id notin result:
result[id] = ControlMessage()
@ -212,12 +222,15 @@ method handleDisconnect(g: GossipSub, peer: PubSubPeer) {.async.} =
for t in g.gossipsub.keys:
g.gossipsub[t].excl(peer.id)
libp2p_gossipsub_peers_per_topic_gossipsub.dec(labelValues = [t])
for t in g.mesh.keys:
g.mesh[t].excl(peer.id)
libp2p_gossipsub_peers_per_topic_mesh.dec(labelValues = [t])
for t in g.fanout.keys:
g.fanout[t].excl(peer.id)
libp2p_gossipsub_peers_per_topic_fanout.dec(labelValues = [t])
method subscribeToPeer*(p: GossipSub,
conn: Connection) {.async.} =
@ -237,10 +250,12 @@ method subscribeTopic*(g: GossipSub,
trace "adding subscription for topic", peer = peerId, name = topic
# subscribe remote peer to the topic
g.gossipsub[topic].incl(peerId)
libp2p_gossipsub_peers_per_topic_gossipsub.inc(labelValues = [topic])
else:
trace "removing subscription for topic", peer = peerId, name = topic
# unsubscribe remote peer from the topic
g.gossipsub[topic].excl(peerId)
libp2p_gossipsub_peers_per_topic_gossipsub.dec(labelValues = [topic])
if topic in g.topics:
await g.rebalanceMesh(topic)
@ -256,8 +271,10 @@ proc handleGraft(g: GossipSub,
if graft.topicID in g.topics:
if g.mesh.len < GossipSubD:
g.mesh[graft.topicID].incl(peer.id)
libp2p_gossipsub_peers_per_topic_mesh.inc(labelValues = [graft.topicID])
else:
g.gossipsub[graft.topicID].incl(peer.id)
libp2p_gossipsub_peers_per_topic_gossipsub.inc(labelValues = [graft.topicID])
else:
respControl.prune.add(ControlPrune(topicID: graft.topicID))
@ -268,6 +285,7 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
if prune.topicID in g.mesh:
g.mesh[prune.topicID].excl(peer.id)
libp2p_gossipsub_peers_per_topic_mesh.dec(labelValues = [prune.topicID])
proc handleIHave(g: GossipSub,
peer: PubSubPeer,

View File

@ -14,6 +14,7 @@ import pubsubpeer,
../protocol,
../../connection,
../../peerinfo
import metrics
export PubSubPeer
export PubSubObserver
@ -21,6 +22,12 @@ export PubSubObserver
logScope:
topic = "PubSub"
declareGauge(libp2p_pubsub_peers, "pubsub peer instances")
declareGauge(libp2p_pubsub_topics, "pubsub subscribed topics")
declareGauge(libp2p_pubsub_validation_success, "pubsub successfully validated messages")
declareGauge(libp2p_pubsub_validation_failure, "pubsub failed validated messages")
declareGauge(libp2p_pubsub_peers_per_topic, "pubsub peers per topic", labels = ["topic"])
type
TopicHandler* = proc(topic: string,
data: seq[byte]): Future[void] {.gcsafe.}
@ -67,7 +74,10 @@ method subscribeTopic*(p: PubSub,
topic: string,
subscribe: bool,
peerId: string) {.base, async.} =
discard
if subscribe:
libp2p_pubsub_peers_per_topic.inc(labelValues = [topic])
else:
libp2p_pubsub_peers_per_topic.dec(labelValues = [topic])
method rpcHandler*(p: PubSub,
peer: PubSubPeer,
@ -86,6 +96,8 @@ method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.async, base.} =
## handle peer disconnects
if peer.id in p.peers:
p.peers.del(peer.id)
# metrics
libp2p_pubsub_peers.dec()
proc cleanUpHelper(p: PubSub, peer: PubSubPeer) {.async.} =
await p.cleanupLock.acquire()
@ -105,6 +117,8 @@ proc getPeer(p: PubSub,
# create new pubsub peer
let peer = newPubSubPeer(peerInfo, proto)
trace "created new pubsub peer", peerId = peer.id
# metrics
libp2p_pubsub_peers.inc()
p.peers[peer.id] = peer
peer.refs.inc # increment reference cound
@ -177,8 +191,11 @@ method unsubscribe*(p: PubSub,
method unsubscribe*(p: PubSub,
topic: string,
handler: TopicHandler): Future[void] {.base.} =
# metrics
libp2p_pubsub_topics.dec()
## unsubscribe from a ``topic`` string
result = p.unsubscribe(@[(topic, handler)])
p.unsubscribe(@[(topic, handler)])
method subscribe*(p: PubSub,
topic: string,
@ -200,6 +217,9 @@ method subscribe*(p: PubSub,
for peer in p.peers.values:
await p.sendSubs(peer, @[topic], true)
# metrics
libp2p_pubsub_topics.inc()
method publish*(p: PubSub,
topic: string,
data: seq[byte]) {.base, async.} =
@ -260,6 +280,10 @@ method validate*(p: PubSub, message: Message): Future[bool] {.async, base.} =
let futs = await allFinished(pending)
result = futs.allIt(not it.failed and it.read())
if result:
libp2p_pubsub_validation_success.inc()
else:
libp2p_pubsub_validation_failure.inc()
proc newPubSub*(P: typedesc[PubSub],
peerInfo: PeerInfo,

View File

@ -18,6 +18,7 @@ import rpc/[messages, message, protobuf],
../../crypto/crypto,
../../protobuf/minprotobuf,
../../utility
import metrics
logScope:
topic = "PubSubPeer"
@ -41,6 +42,9 @@ type
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
declareGauge(libp2p_pubsub_encoded_messages, "number of messages encoded")
declareGauge(libp2p_pubsub_decoded_messages, "number of messages decoded")
proc id*(p: PubSubPeer): string = p.peerInfo.id
proc isConnected*(p: PubSubPeer): bool =
@ -84,6 +88,9 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
await p.handler(p, @[msg])
p.recvdRpcCache.put(digest)
# metrics
libp2p_pubsub_decoded_messages.inc()
finally:
trace "exiting pubsub peer read loop", peer = p.id
await conn.close()
@ -99,6 +106,9 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
var mm = m # hooks can modify the message
p.sendObservers(mm)
# metrics
libp2p_pubsub_encoded_messages.inc()
let encoded = encodeRpcMsg(mm)
if encoded.buffer.len <= 0:
trace "empty message, skipping", peer = p.id

View File

@ -9,6 +9,7 @@
import options
import chronicles, stew/byteutils
import metrics
import nimcrypto/sysrand
import messages, protobuf,
../../../peer,
@ -21,6 +22,9 @@ logScope:
const PubSubPrefix = "libp2p-pubsub:"
declareGauge(libp2p_pubsub_sig_verify_success, "pubsub successfully validated messages")
declareGauge(libp2p_pubsub_sig_verify_failure, "pubsub failed validated messages")
proc msgIdProvider(m: Message): string =
## default msg id provider
crypto.toHex(m.seqno) & PeerID.init(m.fromPeer).pretty
@ -59,6 +63,11 @@ proc verify*(m: Message, p: PeerInfo): bool =
trace "verifying signature", remoteSignature = remote
result = remote.verify(PubSubPrefix.toBytes() & buff.buffer, key)
if result:
libp2p_pubsub_sig_verify_success.inc()
else:
libp2p_pubsub_sig_verify_failure.inc()
proc newMessage*(p: PeerInfo,
data: seq[byte],
topic: string,