From df674d586a89609c50256d19abdbf6df95d77a2d Mon Sep 17 00:00:00 2001 From: shashankshampi Date: Thu, 24 Oct 2024 19:16:13 +0530 Subject: [PATCH] another set of review comment fix --- tests/pubsub/testgossipmembership.nim | 213 +++++++++++++------------- 1 file changed, 105 insertions(+), 108 deletions(-) diff --git a/tests/pubsub/testgossipmembership.nim b/tests/pubsub/testgossipmembership.nim index d158c47b4..0a6ddf4e0 100644 --- a/tests/pubsub/testgossipmembership.nim +++ b/tests/pubsub/testgossipmembership.nim @@ -37,128 +37,118 @@ let DURATION_TIMEOUT = 500.milliseconds suite "GossipSub Topic Membership Tests": teardown: checkTrackers() - # Addition of Designed Test cases for 6. Topic Membership Tests: https://www.notion.so/Gossipsub-651e02d4d7894bb2ac1e4edb55f3192d - # Generalized setup function to initialize one or more topics - proc setupGossipSub( - topics: seq[string], numPeers: int - ): (TestGossipSub, seq[Connection]) = - let gossipSub = TestGossipSub.init(newStandardSwitch()) - var conns = newSeq[Connection]() - - for topic in topics: - gossipSub.mesh[topic] = initHashSet[PubSubPeer]() - gossipSub.topicParams[topic] = TopicParams.init() - gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() - - for i in 0 ..< numPeers: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - peer.sendConn = conn - gossipSub.gossipsub[topic].incl(peer) - - return (gossipSub, conns) - - # Wrapper function to initialize a single topic by converting it into a seq - proc setupGossipSub(topic: string, numPeers: int): (TestGossipSub, seq[Connection]) = - setupGossipSub(@[topic], numPeers) - - # Helper function to subscribe to topics - proc subscribeToTopics(gossipSub: TestGossipSub, topics: seq[string]) = - for topic in topics: - gossipSub.subscribe( - topic, - proc(topic: string, data: seq[byte]): Future[void] {.async.} = - discard - , - ) - - # Helper function to unsubscribe to topics - proc unsubscribeFromTopics(gossipSub: TestGossipSub, topics: seq[string]) = - for topic in topics: - gossipSub.unsubscribeAll(topic) - - proc commonSubscribe( - nodes: seq[TestGossipSub], - topic: string, - handler: proc(topic: string, data: seq[byte]) {.async.}, - ) = - for node in nodes: - node.subscribe(topic, handler) - echo "Subscribed all nodes to the topic: ", topic - - proc commonUnsubscribe( - nodes: seq[TestGossipSub], - topic: string, - handler: proc(topic: string, data: seq[byte]) {.async.}, - ) = - for node in nodes: - node.unsubscribe(topic, handler) - echo "Unsubscribed all nodes from the topic: ", topic - # Simulate the `SUBSCRIBE` to the topic and check proper handling in the mesh and gossipsub structures asyncTest "handle SUBSCRIBE to the topic": - let topic = "test-topic" - let (gossipSub, conns) = setupGossipSub(topic, 5) + let + numberOfNodes = 5 + topic = "test-topic" + nodes = generateNodes(numberOfNodes, gossip = true) - subscribeToTopics(gossipSub, @[topic]) + await allFuturesThrowing(nodes.mapIt(it.switch.start())) - check gossipSub.topics.contains(topic) + await subscribeNodes(nodes) + for node in nodes: + node.subscribe(topic, voidTopicHandler) - check gossipSub.gossipsub[topic].len() == 5 + await sleepAsync(2 * DURATION_TIMEOUT) - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() + for node in nodes: + let currentGossip = GossipSub(node) - # Simulate an UNSUBSCRIBE to the topic and check if the topic is removed from the relevant data structures but remains in gossipsub + check currentGossip.topics.contains(topic) + check currentGossip.gossipsub[topic].len() == numberOfNodes - 1 + check currentGossip.mesh[topic].len() == numberOfNodes - 1 + + await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) + + # Simulate an UNSUBSCRIBE to the topic and check if the topic is removed from all relevant data structures asyncTest "handle UNSUBSCRIBE to the topic": - let topic = "test-topic" - let (gossipSub, conns) = setupGossipSub(topic, 5) + let + numberOfNodes = 5 + topic = "test-topic" + nodes = generateNodes(numberOfNodes, gossip = true) - subscribeToTopics(gossipSub, @[topic]) + await allFuturesThrowing(nodes.mapIt(it.switch.start())) - unsubscribeFromTopics(gossipSub, @[topic]) + await subscribeNodes(nodes) + for node in nodes: + node.subscribe(topic, voidTopicHandler) - check topic notin gossipSub.topics - check topic notin gossipSub.mesh - check topic in gossipSub.gossipsub + await sleepAsync(2 * DURATION_TIMEOUT) - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() + for node in nodes: + node.unsubscribe(topic, voidTopicHandler) + + await sleepAsync(2 * DURATION_TIMEOUT) + + for node in nodes: + let currentGossip = GossipSub(node) + + check topic notin currentGossip.topics + + if topic in currentGossip.mesh: + check currentGossip.mesh[topic].len == 0 + else: + check topic notin currentGossip.mesh + + if topic in currentGossip.gossipsub: + check currentGossip.gossipsub[topic].len == 0 + else: + check topic notin currentGossip.gossipsub + + await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) # Test subscribing and unsubscribing multiple topics asyncTest "handle SUBSCRIBE and UNSUBSCRIBE multiple topics": - let topics = ["topic1", "topic2", "topic3"].toSeq() - let (gossipSub, conns) = setupGossipSub(topics, 5) + let + numberOfNodes = 5 + topics = ["topic1", "topic2", "topic3"].toSeq() + nodes = generateNodes(numberOfNodes, gossip = true) - subscribeToTopics(gossipSub, topics) + await allFuturesThrowing(nodes.mapIt(it.switch.start())) - check gossipSub.topics.len == 3 - for topic in topics: - check gossipSub.gossipsub[topic].len() == 5 + await subscribeNodes(nodes) + for node in nodes: + for topic in topics: + node.subscribe(topic, voidTopicHandler) - unsubscribeFromTopics(gossipSub, topics) + await sleepAsync(2 * DURATION_TIMEOUT) - for topic in topics: - check topic notin gossipSub.topics - check topic notin gossipSub.mesh - check topic in gossipSub.gossipsub + for node in nodes: + let currentGossip = GossipSub(node) + check currentGossip.topics.len == topics.len + for topic in topics: + check currentGossip.gossipsub[topic].len == numberOfNodes - 1 - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() + for node in nodes: + for topic in topics: + node.unsubscribe(topic, voidTopicHandler) + + await sleepAsync(2 * DURATION_TIMEOUT) + + for node in nodes: + let currentGossip = GossipSub(node) + for topic in topics: + check topic notin currentGossip.topics + check topic notin currentGossip.mesh + check topic notin currentGossip.gossipsub + + await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) # Test ensuring that the number of subscriptions does not exceed the limit set in the GossipSub parameters asyncTest "subscription limit test": - let topicCount = 15 - let gossipSubParams = 10 - let topicNames = toSeq(mapIt(0 .. topicCount - 1, "topic" & $it)) + let + topicCount = 15 + gossipSubParams = 10 + topicNames = toSeq(mapIt(0 .. topicCount - 1, "topic" & $it)) + numberOfNodes = 1 + nodes = generateNodes(numberOfNodes, gossip = true) - let (gossipSub, conns) = setupGossipSub(topicNames, 0) + await allFuturesThrowing(nodes.mapIt(it.switch.start())) + let gossipSub = GossipSub(nodes[0]) gossipSub.topicsHigh = gossipSubParams for topic in topicNames: @@ -172,25 +162,31 @@ suite "GossipSub Topic Membership Tests": else: check gossipSub.topics.len == gossipSub.topicsHigh - check gossipSub.topics.len <= gossipSub.topicsHigh check gossipSub.topics.len == gossipSub.topicsHigh - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() + await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) # Test for verifying peers joining a topic using `JOIN(topic)` asyncTest "handle JOIN topic and mesh is updated": - let topic = "test-join-topic" + let + topic = "test-join-topic" + numberOfNodes = 5 + nodes = generateNodes(numberOfNodes, gossip = true) - let (gossipSub, conns) = setupGossipSub(topic, 5) + await allFuturesThrowing(nodes.mapIt(it.switch.start())) - commonSubscribe(@[gossipSub], topic, voidTopicHandler) + await subscribeNodes(nodes) + for node in nodes: + node.subscribe(topic, voidTopicHandler) - check gossipSub.mesh[topic].len == 5 - check gossipSub.topics.contains(topic) + await sleepAsync(2 * DURATION_TIMEOUT) - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() + for node in nodes: + let currentGossip = GossipSub(node) + check currentGossip.mesh[topic].len == numberOfNodes - 1 + check currentGossip.topics.contains(topic) + + await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) # Test the behavior when multiple peers join and leave a topic simultaneously. asyncTest "multiple peers join and leave topic simultaneously": @@ -209,9 +205,8 @@ suite "GossipSub Topic Membership Tests": for i in 0 ..< numberOfNodes: let currentGossip = GossipSub(nodes[i]) check currentGossip.gossipsub.hasKey(topic) - - for i in 0 ..< numberOfNodes: - let currentGossip = GossipSub(nodes[i]) + check currentGossip.mesh.hasKey(topic) + check currentGossip.topics.contains(topic) for x in 0 ..< numberOfNodes: for y in 0 ..< numberOfNodes: @@ -236,5 +231,7 @@ suite "GossipSub Topic Membership Tests": await sleepAsync(3 * DURATION_TIMEOUT) check firstNodeGossip.mesh.getOrDefault(topic).len == 3 + check firstNodeGossip.gossipsub[topic].len == 3 + check topic in firstNodeGossip.topics await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))