From 0095b5e04a5456d6f9c207345b1bc8d8fda1b994 Mon Sep 17 00:00:00 2001 From: fbarbu15 Date: Tue, 2 Apr 2024 09:11:56 +0300 Subject: [PATCH] sharding tests part 1 (#24) * sharding struct * add peers * running nodes tests * cleanup sharding steps * new sharding tests * test_different_cluster_different_shard * new tests * new static sharding tests * new auto sharding tests * change structure * running nodes tests * new autosharding tests * adjust tests * new sharding tests * sharding * sharding filter * add peers --- src/node/api_clients/rest.py | 20 +- src/node/waku_node.py | 15 ++ src/steps/filter.py | 4 + src/steps/relay.py | 9 +- src/steps/sharding.py | 249 ++++++++++++++++++ src/test_data.py | 53 ++++ tests/filter/test_get_messages.py | 1 - tests/relay/test_publish.py | 1 - tests/sharding/__init__.py | 0 tests/sharding/test_filter.py | 51 ++++ tests/sharding/test_multiple_nodes.py | 42 +++ tests/sharding/test_relay_auto_sharding.py | 130 +++++++++ tests/sharding/test_relay_static_sharding.py | 134 ++++++++++ .../test_running_nodes_auto_sharding.py | 113 ++++++++ .../test_running_nodes_static_sharding.py | 83 ++++++ 15 files changed, 899 insertions(+), 6 deletions(-) create mode 100644 src/steps/sharding.py create mode 100644 tests/sharding/__init__.py create mode 100644 tests/sharding/test_filter.py create mode 100644 tests/sharding/test_multiple_nodes.py create mode 100644 tests/sharding/test_relay_auto_sharding.py create mode 100644 tests/sharding/test_relay_static_sharding.py create mode 100644 tests/sharding/test_running_nodes_auto_sharding.py create mode 100644 tests/sharding/test_running_nodes_static_sharding.py diff --git a/src/node/api_clients/rest.py b/src/node/api_clients/rest.py index e13f96dd45..8cde7fba2b 100644 --- a/src/node/api_clients/rest.py +++ b/src/node/api_clients/rest.py @@ -20,22 +20,38 @@ class REST(BaseClient): return info_response.json() def get_peers(self): - info_response = self.rest_call("get", "admin/v1/peers") - return info_response.json() + get_peers_response = self.rest_call("get", "admin/v1/peers") + return get_peers_response.json() + + def add_peers(self, peers): + return self.rest_call("post", "admin/v1/peers", json.dumps(peers)) def set_relay_subscriptions(self, pubsub_topics): return self.rest_call("post", "relay/v1/subscriptions", json.dumps(pubsub_topics)) + def set_relay_auto_subscriptions(self, content_topics): + return self.rest_call("post", "relay/v1/auto/subscriptions", json.dumps(content_topics)) + def delete_relay_subscriptions(self, pubsub_topics): return self.rest_call("delete", "relay/v1/subscriptions", json.dumps(pubsub_topics)) + def delete_relay_auto_subscriptions(self, content_topics): + return self.rest_call("delete", "relay/v1/auto/subscriptions", json.dumps(content_topics)) + def send_relay_message(self, message, pubsub_topic): return self.rest_call("post", f"relay/v1/messages/{quote(pubsub_topic, safe='')}", json.dumps(message)) + def send_relay_auto_message(self, message): + return self.rest_call("post", "relay/v1/auto/messages", json.dumps(message)) + def get_relay_messages(self, pubsub_topic): get_messages_response = self.rest_call("get", f"relay/v1/messages/{quote(pubsub_topic, safe='')}") return get_messages_response.json() + def get_relay_auto_messages(self, content_topic): + get_messages_response = self.rest_call("get", f"relay/v1/auto/messages/{quote(content_topic, safe='')}") + return get_messages_response.json() + def set_filter_subscriptions(self, subscription): set_subscriptions_response = self.rest_call("post", "filter/v2/subscriptions", json.dumps(subscription)) return set_subscriptions_response.json() diff --git a/src/node/waku_node.py b/src/node/waku_node.py index 39c97deea4..799e887232 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -212,18 +212,33 @@ class WakuNode: def get_peers(self): return self._api.get_peers() + def add_peers(self, peers): + return self._api.add_peers(peers) + def set_relay_subscriptions(self, pubsub_topics): return self._api.set_relay_subscriptions(pubsub_topics) + def set_relay_auto_subscriptions(self, content_topics): + return self._api.set_relay_auto_subscriptions(content_topics) + def delete_relay_subscriptions(self, pubsub_topics): return self._api.delete_relay_subscriptions(pubsub_topics) + def delete_relay_auto_subscriptions(self, content_topics): + return self._api.delete_relay_auto_subscriptions(content_topics) + def send_relay_message(self, message, pubsub_topic): return self._api.send_relay_message(message, pubsub_topic) + def send_relay_auto_message(self, message): + return self._api.send_relay_auto_message(message) + def get_relay_messages(self, pubsub_topic): return self._api.get_relay_messages(pubsub_topic) + def get_relay_auto_messages(self, content_topic): + return self._api.get_relay_auto_messages(content_topic) + def set_filter_subscriptions(self, subscription): return self._api.set_filter_subscriptions(subscription) diff --git a/src/steps/filter.py b/src/steps/filter.py index 39b45cd541..dbe7cfc57c 100644 --- a/src/steps/filter.py +++ b/src/steps/filter.py @@ -38,6 +38,8 @@ class StepsFilter: logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") self.node2.start(relay="false", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id) + if self.node2.is_nwaku(): + self.node2.add_peers([self.multiaddr_with_id]) self.main_nodes.append(self.node2) @pytest.fixture(scope="function") @@ -73,6 +75,8 @@ class StepsFilter: for index, node in enumerate(nodes): node = WakuNode(node, f"node{index + 3}_{self.test_id}") node.start(relay="false", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id) + if node.is_nwaku(): + node.add_peers([self.multiaddr_with_id]) self.optional_nodes.append(node) @allure.step diff --git a/src/steps/relay.py b/src/steps/relay.py index ff441c3dec..79a79f3408 100644 --- a/src/steps/relay.py +++ b/src/steps/relay.py @@ -1,7 +1,5 @@ import inspect import os -from uuid import uuid4 - from src.libs.custom_logger import get_custom_logger from time import time import pytest @@ -40,8 +38,11 @@ class StepsRelay: self.node1 = WakuNode(NODE_1, f"node1_{request.cls.test_id}") self.node1.start(relay="true", nodekey=NODEKEY) self.enr_uri = self.node1.get_enr_uri() + self.multiaddr_with_id = self.node1.get_multiaddr_with_id() self.node2 = WakuNode(NODE_2, f"node2_{request.cls.test_id}") self.node2.start(relay="true", discv5_bootstrap_node=self.enr_uri) + if self.node2.is_nwaku(): + self.node2.add_peers([self.multiaddr_with_id]) self.main_nodes.extend([self.node1, self.node2]) @pytest.fixture(scope="function") @@ -63,6 +64,8 @@ class StepsRelay: self.node2.start( relay="true", discv5_bootstrap_node=self.enr_uri, rln_creds_source=RLN_CREDENTIALS, rln_creds_id="2", rln_relay_membership_index="1" ) + if self.node2.is_nwaku(): + self.node2.add_peers([self.multiaddr_with_id]) self.main_nodes.extend([self.node1, self.node2]) @pytest.fixture(scope="function") @@ -75,6 +78,8 @@ class StepsRelay: for index, node in enumerate(nodes): node = WakuNode(node, f"node{index + 3}_{request.cls.test_id}") node.start(relay="true", discv5_bootstrap_node=self.enr_uri) + if node.is_nwaku(): + node.add_peers([self.multiaddr_with_id]) self.optional_nodes.append(node) @pytest.fixture(scope="function") diff --git a/src/steps/sharding.py b/src/steps/sharding.py new file mode 100644 index 0000000000..98a4e2f3e2 --- /dev/null +++ b/src/steps/sharding.py @@ -0,0 +1,249 @@ +import inspect +from uuid import uuid4 +from src.libs.custom_logger import get_custom_logger +from time import time +import pytest +import allure +from src.libs.common import to_base64, delay +from src.node.waku_message import WakuMessage +from src.env_vars import ( + DEFAULT_NWAKU, + NODE_1, + NODE_2, + ADDITIONAL_NODES, + NODEKEY, +) +from src.node.waku_node import WakuNode + +logger = get_custom_logger(__name__) + + +class StepsSharding: + test_content_topic = "/myapp/1/latest/proto" + test_pubsub_topic = "/waku/2/rs/2/0" + test_payload = "Sharding works!!" + auto_cluster = 2 + + @pytest.fixture(scope="function", autouse=True) + def sharding_setup(self): + logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") + self.main_relay_nodes = [] + self.optional_relay_nodes = [] + self.main_filter_nodes = [] + self.optional_filter_nodes = [] + + @allure.step + def setup_first_relay_node(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs): + self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}") + kwargs = self._resolve_sharding_flags(cluster_id, pubsub_topic, content_topic, **kwargs) + self.node1.start(relay="true", filter="true", nodekey=NODEKEY, **kwargs) + self.enr_uri = self.node1.get_enr_uri() + self.multiaddr_with_id = self.node1.get_multiaddr_with_id() + self.main_relay_nodes.extend([self.node1]) + + @allure.step + def setup_second_relay_node(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs): + self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") + kwargs = self._resolve_sharding_flags(cluster_id, pubsub_topic, content_topic, **kwargs) + self.node2.start(relay="true", discv5_bootstrap_node=self.enr_uri, **kwargs) + if self.node2.is_nwaku(): + self.node2.add_peers([self.multiaddr_with_id]) + self.main_relay_nodes.extend([self.node2]) + + @allure.step + def setup_second_node_as_filter(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs): + self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") + kwargs = self._resolve_sharding_flags(cluster_id, pubsub_topic, content_topic, **kwargs) + self.node2.start(relay="false", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id, **kwargs) + if self.node2.is_nwaku(): + self.node2.add_peers([self.multiaddr_with_id]) + self.main_filter_nodes.extend([self.node2]) + + @allure.step + def setup_main_relay_nodes(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs): + self.setup_first_relay_node(cluster_id, pubsub_topic, content_topic, **kwargs) + self.setup_second_relay_node(cluster_id, pubsub_topic, content_topic, **kwargs) + + @allure.step + def setup_optional_relay_nodes(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs): + kwargs = self._resolve_sharding_flags(cluster_id=cluster_id, pubsub_topic=pubsub_topic, content_topic=content_topic, **kwargs) + if ADDITIONAL_NODES: + nodes = [node.strip() for node in ADDITIONAL_NODES.split(",")] + else: + pytest.skip("ADDITIONAL_NODES is empty, cannot run test") + for index, node in enumerate(nodes): + node = WakuNode(node, f"node{index + 3}_{self.test_id}") + node.start(relay="true", discv5_bootstrap_node=self.enr_uri, **kwargs) + if node.is_nwaku(): + node.add_peers([self.multiaddr_with_id]) + self.optional_relay_nodes.append(node) + + @allure.step + def setup_nwaku_relay_nodes(self, num_nodes, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs): + kwargs = self._resolve_sharding_flags(cluster_id, pubsub_topic, content_topic, **kwargs) + for index in range(num_nodes): + node = WakuNode(DEFAULT_NWAKU, f"node{index + 3}_{self.test_id}") + node.start(relay="true", discv5_bootstrap_node=self.enr_uri, **kwargs) + node.add_peers([self.multiaddr_with_id]) + self.optional_relay_nodes.append(node) + + @allure.step + def subscribe_relay_node(self, node, content_topics, pubsub_topics): + if content_topics: + node.set_relay_auto_subscriptions(content_topics) + elif pubsub_topics: + node.set_relay_subscriptions(pubsub_topics) + else: + raise AttributeError("content_topics or pubsub_topics need to be passed") + + @allure.step + def subscribe_first_relay_node(self, content_topics=None, pubsub_topics=None): + self.subscribe_relay_node(self.node1, content_topics, pubsub_topics) + + @allure.step + def subscribe_second_relay_node(self, content_topics=None, pubsub_topics=None): + self.subscribe_relay_node(self.node2, content_topics, pubsub_topics) + + @allure.step + def subscribe_main_relay_nodes(self, content_topics=None, pubsub_topics=None): + for node in self.main_relay_nodes: + self.subscribe_relay_node(node, content_topics, pubsub_topics) + + @allure.step + def subscribe_optional_relay_nodes(self, content_topics=None, pubsub_topics=None): + for node in self.optional_relay_nodes: + self.subscribe_relay_node(node, content_topics, pubsub_topics) + + @allure.step + def unsubscribe_relay_node(self, node, content_topics, pubsub_topics): + if content_topics: + node.delete_relay_auto_subscriptions(content_topics) + elif pubsub_topics: + node.delete_relay_subscriptions(pubsub_topics) + else: + raise AttributeError("content_topics or pubsub_topics need to be passed") + + @allure.step + def unsubscribe_first_relay_node(self, content_topics=None, pubsub_topics=None): + self.unsubscribe_relay_node(self.node1, content_topics, pubsub_topics) + + @allure.step + def unsubscribe_second_relay_node(self, content_topics=None, pubsub_topics=None): + self.unsubscribe_relay_node(self.node2, content_topics, pubsub_topics) + + @allure.step + def unsubscribe_main_relay_nodes(self, content_topics=None, pubsub_topics=None): + for node in self.main_relay_nodes: + self.unsubscribe_relay_node(node, content_topics, pubsub_topics) + + @allure.step + def unsubscribe_optional_relay_nodes(self, content_topics=None, pubsub_topics=None): + for node in self.optional_relay_nodes: + self.unsubscribe_relay_node(node, content_topics, pubsub_topics) + + @allure.step + def subscribe_filter_node(self, node, content_topics=None, pubsub_topic=None): + subscription = {"requestId": str(uuid4()), "contentFilters": content_topics, "pubsubTopic": pubsub_topic} + node.set_filter_subscriptions(subscription) + + @allure.step + def relay_message(self, node, message, pubsub_topic=None): + if pubsub_topic: + node.send_relay_message(message, pubsub_topic) + else: + node.send_relay_auto_message(message) + + @allure.step + def retrieve_relay_message(self, node, content_topic=None, pubsub_topic=None): + if content_topic: + return node.get_relay_auto_messages(content_topic) + elif pubsub_topic: + return node.get_relay_messages(pubsub_topic) + else: + raise AttributeError("content_topic or pubsub_topic needs to be passed") + + @allure.step + def check_published_message_reaches_relay_peer(self, content_topic=None, pubsub_topic=None, sender=None, peer_list=None): + message = self.create_message(contentTopic=content_topic) if content_topic else self.create_message() + if not sender: + sender = self.node1 + if not peer_list: + peer_list = self.main_relay_nodes + self.optional_relay_nodes + + self.relay_message(sender, message, pubsub_topic) + delay(0.1) + for index, peer in enumerate(peer_list): + logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the published message") + get_messages_response = self.retrieve_relay_message(peer, content_topic, pubsub_topic) + assert get_messages_response, f"Peer NODE_{index + 1}:{peer.image} couldn't find any messages" + assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}" + waku_message = WakuMessage(get_messages_response) + waku_message.assert_received_message(message) + + @allure.step + def get_filter_messages(self, content_topic, pubsub_topic=None, node=None): + if node is None: + node = self.node2 + if node.is_gowaku(): + return node.get_filter_messages(content_topic, pubsub_topic) + elif node.is_nwaku(): + return node.get_filter_messages(content_topic) + else: + raise NotImplementedError("Not implemented for this node type") + + @allure.step + def check_published_message_reaches_filter_peer(self, content_topic=None, pubsub_topic=None, sender=None, peer_list=None): + message = self.create_message(contentTopic=content_topic) if content_topic else self.create_message() + if not sender: + sender = self.node1 + if not peer_list: + peer_list = self.main_filter_nodes + self.optional_filter_nodes + + self.relay_message(sender, message, pubsub_topic) + delay(0.1) + for index, peer in enumerate(peer_list): + logger.debug(f"Checking that peer NODE_{index + 2}:{peer.image} can find the published message") + get_messages_response = self.get_filter_messages(message["contentTopic"], pubsub_topic=pubsub_topic, node=peer) + assert get_messages_response, f"Peer NODE_{index + 2}:{peer.image} couldn't find any messages" + assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}" + waku_message = WakuMessage(get_messages_response) + waku_message.assert_received_message(message) + + @allure.step + def check_published_message_doesnt_reach_relay_peer(self, pubsub_topic=None, content_topic=None): + try: + self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic, content_topic=content_topic) + raise AssertionError("Retrieving messages on not subscribed content topic worked!!!") + except Exception as ex: + assert "Not Found" in str(ex) + + @allure.step + def check_publish_fails_on_not_subscribed_pubsub_topic(self, pubsub_topic): + try: + self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic) + raise AssertionError("Publishing messages on unsubscribed shard worked!!!") + except Exception as ex: + assert f"Failed to publish: Node not subscribed to topic: {pubsub_topic}" in str(ex) + + @allure.step + def create_message(self, **kwargs): + message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + message.update(kwargs) + return message + + @allure.step + def _resolve_sharding_flags(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs): + if pubsub_topic: + kwargs["pubsub_topic"] = pubsub_topic + if not cluster_id: + try: + if isinstance(pubsub_topic, list): + pubsub_topic = pubsub_topic[0] + cluster_id = pubsub_topic.split("/")[4] + logger.debug(f"Cluster id was resolved to: {cluster_id}") + except Exception as ex: + raise Exception("Could not resolve cluster_id from pubsub_topic") + kwargs["cluster_id"] = cluster_id + if content_topic: + kwargs["content_topic"] = content_topic + return kwargs diff --git a/src/test_data.py b/src/test_data.py index 3eb1952268..5f162873bb 100644 --- a/src/test_data.py +++ b/src/test_data.py @@ -63,10 +63,63 @@ INVALID_CONTENT_TOPICS = [ {"description": "A bool", "value": True}, ] +CONTENT_TOPICS_DIFFERENT_SHARDS = [ + "/myapp/1/latest/proto", # resolves to shard 0 + "/waku/2/content/test.js", # resolves to shard 1 + "/app/22/sometopic/someencoding", # resolves to shard 2 + "/toychat/2/huilong/proto", # resolves to shard 3 + "/statusim/1/community/cbor", # resolves to shard 4 + "/app/27/sometopic/someencoding", # resolves to shard 5 + "/app/29/sometopic/someencoding", # resolves to shard 6 + "/app/20/sometopic/someencoding", # resolves to shard 7 +] + +CONTENT_TOPICS_SHARD_0 = [ + "/newsService/1.0/weekly/protobuf", + "/newsService/1.0/alerts/xml", + "/newsService/1.0/updates/json", + "/newsService/2.0/alerts/json", + "/newsService/2.0/summaries/xml", + "/newsService/2.0/highlights/yaml", + "/newsService/3.0/weekly/json", + "/newsService/3.0/summaries/xml", +] + +CONTENT_TOPICS_SHARD_7 = [ + "/newsService/2.0/alerts/yaml", + "/newsService/2.0/highlights/xml", + "/newsService/3.0/daily/protobuf", + "/newsService/3.0/alerts/xml", + "/newsService/3.0/updates/protobuf", + "/newsService/3.0/reviews/xml", + "/newsService/4.0/alerts/yaml", + "/newsService/4.0/updates/yaml", +] + VALID_PUBSUB_TOPICS = ["/waku/2/rs/0/0", "/waku/2/rs/0/1", "/waku/2/rs/0/9", "/waku/2/rs/0/25", "/waku/2/rs/0/1000"] INVALID_PUBSUB_TOPICS = ["/test/2/rs/0/1", "/waku/3/rs/0/1", "/waku/2/test/0/1", "/waku/2/rs/0/b", "/waku/2/rs/0"] +PUBSUB_TOPICS_DIFFERENT_CLUSTERS = [ + "/waku/2/rs/0/0", + "/waku/2/rs/0/1", + "/waku/2/rs/2/0", + "/waku/2/rs/2/1", + "/waku/2/rs/2/999", + "/waku/2/rs/8/0", + "/waku/2/rs/999/999", +] + +PUBSUB_TOPICS_SAME_CLUSTER = [ + "/waku/2/rs/2/0", + "/waku/2/rs/2/1", + "/waku/2/rs/2/2", + "/waku/2/rs/2/3", + "/waku/2/rs/2/4", + "/waku/2/rs/2/5", + "/waku/2/rs/2/6", + "/waku/2/rs/2/7", +] SAMPLE_TIMESTAMPS = [ {"description": "Now", "value": int(time() * 1e9), "valid_for": ["nwaku", "gowaku"]}, diff --git a/tests/filter/test_get_messages.py b/tests/filter/test_get_messages.py index 3fff711a2c..73f0fe62e9 100644 --- a/tests/filter/test_get_messages.py +++ b/tests/filter/test_get_messages.py @@ -42,7 +42,6 @@ class TestFilterGetMessages(StepsFilter): def test_filter_get_message_with_meta(self): self.check_published_message_reaches_filter_peer(self.create_message(meta=to_base64(self.test_payload))) - @pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1068") def test_filter_get_message_with_ephemeral(self): failed_ephemeral = [] for ephemeral in [True, False]: diff --git a/tests/relay/test_publish.py b/tests/relay/test_publish.py index 832064383e..4351a1444c 100644 --- a/tests/relay/test_publish.py +++ b/tests/relay/test_publish.py @@ -168,7 +168,6 @@ class TestRelayPublish(StepsRelay): except Exception as ex: assert "Bad Request" in str(ex) - @pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1068") def test_publish_with_ephemeral(self): failed_ephemeral = [] for ephemeral in [True, False]: diff --git a/tests/sharding/__init__.py b/tests/sharding/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/sharding/test_filter.py b/tests/sharding/test_filter.py new file mode 100644 index 0000000000..f3e08eb4a3 --- /dev/null +++ b/tests/sharding/test_filter.py @@ -0,0 +1,51 @@ +import pytest +from src.env_vars import NODE_1, NODE_2 +from src.libs.custom_logger import get_custom_logger +from src.steps.sharding import StepsSharding +from src.test_data import CONTENT_TOPICS_DIFFERENT_SHARDS, PUBSUB_TOPICS_SAME_CLUSTER + +logger = get_custom_logger(__name__) + + +class TestFilterStaticSharding(StepsSharding): + def test_filter_works_with_static_sharding(self): + self.setup_first_relay_node(pubsub_topic=self.test_pubsub_topic) + self.setup_second_node_as_filter(pubsub_topic=self.test_pubsub_topic) + self.subscribe_first_relay_node(pubsub_topics=[self.test_pubsub_topic]) + self.subscribe_filter_node(self.node2, content_topics=[self.test_content_topic], pubsub_topic=self.test_pubsub_topic) + self.check_published_message_reaches_filter_peer(pubsub_topic=self.test_pubsub_topic) + + def test_filter_static_sharding_multiple_shards(self): + self.setup_first_relay_node(pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER) + self.setup_second_node_as_filter(pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER) + self.subscribe_first_relay_node(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER) + for content_topic, pubsub_topic in zip(CONTENT_TOPICS_DIFFERENT_SHARDS, PUBSUB_TOPICS_SAME_CLUSTER): + self.subscribe_filter_node(self.node2, content_topics=[content_topic], pubsub_topic=pubsub_topic) + for content_topic, pubsub_topic in zip(CONTENT_TOPICS_DIFFERENT_SHARDS, PUBSUB_TOPICS_SAME_CLUSTER): + self.check_published_message_reaches_filter_peer(content_topic=content_topic, pubsub_topic=pubsub_topic) + + +@pytest.mark.skipif( + "go-waku" in NODE_1 or "go-waku" in NODE_2, + reason="Autosharding tests work only on nwaku because of https://github.com/waku-org/go-waku/issues/1061", +) +class TestFilterAutoSharding(StepsSharding): + def test_filter_works_with_auto_sharding(self): + self.setup_first_relay_node(cluster_id=self.auto_cluster, content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic) + self.setup_second_node_as_filter(cluster_id=self.auto_cluster, content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic) + self.subscribe_first_relay_node(content_topics=[self.test_content_topic]) + self.subscribe_filter_node(self.node2, content_topics=[self.test_content_topic], pubsub_topic=self.test_pubsub_topic) + self.check_published_message_reaches_filter_peer(content_topic=self.test_content_topic) + + def test_filter_auto_sharding_multiple_content_topics(self): + self.setup_first_relay_node( + cluster_id=self.auto_cluster, content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS, pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER + ) + self.setup_second_node_as_filter( + cluster_id=self.auto_cluster, content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS, pubsub_topic=self.test_pubsub_topic + ) + self.subscribe_first_relay_node(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS) + for content_topic, pubsub_topic in zip(CONTENT_TOPICS_DIFFERENT_SHARDS, PUBSUB_TOPICS_SAME_CLUSTER): + self.subscribe_filter_node(self.node2, content_topics=[content_topic], pubsub_topic=pubsub_topic) + for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS: + self.check_published_message_reaches_filter_peer(content_topic=content_topic) diff --git a/tests/sharding/test_multiple_nodes.py b/tests/sharding/test_multiple_nodes.py new file mode 100644 index 0000000000..3ae81363b5 --- /dev/null +++ b/tests/sharding/test_multiple_nodes.py @@ -0,0 +1,42 @@ +import pytest +from src.env_vars import ADDITIONAL_NODES, NODE_2 +from src.libs.custom_logger import get_custom_logger +from src.steps.sharding import StepsSharding + +logger = get_custom_logger(__name__) + + +class TestMultipleNodes(StepsSharding): + def test_static_shard_relay(self): + self.setup_main_relay_nodes(pubsub_topic=self.test_pubsub_topic) + self.setup_optional_relay_nodes(pubsub_topic=self.test_pubsub_topic) + self.subscribe_main_relay_nodes(pubsub_topics=[self.test_pubsub_topic]) + self.subscribe_optional_relay_nodes(pubsub_topics=[self.test_pubsub_topic]) + self.check_published_message_reaches_relay_peer(pubsub_topic=self.test_pubsub_topic) + + @pytest.mark.skipif("go-waku" in NODE_2, reason="Test works only with nwaku") + def test_static_shard_relay_10_nwaku_nodes(self): + self.setup_first_relay_node(pubsub_topic=self.test_pubsub_topic) + self.setup_nwaku_relay_nodes(num_nodes=9, pubsub_topic=self.test_pubsub_topic) + self.subscribe_main_relay_nodes(pubsub_topics=[self.test_pubsub_topic]) + self.subscribe_optional_relay_nodes(pubsub_topics=[self.test_pubsub_topic]) + self.check_published_message_reaches_relay_peer(pubsub_topic=self.test_pubsub_topic) + + @pytest.mark.skipif( + "go-waku" in NODE_2 or "go-waku" in ADDITIONAL_NODES, + reason="Autosharding tests work only on nwaku because of https://github.com/waku-org/go-waku/issues/1061", + ) + def test_auto_shard_relay(self): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic) + self.setup_optional_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic) + self.subscribe_main_relay_nodes(content_topics=[self.test_content_topic]) + self.subscribe_optional_relay_nodes(content_topics=[self.test_content_topic]) + self.check_published_message_reaches_relay_peer(content_topic=self.test_content_topic) + + @pytest.mark.skipif("go-waku" in NODE_2, reason="Test works only with nwaku") + def test_auto_shard_relay_10_nwaku_nodes(self): + self.setup_first_relay_node(cluster_id=self.auto_cluster, content_topic=self.test_content_topic) + self.setup_nwaku_relay_nodes(num_nodes=8, cluster_id=self.auto_cluster, content_topic=self.test_content_topic) + self.subscribe_main_relay_nodes(content_topics=[self.test_content_topic]) + self.subscribe_optional_relay_nodes(content_topics=[self.test_content_topic]) + self.check_published_message_reaches_relay_peer(content_topic=self.test_content_topic) diff --git a/tests/sharding/test_relay_auto_sharding.py b/tests/sharding/test_relay_auto_sharding.py new file mode 100644 index 0000000000..af53ac547a --- /dev/null +++ b/tests/sharding/test_relay_auto_sharding.py @@ -0,0 +1,130 @@ +import pytest +from src.env_vars import NODE_1, NODE_2 +from src.libs.common import delay, to_base64 +from src.libs.custom_logger import get_custom_logger +from src.steps.sharding import StepsSharding +from src.test_data import CONTENT_TOPICS_DIFFERENT_SHARDS, CONTENT_TOPICS_SHARD_0 + + +logger = get_custom_logger(__name__) + + +@pytest.mark.skipif( + "go-waku" in NODE_1 or "go-waku" in NODE_2, + reason="Autosharding tests work only on nwaku because of https://github.com/waku-org/go-waku/issues/1061", +) +class TestRelayAutosharding(StepsSharding): + def test_publish_without_subscribing_via_api_works(self): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic) + for node in self.main_relay_nodes: + self.relay_message(node, self.create_message(contentTopic=self.test_content_topic)) + + def test_retrieve_messages_without_subscribing_via_api(self): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic) + try: + self.check_published_message_reaches_relay_peer(content_topic=self.test_content_topic) + if self.node2.is_nwaku(): + pass + else: + raise AssertionError("Retrieving messages without subscribing worked!!!") + except Exception as ex: + assert "Not Found" in str(ex) + + def test_subscribe_and_publish_on_another_content_topic_from_same_shard(self): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic) + self.subscribe_main_relay_nodes(content_topics=["/newsService/1.0/weekly/protobuf"]) + self.check_published_message_reaches_relay_peer(content_topic="/newsService/1.0/weekly/protobuf") + self.check_published_message_reaches_relay_peer(content_topic=self.test_content_topic) + + def test_subscribe_and_publish_on_another_content_topic_from_another_shard(self): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic) + self.subscribe_main_relay_nodes(content_topics=["/toychat/2/huilong/proto"]) + self.check_published_message_reaches_relay_peer(content_topic="/toychat/2/huilong/proto") + self.check_published_message_reaches_relay_peer(content_topic=self.test_content_topic) + + def test_publish_on_not_subscribed_content_topic_works(self): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic) + self.subscribe_main_relay_nodes(content_topics=[self.test_content_topic]) + for node in self.main_relay_nodes: + self.relay_message(node, self.create_message(contentTopic="/toychat/2/huilong/proto")) + + def test_cant_retrieve_messages_on_not_subscribed_content_topic(self): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic) + self.subscribe_main_relay_nodes(content_topics=[self.test_content_topic]) + self.check_published_message_doesnt_reach_relay_peer(content_topic="/toychat/2/huilong/proto") + + @pytest.mark.parametrize("content_topic_list", [CONTENT_TOPICS_SHARD_0, CONTENT_TOPICS_DIFFERENT_SHARDS]) + def test_subscribe_via_api_to_new_content_topics(self, content_topic_list): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=content_topic_list[:1]) + self.subscribe_main_relay_nodes(content_topics=content_topic_list[1:]) + for content_topic in content_topic_list[1:]: + self.check_published_message_reaches_relay_peer(content_topic=content_topic) + + def test_subscribe_one_by_one_to_different_content_topics_and_send_messages(self): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic) + for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS + CONTENT_TOPICS_SHARD_0: + self.subscribe_main_relay_nodes(content_topics=[content_topic]) + self.check_published_message_reaches_relay_peer(content_topic=content_topic) + + @pytest.mark.parametrize("content_topic_list", [CONTENT_TOPICS_SHARD_0, CONTENT_TOPICS_DIFFERENT_SHARDS]) + def test_unsubscribe_from_some_content_topics(self, content_topic_list): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic) + self.subscribe_main_relay_nodes(content_topics=content_topic_list) + for content_topic in content_topic_list: + self.check_published_message_reaches_relay_peer(content_topic=content_topic) + self.unsubscribe_main_relay_nodes(content_topics=content_topic_list[:3]) + for content_topic in content_topic_list[:3]: + self.check_published_message_doesnt_reach_relay_peer(content_topic=content_topic) + for content_topic in content_topic_list[3:]: + self.check_published_message_reaches_relay_peer(content_topic=content_topic) + + def test_unsubscribe_from_all_content_topics(self): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic) + self.subscribe_main_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS) + for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS: + self.check_published_message_reaches_relay_peer(content_topic=content_topic) + self.unsubscribe_main_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS) + for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS: + self.check_published_message_doesnt_reach_relay_peer(content_topic=content_topic) + + def test_unsubscribe_from_all_content_topics_one_by_one(self): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic) + self.subscribe_main_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS) + for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS: + self.check_published_message_reaches_relay_peer(content_topic=content_topic) + for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS: + self.unsubscribe_main_relay_nodes(content_topics=[content_topic]) + for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS: + self.check_published_message_doesnt_reach_relay_peer(content_topic=content_topic) + + def test_resubscribe_to_unsubscribed_content_topics(self): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic) + self.subscribe_main_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS) + for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS: + self.check_published_message_reaches_relay_peer(content_topic=content_topic) + self.unsubscribe_main_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS) + for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS: + self.check_published_message_doesnt_reach_relay_peer(content_topic=content_topic) + self.subscribe_main_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS) + for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS: + self.check_published_message_reaches_relay_peer(content_topic=content_topic) + + def test_unsubscribe_from_non_subscribed_content_topics(self): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic) + self.unsubscribe_main_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS) + for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS: + self.check_published_message_doesnt_reach_relay_peer(content_topic=content_topic) + + @pytest.mark.parametrize("content_topic_list", [CONTENT_TOPICS_SHARD_0, CONTENT_TOPICS_DIFFERENT_SHARDS]) + def test_publish_on_multiple_content_topics_and_only_after_fetch_them(self, content_topic_list): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic) + self.subscribe_main_relay_nodes(content_topics=content_topic_list) + for content_topic in content_topic_list: + self.relay_message(self.node1, self.create_message(payload=to_base64(content_topic), contentTopic=content_topic)) + delay(0.1) + for content_topic in content_topic_list: + get_messages_response = self.retrieve_relay_message(self.node2, content_topic=content_topic) + assert get_messages_response, f"Peer NODE_2 couldn't find any messages" + assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}" + assert get_messages_response[0]["contentTopic"] == content_topic + assert get_messages_response[0]["payload"] == to_base64(content_topic) diff --git a/tests/sharding/test_relay_static_sharding.py b/tests/sharding/test_relay_static_sharding.py new file mode 100644 index 0000000000..13adc12367 --- /dev/null +++ b/tests/sharding/test_relay_static_sharding.py @@ -0,0 +1,134 @@ +import pytest +from src.env_vars import NODE_2 +from src.libs.common import delay, to_base64 +from src.libs.custom_logger import get_custom_logger +from src.steps.sharding import StepsSharding +from src.test_data import PUBSUB_TOPICS_SAME_CLUSTER + +logger = get_custom_logger(__name__) + + +class TestRelayStaticSharding(StepsSharding): + def test_publish_without_subscribing_via_api_works(self): + self.setup_main_relay_nodes(pubsub_topic=self.test_pubsub_topic) + for node in self.main_relay_nodes: + self.relay_message(node, self.create_message(), self.test_pubsub_topic) + + def test_retrieve_messages_without_subscribing_via_api(self): + self.setup_main_relay_nodes(pubsub_topic=self.test_pubsub_topic) + try: + self.check_published_message_reaches_relay_peer(pubsub_topic=self.test_pubsub_topic) + if self.node2.is_nwaku(): + pass + else: + raise AssertionError("Retrieving messages without subscribing worked!!!") + except Exception as ex: + assert "Not Found" in str(ex) + + def test_subscribe_and_publish_on_another_shard(self): + self.setup_main_relay_nodes(pubsub_topic=self.test_pubsub_topic) + self.subscribe_main_relay_nodes(pubsub_topics=["/waku/2/rs/2/1"]) + self.check_published_message_reaches_relay_peer(pubsub_topic="/waku/2/rs/2/1") + try: + self.check_published_message_reaches_relay_peer(pubsub_topic=self.test_pubsub_topic) + if self.node2.is_nwaku(): + pass + else: + raise AssertionError("Retrieving messages without subscribing worked!!!") + except Exception as ex: + assert "Not Found" in str(ex) + + def test_cant_publish_on_not_subscribed_shard(self): + self.setup_main_relay_nodes(pubsub_topic=self.test_pubsub_topic) + self.subscribe_main_relay_nodes(pubsub_topics=[self.test_pubsub_topic]) + self.check_publish_fails_on_not_subscribed_pubsub_topic("/waku/2/rs/2/1") + + def test_subscribe_via_api_to_new_pubsub_topics(self): + self.setup_main_relay_nodes(pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER[:1]) + self.subscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER[1:]) + for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER[1:]: + self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic) + + def test_subscribe_via_api_to_new_pubsub_topics_on_other_cluster(self): + topics = ["/waku/2/rs/2/0", "/waku/2/rs/3/0", "/waku/2/rs/4/0"] + self.setup_main_relay_nodes(cluster_id=2, pubsub_topic=topics[0]) + self.subscribe_first_relay_node(pubsub_topics=topics) + self.subscribe_second_relay_node(pubsub_topics=topics) + for pubsub_topic in topics: + self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic) + + def test_subscribe_one_by_one_to_different_pubsub_topics_and_send_messages(self): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic) + for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER: + self.subscribe_main_relay_nodes(pubsub_topics=[pubsub_topic]) + self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic) + + def test_unsubscribe_from_some_pubsub_topics(self): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER) + self.subscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER) + for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER: + self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic) + self.unsubscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER[:3]) + for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER[:3]: + self.check_publish_fails_on_not_subscribed_pubsub_topic(pubsub_topic) + for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER[3:]: + self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic) + + @pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1034") + def test_unsubscribe_from_all_pubsub_topics(self): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER) + self.subscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER) + for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER: + self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic) + self.unsubscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER) + for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER: + self.check_publish_fails_on_not_subscribed_pubsub_topic(pubsub_topic) + + @pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1034") + def test_unsubscribe_from_all_pubsub_topics_one_by_one(self): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic) + self.subscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER) + for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER: + self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic) + for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER: + self.unsubscribe_main_relay_nodes(pubsub_topics=[pubsub_topic]) + for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER: + self.check_publish_fails_on_not_subscribed_pubsub_topic(pubsub_topic) + + @pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1034") + def test_resubscribe_to_unsubscribed_pubsub_topics(self): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic) + self.subscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER) + for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER: + self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic) + self.unsubscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER) + for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER: + self.check_publish_fails_on_not_subscribed_pubsub_topic(pubsub_topic) + self.subscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER) + for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER: + self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic) + + def test_unsubscribe_from_non_subscribed_pubsub_topics(self): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic) + try: + self.unsubscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER) + if self.node1.is_nwaku(): + pass + else: + raise AssertionError("Unsubscribe from non-subscribed pubsub_topic worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) + for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER: + self.check_publish_fails_on_not_subscribed_pubsub_topic(pubsub_topic) + + def test_publish_on_multiple_pubsub_topics_and_only_after_fetch_them(self): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic) + self.subscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER) + for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER: + self.relay_message(self.node1, self.create_message(payload=to_base64(pubsub_topic)), pubsub_topic=pubsub_topic) + delay(0.1) + for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER: + get_messages_response = self.retrieve_relay_message(self.node2, pubsub_topic=pubsub_topic) + assert get_messages_response, f"Peer NODE_2 couldn't find any messages" + assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}" + assert get_messages_response[0]["payload"] == to_base64(pubsub_topic) diff --git a/tests/sharding/test_running_nodes_auto_sharding.py b/tests/sharding/test_running_nodes_auto_sharding.py new file mode 100644 index 0000000000..7963a31d02 --- /dev/null +++ b/tests/sharding/test_running_nodes_auto_sharding.py @@ -0,0 +1,113 @@ +import pytest +from src.env_vars import NODE_1, NODE_2 +from src.libs.common import delay +from src.libs.custom_logger import get_custom_logger +from src.steps.sharding import StepsSharding +from src.test_data import CONTENT_TOPICS_DIFFERENT_SHARDS, CONTENT_TOPICS_SHARD_0, CONTENT_TOPICS_SHARD_7, PUBSUB_TOPICS_SAME_CLUSTER + + +logger = get_custom_logger(__name__) + + +@pytest.mark.skipif( + "go-waku" in NODE_1 or "go-waku" in NODE_2, + reason="Autosharding tests work only on nwaku because of https://github.com/waku-org/go-waku/issues/1061", +) +class TestRunningNodesAutosharding(StepsSharding): + @pytest.mark.parametrize("content_topic", CONTENT_TOPICS_DIFFERENT_SHARDS) + def test_single_content_topic(self, content_topic): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=content_topic) + self.subscribe_first_relay_node(content_topics=[content_topic]) + self.subscribe_second_relay_node(content_topics=[content_topic]) + self.check_published_message_reaches_relay_peer(content_topic=content_topic) + + @pytest.mark.parametrize("content_topic_list", [CONTENT_TOPICS_SHARD_0, CONTENT_TOPICS_SHARD_7]) + def test_multiple_content_topics_same_shard(self, content_topic_list): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=content_topic_list) + self.subscribe_first_relay_node(content_topics=content_topic_list) + self.subscribe_second_relay_node(content_topics=content_topic_list) + for content_topic in content_topic_list: + self.check_published_message_reaches_relay_peer(content_topic=content_topic) + + def test_multiple_content_topics_different_shard(self): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS) + self.subscribe_first_relay_node(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS) + self.subscribe_second_relay_node(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS) + for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS: + self.check_published_message_reaches_relay_peer(content_topic=content_topic) + + def test_2_nodes_different_content_topic_same_shard(self): + self.setup_first_relay_node(cluster_id=self.auto_cluster, content_topic="/newsService/1.0/weekly/protobuf") + self.setup_second_relay_node(cluster_id=self.auto_cluster, content_topic="/newsService/1.0/alerts/xml") + self.subscribe_first_relay_node(content_topics=["/newsService/1.0/weekly/protobuf"]) + self.subscribe_second_relay_node(content_topics=["/newsService/1.0/alerts/xml"]) + try: + self.check_published_message_reaches_relay_peer(content_topic="/newsService/1.0/weekly/protobuf") + raise AssertionError("Publish on different content topic worked!!!") + except Exception as ex: + assert "Not Found" in str(ex) + + def test_2_nodes_different_content_topic_different_shard(self): + self.setup_first_relay_node(cluster_id=self.auto_cluster, content_topic="/myapp/1/latest/proto") + self.setup_second_relay_node(cluster_id=self.auto_cluster, content_topic="/waku/2/content/test.js") + self.subscribe_first_relay_node(content_topics=["/myapp/1/latest/proto"]) + self.subscribe_second_relay_node(content_topics=["/waku/2/content/test.js"]) + try: + self.check_published_message_reaches_relay_peer(content_topic="/myapp/1/latest/proto") + raise AssertionError("Publish on different content topic worked!!!") + except Exception as ex: + assert "Not Found" in str(ex) + + @pytest.mark.parametrize("pubsub_topic", ["/waku/2/rs/2/0", "/waku/2/rs/2/1"]) + def test_pubsub_topic_also_in_docker_flags(self, pubsub_topic): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=pubsub_topic, content_topic=self.test_content_topic) + self.subscribe_main_relay_nodes(content_topics=[self.test_content_topic]) + self.check_published_message_reaches_relay_peer(content_topic=self.test_content_topic) + + def test_content_topic_not_in_docker_flags(self): + self.setup_main_relay_nodes(cluster_id=2, pubsub_topic=self.test_pubsub_topic) + self.subscribe_main_relay_nodes(content_topics=[self.test_content_topic]) + self.check_published_message_reaches_relay_peer(content_topic=self.test_content_topic) + + def test_content_topic_and_pubsub_topic_not_in_docker_flags(self): + self.setup_main_relay_nodes(cluster_id=2) + self.subscribe_main_relay_nodes(content_topics=[self.test_content_topic]) + try: + self.check_published_message_reaches_relay_peer(content_topic=self.test_content_topic) + except Exception as ex: + assert f"Peer NODE_2:{NODE_2} couldn't find any messages" in str(ex) + + def test_multiple_content_topics_and_multiple_pubsub_topics(self): + self.setup_main_relay_nodes( + cluster_id=self.auto_cluster, content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS, pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER + ) + self.subscribe_main_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS) + for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS: + self.check_published_message_reaches_relay_peer(content_topic=content_topic) + + def test_node_uses_both_auto_and_regular_apis(self): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic) + self.subscribe_main_relay_nodes(content_topics=["/toychat/2/huilong/proto"]) + self.check_published_message_reaches_relay_peer(content_topic="/toychat/2/huilong/proto") + self.subscribe_main_relay_nodes(pubsub_topics=[self.test_pubsub_topic]) + self.check_published_message_reaches_relay_peer(pubsub_topic=self.test_pubsub_topic) + + def test_sender_uses_auto_api_receiver_uses_regular_api(self): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic) + self.subscribe_first_relay_node(content_topics=[self.test_content_topic]) + self.subscribe_second_relay_node(pubsub_topics=[self.test_pubsub_topic]) + self.relay_message(self.node1, self.create_message(contentTopic=self.test_content_topic)) + delay(0.1) + get_messages_response = self.retrieve_relay_message(self.node2, pubsub_topic=self.test_pubsub_topic) + assert get_messages_response, f"Peer NODE_2 couldn't find any messages" + assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}" + + def test_sender_uses_regular_api_receiver_uses_auto_api(self): + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic) + self.subscribe_first_relay_node(pubsub_topics=[self.test_pubsub_topic]) + self.subscribe_second_relay_node(content_topics=[self.test_content_topic]) + self.relay_message(self.node1, self.create_message(contentTopic=self.test_content_topic), pubsub_topic=self.test_pubsub_topic) + delay(0.1) + get_messages_response = self.retrieve_relay_message(self.node2, content_topic=self.test_content_topic) + assert get_messages_response, f"Peer NODE_2 couldn't find any messages" + assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}" diff --git a/tests/sharding/test_running_nodes_static_sharding.py b/tests/sharding/test_running_nodes_static_sharding.py new file mode 100644 index 0000000000..b6e3a105f8 --- /dev/null +++ b/tests/sharding/test_running_nodes_static_sharding.py @@ -0,0 +1,83 @@ +import pytest +from src.env_vars import NODE_2 +from src.libs.custom_logger import get_custom_logger +from src.steps.sharding import StepsSharding +from src.test_data import PUBSUB_TOPICS_DIFFERENT_CLUSTERS, PUBSUB_TOPICS_SAME_CLUSTER + +logger = get_custom_logger(__name__) + + +class TestRunningNodesStaticSharding(StepsSharding): + @pytest.mark.parametrize("pubsub_topic", PUBSUB_TOPICS_DIFFERENT_CLUSTERS) + def test_single_pubsub_topic(self, pubsub_topic): + self.setup_main_relay_nodes(pubsub_topic=pubsub_topic) + self.subscribe_main_relay_nodes(pubsub_topics=[pubsub_topic]) + self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic) + + def test_multiple_pubsub_topics_same_cluster(self): + self.setup_main_relay_nodes(pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER) + self.subscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER) + for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER: + self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic) + + def test_multiple_pubsub_topics_different_clusters(self): + self.setup_main_relay_nodes(pubsub_topic=PUBSUB_TOPICS_DIFFERENT_CLUSTERS) + self.subscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_DIFFERENT_CLUSTERS) + for pubsub_topic in PUBSUB_TOPICS_DIFFERENT_CLUSTERS: + self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic) + + def test_2_nodes_same_cluster_different_shards(self): + self.setup_first_relay_node(pubsub_topic=self.test_pubsub_topic) + self.setup_second_relay_node(pubsub_topic="/waku/2/rs/2/1") + self.subscribe_first_relay_node(pubsub_topics=[self.test_pubsub_topic]) + self.subscribe_second_relay_node(pubsub_topics=["/waku/2/rs/2/1"]) + try: + self.check_published_message_reaches_relay_peer(pubsub_topic=self.test_pubsub_topic) + raise AssertionError("Publish on different shard worked!!!") + except Exception as ex: + assert "Not Found" in str(ex) + + def test_2_nodes_different_cluster_same_shard(self): + self.setup_first_relay_node(pubsub_topic=self.test_pubsub_topic) + self.setup_second_relay_node(pubsub_topic="/waku/2/rs/3/0") + self.subscribe_first_relay_node(pubsub_topics=[self.test_pubsub_topic]) + self.subscribe_second_relay_node(pubsub_topics=["/waku/2/rs/3/0"]) + try: + self.check_published_message_reaches_relay_peer(pubsub_topic=self.test_pubsub_topic) + raise AssertionError("Publish on different cluster worked!!!") + except Exception as ex: + assert "Not Found" in str(ex) + + def test_2_nodes_different_cluster_different_shard(self): + self.setup_first_relay_node(pubsub_topic=self.test_pubsub_topic) + self.setup_second_relay_node(pubsub_topic="/waku/2/rs/3/1") + self.subscribe_first_relay_node(pubsub_topics=[self.test_pubsub_topic]) + self.subscribe_second_relay_node(pubsub_topics=["/waku/2/rs/3/1"]) + try: + self.check_published_message_reaches_relay_peer(pubsub_topic=self.test_pubsub_topic) + raise AssertionError("Publish on different cluster worked!!!") + except Exception as ex: + assert "Not Found" in str(ex) + + @pytest.mark.parametrize("content_topic", ["/toychat/2/huilong/proto", "/aaaaa/3/bbbbb/proto"]) + @pytest.mark.skipif("go-waku" in NODE_2, reason="Test doesn't work on go-waku") + def test_content_topic_also_in_docker_flags(self, content_topic): + self.setup_main_relay_nodes(pubsub_topic=self.test_pubsub_topic, content_topic=content_topic) + self.subscribe_main_relay_nodes(pubsub_topics=[self.test_pubsub_topic]) + self.check_published_message_reaches_relay_peer(pubsub_topic=self.test_pubsub_topic) + + # Bug reported: https://github.com/waku-org/go-waku/issues/1034#issuecomment-2011350765 + def test_pubsub_topic_not_in_docker_flags(self): + self.setup_main_relay_nodes(cluster_id=2) + self.subscribe_main_relay_nodes(pubsub_topics=[self.test_pubsub_topic]) + try: + self.check_published_message_reaches_relay_peer(pubsub_topic=self.test_pubsub_topic) + except Exception as ex: + assert f"Peer NODE_2:{NODE_2} couldn't find any messages" in str(ex) + + def test_start_node_with_50_pubsub_topics(self): + topics = ["/waku/2/rs/2/" + str(i) for i in range(50)] + self.setup_main_relay_nodes(pubsub_topic=topics) + self.subscribe_main_relay_nodes(pubsub_topics=topics) + for pubsub_topic in topics: + self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic)