diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 777b47978..78161deca 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -33,46 +33,6 @@ suite "GossipSub internal": teardown: checkTrackers() - asyncTest "subscribe/unsubscribeAll": - let gossipSub = TestGossipSub.init(newStandardSwitch()) - - proc handler(topic: string, data: seq[byte]): Future[void] {.gcsafe.} = - discard - - let topic = "foobar" - gossipSub.mesh[topic] = initHashSet[PubSubPeer]() - gossipSub.topicParams[topic] = TopicParams.init() - - var conns = newSeq[Connection]() - gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() - for i in 0 ..< 15: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - peer.sendConn = conn - gossipSub.gossipsub[topic].incl(peer) - - # test via dynamic dispatch - gossipSub.PubSub.subscribe(topic, handler) - - check: - gossipSub.topics.contains(topic) - gossipSub.gossipsub[topic].len() > 0 - gossipSub.mesh[topic].len() > 0 - - # test via dynamic dispatch - gossipSub.PubSub.unsubscribeAll(topic) - - check: - topic notin gossipSub.topics # not in local topics - topic notin gossipSub.mesh # not in mesh - topic in gossipSub.gossipsub # but still in gossipsub table (for fanning out) - - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() - asyncTest "topic params": let params = TopicParams.init() params.validateParameters().tryGet() diff --git a/tests/pubsub/testgossipmembership.nim b/tests/pubsub/testgossipmembership.nim index 8835bf416..eeb7169cc 100644 --- a/tests/pubsub/testgossipmembership.nim +++ b/tests/pubsub/testgossipmembership.nim @@ -15,10 +15,9 @@ import ../../libp2p/builders import ../../libp2p/errors import ../../libp2p/crypto/crypto import ../../libp2p/stream/bufferstream -import ../../libp2p/protocols/pubsub/[pubsub, gossipsub, mcache, peertable] -import ../../libp2p/protocols/pubsub/rpc/[message, messages, protobuf] import ../../libp2p/switch import ../../libp2p/muxers/muxer +import ../../libp2p/protocols/pubsub/rpc/protobuf import utils import chronos import unittest2 @@ -41,97 +40,36 @@ suite "GossipSub Topic Membership Tests": # Addition of Designed Test cases for 6. Topic Membership Tests: https://www.notion.so/Gossipsub-651e02d4d7894bb2ac1e4edb55f3192d - # Simulate the `SUBSCRIBE` event and check proper handling in the mesh and gossipsub structures - asyncTest "handle SUBSCRIBE event": + # Generalized setup function to initialize one or more topics + proc setupGossipSub( + topics: seq[string], numPeers: int + ): (TestGossipSub, seq[Connection]) = let gossipSub = TestGossipSub.init(newStandardSwitch()) - - # Ensure topic is correctly initialized - let topic = "test-topic" - gossipSub.mesh[topic] = initHashSet[PubSubPeer]() - gossipSub.topicParams[topic] = TopicParams.init() - gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() - # Initialize gossipsub for the topic - var conns = newSeq[Connection]() - for i in 0 ..< 5: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - peer.sendConn = conn - gossipSub.gossipsub[topic].incl(peer) # Ensure the topic is added to gossipsub - # Subscribe to the topic - gossipSub.PubSub.subscribe( - topic, - proc(topic: string, data: seq[byte]): Future[void] {.async.} = - discard - , - ) - - check gossipSub.topics.contains(topic) # Check if the topic is in topics - check gossipSub.gossipsub[topic].len() > 0 # Check if topic added to gossipsub - - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() - - # This test will simulate an UNSUBSCRIBE event and check if the topic is removed from the relevant data structures but remains in gossipsub - asyncTest "handle UNSUBSCRIBE event": - let gossipSub = TestGossipSub.init(newStandardSwitch()) - - # Ensure topic is initialized properly in all relevant data structures - let topic = "test-topic" - gossipSub.mesh[topic] = initHashSet[PubSubPeer]() - gossipSub.topicParams[topic] = TopicParams.init() - gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() - # Initialize gossipsub for the topic - - var conns = newSeq[Connection]() - for i in 0 ..< 5: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - peer.sendConn = conn - gossipSub.gossipsub[topic].incl(peer) - # Ensure peers are added to gossipsub for the topic - - # Subscribe to the topic first - gossipSub.PubSub.subscribe( - topic, - proc(topic: string, data: seq[byte]): Future[void] {.async.} = - discard - , - ) - - # Now unsubscribe from the topic - gossipSub.PubSub.unsubscribeAll(topic) - - # Verify the topic is removed from relevant structures - check topic notin gossipSub.topics # The topic should not be in topics - check topic notin gossipSub.mesh # The topic should be removed from the mesh - check topic in gossipSub.gossipsub - # The topic should remain in gossipsub (for fanout) - - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() - - # This test ensures that multiple topics can be subscribed to and unsubscribed from, with proper initialization of the topic structures. - asyncTest "handle multiple SUBSCRIBE and UNSUBSCRIBE events": - let gossipSub = TestGossipSub.init(newStandardSwitch()) - - let topics = ["topic1", "topic2", "topic3"] - - var conns = newSeq[Connection]() for topic in topics: - # Initialize all relevant structures before subscribing gossipSub.mesh[topic] = initHashSet[PubSubPeer]() gossipSub.topicParams[topic] = TopicParams.init() gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() - # Initialize gossipsub for each topic + for i in 0 ..< numPeers: + let conn = TestBufferStream.new(noop) + conns &= conn + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) + peer.sendConn = conn + gossipSub.gossipsub[topic].incl(peer) + + return (gossipSub, conns) + + # Wrapper function to initialize a single topic by converting it into a seq + proc setupGossipSub(topic: string, numPeers: int): (TestGossipSub, seq[Connection]) = + setupGossipSub(@[topic], numPeers) + + # Helper function to subscribe to topics + proc subscribeToTopics(gossipSub: TestGossipSub, topics: seq[string]) = + for topic in topics: gossipSub.PubSub.subscribe( topic, proc(topic: string, data: seq[byte]): Future[void] {.async.} = @@ -139,14 +77,74 @@ suite "GossipSub Topic Membership Tests": , ) + # Helper function to unsubscribe to topics + proc unsubscribeFromTopics(gossipSub: TestGossipSub, topics: seq[string]) = + for topic in topics: + gossipSub.PubSub.unsubscribeAll(topic) + + # Simulate the `SUBSCRIBE` to the topic and check proper handling in the mesh and gossipsub structures + asyncTest "handle SUBSCRIBE to the topic": + let topic = "test-topic" + let (gossipSub, conns) = setupGossipSub(topic, 5) + + # Check if the topic is added to gossipsub and the peers list is not empty + check gossipSub.gossipsub[topic].len() > 0 + + # Subscribe to the topic + subscribeToTopics(gossipSub, @[topic]) + + # Check if the topic is present in the list of subscribed topics + check gossipSub.topics.contains(topic) + + # Close all peer connections and verify that they are properly cleaned up + await allFuturesThrowing(conns.mapIt(it.close())) + + # Stop the gossipSub switch and wait for it to stop completely + await gossipSub.switch.stop() + + # Verify that connections have been closed and cleaned up after shutdown + for peer in gossipSub.peers.values: + check peer.sendConn == nil or peer.sendConn.closed() + + # Ensure that the topic is removed from the mesh after stopping + check gossipSub.mesh[topic].len() == 0 + + # Simulate an UNSUBSCRIBE to the topic and check if the topic is removed from the relevant data structures but remains in gossipsub + asyncTest "handle UNSUBSCRIBE to the topic": + let topic = "test-topic" + let (gossipSub, conns) = setupGossipSub(topic, 5) + + # Subscribe to the topic first + subscribeToTopics(gossipSub, @[topic]) + + # Now unsubscribe from the topic + unsubscribeFromTopics(gossipSub, @[topic]) + + # Verify the topic is removed from relevant structures + check topic notin gossipSub.topics + check topic notin gossipSub.mesh + check topic in gossipSub.gossipsub + + # The topic should remain in gossipsub (for fanout) + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + + # Test subscribing and unsubscribing multiple topics + asyncTest "handle SUBSCRIBE and UNSUBSCRIBE multiple topics": + let topics = ["topic1", "topic2", "topic3"].toSeq() + let (gossipSub, conns) = setupGossipSub(topics, 5) + + # Subscribe to multiple topics + subscribeToTopics(gossipSub, topics) + # Verify that all topics are added to the topics and gossipsub check gossipSub.topics.len == 3 for topic in topics: check gossipSub.gossipsub[topic].len() >= 0 - # Now unsubscribe from all topics - for topic in topics: - gossipSub.PubSub.unsubscribeAll(topic) + # Unsubscribe from all topics + unsubscribeFromTopics(gossipSub, topics) # Ensure topics are removed from topics and mesh, but still present in gossipsub for topic in topics: @@ -157,10 +155,10 @@ suite "GossipSub Topic Membership Tests": await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() - # This test ensures that the number of subscriptions does not exceed the limit set in the GossipSub parameters + # Test ensuring that the number of subscriptions does not exceed the limit set in the GossipSub parameters asyncTest "subscription limit test": let gossipSub = TestGossipSub.init(newStandardSwitch()) - gossipSub.topicsHigh = 10 # Set a limit for the number of subscriptions + gossipSub.topicsHigh = 10 var conns = newSeq[Connection]() for i in 0 .. gossipSub.topicsHigh + 5: @@ -189,76 +187,40 @@ suite "GossipSub Topic Membership Tests": await gossipSub.switch.stop() # Test for verifying peers joining a topic using `JOIN(topic)` - asyncTest "handle JOIN event": - let gossipSub = TestGossipSub.init(newStandardSwitch()) - + asyncTest "handle JOIN topic and mesh is updated": let topic = "test-join-topic" - # Initialize relevant data structures - gossipSub.mesh[topic] = initHashSet[PubSubPeer]() - gossipSub.topicParams[topic] = TopicParams.init() - gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() + # Initialize the GossipSub system and simulate peer connections + let (gossipSub, conns) = setupGossipSub(topic, 5) - var conns = newSeq[Connection]() + # Simulate peer joining the topic + subscribeToTopics(gossipSub, @[topic]) - for i in 0 ..< 5: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - peer.sendConn = conn - gossipSub.gossipsub[topic].incl(peer) - - # Simulate the peer joining the topic - gossipSub.PubSub.subscribe( - topic, - proc(topic: string, data: seq[byte]): Future[void] {.async.} = - discard - , - ) - - check gossipSub.mesh[topic].len > 0 # Ensure the peer is added to the mesh - check gossipSub.topics.contains(topic) # Ensure the topic is in `topics` + # Check that peers are added to the mesh and the topic is tracked + check gossipSub.mesh[topic].len > 0 + check gossipSub.topics.contains(topic) + # Clean up by closing connections and stopping the gossipSub switch await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() # Test for verifying peers leaving a topic using `LEAVE(topic)` - asyncTest "handle LEAVE event": - let gossipSub = TestGossipSub.init(newStandardSwitch()) - + asyncTest "handle LEAVE topic and mesh is updated": let topic = "test-leave-topic" - # Initialize relevant data structures - gossipSub.mesh[topic] = initHashSet[PubSubPeer]() - gossipSub.topicParams[topic] = TopicParams.init() - gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() - - var conns = newSeq[Connection]() - - for i in 0 ..< 5: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - peer.sendConn = conn - gossipSub.gossipsub[topic].incl(peer) + # Initialize the GossipSub system and simulate peer connections + let (gossipSub, conns) = setupGossipSub(topic, 5) # Simulate peer joining the topic first - gossipSub.PubSub.subscribe( - topic, - proc(topic: string, data: seq[byte]): Future[void] {.async.} = - discard - , - ) + subscribeToTopics(gossipSub, @[topic]) # Now simulate peer leaving the topic - gossipSub.PubSub.unsubscribeAll(topic) + unsubscribeFromTopics(gossipSub, @[topic]) - check topic notin gossipSub.mesh # Ensure the peer is removed from the mesh - check topic in gossipSub.gossipsub # Ensure the topic remains in `gossipsub` + # Check that peers are removed from the mesh but the topic remains in gossipsub + check topic notin gossipSub.mesh + check topic in gossipSub.gossipsub + # Clean up by closing connections and stopping the gossipSub switch await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop()