add metrics into chronosstream to identify peers agents (#458)

* add metrics into chronosstream to identify peers agents

* avoid too many agent strings

* use gauge instead of counter for stream metrics

* filter identity on /

* also track bytes traffic

* fix identity tracking closeimpl call

* add gossip rpc metrics

* fix missing metrics inclusions

* metrics fixes and additions

* add a KnownLibP2PAgents strdefine

* enforse toLowerAscii to agent names (metrics)

* incoming rpc metrics

* fix silly mistake in rpc metrics

* fix agent metrics logic

* libp2p_gossipsub_failed_publish metric

* message ids metrics

* libp2p_pubsub_broadcast_ihave metric improvement

* refactor expensive gossip metrics

* more detailed metrics

* metrics improvements

* remove generic metrics for `set` users

* small fixes, add debug counters

* fix counter and add missing subs metrics!

* agent metrics behind -d:libp2p_agents_metrics

* remove testing related code from this PR

* small rebroadcast metric fix

* fix small mistake

* add some guide to the readme in order to use new metrics

* add libp2p_gossipsub_peers_scores metric

* add protobuf metrics to understand bytes traffic precisely

* refactor gossipsub metrics

* remove unused variable

* add more metrics, refactor rebalance metrics

* avoid bad metric concurrent states

* use a stack structure for gossip mesh metrics

* refine sub metrics

* add received subs metrics fixes

* measure handlers of known topics

* sub/unsub counter

* unsubscribeAll log unknown topics

* expose a way to specify known topics at runtime
This commit is contained in:
Giovanni Petrantoni 2021-01-08 14:21:24 +09:00
parent 5e79d3ab9c
commit 5e6974d07a
7 changed files with 422 additions and 120 deletions

View File

@ -151,12 +151,24 @@ Packages that exist in the original libp2p specs and are under active developmen
### Tips and tricks
- enable expensive metrics:
#### enable expensive metrics:
```bash
nim c -d:libp2p_expensive_metrics some_file.nim
```
#### use identify metrics
```bash
nim c -d:libp2p_agents_metrics -d:KnownLibP2PAgents=nimbus,lighthouse,prysm,teku some_file.nim
```
### specify gossipsub specific topics to measure
```bash
nim c -d:KnownLibP2PTopics=topic1,topic2,topic3 some_file.nim
```
## Contribute
The libp2p implementation in Nim is a work in progress. We welcome contributors to help out! Specifically, you can:
- Go through the modules and **check out existing issues**. This would be especially useful for modules in active development. Some knowledge of IPFS/libp2p may be required, as well as the infrastructure behind it.

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/[tables, sets, options, sequtils, random, algorithm]
import std/[tables, sets, options, sequtils, strutils, random, algorithm]
import chronos, chronicles, metrics
import ./pubsub,
./floodsub,
@ -166,21 +166,40 @@ type
heartbeatEvents*: seq[AsyncEvent]
when defined(libp2p_expensive_metrics):
declareGauge(libp2p_gossipsub_peers_per_topic_mesh,
"gossipsub peers per topic in mesh",
labels = ["topic"])
MeshMetrics = object
# scratch buffers for metrics
otherPeersPerTopicMesh: int64
otherPeersPerTopicFanout: int64
otherPeersPerTopicGossipsub: int64
underDlowTopics: int64
underDoutTopics: int64
underDhighAboveDlowTopics: int64
noPeersTopics: int64
declareGauge(libp2p_gossipsub_peers_per_topic_fanout,
"gossipsub peers per topic in fanout",
labels = ["topic"])
declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub,
"gossipsub peers per topic in gossipsub",
labels = ["topic"])
# the following 3 metrics are updated only inside rebalanceMesh
# this is the most reliable place and rebalance anyway happens every heartbeat
declareGauge(libp2p_gossipsub_peers_per_topic_mesh,
"gossipsub peers per topic in mesh",
labels = ["topic"])
declareGauge(libp2p_gossipsub_peers_per_topic_fanout,
"gossipsub peers per topic in fanout",
labels = ["topic"])
declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub,
"gossipsub peers per topic in gossipsub",
labels = ["topic"])
declareGauge(libp2p_gossipsub_peers_mesh_sum, "pubsub peers in mesh table summed")
declareGauge(libp2p_gossipsub_peers_gossipsub_sum, "pubsub peers in gossipsub table summed")
declareCounter(libp2p_gossipsub_failed_publish, "number of failed publish")
declareGauge(libp2p_gossipsub_cache_window_size, "the number of messages in the cache")
when defined(libp2p_agents_metrics):
declareGauge(libp2p_gossipsub_peers_scores, "the scores of the peers in gossipsub", labels = ["agent"])
declareGauge(libp2p_gossipsub_under_dlow_topics, "number of topics below dlow")
declareGauge(libp2p_gossipsub_under_dout_topics, "number of topics below dout")
declareGauge(libp2p_gossipsub_under_dhigh_above_dlow_topics, "number of topics below dhigh but above dlow")
declareGauge(libp2p_gossipsub_no_peers_topics, "number of topics without peers available")
declareCounter(libp2p_gossipsub_above_dhigh_condition, "number of above dhigh pruning branches ran", labels = ["topic"])
proc init*(_: type[GossipSubParams]): GossipSubParams =
GossipSubParams(
@ -375,10 +394,6 @@ proc replenishFanout(g: GossipSub, topic: string) =
if g.fanout.peers(topic) == g.parameters.d:
break
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(topic).int64, labelValues = [topic])
trace "fanout replenished with peers", peers = g.fanout.peers(topic)
method onPubSubPeerEvent*(p: GossipSub, peer: PubsubPeer, event: PubSubPeerEvent) {.gcsafe.} =
@ -397,7 +412,16 @@ method onPubSubPeerEvent*(p: GossipSub, peer: PubsubPeer, event: PubSubPeerEvent
procCall FloodSub(p).onPubSubPeerEvent(peer, event)
proc rebalanceMesh(g: GossipSub, topic: string) =
proc commitMetrics(metrics: var MeshMetrics) =
libp2p_gossipsub_under_dlow_topics.set(metrics.underDlowTopics)
libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics)
libp2p_gossipsub_under_dout_topics.set(metrics.underDoutTopics)
libp2p_gossipsub_under_dhigh_above_dlow_topics.set(metrics.underDhighAboveDlowTopics)
libp2p_gossipsub_peers_per_topic_gossipsub.set(metrics.otherPeersPerTopicGossipsub, labelValues = ["other"])
libp2p_gossipsub_peers_per_topic_fanout.set(metrics.otherPeersPerTopicFanout, labelValues = ["other"])
libp2p_gossipsub_peers_per_topic_mesh.set(metrics.otherPeersPerTopicMesh, labelValues = ["other"])
proc rebalanceMesh(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) =
logScope:
topic
mesh = g.mesh.peers(topic)
@ -409,9 +433,12 @@ proc rebalanceMesh(g: GossipSub, topic: string) =
var
prunes, grafts: seq[PubSubPeer]
npeers = g.mesh.peers(topic)
let npeers = g.mesh.peers(topic)
if npeers < g.parameters.dLow:
if not isNil(metrics):
inc metrics[].underDlowTopics
trace "replenishing mesh", peers = npeers
# replenish the mesh if we're below Dlo
var candidates = toSeq(
@ -437,16 +464,24 @@ proc rebalanceMesh(g: GossipSub, topic: string) =
candidates.setLen(min(candidates.len, g.parameters.d - npeers))
trace "grafting", grafting = candidates.len
for peer in candidates:
if g.mesh.addPeer(topic, peer):
g.grafted(peer, topic)
g.fanout.removePeer(topic, peer)
grafts &= peer
if candidates.len == 0:
if not isNil(metrics):
inc metrics[].noPeersTopics
else:
for peer in candidates:
if g.mesh.addPeer(topic, peer):
g.grafted(peer, topic)
g.fanout.removePeer(topic, peer)
grafts &= peer
else:
var meshPeers = toSeq(g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()))
meshPeers.keepIf do (x: PubSubPeer) -> bool: x.outbound
if meshPeers.len < g.parameters.dOut:
if not isNil(metrics):
inc metrics[].underDoutTopics
trace "replenishing mesh outbound quota", peers = g.mesh.peers(topic)
var candidates = toSeq(
@ -482,7 +517,15 @@ proc rebalanceMesh(g: GossipSub, topic: string) =
grafts &= peer
if g.mesh.peers(topic) > g.parameters.dHigh:
# get again npeers after possible grafts
npeers = g.mesh.peers(topic)
if npeers > g.parameters.dHigh:
if not isNil(metrics):
if g.knownTopics.contains(topic):
libp2p_gossipsub_above_dhigh_condition.inc(labelValues = [topic])
else:
libp2p_gossipsub_above_dhigh_condition.inc(labelValues = ["other"])
# prune peers if we've gone over Dhi
prunes = toSeq(g.mesh[topic])
# avoid pruning peers we are currently grafting in this heartbeat
@ -529,6 +572,8 @@ proc rebalanceMesh(g: GossipSub, topic: string) =
trace "pruning peer on rebalance", peer, score = peer.score
g.pruned(peer, topic)
g.mesh.removePeer(topic, peer)
elif npeers > g.parameters.dLow and not isNil(metrics):
inc metrics[].underDhighAboveDlowTopics
# opportunistic grafting, by spec mesh should not be empty...
if g.mesh.peers(topic) > 1:
@ -562,15 +607,18 @@ proc rebalanceMesh(g: GossipSub, topic: string) =
grafts &= peer
trace "opportunistic grafting", peer
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.peers(topic).int64, labelValues = [topic])
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(topic).int64, labelValues = [topic])
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(topic).int64, labelValues = [topic])
if not isNil(metrics):
if g.knownTopics.contains(topic):
libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.peers(topic).int64, labelValues = [topic])
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(topic).int64, labelValues = [topic])
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(topic).int64, labelValues = [topic])
else:
metrics[].otherPeersPerTopicGossipsub += g.gossipsub.peers(topic).int64
metrics[].otherPeersPerTopicFanout += g.fanout.peers(topic).int64
metrics[].otherPeersPerTopicMesh += g.mesh.peers(topic).int64
trace "mesh balanced"
@ -597,14 +645,12 @@ proc dropFanoutPeers(g: GossipSub) =
g.lastFanoutPubSub.del(topic)
trace "dropping fanout topic", topic
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(topic).int64, labelValues = [topic])
proc getGossipPeers(g: GossipSub): Table[PubSubPeer, ControlMessage] {.gcsafe.} =
## gossip iHave messages to peers
##
libp2p_gossipsub_cache_window_size.set(0)
trace "getting gossip peers (iHave)"
let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys))
for topic in topics:
@ -617,17 +663,21 @@ proc getGossipPeers(g: GossipSub): Table[PubSubPeer, ControlMessage] {.gcsafe.}
continue
var midsSeq = toSeq(mids)
libp2p_gossipsub_cache_window_size.inc(midsSeq.len.int64)
# not in spec
# similar to rust: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/behaviour.rs#L2101
# and go https://github.com/libp2p/go-libp2p-pubsub/blob/08c17398fb11b2ab06ca141dddc8ec97272eb772/gossipsub.go#L582
if midsSeq.len > IHaveMaxLength:
shuffle(midsSeq)
midsSeq.setLen(IHaveMaxLength)
let ihave = ControlIHave(topicID: topic, messageIDs: midsSeq)
let mesh = g.mesh.getOrDefault(topic)
let fanout = g.fanout.getOrDefault(topic)
let gossipPeers = mesh + fanout
let
ihave = ControlIHave(topicID: topic, messageIDs: midsSeq)
mesh = g.mesh.getOrDefault(topic)
fanout = g.fanout.getOrDefault(topic)
gossipPeers = mesh + fanout
var allPeers = toSeq(g.gossipsub.getOrDefault(topic))
allPeers.keepIf do (x: PubSubPeer) -> bool:
@ -776,6 +826,27 @@ proc updateScores(g: GossipSub) = # avoid async
assert(g.peerStats[peer.peerId].score == peer.score) # nim sanity check
trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted
when defined(libp2p_agents_metrics):
let agent =
block:
if peer.shortAgent.len > 0:
peer.shortAgent
else:
let connections = peer.connections.filterIt(
not isNil(it.peerInfo) and
it.peerInfo.agentVersion.len > 0
)
if connections.len > 0:
let shortAgent = connections[0].peerInfo.agentVersion.split("/")[0].toLowerAscii()
if KnownLibP2PAgentsSeq.contains(shortAgent):
peer.shortAgent = shortAgent
else:
peer.shortAgent = "unknown"
peer.shortAgent
else:
"unknown"
libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent])
for peer in evicting:
g.peerStats.del(peer)
@ -804,18 +875,14 @@ proc heartbeat(g: GossipSub) {.async.} =
g.updateScores()
var
totalMeshPeers = 0
totalGossipPeers = 0
var meshMetrics = MeshMetrics()
for t in toSeq(g.topics.keys):
# prune every negative score peer
# do this before relance
# in order to avoid grafted -> pruned in the same cycle
let meshPeers = g.mesh.getOrDefault(t)
let gossipPeers = g.gossipsub.getOrDefault(t)
# this will be changed by rebalance but does not matter
totalMeshPeers += meshPeers.len
totalGossipPeers += g.gossipsub.peers(t)
var prunes: seq[PubSubPeer]
for peer in meshPeers:
if peer.score < 0.0:
@ -831,10 +898,11 @@ proc heartbeat(g: GossipSub) {.async.} =
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
g.broadcast(prunes, prune)
g.rebalanceMesh(t)
# pass by ptr in order to both signal we want to update metrics
# and as well update the struct for each topic during this iteration
g.rebalanceMesh(t, addr meshMetrics)
libp2p_gossipsub_peers_mesh_sum.set(totalMeshPeers.int64)
libp2p_gossipsub_peers_gossipsub_sum.set(totalGossipPeers.int64)
commitMetrics(meshMetrics)
g.dropFanoutPeers()
@ -844,10 +912,13 @@ proc heartbeat(g: GossipSub) {.async.} =
let peers = g.getGossipPeers()
for peer, control in peers:
g.peers.withValue(peer.peerId, pubsubPeer):
g.send(
pubsubPeer[],
RPCMsg(control: some(control)))
# only ihave from here
for ihave in control.ihave:
if g.knownTopics.contains(ihave.topicID):
libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicID])
else:
libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"])
g.send(peer, RPCMsg(control: some(control)))
g.mcache.shift() # shift the cache
except CancelledError as exc:
@ -882,27 +953,15 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) =
# also try to remove from explicit table here
g.explicit.removePeer(t, pubSubPeer)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.peers(t).int64, labelValues = [t])
for t in toSeq(g.mesh.keys):
trace "pruning unsubscribing peer", pubSubPeer, score = pubSubPeer.score
g.pruned(pubSubPeer, t)
g.mesh.removePeer(t, pubSubPeer)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(t).int64, labelValues = [t])
for t in toSeq(g.fanout.keys):
g.fanout.removePeer(t, pubSubPeer)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(t).int64, labelValues = [t])
g.peerStats.withValue(pubSubPeer.peerId, stats):
g.peerStats.withValue(peer, stats):
stats[].expire = Moment.now() + g.parameters.retainScore
for topic, info in stats[].topicInfos.mpairs:
info.firstMessageDeliveries = 0
@ -943,16 +1002,6 @@ method subscribeTopic*(g: GossipSub,
if peer.peerId in g.parameters.directPeers:
g.explicit.removePeer(topic, peer)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(topic).int64, labelValues = [topic])
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(topic).int64, labelValues = [topic])
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.peers(topic).int64, labelValues = [topic])
trace "gossip peers", peers = g.gossipsub.peers(topic), topic
proc punishPeer(g: GossipSub, peer: PubSubPeer, topics: seq[string]) =
@ -1043,12 +1092,6 @@ proc handleGraft(g: GossipSub,
trace "peer grafting topic we're not interested in", topic
# gossip 1.1, we do not send a control message prune anymore
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(topic).int64, labelValues = [topic])
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(topic).int64, labelValues = [topic])
proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
for prune in prunes:
trace "peer pruned topic", peer, topic = prune.topicID
@ -1068,10 +1111,6 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
# another option could be to implement signed peer records
## if peer.score > g.parameters.gossipThreshold and prunes.peers.len > 0:
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(prune.topicID).int64, labelValues = [prune.topicID])
proc handleIHave(g: GossipSub,
peer: PubSubPeer,
ihaves: seq[ControlIHave]): ControlIWant =
@ -1219,10 +1258,14 @@ method rpcHandler*(g: GossipSub,
# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
g.broadcast(toSeq(toSendPeers), RPCMsg(messages: @[msg]))
trace "forwared message to peers", peers = toSendPeers.len,
msgId = shortLog(msgId), peer
libp2p_pubsub_messages_rebroadcasted.inc()
let sendingTo = toSeq(toSendPeers)
g.broadcast(sendingTo, RPCMsg(messages: @[msg]))
trace "forwared message to peers", peers = sendingTo.len, msgId, peer
for topic in msg.topicIDs:
if g.knownTopics.contains(topic):
libp2p_pubsub_messages_rebroadcasted.inc(sendingTo.len.int64, labelValues = [topic])
else:
libp2p_pubsub_messages_rebroadcasted.inc(sendingTo.len.int64, labelValues = ["generic"])
if rpcMsg.control.isSome:
let control = rpcMsg.control.get()
@ -1235,6 +1278,20 @@ method rpcHandler*(g: GossipSub,
if respControl.graft.len > 0 or respControl.prune.len > 0 or
respControl.ihave.len > 0 or messages.len > 0:
# iwant and prunes from here, also messages
for smsg in messages:
for topic in smsg.topicIDs:
if g.knownTopics.contains(topic):
libp2p_pubsub_broadcast_messages.inc(labelValues = [topic])
else:
libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"])
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)
for prune in respControl.prune:
if g.knownTopics.contains(prune.topicID):
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicID])
else:
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])
trace "sending control message", msg = shortLog(respControl), peer
g.send(
peer,
@ -1249,7 +1306,8 @@ method subscribe*(g: GossipSub,
if topic in g.fanout:
g.fanout.del(topic)
g.rebalanceMesh(topic)
# rebalance but don't update metrics here, we do that only in the heartbeat
g.rebalanceMesh(topic, metrics = nil)
proc unsubscribe*(g: GossipSub, topic: string) =
var
@ -1341,6 +1399,8 @@ method publish*(g: GossipSub,
if peers.len == 0:
debug "No peers for topic, skipping publish"
# skipping topic as our metrics finds that heavy
libp2p_gossipsub_failed_publish.inc()
return 0
inc g.msgSeqno
@ -1363,13 +1423,12 @@ method publish*(g: GossipSub,
g.mcache.put(msgId, msg)
g.broadcast(toSeq(peers), RPCMsg(messages: @[msg]))
when defined(libp2p_expensive_metrics):
if peers.len > 0:
libp2p_pubsub_messages_published.inc(labelValues = [topic])
let peerSeq = toSeq(peers)
g.broadcast(peerSeq, RPCMsg(messages: @[msg]))
if g.knownTopics.contains(topic):
libp2p_pubsub_messages_published.inc(peerSeq.len.int64, labelValues = [topic])
else:
if peers.len > 0:
libp2p_pubsub_messages_published.inc()
libp2p_pubsub_messages_published.inc(peerSeq.len.int64, labelValues = ["generic"])
trace "Published message to peers"

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/[tables, sequtils, sets]
import std/[tables, sequtils, sets, strutils]
import chronos, chronicles, metrics
import pubsubpeer,
rpc/[message, messages],
@ -16,7 +16,8 @@ import pubsubpeer,
../../stream/connection,
../../peerid,
../../peerinfo,
../../errors
../../errors,
../../utility
import metrics
import stew/results
@ -29,16 +30,42 @@ export protocol
logScope:
topics = "libp2p pubsub"
const
KnownLibP2PTopics* {.strdefine.} = ""
KnownLibP2PTopicsSeq* = KnownLibP2PTopics.toLowerAscii().split(",")
declareGauge(libp2p_pubsub_peers, "pubsub peer instances")
declareGauge(libp2p_pubsub_topics, "pubsub subscribed topics")
declareCounter(libp2p_pubsub_subscriptions, "pubsub subscription operations")
declareCounter(libp2p_pubsub_unsubscriptions, "pubsub unsubscription operations")
declareGauge(libp2p_pubsub_topic_handlers, "pubsub subscribed topics handlers count", labels = ["topic"])
declareCounter(libp2p_pubsub_validation_success, "pubsub successfully validated messages")
declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messages")
declareCounter(libp2p_pubsub_validation_ignore, "pubsub ignore validated messages")
when defined(libp2p_expensive_metrics):
declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"])
else:
declarePublicCounter(libp2p_pubsub_messages_published, "published messages")
declarePublicCounter(libp2p_pubsub_messages_rebroadcasted, "re-broadcasted messages")
declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"])
declarePublicCounter(libp2p_pubsub_messages_rebroadcasted, "re-broadcasted messages", labels = ["topic"])
declarePublicCounter(libp2p_pubsub_broadcast_subscriptions, "pubsub broadcast subscriptions", labels = ["topic"])
declarePublicCounter(libp2p_pubsub_broadcast_unsubscriptions, "pubsub broadcast unsubscriptions", labels = ["topic"])
declarePublicCounter(libp2p_pubsub_broadcast_messages, "pubsub broadcast messages", labels = ["topic"])
declarePublicCounter(libp2p_pubsub_received_subscriptions, "pubsub received subscriptions", labels = ["topic"])
declarePublicCounter(libp2p_pubsub_received_unsubscriptions, "pubsub received subscriptions", labels = ["topic"])
declarePublicCounter(libp2p_pubsub_received_messages, "pubsub received messages", labels = ["topic"])
declarePublicCounter(libp2p_pubsub_broadcast_iwant, "pubsub broadcast iwant")
declarePublicCounter(libp2p_pubsub_broadcast_ihave, "pubsub broadcast ihave", labels = ["topic"])
declarePublicCounter(libp2p_pubsub_broadcast_graft, "pubsub broadcast graft", labels = ["topic"])
declarePublicCounter(libp2p_pubsub_broadcast_prune, "pubsub broadcast prune", labels = ["topic"])
declarePublicCounter(libp2p_pubsub_received_iwant, "pubsub broadcast iwant")
declarePublicCounter(libp2p_pubsub_received_ihave, "pubsub broadcast ihave", labels = ["topic"])
declarePublicCounter(libp2p_pubsub_received_graft, "pubsub broadcast graft", labels = ["topic"])
declarePublicCounter(libp2p_pubsub_received_prune, "pubsub broadcast prune", labels = ["topic"])
type
TopicHandler* = proc(topic: string,
@ -76,6 +103,8 @@ type
msgSeqno*: uint64
anonymize*: bool # if we omit fromPeer and seqno from RPC messages we send
knownTopics*: HashSet[string]
method unsubscribePeer*(p: PubSub, peerId: PeerID) {.base.} =
## handle peer disconnects
##
@ -98,6 +127,46 @@ proc broadcast*(
msg: RPCMsg) = # raises: [Defect]
## Attempt to send `msg` to the given peers
let npeers = sendPeers.len.int64
for sub in msg.subscriptions:
if sub.subscribe:
if p.knownTopics.contains(sub.topic):
libp2p_pubsub_broadcast_subscriptions.inc(npeers, labelValues = [sub.topic])
else:
libp2p_pubsub_broadcast_subscriptions.inc(npeers, labelValues = ["generic"])
else:
if p.knownTopics.contains(sub.topic):
libp2p_pubsub_broadcast_unsubscriptions.inc(npeers, labelValues = [sub.topic])
else:
libp2p_pubsub_broadcast_unsubscriptions.inc(npeers, labelValues = ["generic"])
for smsg in msg.messages:
for topic in smsg.topicIDs:
if p.knownTopics.contains(topic):
libp2p_pubsub_broadcast_messages.inc(npeers, labelValues = [topic])
else:
libp2p_pubsub_broadcast_messages.inc(npeers, labelValues = ["generic"])
if msg.control.isSome():
libp2p_pubsub_broadcast_iwant.inc(npeers * msg.control.get().iwant.len.int64)
let control = msg.control.get()
for ihave in control.ihave:
if p.knownTopics.contains(ihave.topicID):
libp2p_pubsub_broadcast_ihave.inc(npeers, labelValues = [ihave.topicID])
else:
libp2p_pubsub_broadcast_ihave.inc(npeers, labelValues = ["generic"])
for graft in control.graft:
if p.knownTopics.contains(graft.topicID):
libp2p_pubsub_broadcast_graft.inc(npeers, labelValues = [graft.topicID])
else:
libp2p_pubsub_broadcast_graft.inc(npeers, labelValues = ["generic"])
for prune in control.prune:
if p.knownTopics.contains(prune.topicID):
libp2p_pubsub_broadcast_prune.inc(npeers, labelValues = [prune.topicID])
else:
libp2p_pubsub_broadcast_prune.inc(npeers, labelValues = ["generic"])
trace "broadcasting messages to peers",
peers = sendPeers.len, msg = shortLog(msg)
for peer in sendPeers:
@ -109,6 +178,17 @@ proc sendSubs*(p: PubSub,
subscribe: bool) =
## send subscriptions to remote peer
p.send(peer, RPCMsg.withSubs(topics, subscribe))
for topic in topics:
if subscribe:
if p.knownTopics.contains(topic):
libp2p_pubsub_broadcast_subscriptions.inc(labelValues = [topic])
else:
libp2p_pubsub_broadcast_subscriptions.inc(labelValues = ["generic"])
else:
if p.knownTopics.contains(topic):
libp2p_pubsub_broadcast_unsubscriptions.inc(labelValues = [topic])
else:
libp2p_pubsub_broadcast_unsubscriptions.inc(labelValues = ["generic"])
method subscribeTopic*(p: PubSub,
topic: string,
@ -126,6 +206,45 @@ method rpcHandler*(p: PubSub,
trace "about to subscribe to topic", topicId = s.topic, peer
p.subscribeTopic(s.topic, s.subscribe, peer)
for sub in rpcMsg.subscriptions:
if sub.subscribe:
if p.knownTopics.contains(sub.topic):
libp2p_pubsub_received_subscriptions.inc(labelValues = [sub.topic])
else:
libp2p_pubsub_received_subscriptions.inc(labelValues = ["generic"])
else:
if p.knownTopics.contains(sub.topic):
libp2p_pubsub_received_unsubscriptions.inc(labelValues = [sub.topic])
else:
libp2p_pubsub_received_unsubscriptions.inc(labelValues = ["generic"])
for smsg in rpcMsg.messages:
for topic in smsg.topicIDs:
if p.knownTopics.contains(topic):
libp2p_pubsub_received_messages.inc(labelValues = [topic])
else:
libp2p_pubsub_received_messages.inc(labelValues = ["generic"])
if rpcMsg.control.isSome():
libp2p_pubsub_received_iwant.inc(rpcMsg.control.get().iwant.len.int64)
let control = rpcMsg.control.get()
for ihave in control.ihave:
if p.knownTopics.contains(ihave.topicID):
libp2p_pubsub_received_ihave.inc(labelValues = [ihave.topicID])
else:
libp2p_pubsub_received_ihave.inc(labelValues = ["generic"])
for graft in control.graft:
if p.knownTopics.contains(graft.topicID):
libp2p_pubsub_received_graft.inc(labelValues = [graft.topicID])
else:
libp2p_pubsub_received_graft.inc(labelValues = ["generic"])
for prune in control.prune:
if p.knownTopics.contains(prune.topicID):
libp2p_pubsub_received_prune.inc(labelValues = [prune.topicID])
else:
libp2p_pubsub_received_prune.inc(labelValues = ["generic"])
method onNewPeer(p: PubSub, peer: PubSubPeer) {.base.} = discard
method onPubSubPeerEvent*(p: PubSub, peer: PubsubPeer, event: PubsubPeerEvent) {.base, gcsafe.} =
@ -171,7 +290,7 @@ proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] {.asyn
# gather all futures without yielding to scheduler
var futs = p.topics[topic].handler.mapIt(it(topic, data))
try:
futs = await allFinished(futs)
except CancelledError:
@ -179,7 +298,7 @@ proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] {.asyn
for fut in futs:
if not(fut.finished):
fut.cancel()
# check for errors in futures
for fut in futs:
if fut.failed:
@ -230,6 +349,17 @@ method subscribePeer*(p: PubSub, peer: PeerID) {.base.} =
let peer = p.getOrCreatePeer(peer, p.codecs)
peer.outbound = true # flag as outbound
proc updateTopicMetrics(p: PubSub, topic: string) =
# metrics
libp2p_pubsub_topics.set(p.topics.len.int64)
if p.knownTopics.contains(topic):
libp2p_pubsub_topic_handlers.set(p.topics[topic].handler.len.int64, labelValues = [topic])
else:
libp2p_pubsub_topic_handlers.set(0, labelValues = ["other"])
for key, val in p.topics:
if not p.knownTopics.contains(key):
libp2p_pubsub_topic_handlers.inc(val.handler.len.int64, labelValues = ["other"])
method unsubscribe*(p: PubSub,
topics: seq[TopicPair]) {.base.} =
## unsubscribe from a list of ``topic`` strings
@ -246,7 +376,9 @@ method unsubscribe*(p: PubSub,
# no more handlers are left
p.topics.del(ttopic)
libp2p_pubsub_topics.set(p.topics.len.int64)
p.updateTopicMetrics(ttopic)
libp2p_pubsub_unsubscriptions.inc()
proc unsubscribe*(p: PubSub,
topic: string,
@ -256,8 +388,14 @@ proc unsubscribe*(p: PubSub,
p.unsubscribe(@[(topic, handler)])
method unsubscribeAll*(p: PubSub, topic: string) {.base.} =
p.topics.del(topic)
libp2p_pubsub_topics.set(p.topics.len.int64)
if topic notin p.topics:
debug "unsubscribeAll called for an unknown topic", topic
else:
p.topics.del(topic)
p.updateTopicMetrics(topic)
libp2p_pubsub_unsubscriptions.inc()
method subscribe*(p: PubSub,
topic: string,
@ -279,8 +417,9 @@ method subscribe*(p: PubSub,
for _, peer in p.peers:
p.sendSubs(peer, @[topic], true)
# metrics
libp2p_pubsub_topics.set(p.topics.len.int64)
p.updateTopicMetrics(topic)
libp2p_pubsub_subscriptions.inc()
method publish*(p: PubSub,
topic: string,
@ -397,6 +536,8 @@ proc init*[PubParams: object | bool](
switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined)
switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left)
pubsub.knownTopics = KnownLibP2PTopicsSeq.toHashSet()
pubsub.initPubSub()
return pubsub

View File

@ -59,6 +59,9 @@ type
outbound*: bool # if this is an outbound connection
appScore*: float64 # application specific score
behaviourPenalty*: float64 # the eventual penalty score
when defined(libp2p_agents_metrics):
shortAgent*: string
RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] {.gcsafe.}

View File

@ -19,12 +19,21 @@ import messages,
logScope:
topics = "pubsubprotobuf"
when defined(libp2p_protobuf_metrics):
import metrics
declareCounter(libp2p_pubsub_rpc_bytes_read, "pubsub rpc bytes read", labels = ["kind"])
declareCounter(libp2p_pubsub_rpc_bytes_write, "pubsub rpc bytes write", labels = ["kind"])
proc write*(pb: var ProtoBuffer, field: int, graft: ControlGraft) =
var ipb = initProtoBuffer()
ipb.write(1, graft.topicID)
ipb.finish()
pb.write(field, ipb)
when defined(libp2p_protobuf_metrics):
libp2p_pubsub_rpc_bytes_write.inc(ipb.getLen().int64, labelValues = ["graft"])
proc write*(pb: var ProtoBuffer, field: int, infoMsg: PeerInfoMsg) =
var ipb = initProtoBuffer()
ipb.write(1, infoMsg.peerID)
@ -41,6 +50,9 @@ proc write*(pb: var ProtoBuffer, field: int, prune: ControlPrune) =
ipb.finish()
pb.write(field, ipb)
when defined(libp2p_protobuf_metrics):
libp2p_pubsub_rpc_bytes_write.inc(ipb.getLen().int64, labelValues = ["prune"])
proc write*(pb: var ProtoBuffer, field: int, ihave: ControlIHave) =
var ipb = initProtoBuffer()
ipb.write(1, ihave.topicID)
@ -49,6 +61,9 @@ proc write*(pb: var ProtoBuffer, field: int, ihave: ControlIHave) =
ipb.finish()
pb.write(field, ipb)
when defined(libp2p_protobuf_metrics):
libp2p_pubsub_rpc_bytes_write.inc(ipb.getLen().int64, labelValues = ["ihave"])
proc write*(pb: var ProtoBuffer, field: int, iwant: ControlIWant) =
var ipb = initProtoBuffer()
for mid in iwant.messageIDs:
@ -57,6 +72,9 @@ proc write*(pb: var ProtoBuffer, field: int, iwant: ControlIWant) =
ipb.finish()
pb.write(field, ipb)
when defined(libp2p_protobuf_metrics):
libp2p_pubsub_rpc_bytes_write.inc(ipb.getLen().int64, labelValues = ["iwant"])
proc write*(pb: var ProtoBuffer, field: int, control: ControlMessage) =
var ipb = initProtoBuffer()
for ihave in control.ihave:
@ -78,6 +96,9 @@ proc write*(pb: var ProtoBuffer, field: int, subs: SubOpts) =
ipb.finish()
pb.write(field, ipb)
when defined(libp2p_protobuf_metrics):
libp2p_pubsub_rpc_bytes_write.inc(ipb.getLen().int64, labelValues = ["subs"])
proc encodeMessage*(msg: Message, anonymize: bool): seq[byte] =
var pb = initProtoBuffer()
if len(msg.fromPeer) > 0 and not anonymize:
@ -92,6 +113,10 @@ proc encodeMessage*(msg: Message, anonymize: bool): seq[byte] =
if len(msg.key) > 0 and not anonymize:
pb.write(6, msg.key)
pb.finish()
when defined(libp2p_protobuf_metrics):
libp2p_pubsub_rpc_bytes_write.inc(pb.getLen().int64, labelValues = ["message"])
pb.buffer
proc write*(pb: var ProtoBuffer, field: int, msg: Message, anonymize: bool) =
@ -99,6 +124,9 @@ proc write*(pb: var ProtoBuffer, field: int, msg: Message, anonymize: bool) =
proc decodeGraft*(pb: ProtoBuffer): ProtoResult[ControlGraft] {.
inline.} =
when defined(libp2p_protobuf_metrics):
libp2p_pubsub_rpc_bytes_read.inc(pb.getLen().int64, labelValues = ["graft"])
trace "decodeGraft: decoding message"
var control = ControlGraft()
if ? pb.getField(1, control.topicId):
@ -123,6 +151,9 @@ proc decodePeerInfoMsg*(pb: ProtoBuffer): ProtoResult[PeerInfoMsg] {.
proc decodePrune*(pb: ProtoBuffer): ProtoResult[ControlPrune] {.
inline.} =
when defined(libp2p_protobuf_metrics):
libp2p_pubsub_rpc_bytes_read.inc(pb.getLen().int64, labelValues = ["prune"])
trace "decodePrune: decoding message"
var control = ControlPrune()
if ? pb.getField(1, control.topicId):
@ -139,6 +170,9 @@ proc decodePrune*(pb: ProtoBuffer): ProtoResult[ControlPrune] {.
proc decodeIHave*(pb: ProtoBuffer): ProtoResult[ControlIHave] {.
inline.} =
when defined(libp2p_protobuf_metrics):
libp2p_pubsub_rpc_bytes_read.inc(pb.getLen().int64, labelValues = ["ihave"])
trace "decodeIHave: decoding message"
var control = ControlIHave()
if ? pb.getField(1, control.topicId):
@ -152,6 +186,9 @@ proc decodeIHave*(pb: ProtoBuffer): ProtoResult[ControlIHave] {.
ok(control)
proc decodeIWant*(pb: ProtoBuffer): ProtoResult[ControlIWant] {.inline.} =
when defined(libp2p_protobuf_metrics):
libp2p_pubsub_rpc_bytes_read.inc(pb.getLen().int64, labelValues = ["iwant"])
trace "decodeIWant: decoding message"
var control = ControlIWant()
if ? pb.getRepeatedField(1, control.messageIDs):
@ -192,6 +229,9 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.
ok(none[ControlMessage]())
proc decodeSubscription*(pb: ProtoBuffer): ProtoResult[SubOpts] {.inline.} =
when defined(libp2p_protobuf_metrics):
libp2p_pubsub_rpc_bytes_read.inc(pb.getLen().int64, labelValues = ["subs"])
trace "decodeSubscription: decoding message"
var subflag: uint64
var sub = SubOpts()
@ -221,6 +261,9 @@ proc decodeSubscriptions*(pb: ProtoBuffer): ProtoResult[seq[SubOpts]] {.
ok(subs)
proc decodeMessage*(pb: ProtoBuffer): ProtoResult[Message] {.inline.} =
when defined(libp2p_protobuf_metrics):
libp2p_pubsub_rpc_bytes_read.inc(pb.getLen().int64, labelValues = ["message"])
trace "decodeMessage: decoding message"
var msg: Message
if ? pb.getField(1, msg.fromPeer):

View File

@ -7,9 +7,10 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/[oids, strformat]
import chronos, chronicles
import std/[oids, strformat, strutils]
import chronos, chronicles, metrics
import connection
import ../utility
logScope:
topics = "libp2p chronosstream"
@ -21,6 +22,14 @@ const
type
ChronosStream* = ref object of Connection
client: StreamTransport
when defined(libp2p_agents_metrics):
tracked: bool
shortAgent: string
when defined(libp2p_agents_metrics):
declareGauge(libp2p_peers_identity, "peers identities", labels = ["agent"])
declareCounter(libp2p_peers_traffic_read, "incoming traffic", labels = ["agent"])
declareCounter(libp2p_peers_traffic_write, "outgoing traffic", labels = ["agent"])
func shortLog*(conn: ChronosStream): string =
if conn.isNil: "ChronosStream(nil)"
@ -65,6 +74,24 @@ template withExceptions(body: untyped) =
# TODO https://github.com/status-im/nim-chronos/pull/99
raise newLPStreamEOFError()
when defined(libp2p_agents_metrics):
proc trackPeerIdentity(s: ChronosStream) =
if not s.tracked:
if not isNil(s.peerInfo) and s.peerInfo.agentVersion.len > 0:
# / seems a weak "standard" so for now it's reliable
let shortAgent = s.peerInfo.agentVersion.split("/")[0].toLowerAscii()
if KnownLibP2PAgentsSeq.contains(shortAgent):
s.shortAgent = shortAgent
else:
s.shortAgent = "unknown"
libp2p_peers_identity.inc(labelValues = [s.shortAgent])
s.tracked = true
proc untrackPeerIdentity(s: ChronosStream) =
if s.tracked:
libp2p_peers_identity.dec(labelValues = [s.shortAgent])
s.tracked = false
method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async.} =
if s.atEof:
raise newLPStreamEOFError()
@ -72,6 +99,10 @@ method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.
withExceptions:
result = await s.client.readOnce(pbytes, nbytes)
s.activity = true # reset activity flag
when defined(libp2p_agents_metrics):
s.trackPeerIdentity()
if s.tracked:
libp2p_peers_traffic_read.inc(nbytes.int64, labelValues = [s.shortAgent])
method write*(s: ChronosStream, msg: seq[byte]) {.async.} =
if s.closed:
@ -90,6 +121,10 @@ method write*(s: ChronosStream, msg: seq[byte]) {.async.} =
raise (ref LPStreamClosedError)(msg: "Write couldn't finish writing")
s.activity = true # reset activity flag
when defined(libp2p_agents_metrics):
s.trackPeerIdentity()
if s.tracked:
libp2p_peers_traffic_write.inc(msg.len.int64, labelValues = [s.shortAgent])
method closed*(s: ChronosStream): bool {.inline.} =
result = s.client.closed
@ -99,17 +134,20 @@ method atEof*(s: ChronosStream): bool {.inline.} =
method closeImpl*(s: ChronosStream) {.async.} =
try:
trace "Shutting down chronos stream", address = $s.client.remoteAddress(),
s
trace "Shutting down chronos stream", address = $s.client.remoteAddress(), s
if not s.client.closed():
await s.client.closeWait()
trace "Shutdown chronos stream", address = $s.client.remoteAddress(),
s
trace "Shutdown chronos stream", address = $s.client.remoteAddress(), s
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "Error closing chronosstream", s, msg = exc.msg
when defined(libp2p_agents_metrics):
# do this after closing!
s.untrackPeerIdentity()
await procCall Connection(s).closeImpl()

View File

@ -8,6 +8,7 @@
## those terms.
import stew/byteutils
import strutils
const
ShortDumpMax = 12
@ -35,3 +36,8 @@ func shortLog*(item: string): string =
result &= item[0..<split]
result &= "..."
result &= item[(item.len - split)..item.high]
when defined(libp2p_agents_metrics):
const
KnownLibP2PAgents* {.strdefine.} = ""
KnownLibP2PAgentsSeq* = KnownLibP2PAgents.toLowerAscii().split(",")