From 5e6974d07afd10a140ccb671bcb81ba02ed42761 Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni <7008900+sinkingsugar@users.noreply.github.com> Date: Fri, 8 Jan 2021 14:21:24 +0900 Subject: [PATCH] add metrics into chronosstream to identify peers agents (#458) * add metrics into chronosstream to identify peers agents * avoid too many agent strings * use gauge instead of counter for stream metrics * filter identity on / * also track bytes traffic * fix identity tracking closeimpl call * add gossip rpc metrics * fix missing metrics inclusions * metrics fixes and additions * add a KnownLibP2PAgents strdefine * enforse toLowerAscii to agent names (metrics) * incoming rpc metrics * fix silly mistake in rpc metrics * fix agent metrics logic * libp2p_gossipsub_failed_publish metric * message ids metrics * libp2p_pubsub_broadcast_ihave metric improvement * refactor expensive gossip metrics * more detailed metrics * metrics improvements * remove generic metrics for `set` users * small fixes, add debug counters * fix counter and add missing subs metrics! * agent metrics behind -d:libp2p_agents_metrics * remove testing related code from this PR * small rebroadcast metric fix * fix small mistake * add some guide to the readme in order to use new metrics * add libp2p_gossipsub_peers_scores metric * add protobuf metrics to understand bytes traffic precisely * refactor gossipsub metrics * remove unused variable * add more metrics, refactor rebalance metrics * avoid bad metric concurrent states * use a stack structure for gossip mesh metrics * refine sub metrics * add received subs metrics fixes * measure handlers of known topics * sub/unsub counter * unsubscribeAll log unknown topics * expose a way to specify known topics at runtime --- README.md | 14 +- libp2p/protocols/pubsub/gossipsub.nim | 257 ++++++++++++++--------- libp2p/protocols/pubsub/pubsub.nim | 169 +++++++++++++-- libp2p/protocols/pubsub/pubsubpeer.nim | 3 + libp2p/protocols/pubsub/rpc/protobuf.nim | 43 ++++ libp2p/stream/chronosstream.nim | 50 ++++- libp2p/utility.nim | 6 + 7 files changed, 422 insertions(+), 120 deletions(-) diff --git a/README.md b/README.md index 16f7964e5..d1d9b146e 100644 --- a/README.md +++ b/README.md @@ -151,12 +151,24 @@ Packages that exist in the original libp2p specs and are under active developmen ### Tips and tricks -- enable expensive metrics: +#### enable expensive metrics: ```bash nim c -d:libp2p_expensive_metrics some_file.nim ``` +#### use identify metrics + +```bash +nim c -d:libp2p_agents_metrics -d:KnownLibP2PAgents=nimbus,lighthouse,prysm,teku some_file.nim +``` + +### specify gossipsub specific topics to measure + +```bash +nim c -d:KnownLibP2PTopics=topic1,topic2,topic3 some_file.nim +``` + ## Contribute The libp2p implementation in Nim is a work in progress. We welcome contributors to help out! Specifically, you can: - Go through the modules and **check out existing issues**. This would be especially useful for modules in active development. Some knowledge of IPFS/libp2p may be required, as well as the infrastructure behind it. diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 5f74879c7..daec486e0 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/[tables, sets, options, sequtils, random, algorithm] +import std/[tables, sets, options, sequtils, strutils, random, algorithm] import chronos, chronicles, metrics import ./pubsub, ./floodsub, @@ -166,21 +166,40 @@ type heartbeatEvents*: seq[AsyncEvent] -when defined(libp2p_expensive_metrics): - declareGauge(libp2p_gossipsub_peers_per_topic_mesh, - "gossipsub peers per topic in mesh", - labels = ["topic"]) + MeshMetrics = object + # scratch buffers for metrics + otherPeersPerTopicMesh: int64 + otherPeersPerTopicFanout: int64 + otherPeersPerTopicGossipsub: int64 + underDlowTopics: int64 + underDoutTopics: int64 + underDhighAboveDlowTopics: int64 + noPeersTopics: int64 - declareGauge(libp2p_gossipsub_peers_per_topic_fanout, - "gossipsub peers per topic in fanout", - labels = ["topic"]) - declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub, - "gossipsub peers per topic in gossipsub", - labels = ["topic"]) +# the following 3 metrics are updated only inside rebalanceMesh +# this is the most reliable place and rebalance anyway happens every heartbeat +declareGauge(libp2p_gossipsub_peers_per_topic_mesh, + "gossipsub peers per topic in mesh", + labels = ["topic"]) +declareGauge(libp2p_gossipsub_peers_per_topic_fanout, + "gossipsub peers per topic in fanout", + labels = ["topic"]) +declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub, + "gossipsub peers per topic in gossipsub", + labels = ["topic"]) -declareGauge(libp2p_gossipsub_peers_mesh_sum, "pubsub peers in mesh table summed") -declareGauge(libp2p_gossipsub_peers_gossipsub_sum, "pubsub peers in gossipsub table summed") +declareCounter(libp2p_gossipsub_failed_publish, "number of failed publish") +declareGauge(libp2p_gossipsub_cache_window_size, "the number of messages in the cache") +when defined(libp2p_agents_metrics): + declareGauge(libp2p_gossipsub_peers_scores, "the scores of the peers in gossipsub", labels = ["agent"]) + +declareGauge(libp2p_gossipsub_under_dlow_topics, "number of topics below dlow") +declareGauge(libp2p_gossipsub_under_dout_topics, "number of topics below dout") +declareGauge(libp2p_gossipsub_under_dhigh_above_dlow_topics, "number of topics below dhigh but above dlow") +declareGauge(libp2p_gossipsub_no_peers_topics, "number of topics without peers available") + +declareCounter(libp2p_gossipsub_above_dhigh_condition, "number of above dhigh pruning branches ran", labels = ["topic"]) proc init*(_: type[GossipSubParams]): GossipSubParams = GossipSubParams( @@ -375,10 +394,6 @@ proc replenishFanout(g: GossipSub, topic: string) = if g.fanout.peers(topic) == g.parameters.d: break - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout.peers(topic).int64, labelValues = [topic]) - trace "fanout replenished with peers", peers = g.fanout.peers(topic) method onPubSubPeerEvent*(p: GossipSub, peer: PubsubPeer, event: PubSubPeerEvent) {.gcsafe.} = @@ -397,7 +412,16 @@ method onPubSubPeerEvent*(p: GossipSub, peer: PubsubPeer, event: PubSubPeerEvent procCall FloodSub(p).onPubSubPeerEvent(peer, event) -proc rebalanceMesh(g: GossipSub, topic: string) = +proc commitMetrics(metrics: var MeshMetrics) = + libp2p_gossipsub_under_dlow_topics.set(metrics.underDlowTopics) + libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics) + libp2p_gossipsub_under_dout_topics.set(metrics.underDoutTopics) + libp2p_gossipsub_under_dhigh_above_dlow_topics.set(metrics.underDhighAboveDlowTopics) + libp2p_gossipsub_peers_per_topic_gossipsub.set(metrics.otherPeersPerTopicGossipsub, labelValues = ["other"]) + libp2p_gossipsub_peers_per_topic_fanout.set(metrics.otherPeersPerTopicFanout, labelValues = ["other"]) + libp2p_gossipsub_peers_per_topic_mesh.set(metrics.otherPeersPerTopicMesh, labelValues = ["other"]) + +proc rebalanceMesh(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) = logScope: topic mesh = g.mesh.peers(topic) @@ -409,9 +433,12 @@ proc rebalanceMesh(g: GossipSub, topic: string) = var prunes, grafts: seq[PubSubPeer] + npeers = g.mesh.peers(topic) - let npeers = g.mesh.peers(topic) if npeers < g.parameters.dLow: + if not isNil(metrics): + inc metrics[].underDlowTopics + trace "replenishing mesh", peers = npeers # replenish the mesh if we're below Dlo var candidates = toSeq( @@ -437,16 +464,24 @@ proc rebalanceMesh(g: GossipSub, topic: string) = candidates.setLen(min(candidates.len, g.parameters.d - npeers)) trace "grafting", grafting = candidates.len - for peer in candidates: - if g.mesh.addPeer(topic, peer): - g.grafted(peer, topic) - g.fanout.removePeer(topic, peer) - grafts &= peer + + if candidates.len == 0: + if not isNil(metrics): + inc metrics[].noPeersTopics + else: + for peer in candidates: + if g.mesh.addPeer(topic, peer): + g.grafted(peer, topic) + g.fanout.removePeer(topic, peer) + grafts &= peer else: var meshPeers = toSeq(g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]())) meshPeers.keepIf do (x: PubSubPeer) -> bool: x.outbound if meshPeers.len < g.parameters.dOut: + if not isNil(metrics): + inc metrics[].underDoutTopics + trace "replenishing mesh outbound quota", peers = g.mesh.peers(topic) var candidates = toSeq( @@ -482,7 +517,15 @@ proc rebalanceMesh(g: GossipSub, topic: string) = grafts &= peer - if g.mesh.peers(topic) > g.parameters.dHigh: + # get again npeers after possible grafts + npeers = g.mesh.peers(topic) + if npeers > g.parameters.dHigh: + if not isNil(metrics): + if g.knownTopics.contains(topic): + libp2p_gossipsub_above_dhigh_condition.inc(labelValues = [topic]) + else: + libp2p_gossipsub_above_dhigh_condition.inc(labelValues = ["other"]) + # prune peers if we've gone over Dhi prunes = toSeq(g.mesh[topic]) # avoid pruning peers we are currently grafting in this heartbeat @@ -529,6 +572,8 @@ proc rebalanceMesh(g: GossipSub, topic: string) = trace "pruning peer on rebalance", peer, score = peer.score g.pruned(peer, topic) g.mesh.removePeer(topic, peer) + elif npeers > g.parameters.dLow and not isNil(metrics): + inc metrics[].underDhighAboveDlowTopics # opportunistic grafting, by spec mesh should not be empty... if g.mesh.peers(topic) > 1: @@ -562,15 +607,18 @@ proc rebalanceMesh(g: GossipSub, topic: string) = grafts &= peer trace "opportunistic grafting", peer - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub.peers(topic).int64, labelValues = [topic]) - - libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout.peers(topic).int64, labelValues = [topic]) - - libp2p_gossipsub_peers_per_topic_mesh - .set(g.mesh.peers(topic).int64, labelValues = [topic]) + if not isNil(metrics): + if g.knownTopics.contains(topic): + libp2p_gossipsub_peers_per_topic_gossipsub + .set(g.gossipsub.peers(topic).int64, labelValues = [topic]) + libp2p_gossipsub_peers_per_topic_fanout + .set(g.fanout.peers(topic).int64, labelValues = [topic]) + libp2p_gossipsub_peers_per_topic_mesh + .set(g.mesh.peers(topic).int64, labelValues = [topic]) + else: + metrics[].otherPeersPerTopicGossipsub += g.gossipsub.peers(topic).int64 + metrics[].otherPeersPerTopicFanout += g.fanout.peers(topic).int64 + metrics[].otherPeersPerTopicMesh += g.mesh.peers(topic).int64 trace "mesh balanced" @@ -597,14 +645,12 @@ proc dropFanoutPeers(g: GossipSub) = g.lastFanoutPubSub.del(topic) trace "dropping fanout topic", topic - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout.peers(topic).int64, labelValues = [topic]) - proc getGossipPeers(g: GossipSub): Table[PubSubPeer, ControlMessage] {.gcsafe.} = ## gossip iHave messages to peers ## + libp2p_gossipsub_cache_window_size.set(0) + trace "getting gossip peers (iHave)" let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys)) for topic in topics: @@ -617,17 +663,21 @@ proc getGossipPeers(g: GossipSub): Table[PubSubPeer, ControlMessage] {.gcsafe.} continue var midsSeq = toSeq(mids) + + libp2p_gossipsub_cache_window_size.inc(midsSeq.len.int64) + # not in spec # similar to rust: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/behaviour.rs#L2101 # and go https://github.com/libp2p/go-libp2p-pubsub/blob/08c17398fb11b2ab06ca141dddc8ec97272eb772/gossipsub.go#L582 if midsSeq.len > IHaveMaxLength: shuffle(midsSeq) midsSeq.setLen(IHaveMaxLength) - let ihave = ControlIHave(topicID: topic, messageIDs: midsSeq) - let mesh = g.mesh.getOrDefault(topic) - let fanout = g.fanout.getOrDefault(topic) - let gossipPeers = mesh + fanout + let + ihave = ControlIHave(topicID: topic, messageIDs: midsSeq) + mesh = g.mesh.getOrDefault(topic) + fanout = g.fanout.getOrDefault(topic) + gossipPeers = mesh + fanout var allPeers = toSeq(g.gossipsub.getOrDefault(topic)) allPeers.keepIf do (x: PubSubPeer) -> bool: @@ -776,6 +826,27 @@ proc updateScores(g: GossipSub) = # avoid async assert(g.peerStats[peer.peerId].score == peer.score) # nim sanity check trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted + when defined(libp2p_agents_metrics): + let agent = + block: + if peer.shortAgent.len > 0: + peer.shortAgent + else: + let connections = peer.connections.filterIt( + not isNil(it.peerInfo) and + it.peerInfo.agentVersion.len > 0 + ) + if connections.len > 0: + let shortAgent = connections[0].peerInfo.agentVersion.split("/")[0].toLowerAscii() + if KnownLibP2PAgentsSeq.contains(shortAgent): + peer.shortAgent = shortAgent + else: + peer.shortAgent = "unknown" + peer.shortAgent + else: + "unknown" + libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent]) + for peer in evicting: g.peerStats.del(peer) @@ -804,18 +875,14 @@ proc heartbeat(g: GossipSub) {.async.} = g.updateScores() - var - totalMeshPeers = 0 - totalGossipPeers = 0 + var meshMetrics = MeshMetrics() + for t in toSeq(g.topics.keys): # prune every negative score peer # do this before relance # in order to avoid grafted -> pruned in the same cycle let meshPeers = g.mesh.getOrDefault(t) let gossipPeers = g.gossipsub.getOrDefault(t) - # this will be changed by rebalance but does not matter - totalMeshPeers += meshPeers.len - totalGossipPeers += g.gossipsub.peers(t) var prunes: seq[PubSubPeer] for peer in meshPeers: if peer.score < 0.0: @@ -831,10 +898,11 @@ proc heartbeat(g: GossipSub) {.async.} = backoff: g.parameters.pruneBackoff.seconds.uint64)]))) g.broadcast(prunes, prune) - g.rebalanceMesh(t) + # pass by ptr in order to both signal we want to update metrics + # and as well update the struct for each topic during this iteration + g.rebalanceMesh(t, addr meshMetrics) - libp2p_gossipsub_peers_mesh_sum.set(totalMeshPeers.int64) - libp2p_gossipsub_peers_gossipsub_sum.set(totalGossipPeers.int64) + commitMetrics(meshMetrics) g.dropFanoutPeers() @@ -844,10 +912,13 @@ proc heartbeat(g: GossipSub) {.async.} = let peers = g.getGossipPeers() for peer, control in peers: - g.peers.withValue(peer.peerId, pubsubPeer): - g.send( - pubsubPeer[], - RPCMsg(control: some(control))) + # only ihave from here + for ihave in control.ihave: + if g.knownTopics.contains(ihave.topicID): + libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicID]) + else: + libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"]) + g.send(peer, RPCMsg(control: some(control))) g.mcache.shift() # shift the cache except CancelledError as exc: @@ -882,27 +953,15 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) = # also try to remove from explicit table here g.explicit.removePeer(t, pubSubPeer) - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub.peers(t).int64, labelValues = [t]) - for t in toSeq(g.mesh.keys): trace "pruning unsubscribing peer", pubSubPeer, score = pubSubPeer.score g.pruned(pubSubPeer, t) g.mesh.removePeer(t, pubSubPeer) - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_mesh - .set(g.mesh.peers(t).int64, labelValues = [t]) - for t in toSeq(g.fanout.keys): g.fanout.removePeer(t, pubSubPeer) - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout.peers(t).int64, labelValues = [t]) - - g.peerStats.withValue(pubSubPeer.peerId, stats): + g.peerStats.withValue(peer, stats): stats[].expire = Moment.now() + g.parameters.retainScore for topic, info in stats[].topicInfos.mpairs: info.firstMessageDeliveries = 0 @@ -943,16 +1002,6 @@ method subscribeTopic*(g: GossipSub, if peer.peerId in g.parameters.directPeers: g.explicit.removePeer(topic, peer) - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_mesh - .set(g.mesh.peers(topic).int64, labelValues = [topic]) - libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout.peers(topic).int64, labelValues = [topic]) - - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub.peers(topic).int64, labelValues = [topic]) - trace "gossip peers", peers = g.gossipsub.peers(topic), topic proc punishPeer(g: GossipSub, peer: PubSubPeer, topics: seq[string]) = @@ -1043,12 +1092,6 @@ proc handleGraft(g: GossipSub, trace "peer grafting topic we're not interested in", topic # gossip 1.1, we do not send a control message prune anymore - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_mesh - .set(g.mesh.peers(topic).int64, labelValues = [topic]) - libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout.peers(topic).int64, labelValues = [topic]) - proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = for prune in prunes: trace "peer pruned topic", peer, topic = prune.topicID @@ -1068,10 +1111,6 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = # another option could be to implement signed peer records ## if peer.score > g.parameters.gossipThreshold and prunes.peers.len > 0: - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_mesh - .set(g.mesh.peers(prune.topicID).int64, labelValues = [prune.topicID]) - proc handleIHave(g: GossipSub, peer: PubSubPeer, ihaves: seq[ControlIHave]): ControlIWant = @@ -1219,10 +1258,14 @@ method rpcHandler*(g: GossipSub, # In theory, if topics are the same in all messages, we could batch - we'd # also have to be careful to only include validated messages - g.broadcast(toSeq(toSendPeers), RPCMsg(messages: @[msg])) - trace "forwared message to peers", peers = toSendPeers.len, - msgId = shortLog(msgId), peer - libp2p_pubsub_messages_rebroadcasted.inc() + let sendingTo = toSeq(toSendPeers) + g.broadcast(sendingTo, RPCMsg(messages: @[msg])) + trace "forwared message to peers", peers = sendingTo.len, msgId, peer + for topic in msg.topicIDs: + if g.knownTopics.contains(topic): + libp2p_pubsub_messages_rebroadcasted.inc(sendingTo.len.int64, labelValues = [topic]) + else: + libp2p_pubsub_messages_rebroadcasted.inc(sendingTo.len.int64, labelValues = ["generic"]) if rpcMsg.control.isSome: let control = rpcMsg.control.get() @@ -1235,6 +1278,20 @@ method rpcHandler*(g: GossipSub, if respControl.graft.len > 0 or respControl.prune.len > 0 or respControl.ihave.len > 0 or messages.len > 0: + # iwant and prunes from here, also messages + + for smsg in messages: + for topic in smsg.topicIDs: + if g.knownTopics.contains(topic): + libp2p_pubsub_broadcast_messages.inc(labelValues = [topic]) + else: + libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"]) + libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64) + for prune in respControl.prune: + if g.knownTopics.contains(prune.topicID): + libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicID]) + else: + libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"]) trace "sending control message", msg = shortLog(respControl), peer g.send( peer, @@ -1249,7 +1306,8 @@ method subscribe*(g: GossipSub, if topic in g.fanout: g.fanout.del(topic) - g.rebalanceMesh(topic) + # rebalance but don't update metrics here, we do that only in the heartbeat + g.rebalanceMesh(topic, metrics = nil) proc unsubscribe*(g: GossipSub, topic: string) = var @@ -1341,6 +1399,8 @@ method publish*(g: GossipSub, if peers.len == 0: debug "No peers for topic, skipping publish" + # skipping topic as our metrics finds that heavy + libp2p_gossipsub_failed_publish.inc() return 0 inc g.msgSeqno @@ -1363,13 +1423,12 @@ method publish*(g: GossipSub, g.mcache.put(msgId, msg) - g.broadcast(toSeq(peers), RPCMsg(messages: @[msg])) - when defined(libp2p_expensive_metrics): - if peers.len > 0: - libp2p_pubsub_messages_published.inc(labelValues = [topic]) + let peerSeq = toSeq(peers) + g.broadcast(peerSeq, RPCMsg(messages: @[msg])) + if g.knownTopics.contains(topic): + libp2p_pubsub_messages_published.inc(peerSeq.len.int64, labelValues = [topic]) else: - if peers.len > 0: - libp2p_pubsub_messages_published.inc() + libp2p_pubsub_messages_published.inc(peerSeq.len.int64, labelValues = ["generic"]) trace "Published message to peers" diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 030b67990..740b6eef4 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/[tables, sequtils, sets] +import std/[tables, sequtils, sets, strutils] import chronos, chronicles, metrics import pubsubpeer, rpc/[message, messages], @@ -16,7 +16,8 @@ import pubsubpeer, ../../stream/connection, ../../peerid, ../../peerinfo, - ../../errors + ../../errors, + ../../utility import metrics import stew/results @@ -29,16 +30,42 @@ export protocol logScope: topics = "libp2p pubsub" +const + KnownLibP2PTopics* {.strdefine.} = "" + KnownLibP2PTopicsSeq* = KnownLibP2PTopics.toLowerAscii().split(",") + declareGauge(libp2p_pubsub_peers, "pubsub peer instances") declareGauge(libp2p_pubsub_topics, "pubsub subscribed topics") +declareCounter(libp2p_pubsub_subscriptions, "pubsub subscription operations") +declareCounter(libp2p_pubsub_unsubscriptions, "pubsub unsubscription operations") +declareGauge(libp2p_pubsub_topic_handlers, "pubsub subscribed topics handlers count", labels = ["topic"]) + declareCounter(libp2p_pubsub_validation_success, "pubsub successfully validated messages") declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messages") declareCounter(libp2p_pubsub_validation_ignore, "pubsub ignore validated messages") -when defined(libp2p_expensive_metrics): - declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"]) -else: - declarePublicCounter(libp2p_pubsub_messages_published, "published messages") -declarePublicCounter(libp2p_pubsub_messages_rebroadcasted, "re-broadcasted messages") + +declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"]) +declarePublicCounter(libp2p_pubsub_messages_rebroadcasted, "re-broadcasted messages", labels = ["topic"]) + +declarePublicCounter(libp2p_pubsub_broadcast_subscriptions, "pubsub broadcast subscriptions", labels = ["topic"]) +declarePublicCounter(libp2p_pubsub_broadcast_unsubscriptions, "pubsub broadcast unsubscriptions", labels = ["topic"]) +declarePublicCounter(libp2p_pubsub_broadcast_messages, "pubsub broadcast messages", labels = ["topic"]) + +declarePublicCounter(libp2p_pubsub_received_subscriptions, "pubsub received subscriptions", labels = ["topic"]) +declarePublicCounter(libp2p_pubsub_received_unsubscriptions, "pubsub received subscriptions", labels = ["topic"]) +declarePublicCounter(libp2p_pubsub_received_messages, "pubsub received messages", labels = ["topic"]) + +declarePublicCounter(libp2p_pubsub_broadcast_iwant, "pubsub broadcast iwant") + +declarePublicCounter(libp2p_pubsub_broadcast_ihave, "pubsub broadcast ihave", labels = ["topic"]) +declarePublicCounter(libp2p_pubsub_broadcast_graft, "pubsub broadcast graft", labels = ["topic"]) +declarePublicCounter(libp2p_pubsub_broadcast_prune, "pubsub broadcast prune", labels = ["topic"]) + +declarePublicCounter(libp2p_pubsub_received_iwant, "pubsub broadcast iwant") + +declarePublicCounter(libp2p_pubsub_received_ihave, "pubsub broadcast ihave", labels = ["topic"]) +declarePublicCounter(libp2p_pubsub_received_graft, "pubsub broadcast graft", labels = ["topic"]) +declarePublicCounter(libp2p_pubsub_received_prune, "pubsub broadcast prune", labels = ["topic"]) type TopicHandler* = proc(topic: string, @@ -76,6 +103,8 @@ type msgSeqno*: uint64 anonymize*: bool # if we omit fromPeer and seqno from RPC messages we send + knownTopics*: HashSet[string] + method unsubscribePeer*(p: PubSub, peerId: PeerID) {.base.} = ## handle peer disconnects ## @@ -98,6 +127,46 @@ proc broadcast*( msg: RPCMsg) = # raises: [Defect] ## Attempt to send `msg` to the given peers + let npeers = sendPeers.len.int64 + for sub in msg.subscriptions: + if sub.subscribe: + if p.knownTopics.contains(sub.topic): + libp2p_pubsub_broadcast_subscriptions.inc(npeers, labelValues = [sub.topic]) + else: + libp2p_pubsub_broadcast_subscriptions.inc(npeers, labelValues = ["generic"]) + else: + if p.knownTopics.contains(sub.topic): + libp2p_pubsub_broadcast_unsubscriptions.inc(npeers, labelValues = [sub.topic]) + else: + libp2p_pubsub_broadcast_unsubscriptions.inc(npeers, labelValues = ["generic"]) + + for smsg in msg.messages: + for topic in smsg.topicIDs: + if p.knownTopics.contains(topic): + libp2p_pubsub_broadcast_messages.inc(npeers, labelValues = [topic]) + else: + libp2p_pubsub_broadcast_messages.inc(npeers, labelValues = ["generic"]) + + if msg.control.isSome(): + libp2p_pubsub_broadcast_iwant.inc(npeers * msg.control.get().iwant.len.int64) + + let control = msg.control.get() + for ihave in control.ihave: + if p.knownTopics.contains(ihave.topicID): + libp2p_pubsub_broadcast_ihave.inc(npeers, labelValues = [ihave.topicID]) + else: + libp2p_pubsub_broadcast_ihave.inc(npeers, labelValues = ["generic"]) + for graft in control.graft: + if p.knownTopics.contains(graft.topicID): + libp2p_pubsub_broadcast_graft.inc(npeers, labelValues = [graft.topicID]) + else: + libp2p_pubsub_broadcast_graft.inc(npeers, labelValues = ["generic"]) + for prune in control.prune: + if p.knownTopics.contains(prune.topicID): + libp2p_pubsub_broadcast_prune.inc(npeers, labelValues = [prune.topicID]) + else: + libp2p_pubsub_broadcast_prune.inc(npeers, labelValues = ["generic"]) + trace "broadcasting messages to peers", peers = sendPeers.len, msg = shortLog(msg) for peer in sendPeers: @@ -109,6 +178,17 @@ proc sendSubs*(p: PubSub, subscribe: bool) = ## send subscriptions to remote peer p.send(peer, RPCMsg.withSubs(topics, subscribe)) + for topic in topics: + if subscribe: + if p.knownTopics.contains(topic): + libp2p_pubsub_broadcast_subscriptions.inc(labelValues = [topic]) + else: + libp2p_pubsub_broadcast_subscriptions.inc(labelValues = ["generic"]) + else: + if p.knownTopics.contains(topic): + libp2p_pubsub_broadcast_unsubscriptions.inc(labelValues = [topic]) + else: + libp2p_pubsub_broadcast_unsubscriptions.inc(labelValues = ["generic"]) method subscribeTopic*(p: PubSub, topic: string, @@ -126,6 +206,45 @@ method rpcHandler*(p: PubSub, trace "about to subscribe to topic", topicId = s.topic, peer p.subscribeTopic(s.topic, s.subscribe, peer) + for sub in rpcMsg.subscriptions: + if sub.subscribe: + if p.knownTopics.contains(sub.topic): + libp2p_pubsub_received_subscriptions.inc(labelValues = [sub.topic]) + else: + libp2p_pubsub_received_subscriptions.inc(labelValues = ["generic"]) + else: + if p.knownTopics.contains(sub.topic): + libp2p_pubsub_received_unsubscriptions.inc(labelValues = [sub.topic]) + else: + libp2p_pubsub_received_unsubscriptions.inc(labelValues = ["generic"]) + + for smsg in rpcMsg.messages: + for topic in smsg.topicIDs: + if p.knownTopics.contains(topic): + libp2p_pubsub_received_messages.inc(labelValues = [topic]) + else: + libp2p_pubsub_received_messages.inc(labelValues = ["generic"]) + + if rpcMsg.control.isSome(): + libp2p_pubsub_received_iwant.inc(rpcMsg.control.get().iwant.len.int64) + + let control = rpcMsg.control.get() + for ihave in control.ihave: + if p.knownTopics.contains(ihave.topicID): + libp2p_pubsub_received_ihave.inc(labelValues = [ihave.topicID]) + else: + libp2p_pubsub_received_ihave.inc(labelValues = ["generic"]) + for graft in control.graft: + if p.knownTopics.contains(graft.topicID): + libp2p_pubsub_received_graft.inc(labelValues = [graft.topicID]) + else: + libp2p_pubsub_received_graft.inc(labelValues = ["generic"]) + for prune in control.prune: + if p.knownTopics.contains(prune.topicID): + libp2p_pubsub_received_prune.inc(labelValues = [prune.topicID]) + else: + libp2p_pubsub_received_prune.inc(labelValues = ["generic"]) + method onNewPeer(p: PubSub, peer: PubSubPeer) {.base.} = discard method onPubSubPeerEvent*(p: PubSub, peer: PubsubPeer, event: PubsubPeerEvent) {.base, gcsafe.} = @@ -171,7 +290,7 @@ proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] {.asyn # gather all futures without yielding to scheduler var futs = p.topics[topic].handler.mapIt(it(topic, data)) - + try: futs = await allFinished(futs) except CancelledError: @@ -179,7 +298,7 @@ proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] {.asyn for fut in futs: if not(fut.finished): fut.cancel() - + # check for errors in futures for fut in futs: if fut.failed: @@ -230,6 +349,17 @@ method subscribePeer*(p: PubSub, peer: PeerID) {.base.} = let peer = p.getOrCreatePeer(peer, p.codecs) peer.outbound = true # flag as outbound +proc updateTopicMetrics(p: PubSub, topic: string) = + # metrics + libp2p_pubsub_topics.set(p.topics.len.int64) + if p.knownTopics.contains(topic): + libp2p_pubsub_topic_handlers.set(p.topics[topic].handler.len.int64, labelValues = [topic]) + else: + libp2p_pubsub_topic_handlers.set(0, labelValues = ["other"]) + for key, val in p.topics: + if not p.knownTopics.contains(key): + libp2p_pubsub_topic_handlers.inc(val.handler.len.int64, labelValues = ["other"]) + method unsubscribe*(p: PubSub, topics: seq[TopicPair]) {.base.} = ## unsubscribe from a list of ``topic`` strings @@ -246,7 +376,9 @@ method unsubscribe*(p: PubSub, # no more handlers are left p.topics.del(ttopic) - libp2p_pubsub_topics.set(p.topics.len.int64) + p.updateTopicMetrics(ttopic) + + libp2p_pubsub_unsubscriptions.inc() proc unsubscribe*(p: PubSub, topic: string, @@ -256,8 +388,14 @@ proc unsubscribe*(p: PubSub, p.unsubscribe(@[(topic, handler)]) method unsubscribeAll*(p: PubSub, topic: string) {.base.} = - p.topics.del(topic) - libp2p_pubsub_topics.set(p.topics.len.int64) + if topic notin p.topics: + debug "unsubscribeAll called for an unknown topic", topic + else: + p.topics.del(topic) + + p.updateTopicMetrics(topic) + + libp2p_pubsub_unsubscriptions.inc() method subscribe*(p: PubSub, topic: string, @@ -279,8 +417,9 @@ method subscribe*(p: PubSub, for _, peer in p.peers: p.sendSubs(peer, @[topic], true) - # metrics - libp2p_pubsub_topics.set(p.topics.len.int64) + p.updateTopicMetrics(topic) + + libp2p_pubsub_subscriptions.inc() method publish*(p: PubSub, topic: string, @@ -397,6 +536,8 @@ proc init*[PubParams: object | bool]( switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined) switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) + pubsub.knownTopics = KnownLibP2PTopicsSeq.toHashSet() + pubsub.initPubSub() return pubsub diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 9c5fa5033..b1c925ebd 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -59,6 +59,9 @@ type outbound*: bool # if this is an outbound connection appScore*: float64 # application specific score behaviourPenalty*: float64 # the eventual penalty score + + when defined(libp2p_agents_metrics): + shortAgent*: string RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] {.gcsafe.} diff --git a/libp2p/protocols/pubsub/rpc/protobuf.nim b/libp2p/protocols/pubsub/rpc/protobuf.nim index 4733d4d21..68794ea9a 100644 --- a/libp2p/protocols/pubsub/rpc/protobuf.nim +++ b/libp2p/protocols/pubsub/rpc/protobuf.nim @@ -19,12 +19,21 @@ import messages, logScope: topics = "pubsubprotobuf" +when defined(libp2p_protobuf_metrics): + import metrics + + declareCounter(libp2p_pubsub_rpc_bytes_read, "pubsub rpc bytes read", labels = ["kind"]) + declareCounter(libp2p_pubsub_rpc_bytes_write, "pubsub rpc bytes write", labels = ["kind"]) + proc write*(pb: var ProtoBuffer, field: int, graft: ControlGraft) = var ipb = initProtoBuffer() ipb.write(1, graft.topicID) ipb.finish() pb.write(field, ipb) + when defined(libp2p_protobuf_metrics): + libp2p_pubsub_rpc_bytes_write.inc(ipb.getLen().int64, labelValues = ["graft"]) + proc write*(pb: var ProtoBuffer, field: int, infoMsg: PeerInfoMsg) = var ipb = initProtoBuffer() ipb.write(1, infoMsg.peerID) @@ -41,6 +50,9 @@ proc write*(pb: var ProtoBuffer, field: int, prune: ControlPrune) = ipb.finish() pb.write(field, ipb) + when defined(libp2p_protobuf_metrics): + libp2p_pubsub_rpc_bytes_write.inc(ipb.getLen().int64, labelValues = ["prune"]) + proc write*(pb: var ProtoBuffer, field: int, ihave: ControlIHave) = var ipb = initProtoBuffer() ipb.write(1, ihave.topicID) @@ -49,6 +61,9 @@ proc write*(pb: var ProtoBuffer, field: int, ihave: ControlIHave) = ipb.finish() pb.write(field, ipb) + when defined(libp2p_protobuf_metrics): + libp2p_pubsub_rpc_bytes_write.inc(ipb.getLen().int64, labelValues = ["ihave"]) + proc write*(pb: var ProtoBuffer, field: int, iwant: ControlIWant) = var ipb = initProtoBuffer() for mid in iwant.messageIDs: @@ -57,6 +72,9 @@ proc write*(pb: var ProtoBuffer, field: int, iwant: ControlIWant) = ipb.finish() pb.write(field, ipb) + when defined(libp2p_protobuf_metrics): + libp2p_pubsub_rpc_bytes_write.inc(ipb.getLen().int64, labelValues = ["iwant"]) + proc write*(pb: var ProtoBuffer, field: int, control: ControlMessage) = var ipb = initProtoBuffer() for ihave in control.ihave: @@ -78,6 +96,9 @@ proc write*(pb: var ProtoBuffer, field: int, subs: SubOpts) = ipb.finish() pb.write(field, ipb) + when defined(libp2p_protobuf_metrics): + libp2p_pubsub_rpc_bytes_write.inc(ipb.getLen().int64, labelValues = ["subs"]) + proc encodeMessage*(msg: Message, anonymize: bool): seq[byte] = var pb = initProtoBuffer() if len(msg.fromPeer) > 0 and not anonymize: @@ -92,6 +113,10 @@ proc encodeMessage*(msg: Message, anonymize: bool): seq[byte] = if len(msg.key) > 0 and not anonymize: pb.write(6, msg.key) pb.finish() + + when defined(libp2p_protobuf_metrics): + libp2p_pubsub_rpc_bytes_write.inc(pb.getLen().int64, labelValues = ["message"]) + pb.buffer proc write*(pb: var ProtoBuffer, field: int, msg: Message, anonymize: bool) = @@ -99,6 +124,9 @@ proc write*(pb: var ProtoBuffer, field: int, msg: Message, anonymize: bool) = proc decodeGraft*(pb: ProtoBuffer): ProtoResult[ControlGraft] {. inline.} = + when defined(libp2p_protobuf_metrics): + libp2p_pubsub_rpc_bytes_read.inc(pb.getLen().int64, labelValues = ["graft"]) + trace "decodeGraft: decoding message" var control = ControlGraft() if ? pb.getField(1, control.topicId): @@ -123,6 +151,9 @@ proc decodePeerInfoMsg*(pb: ProtoBuffer): ProtoResult[PeerInfoMsg] {. proc decodePrune*(pb: ProtoBuffer): ProtoResult[ControlPrune] {. inline.} = + when defined(libp2p_protobuf_metrics): + libp2p_pubsub_rpc_bytes_read.inc(pb.getLen().int64, labelValues = ["prune"]) + trace "decodePrune: decoding message" var control = ControlPrune() if ? pb.getField(1, control.topicId): @@ -139,6 +170,9 @@ proc decodePrune*(pb: ProtoBuffer): ProtoResult[ControlPrune] {. proc decodeIHave*(pb: ProtoBuffer): ProtoResult[ControlIHave] {. inline.} = + when defined(libp2p_protobuf_metrics): + libp2p_pubsub_rpc_bytes_read.inc(pb.getLen().int64, labelValues = ["ihave"]) + trace "decodeIHave: decoding message" var control = ControlIHave() if ? pb.getField(1, control.topicId): @@ -152,6 +186,9 @@ proc decodeIHave*(pb: ProtoBuffer): ProtoResult[ControlIHave] {. ok(control) proc decodeIWant*(pb: ProtoBuffer): ProtoResult[ControlIWant] {.inline.} = + when defined(libp2p_protobuf_metrics): + libp2p_pubsub_rpc_bytes_read.inc(pb.getLen().int64, labelValues = ["iwant"]) + trace "decodeIWant: decoding message" var control = ControlIWant() if ? pb.getRepeatedField(1, control.messageIDs): @@ -192,6 +229,9 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {. ok(none[ControlMessage]()) proc decodeSubscription*(pb: ProtoBuffer): ProtoResult[SubOpts] {.inline.} = + when defined(libp2p_protobuf_metrics): + libp2p_pubsub_rpc_bytes_read.inc(pb.getLen().int64, labelValues = ["subs"]) + trace "decodeSubscription: decoding message" var subflag: uint64 var sub = SubOpts() @@ -221,6 +261,9 @@ proc decodeSubscriptions*(pb: ProtoBuffer): ProtoResult[seq[SubOpts]] {. ok(subs) proc decodeMessage*(pb: ProtoBuffer): ProtoResult[Message] {.inline.} = + when defined(libp2p_protobuf_metrics): + libp2p_pubsub_rpc_bytes_read.inc(pb.getLen().int64, labelValues = ["message"]) + trace "decodeMessage: decoding message" var msg: Message if ? pb.getField(1, msg.fromPeer): diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index 1daff0433..aef0936ed 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -7,9 +7,10 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/[oids, strformat] -import chronos, chronicles +import std/[oids, strformat, strutils] +import chronos, chronicles, metrics import connection +import ../utility logScope: topics = "libp2p chronosstream" @@ -21,6 +22,14 @@ const type ChronosStream* = ref object of Connection client: StreamTransport + when defined(libp2p_agents_metrics): + tracked: bool + shortAgent: string + +when defined(libp2p_agents_metrics): + declareGauge(libp2p_peers_identity, "peers identities", labels = ["agent"]) + declareCounter(libp2p_peers_traffic_read, "incoming traffic", labels = ["agent"]) + declareCounter(libp2p_peers_traffic_write, "outgoing traffic", labels = ["agent"]) func shortLog*(conn: ChronosStream): string = if conn.isNil: "ChronosStream(nil)" @@ -65,6 +74,24 @@ template withExceptions(body: untyped) = # TODO https://github.com/status-im/nim-chronos/pull/99 raise newLPStreamEOFError() +when defined(libp2p_agents_metrics): + proc trackPeerIdentity(s: ChronosStream) = + if not s.tracked: + if not isNil(s.peerInfo) and s.peerInfo.agentVersion.len > 0: + # / seems a weak "standard" so for now it's reliable + let shortAgent = s.peerInfo.agentVersion.split("/")[0].toLowerAscii() + if KnownLibP2PAgentsSeq.contains(shortAgent): + s.shortAgent = shortAgent + else: + s.shortAgent = "unknown" + libp2p_peers_identity.inc(labelValues = [s.shortAgent]) + s.tracked = true + + proc untrackPeerIdentity(s: ChronosStream) = + if s.tracked: + libp2p_peers_identity.dec(labelValues = [s.shortAgent]) + s.tracked = false + method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async.} = if s.atEof: raise newLPStreamEOFError() @@ -72,6 +99,10 @@ method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {. withExceptions: result = await s.client.readOnce(pbytes, nbytes) s.activity = true # reset activity flag + when defined(libp2p_agents_metrics): + s.trackPeerIdentity() + if s.tracked: + libp2p_peers_traffic_read.inc(nbytes.int64, labelValues = [s.shortAgent]) method write*(s: ChronosStream, msg: seq[byte]) {.async.} = if s.closed: @@ -90,6 +121,10 @@ method write*(s: ChronosStream, msg: seq[byte]) {.async.} = raise (ref LPStreamClosedError)(msg: "Write couldn't finish writing") s.activity = true # reset activity flag + when defined(libp2p_agents_metrics): + s.trackPeerIdentity() + if s.tracked: + libp2p_peers_traffic_write.inc(msg.len.int64, labelValues = [s.shortAgent]) method closed*(s: ChronosStream): bool {.inline.} = result = s.client.closed @@ -99,17 +134,20 @@ method atEof*(s: ChronosStream): bool {.inline.} = method closeImpl*(s: ChronosStream) {.async.} = try: - trace "Shutting down chronos stream", address = $s.client.remoteAddress(), - s + trace "Shutting down chronos stream", address = $s.client.remoteAddress(), s + if not s.client.closed(): await s.client.closeWait() - trace "Shutdown chronos stream", address = $s.client.remoteAddress(), - s + trace "Shutdown chronos stream", address = $s.client.remoteAddress(), s except CancelledError as exc: raise exc except CatchableError as exc: trace "Error closing chronosstream", s, msg = exc.msg + + when defined(libp2p_agents_metrics): + # do this after closing! + s.untrackPeerIdentity() await procCall Connection(s).closeImpl() diff --git a/libp2p/utility.nim b/libp2p/utility.nim index 94307d139..9f3081a64 100644 --- a/libp2p/utility.nim +++ b/libp2p/utility.nim @@ -8,6 +8,7 @@ ## those terms. import stew/byteutils +import strutils const ShortDumpMax = 12 @@ -35,3 +36,8 @@ func shortLog*(item: string): string = result &= item[0..