mirror of https://github.com/vacp2p/nim-libp2p.git
add metrics into chronosstream to identify peers agents (#458)
* add metrics into chronosstream to identify peers agents * avoid too many agent strings * use gauge instead of counter for stream metrics * filter identity on / * also track bytes traffic * fix identity tracking closeimpl call * add gossip rpc metrics * fix missing metrics inclusions * metrics fixes and additions * add a KnownLibP2PAgents strdefine * enforse toLowerAscii to agent names (metrics) * incoming rpc metrics * fix silly mistake in rpc metrics * fix agent metrics logic * libp2p_gossipsub_failed_publish metric * message ids metrics * libp2p_pubsub_broadcast_ihave metric improvement * refactor expensive gossip metrics * more detailed metrics * metrics improvements * remove generic metrics for `set` users * small fixes, add debug counters * fix counter and add missing subs metrics! * agent metrics behind -d:libp2p_agents_metrics * remove testing related code from this PR * small rebroadcast metric fix * fix small mistake * add some guide to the readme in order to use new metrics * add libp2p_gossipsub_peers_scores metric * add protobuf metrics to understand bytes traffic precisely * refactor gossipsub metrics * remove unused variable * add more metrics, refactor rebalance metrics * avoid bad metric concurrent states * use a stack structure for gossip mesh metrics * refine sub metrics * add received subs metrics fixes * measure handlers of known topics * sub/unsub counter * unsubscribeAll log unknown topics * expose a way to specify known topics at runtime
This commit is contained in:
parent
8e57746f3a
commit
b902c030a0
14
README.md
14
README.md
|
@ -151,12 +151,24 @@ Packages that exist in the original libp2p specs and are under active developmen
|
|||
|
||||
### Tips and tricks
|
||||
|
||||
- enable expensive metrics:
|
||||
#### enable expensive metrics:
|
||||
|
||||
```bash
|
||||
nim c -d:libp2p_expensive_metrics some_file.nim
|
||||
```
|
||||
|
||||
#### use identify metrics
|
||||
|
||||
```bash
|
||||
nim c -d:libp2p_agents_metrics -d:KnownLibP2PAgents=nimbus,lighthouse,prysm,teku some_file.nim
|
||||
```
|
||||
|
||||
### specify gossipsub specific topics to measure
|
||||
|
||||
```bash
|
||||
nim c -d:KnownLibP2PTopics=topic1,topic2,topic3 some_file.nim
|
||||
```
|
||||
|
||||
## Contribute
|
||||
The libp2p implementation in Nim is a work in progress. We welcome contributors to help out! Specifically, you can:
|
||||
- Go through the modules and **check out existing issues**. This would be especially useful for modules in active development. Some knowledge of IPFS/libp2p may be required, as well as the infrastructure behind it.
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
import std/[tables, sets, options, sequtils, random, algorithm]
|
||||
import std/[tables, sets, options, sequtils, strutils, random, algorithm]
|
||||
import chronos, chronicles, metrics
|
||||
import ./pubsub,
|
||||
./floodsub,
|
||||
|
@ -166,21 +166,40 @@ type
|
|||
|
||||
heartbeatEvents*: seq[AsyncEvent]
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
declareGauge(libp2p_gossipsub_peers_per_topic_mesh,
|
||||
"gossipsub peers per topic in mesh",
|
||||
labels = ["topic"])
|
||||
MeshMetrics = object
|
||||
# scratch buffers for metrics
|
||||
otherPeersPerTopicMesh: int64
|
||||
otherPeersPerTopicFanout: int64
|
||||
otherPeersPerTopicGossipsub: int64
|
||||
underDlowTopics: int64
|
||||
underDoutTopics: int64
|
||||
underDhighAboveDlowTopics: int64
|
||||
noPeersTopics: int64
|
||||
|
||||
declareGauge(libp2p_gossipsub_peers_per_topic_fanout,
|
||||
"gossipsub peers per topic in fanout",
|
||||
labels = ["topic"])
|
||||
|
||||
declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub,
|
||||
"gossipsub peers per topic in gossipsub",
|
||||
labels = ["topic"])
|
||||
# the following 3 metrics are updated only inside rebalanceMesh
|
||||
# this is the most reliable place and rebalance anyway happens every heartbeat
|
||||
declareGauge(libp2p_gossipsub_peers_per_topic_mesh,
|
||||
"gossipsub peers per topic in mesh",
|
||||
labels = ["topic"])
|
||||
declareGauge(libp2p_gossipsub_peers_per_topic_fanout,
|
||||
"gossipsub peers per topic in fanout",
|
||||
labels = ["topic"])
|
||||
declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub,
|
||||
"gossipsub peers per topic in gossipsub",
|
||||
labels = ["topic"])
|
||||
|
||||
declareGauge(libp2p_gossipsub_peers_mesh_sum, "pubsub peers in mesh table summed")
|
||||
declareGauge(libp2p_gossipsub_peers_gossipsub_sum, "pubsub peers in gossipsub table summed")
|
||||
declareCounter(libp2p_gossipsub_failed_publish, "number of failed publish")
|
||||
declareGauge(libp2p_gossipsub_cache_window_size, "the number of messages in the cache")
|
||||
when defined(libp2p_agents_metrics):
|
||||
declareGauge(libp2p_gossipsub_peers_scores, "the scores of the peers in gossipsub", labels = ["agent"])
|
||||
|
||||
declareGauge(libp2p_gossipsub_under_dlow_topics, "number of topics below dlow")
|
||||
declareGauge(libp2p_gossipsub_under_dout_topics, "number of topics below dout")
|
||||
declareGauge(libp2p_gossipsub_under_dhigh_above_dlow_topics, "number of topics below dhigh but above dlow")
|
||||
declareGauge(libp2p_gossipsub_no_peers_topics, "number of topics without peers available")
|
||||
|
||||
declareCounter(libp2p_gossipsub_above_dhigh_condition, "number of above dhigh pruning branches ran", labels = ["topic"])
|
||||
|
||||
proc init*(_: type[GossipSubParams]): GossipSubParams =
|
||||
GossipSubParams(
|
||||
|
@ -375,10 +394,6 @@ proc replenishFanout(g: GossipSub, topic: string) =
|
|||
if g.fanout.peers(topic) == g.parameters.d:
|
||||
break
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_peers_per_topic_fanout
|
||||
.set(g.fanout.peers(topic).int64, labelValues = [topic])
|
||||
|
||||
trace "fanout replenished with peers", peers = g.fanout.peers(topic)
|
||||
|
||||
method onPubSubPeerEvent*(p: GossipSub, peer: PubsubPeer, event: PubSubPeerEvent) {.gcsafe.} =
|
||||
|
@ -397,7 +412,16 @@ method onPubSubPeerEvent*(p: GossipSub, peer: PubsubPeer, event: PubSubPeerEvent
|
|||
|
||||
procCall FloodSub(p).onPubSubPeerEvent(peer, event)
|
||||
|
||||
proc rebalanceMesh(g: GossipSub, topic: string) =
|
||||
proc commitMetrics(metrics: var MeshMetrics) =
|
||||
libp2p_gossipsub_under_dlow_topics.set(metrics.underDlowTopics)
|
||||
libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics)
|
||||
libp2p_gossipsub_under_dout_topics.set(metrics.underDoutTopics)
|
||||
libp2p_gossipsub_under_dhigh_above_dlow_topics.set(metrics.underDhighAboveDlowTopics)
|
||||
libp2p_gossipsub_peers_per_topic_gossipsub.set(metrics.otherPeersPerTopicGossipsub, labelValues = ["other"])
|
||||
libp2p_gossipsub_peers_per_topic_fanout.set(metrics.otherPeersPerTopicFanout, labelValues = ["other"])
|
||||
libp2p_gossipsub_peers_per_topic_mesh.set(metrics.otherPeersPerTopicMesh, labelValues = ["other"])
|
||||
|
||||
proc rebalanceMesh(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) =
|
||||
logScope:
|
||||
topic
|
||||
mesh = g.mesh.peers(topic)
|
||||
|
@ -409,9 +433,12 @@ proc rebalanceMesh(g: GossipSub, topic: string) =
|
|||
|
||||
var
|
||||
prunes, grafts: seq[PubSubPeer]
|
||||
npeers = g.mesh.peers(topic)
|
||||
|
||||
let npeers = g.mesh.peers(topic)
|
||||
if npeers < g.parameters.dLow:
|
||||
if not isNil(metrics):
|
||||
inc metrics[].underDlowTopics
|
||||
|
||||
trace "replenishing mesh", peers = npeers
|
||||
# replenish the mesh if we're below Dlo
|
||||
var candidates = toSeq(
|
||||
|
@ -437,16 +464,24 @@ proc rebalanceMesh(g: GossipSub, topic: string) =
|
|||
candidates.setLen(min(candidates.len, g.parameters.d - npeers))
|
||||
|
||||
trace "grafting", grafting = candidates.len
|
||||
for peer in candidates:
|
||||
if g.mesh.addPeer(topic, peer):
|
||||
g.grafted(peer, topic)
|
||||
g.fanout.removePeer(topic, peer)
|
||||
grafts &= peer
|
||||
|
||||
if candidates.len == 0:
|
||||
if not isNil(metrics):
|
||||
inc metrics[].noPeersTopics
|
||||
else:
|
||||
for peer in candidates:
|
||||
if g.mesh.addPeer(topic, peer):
|
||||
g.grafted(peer, topic)
|
||||
g.fanout.removePeer(topic, peer)
|
||||
grafts &= peer
|
||||
|
||||
else:
|
||||
var meshPeers = toSeq(g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()))
|
||||
meshPeers.keepIf do (x: PubSubPeer) -> bool: x.outbound
|
||||
if meshPeers.len < g.parameters.dOut:
|
||||
if not isNil(metrics):
|
||||
inc metrics[].underDoutTopics
|
||||
|
||||
trace "replenishing mesh outbound quota", peers = g.mesh.peers(topic)
|
||||
|
||||
var candidates = toSeq(
|
||||
|
@ -482,7 +517,15 @@ proc rebalanceMesh(g: GossipSub, topic: string) =
|
|||
grafts &= peer
|
||||
|
||||
|
||||
if g.mesh.peers(topic) > g.parameters.dHigh:
|
||||
# get again npeers after possible grafts
|
||||
npeers = g.mesh.peers(topic)
|
||||
if npeers > g.parameters.dHigh:
|
||||
if not isNil(metrics):
|
||||
if g.knownTopics.contains(topic):
|
||||
libp2p_gossipsub_above_dhigh_condition.inc(labelValues = [topic])
|
||||
else:
|
||||
libp2p_gossipsub_above_dhigh_condition.inc(labelValues = ["other"])
|
||||
|
||||
# prune peers if we've gone over Dhi
|
||||
prunes = toSeq(g.mesh[topic])
|
||||
# avoid pruning peers we are currently grafting in this heartbeat
|
||||
|
@ -529,6 +572,8 @@ proc rebalanceMesh(g: GossipSub, topic: string) =
|
|||
trace "pruning peer on rebalance", peer, score = peer.score
|
||||
g.pruned(peer, topic)
|
||||
g.mesh.removePeer(topic, peer)
|
||||
elif npeers > g.parameters.dLow and not isNil(metrics):
|
||||
inc metrics[].underDhighAboveDlowTopics
|
||||
|
||||
# opportunistic grafting, by spec mesh should not be empty...
|
||||
if g.mesh.peers(topic) > 1:
|
||||
|
@ -562,15 +607,18 @@ proc rebalanceMesh(g: GossipSub, topic: string) =
|
|||
grafts &= peer
|
||||
trace "opportunistic grafting", peer
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_peers_per_topic_gossipsub
|
||||
.set(g.gossipsub.peers(topic).int64, labelValues = [topic])
|
||||
|
||||
libp2p_gossipsub_peers_per_topic_fanout
|
||||
.set(g.fanout.peers(topic).int64, labelValues = [topic])
|
||||
|
||||
libp2p_gossipsub_peers_per_topic_mesh
|
||||
.set(g.mesh.peers(topic).int64, labelValues = [topic])
|
||||
if not isNil(metrics):
|
||||
if g.knownTopics.contains(topic):
|
||||
libp2p_gossipsub_peers_per_topic_gossipsub
|
||||
.set(g.gossipsub.peers(topic).int64, labelValues = [topic])
|
||||
libp2p_gossipsub_peers_per_topic_fanout
|
||||
.set(g.fanout.peers(topic).int64, labelValues = [topic])
|
||||
libp2p_gossipsub_peers_per_topic_mesh
|
||||
.set(g.mesh.peers(topic).int64, labelValues = [topic])
|
||||
else:
|
||||
metrics[].otherPeersPerTopicGossipsub += g.gossipsub.peers(topic).int64
|
||||
metrics[].otherPeersPerTopicFanout += g.fanout.peers(topic).int64
|
||||
metrics[].otherPeersPerTopicMesh += g.mesh.peers(topic).int64
|
||||
|
||||
trace "mesh balanced"
|
||||
|
||||
|
@ -597,14 +645,12 @@ proc dropFanoutPeers(g: GossipSub) =
|
|||
g.lastFanoutPubSub.del(topic)
|
||||
trace "dropping fanout topic", topic
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_peers_per_topic_fanout
|
||||
.set(g.fanout.peers(topic).int64, labelValues = [topic])
|
||||
|
||||
proc getGossipPeers(g: GossipSub): Table[PubSubPeer, ControlMessage] {.gcsafe.} =
|
||||
## gossip iHave messages to peers
|
||||
##
|
||||
|
||||
libp2p_gossipsub_cache_window_size.set(0)
|
||||
|
||||
trace "getting gossip peers (iHave)"
|
||||
let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys))
|
||||
for topic in topics:
|
||||
|
@ -617,17 +663,21 @@ proc getGossipPeers(g: GossipSub): Table[PubSubPeer, ControlMessage] {.gcsafe.}
|
|||
continue
|
||||
|
||||
var midsSeq = toSeq(mids)
|
||||
|
||||
libp2p_gossipsub_cache_window_size.inc(midsSeq.len.int64)
|
||||
|
||||
# not in spec
|
||||
# similar to rust: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/behaviour.rs#L2101
|
||||
# and go https://github.com/libp2p/go-libp2p-pubsub/blob/08c17398fb11b2ab06ca141dddc8ec97272eb772/gossipsub.go#L582
|
||||
if midsSeq.len > IHaveMaxLength:
|
||||
shuffle(midsSeq)
|
||||
midsSeq.setLen(IHaveMaxLength)
|
||||
let ihave = ControlIHave(topicID: topic, messageIDs: midsSeq)
|
||||
|
||||
let mesh = g.mesh.getOrDefault(topic)
|
||||
let fanout = g.fanout.getOrDefault(topic)
|
||||
let gossipPeers = mesh + fanout
|
||||
let
|
||||
ihave = ControlIHave(topicID: topic, messageIDs: midsSeq)
|
||||
mesh = g.mesh.getOrDefault(topic)
|
||||
fanout = g.fanout.getOrDefault(topic)
|
||||
gossipPeers = mesh + fanout
|
||||
var allPeers = toSeq(g.gossipsub.getOrDefault(topic))
|
||||
|
||||
allPeers.keepIf do (x: PubSubPeer) -> bool:
|
||||
|
@ -776,6 +826,27 @@ proc updateScores(g: GossipSub) = # avoid async
|
|||
assert(g.peerStats[peer.peerId].score == peer.score) # nim sanity check
|
||||
trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted
|
||||
|
||||
when defined(libp2p_agents_metrics):
|
||||
let agent =
|
||||
block:
|
||||
if peer.shortAgent.len > 0:
|
||||
peer.shortAgent
|
||||
else:
|
||||
let connections = peer.connections.filterIt(
|
||||
not isNil(it.peerInfo) and
|
||||
it.peerInfo.agentVersion.len > 0
|
||||
)
|
||||
if connections.len > 0:
|
||||
let shortAgent = connections[0].peerInfo.agentVersion.split("/")[0].toLowerAscii()
|
||||
if KnownLibP2PAgentsSeq.contains(shortAgent):
|
||||
peer.shortAgent = shortAgent
|
||||
else:
|
||||
peer.shortAgent = "unknown"
|
||||
peer.shortAgent
|
||||
else:
|
||||
"unknown"
|
||||
libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent])
|
||||
|
||||
for peer in evicting:
|
||||
g.peerStats.del(peer)
|
||||
|
||||
|
@ -804,18 +875,14 @@ proc heartbeat(g: GossipSub) {.async.} =
|
|||
|
||||
g.updateScores()
|
||||
|
||||
var
|
||||
totalMeshPeers = 0
|
||||
totalGossipPeers = 0
|
||||
var meshMetrics = MeshMetrics()
|
||||
|
||||
for t in toSeq(g.topics.keys):
|
||||
# prune every negative score peer
|
||||
# do this before relance
|
||||
# in order to avoid grafted -> pruned in the same cycle
|
||||
let meshPeers = g.mesh.getOrDefault(t)
|
||||
let gossipPeers = g.gossipsub.getOrDefault(t)
|
||||
# this will be changed by rebalance but does not matter
|
||||
totalMeshPeers += meshPeers.len
|
||||
totalGossipPeers += g.gossipsub.peers(t)
|
||||
var prunes: seq[PubSubPeer]
|
||||
for peer in meshPeers:
|
||||
if peer.score < 0.0:
|
||||
|
@ -831,10 +898,11 @@ proc heartbeat(g: GossipSub) {.async.} =
|
|||
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
|
||||
g.broadcast(prunes, prune)
|
||||
|
||||
g.rebalanceMesh(t)
|
||||
# pass by ptr in order to both signal we want to update metrics
|
||||
# and as well update the struct for each topic during this iteration
|
||||
g.rebalanceMesh(t, addr meshMetrics)
|
||||
|
||||
libp2p_gossipsub_peers_mesh_sum.set(totalMeshPeers.int64)
|
||||
libp2p_gossipsub_peers_gossipsub_sum.set(totalGossipPeers.int64)
|
||||
commitMetrics(meshMetrics)
|
||||
|
||||
g.dropFanoutPeers()
|
||||
|
||||
|
@ -844,10 +912,13 @@ proc heartbeat(g: GossipSub) {.async.} =
|
|||
|
||||
let peers = g.getGossipPeers()
|
||||
for peer, control in peers:
|
||||
g.peers.withValue(peer.peerId, pubsubPeer):
|
||||
g.send(
|
||||
pubsubPeer[],
|
||||
RPCMsg(control: some(control)))
|
||||
# only ihave from here
|
||||
for ihave in control.ihave:
|
||||
if g.knownTopics.contains(ihave.topicID):
|
||||
libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicID])
|
||||
else:
|
||||
libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"])
|
||||
g.send(peer, RPCMsg(control: some(control)))
|
||||
|
||||
g.mcache.shift() # shift the cache
|
||||
except CancelledError as exc:
|
||||
|
@ -882,27 +953,15 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) =
|
|||
# also try to remove from explicit table here
|
||||
g.explicit.removePeer(t, pubSubPeer)
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_peers_per_topic_gossipsub
|
||||
.set(g.gossipsub.peers(t).int64, labelValues = [t])
|
||||
|
||||
for t in toSeq(g.mesh.keys):
|
||||
trace "pruning unsubscribing peer", pubSubPeer, score = pubSubPeer.score
|
||||
g.pruned(pubSubPeer, t)
|
||||
g.mesh.removePeer(t, pubSubPeer)
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_peers_per_topic_mesh
|
||||
.set(g.mesh.peers(t).int64, labelValues = [t])
|
||||
|
||||
for t in toSeq(g.fanout.keys):
|
||||
g.fanout.removePeer(t, pubSubPeer)
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_peers_per_topic_fanout
|
||||
.set(g.fanout.peers(t).int64, labelValues = [t])
|
||||
|
||||
g.peerStats.withValue(pubSubPeer.peerId, stats):
|
||||
g.peerStats.withValue(peer, stats):
|
||||
stats[].expire = Moment.now() + g.parameters.retainScore
|
||||
for topic, info in stats[].topicInfos.mpairs:
|
||||
info.firstMessageDeliveries = 0
|
||||
|
@ -943,16 +1002,6 @@ method subscribeTopic*(g: GossipSub,
|
|||
if peer.peerId in g.parameters.directPeers:
|
||||
g.explicit.removePeer(topic, peer)
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_peers_per_topic_mesh
|
||||
.set(g.mesh.peers(topic).int64, labelValues = [topic])
|
||||
libp2p_gossipsub_peers_per_topic_fanout
|
||||
.set(g.fanout.peers(topic).int64, labelValues = [topic])
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_peers_per_topic_gossipsub
|
||||
.set(g.gossipsub.peers(topic).int64, labelValues = [topic])
|
||||
|
||||
trace "gossip peers", peers = g.gossipsub.peers(topic), topic
|
||||
|
||||
proc punishPeer(g: GossipSub, peer: PubSubPeer, topics: seq[string]) =
|
||||
|
@ -1043,12 +1092,6 @@ proc handleGraft(g: GossipSub,
|
|||
trace "peer grafting topic we're not interested in", topic
|
||||
# gossip 1.1, we do not send a control message prune anymore
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_peers_per_topic_mesh
|
||||
.set(g.mesh.peers(topic).int64, labelValues = [topic])
|
||||
libp2p_gossipsub_peers_per_topic_fanout
|
||||
.set(g.fanout.peers(topic).int64, labelValues = [topic])
|
||||
|
||||
proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
|
||||
for prune in prunes:
|
||||
trace "peer pruned topic", peer, topic = prune.topicID
|
||||
|
@ -1068,10 +1111,6 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
|
|||
# another option could be to implement signed peer records
|
||||
## if peer.score > g.parameters.gossipThreshold and prunes.peers.len > 0:
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_peers_per_topic_mesh
|
||||
.set(g.mesh.peers(prune.topicID).int64, labelValues = [prune.topicID])
|
||||
|
||||
proc handleIHave(g: GossipSub,
|
||||
peer: PubSubPeer,
|
||||
ihaves: seq[ControlIHave]): ControlIWant =
|
||||
|
@ -1219,10 +1258,14 @@ method rpcHandler*(g: GossipSub,
|
|||
|
||||
# In theory, if topics are the same in all messages, we could batch - we'd
|
||||
# also have to be careful to only include validated messages
|
||||
g.broadcast(toSeq(toSendPeers), RPCMsg(messages: @[msg]))
|
||||
trace "forwared message to peers", peers = toSendPeers.len,
|
||||
msgId = shortLog(msgId), peer
|
||||
libp2p_pubsub_messages_rebroadcasted.inc()
|
||||
let sendingTo = toSeq(toSendPeers)
|
||||
g.broadcast(sendingTo, RPCMsg(messages: @[msg]))
|
||||
trace "forwared message to peers", peers = sendingTo.len, msgId, peer
|
||||
for topic in msg.topicIDs:
|
||||
if g.knownTopics.contains(topic):
|
||||
libp2p_pubsub_messages_rebroadcasted.inc(sendingTo.len.int64, labelValues = [topic])
|
||||
else:
|
||||
libp2p_pubsub_messages_rebroadcasted.inc(sendingTo.len.int64, labelValues = ["generic"])
|
||||
|
||||
if rpcMsg.control.isSome:
|
||||
let control = rpcMsg.control.get()
|
||||
|
@ -1235,6 +1278,20 @@ method rpcHandler*(g: GossipSub,
|
|||
|
||||
if respControl.graft.len > 0 or respControl.prune.len > 0 or
|
||||
respControl.ihave.len > 0 or messages.len > 0:
|
||||
# iwant and prunes from here, also messages
|
||||
|
||||
for smsg in messages:
|
||||
for topic in smsg.topicIDs:
|
||||
if g.knownTopics.contains(topic):
|
||||
libp2p_pubsub_broadcast_messages.inc(labelValues = [topic])
|
||||
else:
|
||||
libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"])
|
||||
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)
|
||||
for prune in respControl.prune:
|
||||
if g.knownTopics.contains(prune.topicID):
|
||||
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicID])
|
||||
else:
|
||||
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])
|
||||
trace "sending control message", msg = shortLog(respControl), peer
|
||||
g.send(
|
||||
peer,
|
||||
|
@ -1249,7 +1306,8 @@ method subscribe*(g: GossipSub,
|
|||
if topic in g.fanout:
|
||||
g.fanout.del(topic)
|
||||
|
||||
g.rebalanceMesh(topic)
|
||||
# rebalance but don't update metrics here, we do that only in the heartbeat
|
||||
g.rebalanceMesh(topic, metrics = nil)
|
||||
|
||||
proc unsubscribe*(g: GossipSub, topic: string) =
|
||||
var
|
||||
|
@ -1341,6 +1399,8 @@ method publish*(g: GossipSub,
|
|||
|
||||
if peers.len == 0:
|
||||
debug "No peers for topic, skipping publish"
|
||||
# skipping topic as our metrics finds that heavy
|
||||
libp2p_gossipsub_failed_publish.inc()
|
||||
return 0
|
||||
|
||||
inc g.msgSeqno
|
||||
|
@ -1363,13 +1423,12 @@ method publish*(g: GossipSub,
|
|||
|
||||
g.mcache.put(msgId, msg)
|
||||
|
||||
g.broadcast(toSeq(peers), RPCMsg(messages: @[msg]))
|
||||
when defined(libp2p_expensive_metrics):
|
||||
if peers.len > 0:
|
||||
libp2p_pubsub_messages_published.inc(labelValues = [topic])
|
||||
let peerSeq = toSeq(peers)
|
||||
g.broadcast(peerSeq, RPCMsg(messages: @[msg]))
|
||||
if g.knownTopics.contains(topic):
|
||||
libp2p_pubsub_messages_published.inc(peerSeq.len.int64, labelValues = [topic])
|
||||
else:
|
||||
if peers.len > 0:
|
||||
libp2p_pubsub_messages_published.inc()
|
||||
libp2p_pubsub_messages_published.inc(peerSeq.len.int64, labelValues = ["generic"])
|
||||
|
||||
trace "Published message to peers"
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
import std/[tables, sequtils, sets]
|
||||
import std/[tables, sequtils, sets, strutils]
|
||||
import chronos, chronicles, metrics
|
||||
import pubsubpeer,
|
||||
rpc/[message, messages],
|
||||
|
@ -16,7 +16,8 @@ import pubsubpeer,
|
|||
../../stream/connection,
|
||||
../../peerid,
|
||||
../../peerinfo,
|
||||
../../errors
|
||||
../../errors,
|
||||
../../utility
|
||||
|
||||
import metrics
|
||||
import stew/results
|
||||
|
@ -29,16 +30,42 @@ export protocol
|
|||
logScope:
|
||||
topics = "libp2p pubsub"
|
||||
|
||||
const
|
||||
KnownLibP2PTopics* {.strdefine.} = ""
|
||||
KnownLibP2PTopicsSeq* = KnownLibP2PTopics.toLowerAscii().split(",")
|
||||
|
||||
declareGauge(libp2p_pubsub_peers, "pubsub peer instances")
|
||||
declareGauge(libp2p_pubsub_topics, "pubsub subscribed topics")
|
||||
declareCounter(libp2p_pubsub_subscriptions, "pubsub subscription operations")
|
||||
declareCounter(libp2p_pubsub_unsubscriptions, "pubsub unsubscription operations")
|
||||
declareGauge(libp2p_pubsub_topic_handlers, "pubsub subscribed topics handlers count", labels = ["topic"])
|
||||
|
||||
declareCounter(libp2p_pubsub_validation_success, "pubsub successfully validated messages")
|
||||
declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messages")
|
||||
declareCounter(libp2p_pubsub_validation_ignore, "pubsub ignore validated messages")
|
||||
when defined(libp2p_expensive_metrics):
|
||||
declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"])
|
||||
else:
|
||||
declarePublicCounter(libp2p_pubsub_messages_published, "published messages")
|
||||
declarePublicCounter(libp2p_pubsub_messages_rebroadcasted, "re-broadcasted messages")
|
||||
|
||||
declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"])
|
||||
declarePublicCounter(libp2p_pubsub_messages_rebroadcasted, "re-broadcasted messages", labels = ["topic"])
|
||||
|
||||
declarePublicCounter(libp2p_pubsub_broadcast_subscriptions, "pubsub broadcast subscriptions", labels = ["topic"])
|
||||
declarePublicCounter(libp2p_pubsub_broadcast_unsubscriptions, "pubsub broadcast unsubscriptions", labels = ["topic"])
|
||||
declarePublicCounter(libp2p_pubsub_broadcast_messages, "pubsub broadcast messages", labels = ["topic"])
|
||||
|
||||
declarePublicCounter(libp2p_pubsub_received_subscriptions, "pubsub received subscriptions", labels = ["topic"])
|
||||
declarePublicCounter(libp2p_pubsub_received_unsubscriptions, "pubsub received subscriptions", labels = ["topic"])
|
||||
declarePublicCounter(libp2p_pubsub_received_messages, "pubsub received messages", labels = ["topic"])
|
||||
|
||||
declarePublicCounter(libp2p_pubsub_broadcast_iwant, "pubsub broadcast iwant")
|
||||
|
||||
declarePublicCounter(libp2p_pubsub_broadcast_ihave, "pubsub broadcast ihave", labels = ["topic"])
|
||||
declarePublicCounter(libp2p_pubsub_broadcast_graft, "pubsub broadcast graft", labels = ["topic"])
|
||||
declarePublicCounter(libp2p_pubsub_broadcast_prune, "pubsub broadcast prune", labels = ["topic"])
|
||||
|
||||
declarePublicCounter(libp2p_pubsub_received_iwant, "pubsub broadcast iwant")
|
||||
|
||||
declarePublicCounter(libp2p_pubsub_received_ihave, "pubsub broadcast ihave", labels = ["topic"])
|
||||
declarePublicCounter(libp2p_pubsub_received_graft, "pubsub broadcast graft", labels = ["topic"])
|
||||
declarePublicCounter(libp2p_pubsub_received_prune, "pubsub broadcast prune", labels = ["topic"])
|
||||
|
||||
type
|
||||
TopicHandler* = proc(topic: string,
|
||||
|
@ -76,6 +103,8 @@ type
|
|||
msgSeqno*: uint64
|
||||
anonymize*: bool # if we omit fromPeer and seqno from RPC messages we send
|
||||
|
||||
knownTopics*: HashSet[string]
|
||||
|
||||
method unsubscribePeer*(p: PubSub, peerId: PeerID) {.base.} =
|
||||
## handle peer disconnects
|
||||
##
|
||||
|
@ -98,6 +127,46 @@ proc broadcast*(
|
|||
msg: RPCMsg) = # raises: [Defect]
|
||||
## Attempt to send `msg` to the given peers
|
||||
|
||||
let npeers = sendPeers.len.int64
|
||||
for sub in msg.subscriptions:
|
||||
if sub.subscribe:
|
||||
if p.knownTopics.contains(sub.topic):
|
||||
libp2p_pubsub_broadcast_subscriptions.inc(npeers, labelValues = [sub.topic])
|
||||
else:
|
||||
libp2p_pubsub_broadcast_subscriptions.inc(npeers, labelValues = ["generic"])
|
||||
else:
|
||||
if p.knownTopics.contains(sub.topic):
|
||||
libp2p_pubsub_broadcast_unsubscriptions.inc(npeers, labelValues = [sub.topic])
|
||||
else:
|
||||
libp2p_pubsub_broadcast_unsubscriptions.inc(npeers, labelValues = ["generic"])
|
||||
|
||||
for smsg in msg.messages:
|
||||
for topic in smsg.topicIDs:
|
||||
if p.knownTopics.contains(topic):
|
||||
libp2p_pubsub_broadcast_messages.inc(npeers, labelValues = [topic])
|
||||
else:
|
||||
libp2p_pubsub_broadcast_messages.inc(npeers, labelValues = ["generic"])
|
||||
|
||||
if msg.control.isSome():
|
||||
libp2p_pubsub_broadcast_iwant.inc(npeers * msg.control.get().iwant.len.int64)
|
||||
|
||||
let control = msg.control.get()
|
||||
for ihave in control.ihave:
|
||||
if p.knownTopics.contains(ihave.topicID):
|
||||
libp2p_pubsub_broadcast_ihave.inc(npeers, labelValues = [ihave.topicID])
|
||||
else:
|
||||
libp2p_pubsub_broadcast_ihave.inc(npeers, labelValues = ["generic"])
|
||||
for graft in control.graft:
|
||||
if p.knownTopics.contains(graft.topicID):
|
||||
libp2p_pubsub_broadcast_graft.inc(npeers, labelValues = [graft.topicID])
|
||||
else:
|
||||
libp2p_pubsub_broadcast_graft.inc(npeers, labelValues = ["generic"])
|
||||
for prune in control.prune:
|
||||
if p.knownTopics.contains(prune.topicID):
|
||||
libp2p_pubsub_broadcast_prune.inc(npeers, labelValues = [prune.topicID])
|
||||
else:
|
||||
libp2p_pubsub_broadcast_prune.inc(npeers, labelValues = ["generic"])
|
||||
|
||||
trace "broadcasting messages to peers",
|
||||
peers = sendPeers.len, msg = shortLog(msg)
|
||||
for peer in sendPeers:
|
||||
|
@ -109,6 +178,17 @@ proc sendSubs*(p: PubSub,
|
|||
subscribe: bool) =
|
||||
## send subscriptions to remote peer
|
||||
p.send(peer, RPCMsg.withSubs(topics, subscribe))
|
||||
for topic in topics:
|
||||
if subscribe:
|
||||
if p.knownTopics.contains(topic):
|
||||
libp2p_pubsub_broadcast_subscriptions.inc(labelValues = [topic])
|
||||
else:
|
||||
libp2p_pubsub_broadcast_subscriptions.inc(labelValues = ["generic"])
|
||||
else:
|
||||
if p.knownTopics.contains(topic):
|
||||
libp2p_pubsub_broadcast_unsubscriptions.inc(labelValues = [topic])
|
||||
else:
|
||||
libp2p_pubsub_broadcast_unsubscriptions.inc(labelValues = ["generic"])
|
||||
|
||||
method subscribeTopic*(p: PubSub,
|
||||
topic: string,
|
||||
|
@ -126,6 +206,45 @@ method rpcHandler*(p: PubSub,
|
|||
trace "about to subscribe to topic", topicId = s.topic, peer
|
||||
p.subscribeTopic(s.topic, s.subscribe, peer)
|
||||
|
||||
for sub in rpcMsg.subscriptions:
|
||||
if sub.subscribe:
|
||||
if p.knownTopics.contains(sub.topic):
|
||||
libp2p_pubsub_received_subscriptions.inc(labelValues = [sub.topic])
|
||||
else:
|
||||
libp2p_pubsub_received_subscriptions.inc(labelValues = ["generic"])
|
||||
else:
|
||||
if p.knownTopics.contains(sub.topic):
|
||||
libp2p_pubsub_received_unsubscriptions.inc(labelValues = [sub.topic])
|
||||
else:
|
||||
libp2p_pubsub_received_unsubscriptions.inc(labelValues = ["generic"])
|
||||
|
||||
for smsg in rpcMsg.messages:
|
||||
for topic in smsg.topicIDs:
|
||||
if p.knownTopics.contains(topic):
|
||||
libp2p_pubsub_received_messages.inc(labelValues = [topic])
|
||||
else:
|
||||
libp2p_pubsub_received_messages.inc(labelValues = ["generic"])
|
||||
|
||||
if rpcMsg.control.isSome():
|
||||
libp2p_pubsub_received_iwant.inc(rpcMsg.control.get().iwant.len.int64)
|
||||
|
||||
let control = rpcMsg.control.get()
|
||||
for ihave in control.ihave:
|
||||
if p.knownTopics.contains(ihave.topicID):
|
||||
libp2p_pubsub_received_ihave.inc(labelValues = [ihave.topicID])
|
||||
else:
|
||||
libp2p_pubsub_received_ihave.inc(labelValues = ["generic"])
|
||||
for graft in control.graft:
|
||||
if p.knownTopics.contains(graft.topicID):
|
||||
libp2p_pubsub_received_graft.inc(labelValues = [graft.topicID])
|
||||
else:
|
||||
libp2p_pubsub_received_graft.inc(labelValues = ["generic"])
|
||||
for prune in control.prune:
|
||||
if p.knownTopics.contains(prune.topicID):
|
||||
libp2p_pubsub_received_prune.inc(labelValues = [prune.topicID])
|
||||
else:
|
||||
libp2p_pubsub_received_prune.inc(labelValues = ["generic"])
|
||||
|
||||
method onNewPeer(p: PubSub, peer: PubSubPeer) {.base.} = discard
|
||||
|
||||
method onPubSubPeerEvent*(p: PubSub, peer: PubsubPeer, event: PubsubPeerEvent) {.base, gcsafe.} =
|
||||
|
@ -171,7 +290,7 @@ proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] {.asyn
|
|||
|
||||
# gather all futures without yielding to scheduler
|
||||
var futs = p.topics[topic].handler.mapIt(it(topic, data))
|
||||
|
||||
|
||||
try:
|
||||
futs = await allFinished(futs)
|
||||
except CancelledError:
|
||||
|
@ -179,7 +298,7 @@ proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] {.asyn
|
|||
for fut in futs:
|
||||
if not(fut.finished):
|
||||
fut.cancel()
|
||||
|
||||
|
||||
# check for errors in futures
|
||||
for fut in futs:
|
||||
if fut.failed:
|
||||
|
@ -230,6 +349,17 @@ method subscribePeer*(p: PubSub, peer: PeerID) {.base.} =
|
|||
let peer = p.getOrCreatePeer(peer, p.codecs)
|
||||
peer.outbound = true # flag as outbound
|
||||
|
||||
proc updateTopicMetrics(p: PubSub, topic: string) =
|
||||
# metrics
|
||||
libp2p_pubsub_topics.set(p.topics.len.int64)
|
||||
if p.knownTopics.contains(topic):
|
||||
libp2p_pubsub_topic_handlers.set(p.topics[topic].handler.len.int64, labelValues = [topic])
|
||||
else:
|
||||
libp2p_pubsub_topic_handlers.set(0, labelValues = ["other"])
|
||||
for key, val in p.topics:
|
||||
if not p.knownTopics.contains(key):
|
||||
libp2p_pubsub_topic_handlers.inc(val.handler.len.int64, labelValues = ["other"])
|
||||
|
||||
method unsubscribe*(p: PubSub,
|
||||
topics: seq[TopicPair]) {.base.} =
|
||||
## unsubscribe from a list of ``topic`` strings
|
||||
|
@ -246,7 +376,9 @@ method unsubscribe*(p: PubSub,
|
|||
# no more handlers are left
|
||||
p.topics.del(ttopic)
|
||||
|
||||
libp2p_pubsub_topics.set(p.topics.len.int64)
|
||||
p.updateTopicMetrics(ttopic)
|
||||
|
||||
libp2p_pubsub_unsubscriptions.inc()
|
||||
|
||||
proc unsubscribe*(p: PubSub,
|
||||
topic: string,
|
||||
|
@ -256,8 +388,14 @@ proc unsubscribe*(p: PubSub,
|
|||
p.unsubscribe(@[(topic, handler)])
|
||||
|
||||
method unsubscribeAll*(p: PubSub, topic: string) {.base.} =
|
||||
p.topics.del(topic)
|
||||
libp2p_pubsub_topics.set(p.topics.len.int64)
|
||||
if topic notin p.topics:
|
||||
debug "unsubscribeAll called for an unknown topic", topic
|
||||
else:
|
||||
p.topics.del(topic)
|
||||
|
||||
p.updateTopicMetrics(topic)
|
||||
|
||||
libp2p_pubsub_unsubscriptions.inc()
|
||||
|
||||
method subscribe*(p: PubSub,
|
||||
topic: string,
|
||||
|
@ -279,8 +417,9 @@ method subscribe*(p: PubSub,
|
|||
for _, peer in p.peers:
|
||||
p.sendSubs(peer, @[topic], true)
|
||||
|
||||
# metrics
|
||||
libp2p_pubsub_topics.set(p.topics.len.int64)
|
||||
p.updateTopicMetrics(topic)
|
||||
|
||||
libp2p_pubsub_subscriptions.inc()
|
||||
|
||||
method publish*(p: PubSub,
|
||||
topic: string,
|
||||
|
@ -397,6 +536,8 @@ proc init*[PubParams: object | bool](
|
|||
switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined)
|
||||
switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left)
|
||||
|
||||
pubsub.knownTopics = KnownLibP2PTopicsSeq.toHashSet()
|
||||
|
||||
pubsub.initPubSub()
|
||||
|
||||
return pubsub
|
||||
|
|
|
@ -59,6 +59,9 @@ type
|
|||
outbound*: bool # if this is an outbound connection
|
||||
appScore*: float64 # application specific score
|
||||
behaviourPenalty*: float64 # the eventual penalty score
|
||||
|
||||
when defined(libp2p_agents_metrics):
|
||||
shortAgent*: string
|
||||
|
||||
RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] {.gcsafe.}
|
||||
|
||||
|
|
|
@ -19,12 +19,21 @@ import messages,
|
|||
logScope:
|
||||
topics = "pubsubprotobuf"
|
||||
|
||||
when defined(libp2p_protobuf_metrics):
|
||||
import metrics
|
||||
|
||||
declareCounter(libp2p_pubsub_rpc_bytes_read, "pubsub rpc bytes read", labels = ["kind"])
|
||||
declareCounter(libp2p_pubsub_rpc_bytes_write, "pubsub rpc bytes write", labels = ["kind"])
|
||||
|
||||
proc write*(pb: var ProtoBuffer, field: int, graft: ControlGraft) =
|
||||
var ipb = initProtoBuffer()
|
||||
ipb.write(1, graft.topicID)
|
||||
ipb.finish()
|
||||
pb.write(field, ipb)
|
||||
|
||||
when defined(libp2p_protobuf_metrics):
|
||||
libp2p_pubsub_rpc_bytes_write.inc(ipb.getLen().int64, labelValues = ["graft"])
|
||||
|
||||
proc write*(pb: var ProtoBuffer, field: int, infoMsg: PeerInfoMsg) =
|
||||
var ipb = initProtoBuffer()
|
||||
ipb.write(1, infoMsg.peerID)
|
||||
|
@ -41,6 +50,9 @@ proc write*(pb: var ProtoBuffer, field: int, prune: ControlPrune) =
|
|||
ipb.finish()
|
||||
pb.write(field, ipb)
|
||||
|
||||
when defined(libp2p_protobuf_metrics):
|
||||
libp2p_pubsub_rpc_bytes_write.inc(ipb.getLen().int64, labelValues = ["prune"])
|
||||
|
||||
proc write*(pb: var ProtoBuffer, field: int, ihave: ControlIHave) =
|
||||
var ipb = initProtoBuffer()
|
||||
ipb.write(1, ihave.topicID)
|
||||
|
@ -49,6 +61,9 @@ proc write*(pb: var ProtoBuffer, field: int, ihave: ControlIHave) =
|
|||
ipb.finish()
|
||||
pb.write(field, ipb)
|
||||
|
||||
when defined(libp2p_protobuf_metrics):
|
||||
libp2p_pubsub_rpc_bytes_write.inc(ipb.getLen().int64, labelValues = ["ihave"])
|
||||
|
||||
proc write*(pb: var ProtoBuffer, field: int, iwant: ControlIWant) =
|
||||
var ipb = initProtoBuffer()
|
||||
for mid in iwant.messageIDs:
|
||||
|
@ -57,6 +72,9 @@ proc write*(pb: var ProtoBuffer, field: int, iwant: ControlIWant) =
|
|||
ipb.finish()
|
||||
pb.write(field, ipb)
|
||||
|
||||
when defined(libp2p_protobuf_metrics):
|
||||
libp2p_pubsub_rpc_bytes_write.inc(ipb.getLen().int64, labelValues = ["iwant"])
|
||||
|
||||
proc write*(pb: var ProtoBuffer, field: int, control: ControlMessage) =
|
||||
var ipb = initProtoBuffer()
|
||||
for ihave in control.ihave:
|
||||
|
@ -78,6 +96,9 @@ proc write*(pb: var ProtoBuffer, field: int, subs: SubOpts) =
|
|||
ipb.finish()
|
||||
pb.write(field, ipb)
|
||||
|
||||
when defined(libp2p_protobuf_metrics):
|
||||
libp2p_pubsub_rpc_bytes_write.inc(ipb.getLen().int64, labelValues = ["subs"])
|
||||
|
||||
proc encodeMessage*(msg: Message, anonymize: bool): seq[byte] =
|
||||
var pb = initProtoBuffer()
|
||||
if len(msg.fromPeer) > 0 and not anonymize:
|
||||
|
@ -92,6 +113,10 @@ proc encodeMessage*(msg: Message, anonymize: bool): seq[byte] =
|
|||
if len(msg.key) > 0 and not anonymize:
|
||||
pb.write(6, msg.key)
|
||||
pb.finish()
|
||||
|
||||
when defined(libp2p_protobuf_metrics):
|
||||
libp2p_pubsub_rpc_bytes_write.inc(pb.getLen().int64, labelValues = ["message"])
|
||||
|
||||
pb.buffer
|
||||
|
||||
proc write*(pb: var ProtoBuffer, field: int, msg: Message, anonymize: bool) =
|
||||
|
@ -99,6 +124,9 @@ proc write*(pb: var ProtoBuffer, field: int, msg: Message, anonymize: bool) =
|
|||
|
||||
proc decodeGraft*(pb: ProtoBuffer): ProtoResult[ControlGraft] {.
|
||||
inline.} =
|
||||
when defined(libp2p_protobuf_metrics):
|
||||
libp2p_pubsub_rpc_bytes_read.inc(pb.getLen().int64, labelValues = ["graft"])
|
||||
|
||||
trace "decodeGraft: decoding message"
|
||||
var control = ControlGraft()
|
||||
if ? pb.getField(1, control.topicId):
|
||||
|
@ -123,6 +151,9 @@ proc decodePeerInfoMsg*(pb: ProtoBuffer): ProtoResult[PeerInfoMsg] {.
|
|||
|
||||
proc decodePrune*(pb: ProtoBuffer): ProtoResult[ControlPrune] {.
|
||||
inline.} =
|
||||
when defined(libp2p_protobuf_metrics):
|
||||
libp2p_pubsub_rpc_bytes_read.inc(pb.getLen().int64, labelValues = ["prune"])
|
||||
|
||||
trace "decodePrune: decoding message"
|
||||
var control = ControlPrune()
|
||||
if ? pb.getField(1, control.topicId):
|
||||
|
@ -139,6 +170,9 @@ proc decodePrune*(pb: ProtoBuffer): ProtoResult[ControlPrune] {.
|
|||
|
||||
proc decodeIHave*(pb: ProtoBuffer): ProtoResult[ControlIHave] {.
|
||||
inline.} =
|
||||
when defined(libp2p_protobuf_metrics):
|
||||
libp2p_pubsub_rpc_bytes_read.inc(pb.getLen().int64, labelValues = ["ihave"])
|
||||
|
||||
trace "decodeIHave: decoding message"
|
||||
var control = ControlIHave()
|
||||
if ? pb.getField(1, control.topicId):
|
||||
|
@ -152,6 +186,9 @@ proc decodeIHave*(pb: ProtoBuffer): ProtoResult[ControlIHave] {.
|
|||
ok(control)
|
||||
|
||||
proc decodeIWant*(pb: ProtoBuffer): ProtoResult[ControlIWant] {.inline.} =
|
||||
when defined(libp2p_protobuf_metrics):
|
||||
libp2p_pubsub_rpc_bytes_read.inc(pb.getLen().int64, labelValues = ["iwant"])
|
||||
|
||||
trace "decodeIWant: decoding message"
|
||||
var control = ControlIWant()
|
||||
if ? pb.getRepeatedField(1, control.messageIDs):
|
||||
|
@ -192,6 +229,9 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.
|
|||
ok(none[ControlMessage]())
|
||||
|
||||
proc decodeSubscription*(pb: ProtoBuffer): ProtoResult[SubOpts] {.inline.} =
|
||||
when defined(libp2p_protobuf_metrics):
|
||||
libp2p_pubsub_rpc_bytes_read.inc(pb.getLen().int64, labelValues = ["subs"])
|
||||
|
||||
trace "decodeSubscription: decoding message"
|
||||
var subflag: uint64
|
||||
var sub = SubOpts()
|
||||
|
@ -221,6 +261,9 @@ proc decodeSubscriptions*(pb: ProtoBuffer): ProtoResult[seq[SubOpts]] {.
|
|||
ok(subs)
|
||||
|
||||
proc decodeMessage*(pb: ProtoBuffer): ProtoResult[Message] {.inline.} =
|
||||
when defined(libp2p_protobuf_metrics):
|
||||
libp2p_pubsub_rpc_bytes_read.inc(pb.getLen().int64, labelValues = ["message"])
|
||||
|
||||
trace "decodeMessage: decoding message"
|
||||
var msg: Message
|
||||
if ? pb.getField(1, msg.fromPeer):
|
||||
|
|
|
@ -7,9 +7,10 @@
|
|||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
import std/[oids, strformat]
|
||||
import chronos, chronicles
|
||||
import std/[oids, strformat, strutils]
|
||||
import chronos, chronicles, metrics
|
||||
import connection
|
||||
import ../utility
|
||||
|
||||
logScope:
|
||||
topics = "libp2p chronosstream"
|
||||
|
@ -21,6 +22,14 @@ const
|
|||
type
|
||||
ChronosStream* = ref object of Connection
|
||||
client: StreamTransport
|
||||
when defined(libp2p_agents_metrics):
|
||||
tracked: bool
|
||||
shortAgent: string
|
||||
|
||||
when defined(libp2p_agents_metrics):
|
||||
declareGauge(libp2p_peers_identity, "peers identities", labels = ["agent"])
|
||||
declareCounter(libp2p_peers_traffic_read, "incoming traffic", labels = ["agent"])
|
||||
declareCounter(libp2p_peers_traffic_write, "outgoing traffic", labels = ["agent"])
|
||||
|
||||
func shortLog*(conn: ChronosStream): string =
|
||||
if conn.isNil: "ChronosStream(nil)"
|
||||
|
@ -65,6 +74,24 @@ template withExceptions(body: untyped) =
|
|||
# TODO https://github.com/status-im/nim-chronos/pull/99
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
when defined(libp2p_agents_metrics):
|
||||
proc trackPeerIdentity(s: ChronosStream) =
|
||||
if not s.tracked:
|
||||
if not isNil(s.peerInfo) and s.peerInfo.agentVersion.len > 0:
|
||||
# / seems a weak "standard" so for now it's reliable
|
||||
let shortAgent = s.peerInfo.agentVersion.split("/")[0].toLowerAscii()
|
||||
if KnownLibP2PAgentsSeq.contains(shortAgent):
|
||||
s.shortAgent = shortAgent
|
||||
else:
|
||||
s.shortAgent = "unknown"
|
||||
libp2p_peers_identity.inc(labelValues = [s.shortAgent])
|
||||
s.tracked = true
|
||||
|
||||
proc untrackPeerIdentity(s: ChronosStream) =
|
||||
if s.tracked:
|
||||
libp2p_peers_identity.dec(labelValues = [s.shortAgent])
|
||||
s.tracked = false
|
||||
|
||||
method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async.} =
|
||||
if s.atEof:
|
||||
raise newLPStreamEOFError()
|
||||
|
@ -72,6 +99,10 @@ method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.
|
|||
withExceptions:
|
||||
result = await s.client.readOnce(pbytes, nbytes)
|
||||
s.activity = true # reset activity flag
|
||||
when defined(libp2p_agents_metrics):
|
||||
s.trackPeerIdentity()
|
||||
if s.tracked:
|
||||
libp2p_peers_traffic_read.inc(nbytes.int64, labelValues = [s.shortAgent])
|
||||
|
||||
method write*(s: ChronosStream, msg: seq[byte]) {.async.} =
|
||||
if s.closed:
|
||||
|
@ -90,6 +121,10 @@ method write*(s: ChronosStream, msg: seq[byte]) {.async.} =
|
|||
raise (ref LPStreamClosedError)(msg: "Write couldn't finish writing")
|
||||
|
||||
s.activity = true # reset activity flag
|
||||
when defined(libp2p_agents_metrics):
|
||||
s.trackPeerIdentity()
|
||||
if s.tracked:
|
||||
libp2p_peers_traffic_write.inc(msg.len.int64, labelValues = [s.shortAgent])
|
||||
|
||||
method closed*(s: ChronosStream): bool {.inline.} =
|
||||
result = s.client.closed
|
||||
|
@ -99,17 +134,20 @@ method atEof*(s: ChronosStream): bool {.inline.} =
|
|||
|
||||
method closeImpl*(s: ChronosStream) {.async.} =
|
||||
try:
|
||||
trace "Shutting down chronos stream", address = $s.client.remoteAddress(),
|
||||
s
|
||||
trace "Shutting down chronos stream", address = $s.client.remoteAddress(), s
|
||||
|
||||
if not s.client.closed():
|
||||
await s.client.closeWait()
|
||||
|
||||
trace "Shutdown chronos stream", address = $s.client.remoteAddress(),
|
||||
s
|
||||
trace "Shutdown chronos stream", address = $s.client.remoteAddress(), s
|
||||
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "Error closing chronosstream", s, msg = exc.msg
|
||||
|
||||
when defined(libp2p_agents_metrics):
|
||||
# do this after closing!
|
||||
s.untrackPeerIdentity()
|
||||
|
||||
await procCall Connection(s).closeImpl()
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
## those terms.
|
||||
|
||||
import stew/byteutils
|
||||
import strutils
|
||||
|
||||
const
|
||||
ShortDumpMax = 12
|
||||
|
@ -35,3 +36,8 @@ func shortLog*(item: string): string =
|
|||
result &= item[0..<split]
|
||||
result &= "..."
|
||||
result &= item[(item.len - split)..item.high]
|
||||
|
||||
when defined(libp2p_agents_metrics):
|
||||
const
|
||||
KnownLibP2PAgents* {.strdefine.} = ""
|
||||
KnownLibP2PAgentsSeq* = KnownLibP2PAgents.toLowerAscii().split(",")
|
||||
|
|
Loading…
Reference in New Issue