From 44dd7d1c3c143f32174aeb125076f52f26a3b621 Mon Sep 17 00:00:00 2001 From: shashankshampi Date: Mon, 21 Oct 2024 16:35:17 +0530 Subject: [PATCH] added updated logic to use common handler and create peers nodes as real instead of simulations --- tests/pubsub/testgossipmembership.nim | 124 +++++++++++++++++++------- 1 file changed, 92 insertions(+), 32 deletions(-) diff --git a/tests/pubsub/testgossipmembership.nim b/tests/pubsub/testgossipmembership.nim index 315f0ea14..c42388df8 100644 --- a/tests/pubsub/testgossipmembership.nim +++ b/tests/pubsub/testgossipmembership.nim @@ -22,13 +22,17 @@ import ../../libp2p/muxers/muxer import ../../libp2p/protocols/pubsub/rpc/protobuf import utils import chronos - +import chronicles import ../helpers proc noop(data: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} = discard +proc voidTopicHandler(topic: string, data: seq[byte]) {.async.} = + discard + const MsgIdSuccess = "msg id gen success" +let DURATION_TIMEOUT = 500.milliseconds suite "GossipSub Topic Membership Tests": teardown: @@ -78,6 +82,26 @@ suite "GossipSub Topic Membership Tests": for topic in topics: gossipSub.unsubscribeAll(topic) + proc commonSubscribe( + nodes: seq[TestGossipSub], + topic: string, + handler: proc(topic: string, data: seq[byte]) {.async.}, + ) = + # Subscribe all nodes to the topic + for node in nodes: + node.subscribe(topic, handler) + echo "Subscribed all nodes to the topic: ", topic + + proc commonUnsubscribe( + nodes: seq[TestGossipSub], + topic: string, + handler: proc(topic: string, data: seq[byte]) {.async.}, + ) = + # Unsubscribe all nodes from the topic + for node in nodes: + node.unsubscribe(topic, handler) + echo "Unsubscribed all nodes from the topic: ", topic + # Simulate the `SUBSCRIBE` to the topic and check proper handling in the mesh and gossipsub structures asyncTest "handle SUBSCRIBE to the topic": let topic = "test-topic" @@ -115,7 +139,6 @@ suite "GossipSub Topic Membership Tests": check topic in gossipSub.gossipsub # The topic should remain in gossipsub - await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() @@ -182,8 +205,8 @@ suite "GossipSub Topic Membership Tests": # Initialize the GossipSub system and simulate peer connections let (gossipSub, conns) = setupGossipSub(topic, 5) - # Simulate peer joining the topic - subscribeToTopics(gossipSub, @[topic]) + # Simulate peer joining the topic using commonSubscribe + commonSubscribe(@[gossipSub], topic, voidTopicHandler) # Check that peers are added to the mesh and the topic is tracked check gossipSub.mesh[topic].len == 5 @@ -199,11 +222,11 @@ suite "GossipSub Topic Membership Tests": # Initialize the GossipSub system and simulate peer connections let (gossipSub, conns) = setupGossipSub(topic, 5) - # Simulate peer joining the topic first - subscribeToTopics(gossipSub, @[topic]) + # Simulate peer joining the topic using commonSubscribe + commonSubscribe(@[gossipSub], topic, voidTopicHandler) - # Now simulate peer leaving the topic - unsubscribeFromTopics(gossipSub, @[topic]) + # Now simulate peer leaving the topic using commonUnsubscribe + commonUnsubscribe(@[gossipSub], topic, voidTopicHandler) # Check that peers are removed from the mesh but the topic remains in gossipsub check topic notin gossipSub.mesh @@ -214,39 +237,76 @@ suite "GossipSub Topic Membership Tests": # Test the behavior when multiple peers join and leave a topic simultaneously. asyncTest "multiple peers join and leave topic simultaneously": - let topic = "test-multi-join-leave" + let + numberOfNodes = 6 + topic = "foobar" + nodes = generateNodes(numberOfNodes, gossip = true) + nodesFut = await allFinished(nodes.mapIt(it.switch.start())) - # Initialize the GossipSub system and simulate peer connections for 6 peers - let (gossipSub, conns) = setupGossipSub(@[topic], 6) + # Subscribe all nodes to the topic + await subscribeNodes(nodes) + for node in nodes: + node.subscribe(topic, voidTopicHandler) - # Ensure the topic is correctly initialized in mesh and gossipsub - doAssert gossipSub.mesh.contains(topic), "Topic not found in mesh" - doAssert gossipSub.gossipsub.contains(topic), "Topic not found in gossipsub" + # Allow time for subscription propagation + await sleepAsync(2 * DURATION_TIMEOUT) - # Simulate 6 peers joining the topic - subscribeToTopics(gossipSub, @[topic]) + # Ensure each node is subscribed by checking the gossipsub field + for i in 0 ..< numberOfNodes: + let currentGossip = GossipSub(nodes[i]) # Rename to 'currentGossip' + doAssert currentGossip.gossipsub.hasKey(topic), + "Node is not subscribed to the topic" - # Assert that 6 peers have joined the mesh - doAssert gossipSub.mesh[topic].len == 6, "Expected 6 peers to join the mesh" + # Print mesh status before connecting nodes + echo "Initial mesh size:" + for i in 0 ..< numberOfNodes: + let currentGossip = GossipSub(nodes[i]) # Rename to 'currentGossip' + echo "Node ", i, " mesh size: ", currentGossip.mesh.getOrDefault(topic).len - # Define a simple handler for unsubscribing the peers - proc dummyHandler(topic: string, data: seq[byte]): Future[void] {.async.} = - discard + # Connect all nodes to each other and ensure subscription propagation + for x in 0 ..< numberOfNodes: + for y in 0 ..< numberOfNodes: + if x != y: + await waitSub(nodes[x], nodes[y], topic) - # Simulate 3 peers leaving the topic by unsubscribing them - var peersToUnsubscribe = gossipSub.mesh[topic].toSeq()[0 .. 2] + # Allow time for mesh stabilization + await sleepAsync(2 * DURATION_TIMEOUT) + + # Print mesh status after stabilization + echo "Mesh size after connecting:" + for i in 0 ..< numberOfNodes: + let currentGossip = GossipSub(nodes[i]) # Rename to 'currentGossip' + echo "Node ", i, " mesh size: ", currentGossip.mesh.getOrDefault(topic).len + + # Expected number of peers in the mesh + let expectedNumberOfPeers = numberOfNodes - 1 + for i in 0 ..< numberOfNodes: + let currentGossip = GossipSub(nodes[i]) # Rename to 'currentGossip' + check: + currentGossip.gossipsub[topic].len == expectedNumberOfPeers + currentGossip.mesh[topic].len == expectedNumberOfPeers + currentGossip.fanout.len == 0 + + # Simulate unsubscription of 3 peers + let firstNodeGossip = GossipSub(nodes[0]) # Initialize for the first node + let peersToUnsubscribe = firstNodeGossip.mesh[topic].toSeq()[0 .. 2] for peer in peersToUnsubscribe: + firstNodeGossip.unsubscribe(topic, voidTopicHandler) echo "Unsubscribing peer: ", peer.peerId - gossipSub.unsubscribe(topic, dummyHandler) - # Now assert that 6 peers still remain in the mesh because the mesh retains peers - doAssert gossipSub.mesh[topic].len == 6, - "Expected 6 peers to still be in mesh after unsubscription" + # Allow time for heartbeat to adjust the mesh + await sleepAsync(3 * DURATION_TIMEOUT) # Increased delay for mesh stabilization - # Assert that unsubscribed peers should remain in the mesh but should no longer receive messages + # Check mesh status again, expecting remaining peers + echo "Mesh size after unsubscription: ", + firstNodeGossip.mesh.getOrDefault(topic).len + doAssert firstNodeGossip.mesh.getOrDefault(topic).len == 3, + "Expected 3 peers to remain in the mesh" + + # Assert that unsubscribed peers are no longer in the mesh for peer in peersToUnsubscribe: - doAssert gossipSub.mesh[topic].contains(peer), - "Peer should still be in mesh even after unsubscription" + doAssert not firstNodeGossip.mesh[topic].contains(peer), + "Unsubscribed peer should not be in the mesh" - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() + # Cleanup: stop all nodes + await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))