From 4858e0ab1586c393af7eb51516ce3aed6b40ec85 Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni <7008900+sinkingsugar@users.noreply.github.com> Date: Sun, 20 Dec 2020 00:45:34 +0900 Subject: [PATCH] Gossipsub refactor pt2 (#495) * add sub/unsub test * remove unused variable from gossip --- libp2p/protocols/pubsub/gossipsub.nim | 10 ++----- tests/pubsub/testgossipinternal.nim | 42 +++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 8 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index fc078c617..e8431940c 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -166,9 +166,6 @@ type heartbeatEvents*: seq[AsyncEvent] - when not defined(release): - prunedPeers: HashSet[PubSubPeer] - when defined(libp2p_expensive_metrics): declareGauge(libp2p_gossipsub_peers_per_topic_mesh, "gossipsub peers per topic in mesh", @@ -339,9 +336,6 @@ proc grafted(g: GossipSub, p: PubSubPeer, topic: string) = proc pruned(g: GossipSub, p: PubSubPeer, topic: string) = g.peerStats.withValue(p.peerId, stats): - when not defined(release): - g.prunedPeers.incl(p) - if topic in stats.topicInfos: var info = stats.topicInfos[topic] let topicParams = g.topicParams.mgetOrPut(topic, TopicParams.init()) @@ -1270,12 +1264,12 @@ method unsubscribeAll*(g: GossipSub, topic: string) = # send to peers NOT in mesh first g.broadcast(toSeq(gpeers), msg) - g.mesh.del(topic) - for peer in mpeers: trace "pruning unsubscribeAll call peer", peer, score = peer.score g.pruned(peer, topic) + g.mesh.del(topic) + msg.control = some(ControlMessage(prune: @[ControlPrune(topicID: topic, diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 5805d72ed..273afaf0e 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -30,6 +30,48 @@ suite "GossipSub internal": teardown: checkTrackers() + asyncTest "subscribe/unsubscribeAll": + let gossipSub = TestGossipSub.init(newStandardSwitch()) + + proc handler(topic: string, data: seq[byte]): Future[void] {.gcsafe.} = + discard + + let topic = "foobar" + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + gossipSub.topicParams[topic] = TopicParams.init() + + var conns = newSeq[Connection]() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() + for i in 0..<15: + let conn = newBufferStream(noop) + conns &= conn + let peerInfo = randomPeerInfo() + conn.peerInfo = peerInfo + let peer = gossipSub.getPubSubPeer(peerInfo.peerId) + peer.sendConn = conn + gossipSub.onNewPeer(peer) + gossipSub.peers[peerInfo.peerId] = peer + gossipSub.gossipsub[topic].incl(peer) + + # test via dynamic dispatch + gossipSub.PubSub.subscribe(topic, handler) + + check: + gossipSub.topics.contains(topic) + gossipSub.gossipsub[topic].len() > 0 + gossipSub.mesh[topic].len() > 0 + + # test via dynamic dispatch + gossipSub.PubSub.unsubscribeAll(topic) + + check: + topic notin gossipSub.topics # not in local topics + topic notin gossipSub.mesh # not in mesh + topic in gossipSub.gossipsub # but still in gossipsub table (for fanning out) + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + asyncTest "topic params": let params = TopicParams.init() params.validateParameters().tryGet()