diff --git a/tests/pubsub/testgossipinternal2.nim b/tests/pubsub/testgossipinternal2.nim new file mode 100644 index 000000000..11b0459fc --- /dev/null +++ b/tests/pubsub/testgossipinternal2.nim @@ -0,0 +1,161 @@ +# Nim-LibP2P +# Copyright (c) 2023-2024 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +{.used.} + +import std/[options, deques, sequtils, enumerate, algorithm, sets] +import stew/byteutils +import ../../libp2p/builders +import ../../libp2p/errors +import ../../libp2p/crypto/crypto +import ../../libp2p/stream/bufferstream +import ../../libp2p/protocols/pubsub/[pubsub, gossipsub, mcache, mcache, peertable] +import ../../libp2p/protocols/pubsub/rpc/[message, messages] +import ../../libp2p/switch +import ../../libp2p/muxers/muxer +import ../../libp2p/protocols/pubsub/rpc/protobuf +import utils +import chronos + +import ../helpers + +proc noop(data: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} = + discard + +const MsgIdSuccess = "msg id gen success" + +suite "GossipSub internal2": + teardown: + checkTrackers() + + # Addition of Designed Test cases for 6. Topic Membership Tests: https://www.notion.so/Gossipsub-651e02d4d7894bb2ac1e4edb55f3192d + + # Simulate the `SUBSCRIBE` event and check proper handling in the mesh and gossipsub structures + asyncTest "handle SUBSCRIBE event": + let gossipSub = TestGossipSub.init(newStandardSwitch()) + + # Ensure topic is correctly initialized + let topic = "test-topic" + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + gossipSub.topicParams[topic] = TopicParams.init() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() # Initialize gossipsub for the topic + + var conns = newSeq[Connection]() + for i in 0 ..< 5: + let conn = TestBufferStream.new(noop) + conns &= conn + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) + peer.sendConn = conn + gossipSub.gossipsub[topic].incl(peer) # Ensure the topic is added to gossipsub + + # Subscribe to the topic + gossipSub.PubSub.subscribe(topic, proc(topic: string, data: seq[byte]): Future[void] {.async.} = discard) + + check gossipSub.topics.contains(topic) # Check if the topic is in topics + check gossipSub.gossipsub[topic].len() > 0 # Check if topic added to gossipsub + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + + # This test will simulate an UNSUBSCRIBE event and check if the topic is removed from the relevant data structures but remains in gossipsub + asyncTest "handle UNSUBSCRIBE event": + let gossipSub = TestGossipSub.init(newStandardSwitch()) + + # Ensure topic is initialized properly in all relevant data structures + let topic = "test-topic" + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + gossipSub.topicParams[topic] = TopicParams.init() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() # Initialize gossipsub for the topic + + var conns = newSeq[Connection]() + for i in 0 ..< 5: + let conn = TestBufferStream.new(noop) + conns &= conn + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) + peer.sendConn = conn + gossipSub.gossipsub[topic].incl(peer) # Ensure peers are added to gossipsub for the topic + + # Subscribe to the topic first + gossipSub.PubSub.subscribe(topic, proc(topic: string, data: seq[byte]): Future[void] {.async.} = discard) + + # Now unsubscribe from the topic + gossipSub.PubSub.unsubscribeAll(topic) + + # Verify the topic is removed from relevant structures + check topic notin gossipSub.topics # The topic should not be in topics + check topic notin gossipSub.mesh # The topic should be removed from the mesh + check topic in gossipSub.gossipsub # The topic should remain in gossipsub (for fanout) + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + + # This test ensures that multiple topics can be subscribed to and unsubscribed from, with proper initialization of the topic structures. + asyncTest "handle multiple SUBSCRIBE and UNSUBSCRIBE events": + let gossipSub = TestGossipSub.init(newStandardSwitch()) + + let topics = ["topic1", "topic2", "topic3"] + + var conns = newSeq[Connection]() + for topic in topics: + # Initialize all relevant structures before subscribing + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + gossipSub.topicParams[topic] = TopicParams.init() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() # Initialize gossipsub for each topic + + gossipSub.PubSub.subscribe(topic, proc(topic: string, data: seq[byte]): Future[void] {.async.} = discard) + + # Verify that all topics are added to the topics and gossipsub + check gossipSub.topics.len == 3 + for topic in topics: + check gossipSub.gossipsub[topic].len() >= 0 + + # Now unsubscribe from all topics + for topic in topics: + gossipSub.PubSub.unsubscribeAll(topic) + + # Ensure topics are removed from topics and mesh, but still present in gossipsub + for topic in topics: + check topic notin gossipSub.topics + check topic notin gossipSub.mesh + check topic in gossipSub.gossipsub + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + + # This test ensures that the number of subscriptions does not exceed the limit set in the GossipSub parameters + asyncTest "subscription limit test": + let gossipSub = TestGossipSub.init(newStandardSwitch()) + gossipSub.topicsHigh = 10 # Set a limit for the number of subscriptions + + var conns = newSeq[Connection]() + for i in 0 .. gossipSub.topicsHigh + 5: + let topic = "topic" & $i + # Ensure all topics are properly initialized before subscribing + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + gossipSub.topicParams[topic] = TopicParams.init() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() + + if gossipSub.topics.len < gossipSub.topicsHigh: + gossipSub.PubSub.subscribe(topic, proc(topic: string, data: seq[byte]): Future[void] {.async.} = discard) + else: + # Prevent subscription beyond the limit and log the error + echo "Subscription limit reached for topic: ", topic + + # Ensure that the number of subscribed topics does not exceed the limit + check gossipSub.topics.len <= gossipSub.topicsHigh + check gossipSub.topics.len == gossipSub.topicsHigh + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + + diff --git a/tests/pubsub/testgossipsub2.nim b/tests/pubsub/testgossipsub2.nim index 18582028a..ffc5c24e4 100644 --- a/tests/pubsub/testgossipsub2.nim +++ b/tests/pubsub/testgossipsub2.nim @@ -7,12 +7,6 @@ # This file may not be copied, modified, or distributed except according to # those terms. - ) - await GossipSub(nodes[1]).addDirectPeer( - nodes[2].switch.peerInfo.peerId, nodes[2].switch.peerInfo.addrs - ) - await GossipSub(nodes[2]).addDirectPeer( - nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs {.used.} import sequtils, options, tables, sets @@ -184,6 +178,12 @@ suite "GossipSub": await GossipSub(nodes[1]).addDirectPeer( nodes[0].switch.peerInfo.peerId, nodes[0].switch.peerInfo.addrs ) + await GossipSub(nodes[1]).addDirectPeer( + nodes[2].switch.peerInfo.peerId, nodes[2].switch.peerInfo.addrs + ) + await GossipSub(nodes[2]).addDirectPeer( + nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs + ) var handlerFut = newFuture[void]() proc handler(topic: string, data: seq[byte]) {.async.} =