From ac04ca6e3137adcc6c3502c60a4b545681b103aa Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Thu, 11 Jun 2020 20:20:58 -0600 Subject: [PATCH] make sure keys exist and more metrics (#215) --- libp2p/protocols/pubsub/gossipsub.nim | 127 +++++++++++++++---------- libp2p/protocols/pubsub/pubsub.nim | 5 +- libp2p/protocols/pubsub/pubsubpeer.nim | 12 ++- 3 files changed, 88 insertions(+), 56 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 85c3fd4e8..995d95a03 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -78,16 +78,17 @@ proc replenishFanout(g: GossipSub, topic: string) {.async.} = if topic notin g.fanout: g.fanout[topic] = initHashSet[string]() - if g.fanout[topic].len < GossipSubDLo: - trace "replenishing fanout", peers = g.fanout[topic].len + if g.fanout.getOrDefault(topic).len < GossipSubDLo: + trace "replenishing fanout", peers = g.fanout.getOrDefault(topic).len if topic in g.gossipsub: - for p in g.gossipsub[topic]: + for p in g.gossipsub.getOrDefault(topic): if not g.fanout[topic].containsOrIncl(p): - libp2p_gossipsub_peers_per_topic_fanout.set(g.fanout[topic].len.int64, labelValues = [topic]) - if g.fanout[topic].len == GossipSubD: + if g.fanout.getOrDefault(topic).len == GossipSubD: break - trace "fanout replenished with peers", peers = g.fanout[topic].len + libp2p_gossipsub_peers_per_topic_fanout + .set(g.fanout.getOrDefault(topic).len.int64, labelValues = [topic]) + trace "fanout replenished with peers", peers = g.fanout.getOrDefault(topic).len proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = try: @@ -96,47 +97,68 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = if topic notin g.mesh: g.mesh[topic] = initHashSet[string]() - if g.mesh[topic].len < GossipSubDlo: - trace "replenishing mesh" + if g.mesh.getOrDefault(topic).len < GossipSubDlo: + trace "replenishing mesh", topic # replenish the mesh if we're below GossipSubDlo - while g.mesh[topic].len < GossipSubD: - trace "gathering peers", peers = g.mesh[topic].len + while g.mesh.getOrDefault(topic).len < GossipSubD: + trace "gathering peers", peers = g.mesh.getOrDefault(topic).len + await sleepAsync(1.millis) # don't starve the event loop var id: string - if topic in g.fanout and g.fanout[topic].len > 0: - id = sample(toSeq(g.fanout[topic])) + if topic in g.fanout and g.fanout.getOrDefault(topic).len > 0: + trace "getting peer from fanout", topic, + peers = g.fanout.getOrDefault(topic).len + + id = sample(toSeq(g.fanout.getOrDefault(topic))) g.fanout[topic].excl(id) - libp2p_gossipsub_peers_per_topic_fanout.set(g.fanout[topic].len.int64, labelValues = [topic]) + + if id in g.fanout[topic]: + continue # we already have this peer in the mesh, try again + trace "got fanout peer", peer = id - elif topic in g.gossipsub and g.gossipsub[topic].len > 0: + elif topic in g.gossipsub and g.gossipsub.getOrDefault(topic).len > 0: + trace "getting peer from gossipsub", topic, + peers = g.gossipsub.getOrDefault(topic).len + id = sample(toSeq(g.gossipsub[topic])) g.gossipsub[topic].excl(id) - libp2p_gossipsub_peers_per_topic_gossipsub.set(g.gossipsub[topic].len.int64, labelValues = [topic]) + + if id in g.mesh[topic]: + continue # we already have this peer in the mesh, try again + trace "got gossipsub peer", peer = id else: trace "no more peers" break g.mesh[topic].incl(id) - libp2p_gossipsub_peers_per_topic_mesh.set(g.mesh[topic].len.int64, labelValues = [topic]) if id in g.peers: let p = g.peers[id] # send a graft message to the peer await p.sendGraft(@[topic]) # prune peers if we've gone over - if g.mesh[topic].len > GossipSubDhi: - trace "pruning mesh" - while g.mesh[topic].len > GossipSubD: + if g.mesh.getOrDefault(topic).len > GossipSubDhi: + trace "about to prune mesh", mesh = g.mesh.getOrDefault(topic).len + while g.mesh.getOrDefault(topic).len > GossipSubD: trace "pruning peers", peers = g.mesh[topic].len let id = toSeq(g.mesh[topic])[rand(0.. val: dropping.add(topic) g.fanout.del(topic) + for topic in dropping: - g.lastFanoutPubSub.del(topic) + g.lastFanoutPubSub.del(topic) + + libp2p_gossipsub_peers_per_topic_fanout + .set(g.fanout.getOrDefault(topic).len.int64, labelValues = [topic]) proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} = ## gossip iHave messages to peers let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys)) for topic in topics: - let mesh: HashSet[string] = - if topic in g.mesh: - g.mesh[topic] - else: - initHashSet[string]() - - let fanout: HashSet[string] = - if topic in g.fanout: - g.fanout[topic] - else: - initHashSet[string]() + let mesh: HashSet[string] = g.mesh.getOrDefault(topic) + let fanout: HashSet[string] = g.fanout.getOrDefault(topic) let gossipPeers = mesh + fanout let mids = g.mcache.window(topic) @@ -178,25 +195,27 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} = continue while result.len < GossipSubD: - if not (g.gossipsub[topic].len > 0): + if g.gossipsub.getOrDefault(topic).len == 0: trace "no peers for topic, skipping", topicID = topic break - let id = toSeq(g.gossipsub[topic]).sample() + let id = toSeq(g.gossipsub.getOrDefault(topic)).sample() g.gossipsub[topic].excl(id) - libp2p_gossipsub_peers_per_topic_gossipsub.set(g.gossipsub[topic].len.int64, labelValues = [topic]) if id notin gossipPeers: if id notin result: result[id] = ControlMessage() result[id].ihave.add(ihave) + libp2p_gossipsub_peers_per_topic_gossipsub + .set(g.gossipsub.getOrDefault(topic).len.int64, labelValues = [topic]) + proc heartbeat(g: GossipSub) {.async.} = while true: try: await g.heartbeatLock.acquire() trace "running heartbeat" - for t in g.mesh.keys: + for t in g.topics.keys: await g.rebalanceMesh(t) await g.dropFanoutPeers() @@ -222,15 +241,21 @@ method handleDisconnect(g: GossipSub, peer: PubSubPeer) {.async.} = for t in g.gossipsub.keys: g.gossipsub[t].excl(peer.id) - libp2p_gossipsub_peers_per_topic_gossipsub.set(g.gossipsub[t].len.int64, labelValues = [t]) + libp2p_gossipsub_peers_per_topic_gossipsub + .set(g.gossipsub[t].len.int64, labelValues = [t]) + + # mostly for metrics + await procCall PubSub(g).subscribeTopic(t, false, peer.id) for t in g.mesh.keys: g.mesh[t].excl(peer.id) - libp2p_gossipsub_peers_per_topic_mesh.set(g.mesh[t].len.int64, labelValues = [t]) + libp2p_gossipsub_peers_per_topic_mesh + .set(g.mesh[t].len.int64, labelValues = [t]) for t in g.fanout.keys: g.fanout[t].excl(peer.id) - libp2p_gossipsub_peers_per_topic_fanout.set(g.fanout[t].len.int64, labelValues = [t]) + libp2p_gossipsub_peers_per_topic_fanout + .set(g.fanout[t].len.int64, labelValues = [t]) method subscribeToPeer*(p: GossipSub, conn: Connection) {.async.} = @@ -250,12 +275,13 @@ method subscribeTopic*(g: GossipSub, trace "adding subscription for topic", peer = peerId, name = topic # subscribe remote peer to the topic g.gossipsub[topic].incl(peerId) - libp2p_gossipsub_peers_per_topic_gossipsub.set(g.gossipsub[topic].len.int64, labelValues = [topic]) else: trace "removing subscription for topic", peer = peerId, name = topic # unsubscribe remote peer from the topic g.gossipsub[topic].excl(peerId) - libp2p_gossipsub_peers_per_topic_gossipsub.set(g.gossipsub[topic].len.int64, labelValues = [topic]) + + libp2p_gossipsub_peers_per_topic_gossipsub + .set(g.gossipsub.getOrDefault(topic).len.int64, labelValues = [topic]) if topic in g.topics: await g.rebalanceMesh(topic) @@ -271,13 +297,17 @@ proc handleGraft(g: GossipSub, if graft.topicID in g.topics: if g.mesh.len < GossipSubD: g.mesh[graft.topicID].incl(peer.id) - libp2p_gossipsub_peers_per_topic_mesh.set(g.mesh[graft.topicID].len.int64, labelValues = [graft.topicID]) else: g.gossipsub[graft.topicID].incl(peer.id) - libp2p_gossipsub_peers_per_topic_gossipsub.set(g.gossipsub[graft.topicID].len.int64, labelValues = [graft.topicID]) else: respControl.prune.add(ControlPrune(topicID: graft.topicID)) + libp2p_gossipsub_peers_per_topic_mesh + .set(g.mesh[graft.topicID].len.int64, labelValues = [graft.topicID]) + + libp2p_gossipsub_peers_per_topic_gossipsub + .set(g.gossipsub[graft.topicID].len.int64, labelValues = [graft.topicID]) + proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = for prune in prunes: trace "processing prune message", peer = peer.id, @@ -285,7 +315,8 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = if prune.topicID in g.mesh: g.mesh[prune.topicID].excl(peer.id) - libp2p_gossipsub_peers_per_topic_mesh.set(g.mesh[prune.topicID].len.int64, labelValues = [prune.topicID]) + libp2p_gossipsub_peers_per_topic_mesh + .set(g.mesh[prune.topicID].len.int64, labelValues = [prune.topicID]) proc handleIHave(g: GossipSub, peer: PubSubPeer, @@ -406,7 +437,7 @@ method unsubscribe*(g: GossipSub, for pair in topics: let topic = pair.topic if topic in g.mesh: - let peers = g.mesh[topic] + let peers = g.mesh.getOrDefault(topic) g.mesh.del(topic) for id in peers: let p = g.peers[id] @@ -427,11 +458,11 @@ method publish*(g: GossipSub, if topic in g.topics: # if we're subscribed to the topic attempt to build a mesh await g.rebalanceMesh(topic) - peers = g.mesh[topic] + peers = g.mesh.getOrDefault(topic) else: # send to fanout peers await g.replenishFanout(topic) if topic in g.fanout: - peers = g.fanout[topic] + peers = g.fanout.getOrDefault(topic) # set the fanout expiry time g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL) diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index ce1e33da3..0667af9df 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -187,6 +187,8 @@ method unsubscribe*(p: PubSub, topics: seq[TopicPair]) {.base, async.} = ## unsubscribe from a list of ``topic`` strings for t in topics: + # metrics + libp2p_pubsub_topics.dec() for i, h in p.topics[t.topic].handler: if h == t.handler: p.topics[t.topic].handler.del(i) @@ -194,9 +196,6 @@ method unsubscribe*(p: PubSub, method unsubscribe*(p: PubSub, topic: string, handler: TopicHandler): Future[void] {.base.} = - # metrics - libp2p_pubsub_topics.dec() - ## unsubscribe from a ``topic`` string p.unsubscribe(@[(topic, handler)]) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 4a6c9664f..b7401f65e 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -22,6 +22,10 @@ import rpc/[messages, message, protobuf], logScope: topics = "pubsubpeer" +declareCounter(libp2p_pubsub_sent_messages, "number of messages sent", labels = ["id"]) +declareCounter(libp2p_pubsub_received_messages, "number of messages received", labels = ["id"]) +declareCounter(libp2p_pubsub_skipped_messages, "number of skipped messages", labels = ["id"]) + type PubSubObserver* = ref object onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].} @@ -41,9 +45,6 @@ type RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} -declareCounter(libp2p_pubsub_sent_messages, "number of messages sent") -declareCounter(libp2p_pubsub_received_messages, "number of messages received") - proc id*(p: PubSubPeer): string = p.peerInfo.id proc isConnected*(p: PubSubPeer): bool = @@ -77,6 +78,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = let digest = $(sha256.digest(data)) trace "read data from peer", peer = p.id, data = data.shortLog if digest in p.recvdRpcCache: + libp2p_pubsub_skipped_messages.inc(labelValues = [p.id]) trace "message already received, skipping", peer = p.id continue @@ -86,7 +88,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = p.recvObservers(msg) # metrics - libp2p_pubsub_received_messages.inc() + libp2p_pubsub_received_messages.inc(labelValues = [p.id]) await p.handler(p, @[msg]) p.recvdRpcCache.put(digest) @@ -127,7 +129,7 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = p.sentRpcCache.put(digest) # metrics - libp2p_pubsub_sent_messages.inc() + libp2p_pubsub_sent_messages.inc(labelValues = [p.id]) except CatchableError as exc: trace "unable to send to remote", exc = exc.msg p.sendConn = nil