From 26718c38a49b03aebb839969db67de0c4ea10c32 Mon Sep 17 00:00:00 2001 From: Florin Barbu Date: Mon, 18 Dec 2023 10:02:17 +0200 Subject: [PATCH] Filter Unsubscribe (#7) * filter unsubscribe * filter unsubscribe * fix pring typo --- .github/workflows/test.yml | 2 +- src/node/api_clients/base_client.py | 5 +- src/node/api_clients/rest.py | 20 +++- src/node/api_clients/rpc.py | 10 ++ src/node/waku_node.py | 15 +++ src/steps/filter.py | 57 ++++++++++-- tests/filter/test_subscribe_create.py | 19 +++- tests/filter/test_subscribe_update.py | 25 +++-- tests/filter/test_unsubscribe.py | 128 ++++++++++++++++++++++++++ tests/filter/test_unsubscribe_all.py | 69 ++++++++++++++ tests/relay/test_publish.py | 2 +- 11 files changed, 325 insertions(+), 27 deletions(-) create mode 100644 tests/filter/test_unsubscribe.py create mode 100644 tests/filter/test_unsubscribe_all.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 64032efe98..8fffa8006f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -57,7 +57,7 @@ jobs: - run: pip install -r requirements.txt - name: Run tests - run: pytest -n 4 --reruns 2 --alluredir=allure-results + run: pytest -n 4 --reruns 1 --alluredir=allure-results - name: Get allure history if: always() diff --git a/src/node/api_clients/base_client.py b/src/node/api_clients/base_client.py index d40cf025e8..723668e4c9 100644 --- a/src/node/api_clients/base_client.py +++ b/src/node/api_clients/base_client.py @@ -1,5 +1,4 @@ import requests -from tenacity import retry, stop_after_delay, wait_fixed from abc import ABC, abstractmethod from src.env_vars import API_REQUEST_TIMEOUT from src.libs.custom_logger import get_custom_logger @@ -47,6 +46,10 @@ class BaseClient(ABC): def set_filter_subscriptions(self, subscription): pass + @abstractmethod + def delete_filter_subscriptions(self, subscription): + pass + @abstractmethod def get_filter_messages(self, content_topic): pass diff --git a/src/node/api_clients/rest.py b/src/node/api_clients/rest.py index 1434f2210e..cfa059ea67 100644 --- a/src/node/api_clients/rest.py +++ b/src/node/api_clients/rest.py @@ -36,10 +36,22 @@ class REST(BaseClient): set_subscriptions_response = self.rest_call("post", "filter/v2/subscriptions", json.dumps(subscription)) return set_subscriptions_response.json() - def get_filter_messages(self, content_topic): - get_messages_response = self.rest_call("get", f"filter/v2/messages/{quote(content_topic, safe='')}") - return get_messages_response.json() - def update_filter_subscriptions(self, subscription): update_subscriptions_response = self.rest_call("put", "filter/v2/subscriptions", json.dumps(subscription)) return update_subscriptions_response.json() + + def delete_filter_subscriptions(self, subscription): + delete_subscriptions_response = self.rest_call("delete", "filter/v2/subscriptions", json.dumps(subscription)) + return delete_subscriptions_response.json() + + def delete_all_filter_subscriptions(self, request_id): + delete_all_subscriptions_response = self.rest_call("delete", "filter/v2/subscriptions/all", json.dumps(request_id)) + return delete_all_subscriptions_response.json() + + def ping_filter_subscriptions(self, request_id): + ping_subscriptions_response = self.rest_call("get", f"filter/v2/subscriptions/{quote(request_id, safe='')}") + return ping_subscriptions_response.json() + + def get_filter_messages(self, content_topic): + get_messages_response = self.rest_call("get", f"filter/v2/messages/{quote(content_topic, safe='')}") + return get_messages_response.json() diff --git a/src/node/api_clients/rpc.py b/src/node/api_clients/rpc.py index e665c11ee4..0975c82612 100644 --- a/src/node/api_clients/rpc.py +++ b/src/node/api_clients/rpc.py @@ -50,6 +50,16 @@ class RPC(BaseClient): ) return set_subscriptions_response.json()["result"] + def delete_filter_subscriptions(self, subscription): + delete_subscriptions_response = self.rpc_call( + "delete_waku_v2_filter_v1_subscription", + [ + subscription.get("contentFilters", []), + subscription.get("pubsubTopic", None), + ], + ) + return delete_subscriptions_response.json()["result"] + def get_filter_messages(self, content_topic): get_messages_response = self.rpc_call("get_waku_v2_filter_v1_messages", [content_topic]) return get_messages_response.json()["result"] diff --git a/src/node/waku_node.py b/src/node/waku_node.py index a1a27d3387..697ea5d116 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -151,6 +151,21 @@ class WakuNode: else: return self._api.update_filter_subscriptions(subscription) + def delete_filter_subscriptions(self, subscription): + return self._api.delete_filter_subscriptions(subscription) + + def delete_all_filter_subscriptions(self, request_id): + if PROTOCOL == "RPC": + pytest.skip("This method doesn't exist for RPC protocol") + else: + return self._api.delete_all_filter_subscriptions(request_id) + + def ping_filter_subscriptions(self, request_id): + if PROTOCOL == "RPC": + pytest.skip("This method doesn't exist for RPC protocol") + else: + return self._api.ping_filter_subscriptions(request_id) + def get_filter_messages(self, content_topic): return self._api.get_filter_messages(content_topic) diff --git a/src/steps/filter.py b/src/steps/filter.py index 398b66f541..7e5ea68f80 100644 --- a/src/steps/filter.py +++ b/src/steps/filter.py @@ -16,8 +16,9 @@ logger = get_custom_logger(__name__) class StepsFilter: test_pubsub_topic = VALID_PUBSUB_TOPICS[1] + second_pubsub_topic = VALID_PUBSUB_TOPICS[2] test_content_topic = "/test/1/waku-filter/proto" - second_conted_topic = "/test/2/waku-filter/proto" + second_content_topic = "/test/2/waku-filter/proto" test_payload = "Filter works!!" @pytest.fixture(scope="function") @@ -35,7 +36,7 @@ class StepsFilter: def setup_main_filter_node(self, request): logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") self.node2 = WakuNode(NODE_2, f"node2_{request.cls.test_id}") - self.node2.start(filter="true", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id) + self.node2.start(relay="false", filter="true", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id) self.main_nodes = [self.node2] self.optional_nodes = [] @@ -52,10 +53,21 @@ class StepsFilter: else: pytest.skip("ADDITIONAL_NODES is empty, cannot run test") for index, node in enumerate(nodes): - node = WakuNode(node, f"node{index}_{request.cls.test_id}") - node.start(filter="true", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id) + node = WakuNode(node, f"additional_node{index}_{request.cls.test_id}") + node.start(relay="false", filter="true", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id) self.optional_nodes.append(node) + @pytest.fixture(scope="function") + @retry(stop=stop_after_delay(20), wait=wait_fixed(1), reraise=True) + def filter_warm_up(self): + try: + self.ping_filter_subscriptions("1") + except Exception as ex: + if "peer has no subscriptions" in str(ex): + logger.info("WARM UP successful!!") + else: + raise TimeoutError(f"WARM UP FAILED WITH: {ex}") + @allure.step def check_published_message_reaches_filter_peer( self, message=None, pubsub_topic=None, message_propagation_delay=0.1, sender=None, peer_list=None @@ -79,7 +91,15 @@ class StepsFilter: waku_message = WakuMessage(get_messages_response) waku_message.assert_received_message(message) - @retry(stop=stop_after_delay(10), wait=wait_fixed(1), reraise=True) + @allure.step + def check_publish_without_filter_subscription(self, message=None, pubsub_topic=None): + try: + self.check_published_message_reaches_filter_peer(message=message, pubsub_topic=pubsub_topic) + raise AssertionError("Publish with no subscription worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) + + @retry(stop=stop_after_delay(30), wait=wait_fixed(1), reraise=True) @allure.step def wait_for_subscriptions_on_main_nodes(self, content_topic_list, pubsub_topic=None): if pubsub_topic is None: @@ -90,8 +110,7 @@ class StepsFilter: {"requestId": request_id, "contentFilters": content_topic_list, "pubsubTopic": pubsub_topic} ) assert filter_sub_response["requestId"] == request_id - assert filter_sub_response["statusCode"] == 0 - assert filter_sub_response["statusDesc"] == "" + assert filter_sub_response["statusDesc"] in ["OK", ""] # until https://github.com/waku-org/nwaku/issues/2286 is fixed @allure.step def create_filter_subscription(self, subscription, node=None): @@ -105,6 +124,30 @@ class StepsFilter: node = self.node2 return node.update_filter_subscriptions(subscription) + @allure.step + def delete_filter_subscription(self, subscription, node=None): + if node is None: + node = self.node2 + delete_sub_response = node.delete_filter_subscriptions(subscription) + assert delete_sub_response["requestId"] == subscription["requestId"] + assert delete_sub_response["statusDesc"] in ["OK", ""] # until https://github.com/waku-org/nwaku/issues/2286 is fixed + + @allure.step + def delete_all_filter_subscriptions(self, request_id, node=None): + if node is None: + node = self.node2 + delete_sub_response = node.delete_all_filter_subscriptions(request_id) + assert delete_sub_response["requestId"] == request_id["requestId"] + assert delete_sub_response["statusDesc"] in ["OK", ""] # until https://github.com/waku-org/nwaku/issues/2286 is fixed + + @allure.step + def ping_filter_subscriptions(self, request_id, node=None): + if node is None: + node = self.node2 + ping_sub_response = node.ping_filter_subscriptions(request_id) + assert ping_sub_response["requestId"] == request_id + assert ping_sub_response["statusDesc"] in ["OK", ""] # until https://github.com/waku-org/nwaku/issues/2286 is fixed + @allure.step def add_new_relay_subscription(self, pubsub_topics, node=None): if node is None: diff --git a/tests/filter/test_subscribe_create.py b/tests/filter/test_subscribe_create.py index 87f57d0c57..9f80420736 100644 --- a/tests/filter/test_subscribe_create.py +++ b/tests/filter/test_subscribe_create.py @@ -76,21 +76,21 @@ class TestFilterSubscribeUpdate(StepsFilter): def test_filter_subscribe_with_no_pubsub_topic(self, subscribe_main_nodes): try: self.create_filter_subscription({"requestId": "1", "contentFilters": [self.test_content_topic]}) - raise AssertionError(f"Subscribe with no pubusub topics worked!!!") + raise AssertionError("Subscribe with no pubusub topics worked!!!") except Exception as ex: assert "Bad Request" in str(ex) def test_filter_subscribe_with_invalid_pubsub_topic_format(self, subscribe_main_nodes): try: self.create_filter_subscription({"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": [self.test_pubsub_topic]}) - raise AssertionError(f"Subscribe with invalid pubusub topics worked!!!") + raise AssertionError("Subscribe with invalid pubusub topics worked!!!") except Exception as ex: assert "Bad Request" in str(ex) def test_filter_subscribe_with_no_content_topic(self, subscribe_main_nodes): try: self.create_filter_subscription({"requestId": "1", "pubsubTopic": self.test_pubsub_topic}) - raise AssertionError(f"Subscribe with no content topics worked!!!") + raise AssertionError("Subscribe with no content topics worked!!!") except Exception as ex: assert "Bad Request" in str(ex) @@ -108,13 +108,22 @@ class TestFilterSubscribeUpdate(StepsFilter): def test_filter_subscribe_with_no_request_id(self, subscribe_main_nodes): try: self.create_filter_subscription({"contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic}) - raise AssertionError(f"Subscribe with no request id worked!!!") + raise AssertionError("Subscribe with no request id worked!!!") except Exception as ex: assert "Bad Request" in str(ex) def test_filter_subscribe_with_invalid_request_id(self, subscribe_main_nodes): try: self.create_filter_subscription({"requestId": 1, "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic}) - raise AssertionError(f"Subscribe with invalid request id worked!!!") + raise AssertionError("Subscribe with invalid request id worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_filter_subscribe_with_extra_field(self, subscribe_main_nodes): + try: + self.create_filter_subscription( + {"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic, "extraField": "extraValue"} + ) + raise AssertionError("Subscribe with extra field worked!!!") except Exception as ex: assert "Bad Request" in str(ex) diff --git a/tests/filter/test_subscribe_update.py b/tests/filter/test_subscribe_update.py index 54b2041159..ca2cc889b9 100644 --- a/tests/filter/test_subscribe_update.py +++ b/tests/filter/test_subscribe_update.py @@ -10,9 +10,9 @@ logger = get_custom_logger(__name__) class TestFilterSubscribeCreate(StepsFilter): def test_filter_update_subscription_add_a_new_content_topic(self): self.wait_for_subscriptions_on_main_nodes([self.test_content_topic], pubsub_topic=self.test_pubsub_topic) - self.update_filter_subscription({"requestId": "1", "contentFilters": [self.second_conted_topic], "pubsubTopic": self.test_pubsub_topic}) + self.update_filter_subscription({"requestId": "1", "contentFilters": [self.second_content_topic], "pubsubTopic": self.test_pubsub_topic}) self.check_published_message_reaches_filter_peer(self.create_message(contentTopic=self.test_content_topic)) - self.check_published_message_reaches_filter_peer(self.create_message(contentTopic=self.second_conted_topic)) + self.check_published_message_reaches_filter_peer(self.create_message(contentTopic=self.second_content_topic)) def test_filter_update_subscription_add_30_new_content_topics(self): self.wait_for_subscriptions_on_main_nodes([self.test_content_topic], pubsub_topic=self.test_pubsub_topic) @@ -47,7 +47,7 @@ class TestFilterSubscribeCreate(StepsFilter): def test_filter_update_subscription_add_a_new_pubsub_topic(self): self.wait_for_subscriptions_on_main_nodes([self.test_content_topic], pubsub_topic=self.test_pubsub_topic) self.update_filter_subscription( - {"requestId": "1", "contentFilters": [self.test_content_topic, self.second_conted_topic], "pubsubTopic": VALID_PUBSUB_TOPICS[4]} + {"requestId": "1", "contentFilters": [self.test_content_topic, self.second_content_topic], "pubsubTopic": VALID_PUBSUB_TOPICS[4]} ) self.add_new_relay_subscription(VALID_PUBSUB_TOPICS[4:5]) self.check_published_message_reaches_filter_peer( @@ -57,13 +57,13 @@ class TestFilterSubscribeCreate(StepsFilter): self.create_message(contentTopic=self.test_content_topic), pubsub_topic=VALID_PUBSUB_TOPICS[4] ) self.check_published_message_reaches_filter_peer( - self.create_message(contentTopic=self.second_conted_topic), pubsub_topic=VALID_PUBSUB_TOPICS[4] + self.create_message(contentTopic=self.second_content_topic), pubsub_topic=VALID_PUBSUB_TOPICS[4] ) def test_filter_update_subscription_with_no_pubsub_topic(self, subscribe_main_nodes): try: self.update_filter_subscription({"requestId": "1", "contentFilters": [self.test_content_topic]}) - raise AssertionError(f"Subscribe with no pubusub topics worked!!!") + raise AssertionError("Subscribe with no pubusub topics worked!!!") except Exception as ex: assert "Bad Request" in str(ex) @@ -77,7 +77,7 @@ class TestFilterSubscribeCreate(StepsFilter): def test_filter_update_subscription_with_no_content_topic(self, subscribe_main_nodes): try: self.update_filter_subscription({"requestId": "1", "pubsubTopic": self.test_pubsub_topic}) - raise AssertionError(f"Subscribe with no content topics worked!!!") + raise AssertionError("Subscribe with no content topics worked!!!") except Exception as ex: assert "Bad Request" in str(ex) @@ -95,13 +95,22 @@ class TestFilterSubscribeCreate(StepsFilter): def test_filter_update_subscription_with_no_request_id(self, subscribe_main_nodes): try: self.update_filter_subscription({"contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic}) - raise AssertionError(f"Subscribe with no request id worked!!!") + raise AssertionError("Subscribe with no request id worked!!!") except Exception as ex: assert "Bad Request" in str(ex) def test_filter_update_subscription_with_invalid_request_id(self, subscribe_main_nodes): try: self.update_filter_subscription({"requestId": 1, "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic}) - raise AssertionError(f"Subscribe with invalid request id worked!!!") + raise AssertionError("Subscribe with invalid request id worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_filter_update_subscription_with_extra_field(self, subscribe_main_nodes): + try: + self.update_filter_subscription( + {"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic, "extraField": "extraValue"} + ) + raise AssertionError("Subscribe with extra field worked!!!") except Exception as ex: assert "Bad Request" in str(ex) diff --git a/tests/filter/test_unsubscribe.py b/tests/filter/test_unsubscribe.py new file mode 100644 index 0000000000..50e9f3b105 --- /dev/null +++ b/tests/filter/test_unsubscribe.py @@ -0,0 +1,128 @@ +import pytest +from src.test_data import SAMPLE_INPUTS +from src.steps.filter import StepsFilter + + +@pytest.mark.usefixtures("setup_relay_node", "setup_main_filter_node", "subscribe_main_nodes") +class TestFilterUnSubscribe(StepsFilter): + def test_filter_unsubscribe_from_single_content_topic(self): + self.check_published_message_reaches_filter_peer() + self.delete_filter_subscription({"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic}) + self.check_publish_without_filter_subscription() + + def test_filter_unsubscribe_from_all_subscribed_content_topics(self): + content_topics = [input["value"] for input in SAMPLE_INPUTS[:2]] + self.wait_for_subscriptions_on_main_nodes(content_topics) + self.check_published_message_reaches_filter_peer(self.create_message(contentTopic=content_topics[1])) + self.delete_filter_subscription({"requestId": "1", "contentFilters": content_topics, "pubsubTopic": self.test_pubsub_topic}) + self.check_publish_without_filter_subscription(self.create_message(contentTopic=content_topics[1])) + + def test_filter_unsubscribe_from_some_of_the_subscribed_content_topics(self): + content_topics = [input["value"] for input in SAMPLE_INPUTS[:2]] + self.wait_for_subscriptions_on_main_nodes(content_topics) + self.check_published_message_reaches_filter_peer(self.create_message(contentTopic=content_topics[1])) + self.delete_filter_subscription({"requestId": "1", "contentFilters": content_topics[:1], "pubsubTopic": self.test_pubsub_topic}) + self.check_published_message_reaches_filter_peer(self.create_message(contentTopic=content_topics[1])) + self.check_publish_without_filter_subscription(self.create_message(contentTopic=content_topics[0])) + + def test_filter_unsubscribe_from_pubsub_topics(self): + self.wait_for_subscriptions_on_main_nodes([self.test_content_topic], self.test_pubsub_topic) + self.wait_for_subscriptions_on_main_nodes([self.second_content_topic], self.second_pubsub_topic) + self.check_published_message_reaches_filter_peer(self.create_message(contentTopic=self.test_content_topic), self.test_pubsub_topic) + self.check_published_message_reaches_filter_peer(self.create_message(contentTopic=self.second_content_topic), self.second_pubsub_topic) + self.delete_filter_subscription({"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic}) + self.check_publish_without_filter_subscription() + self.check_published_message_reaches_filter_peer(self.create_message(contentTopic=self.second_content_topic), self.second_pubsub_topic) + self.delete_filter_subscription({"requestId": "1", "contentFilters": [self.second_content_topic], "pubsubTopic": self.second_pubsub_topic}) + self.check_publish_without_filter_subscription() + self.check_publish_without_filter_subscription(self.create_message(contentTopic=self.second_content_topic), self.second_pubsub_topic) + + def test_filter_unsubscribe_from_non_existing_content_topic(self): + self.delete_filter_subscription({"requestId": "1", "contentFilters": [self.second_content_topic], "pubsubTopic": self.test_pubsub_topic}) + self.check_published_message_reaches_filter_peer() + + def test_filter_unsubscribe_from_non_existing_pubsub_topic(self): + try: + self.delete_filter_subscription({"requestId": "1", "contentFilters": [self.test_pubsub_topic], "pubsubTopic": self.second_pubsub_topic}) + raise AssertionError("Unsubscribe with non existing pubsub topic worked!!!") + except Exception as ex: + assert "Not Found" and "peer has no subscriptions" in str(ex) + self.check_published_message_reaches_filter_peer() + + def test_filter_unsubscribe_from_31_content_topics(self): + try: + self.delete_filter_subscription( + {"requestId": "1", "contentFilters": [input["value"] for input in SAMPLE_INPUTS[:31]], "pubsubTopic": self.test_pubsub_topic} + ) + raise AssertionError("Unsubscribe from more than 30 content topics worked!!!") + except Exception as ex: + assert "Not Found" and "exceeds maximum content topics: 30" in str(ex) + + def test_filter_unsubscribe_with_no_content_topic(self): + try: + self.delete_filter_subscription({"requestId": "1", "pubsubTopic": self.test_pubsub_topic}) + raise AssertionError("Unsubscribe with no content topics worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_filter_unsubscribe_with_content_topic_string_instead_of_list(self): + try: + self.delete_filter_subscription({"requestId": "1", "contentFilters": self.test_content_topic, "pubsubTopic": self.test_pubsub_topic}) + raise AssertionError("Unsubscribe with invalid content topics worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_filter_unsubscribe_with_no_pubsub_topic(self): + try: + self.delete_filter_subscription({"requestId": "1", "contentFilters": self.test_content_topic}) + raise AssertionError("Unsubscribe with no pubsub topic worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_filter_unsubscribe_with_pubsub_topic_string_instead_of_list(self): + try: + self.delete_filter_subscription({"requestId": "1", "contentFilters": self.test_content_topic, "pubsubTopic": self.test_pubsub_topic}) + raise AssertionError("Unsubscribe with invalid pubsub topic worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_filter_unsubscribe_with_very_large_request_id(self): + self.check_published_message_reaches_filter_peer() + self.delete_filter_subscription( + { + "requestId": "12345678901234567890123456789012345678901234567890", + "contentFilters": [self.test_content_topic], + "pubsubTopic": self.test_pubsub_topic, + } + ) + self.check_publish_without_filter_subscription() + + def test_filter_unsubscribe_with_no_request_id(self): + try: + self.delete_filter_subscription({"contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic}) + raise AssertionError("Unsubscribe with no request id worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_filter_unsubscribe_with_invalid_request_id(self): + try: + self.delete_filter_subscription({"requestId": 1, "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic}) + raise AssertionError("Unsubscribe with invalid request id worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_filter_unsubscribe_with_extra_field(self): + try: + self.delete_filter_subscription( + {"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic, "extraField": "extraValue"} + ) + raise AssertionError("Unsubscribe with extra field worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_filter_resubscribe_to_unsubscribed_topics(self): + self.check_published_message_reaches_filter_peer() + self.delete_filter_subscription({"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic}) + self.check_publish_without_filter_subscription() + self.wait_for_subscriptions_on_main_nodes([self.test_content_topic], self.test_pubsub_topic) + self.check_published_message_reaches_filter_peer() diff --git a/tests/filter/test_unsubscribe_all.py b/tests/filter/test_unsubscribe_all.py new file mode 100644 index 0000000000..0fd57d7392 --- /dev/null +++ b/tests/filter/test_unsubscribe_all.py @@ -0,0 +1,69 @@ +import pytest +from src.test_data import SAMPLE_INPUTS, VALID_PUBSUB_TOPICS +from src.steps.filter import StepsFilter +from random import choice + + +@pytest.mark.usefixtures("setup_relay_node", "setup_main_filter_node", "filter_warm_up") +class TestFilterUnSubscribeAll(StepsFilter): + def test_filter_unsubscribe_all_from_few_content_topics(self): + content_topics = [input["value"] for input in SAMPLE_INPUTS[:5]] + self.wait_for_subscriptions_on_main_nodes(content_topics) + for content_topic in content_topics: + self.check_published_message_reaches_filter_peer(self.create_message(contentTopic=content_topic)) + self.delete_all_filter_subscriptions({"requestId": "1"}) + for content_topic in content_topics: + self.check_publish_without_filter_subscription(self.create_message(contentTopic=content_topic)) + + def test_filter_unsubscribe_all_from_90_content_topics(self): + first_list = [str(i) for i in range(1, 31)] + second_list = [str(i) for i in range(31, 61)] + third_list = [str(i) for i in range(61, 91)] + self.wait_for_subscriptions_on_main_nodes(first_list) + self.wait_for_subscriptions_on_main_nodes(second_list) + self.wait_for_subscriptions_on_main_nodes(third_list) + self.check_published_message_reaches_filter_peer(self.create_message(contentTopic=choice(first_list))) + self.check_published_message_reaches_filter_peer(self.create_message(contentTopic=choice(second_list))) + self.check_published_message_reaches_filter_peer(self.create_message(contentTopic=choice(third_list))) + self.delete_all_filter_subscriptions({"requestId": "1"}) + self.check_publish_without_filter_subscription(self.create_message(contentTopic=choice(first_list))) + self.check_publish_without_filter_subscription(self.create_message(contentTopic=choice(second_list))) + self.check_publish_without_filter_subscription(self.create_message(contentTopic=choice(third_list))) + + def test_filter_unsubscribe_all_from_multiple_pubsub_topics(self): + for pubsub_topic in VALID_PUBSUB_TOPICS: + content_topic = pubsub_topic + self.wait_for_subscriptions_on_main_nodes([content_topic], pubsub_topic) + self.check_published_message_reaches_filter_peer(self.create_message(contentTopic=content_topic), pubsub_topic=pubsub_topic) + self.delete_all_filter_subscriptions({"requestId": "1"}) + for pubsub_topic in VALID_PUBSUB_TOPICS: + content_topic = pubsub_topic + self.check_publish_without_filter_subscription(self.create_message(contentTopic=content_topic), pubsub_topic=pubsub_topic) + + def test_filter_unsubscribe_all_on_peer_with_no_subscription(self): + try: + self.delete_all_filter_subscriptions({"requestId": "1"}) + raise AssertionError("Unsubscribe all on peer without subscriptions worked!!!") + except Exception as ex: + assert "Not Found" and "peer has no subscriptions" in str(ex) + + def test_filter_unsubscribe_all_with_no_request_id(self, subscribe_main_nodes): + try: + self.delete_all_filter_subscriptions({}) + raise AssertionError("Unsubscribe all with no request id worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_filter_unsubscribe_all_with_invalid_request_id(self, subscribe_main_nodes): + try: + self.delete_all_filter_subscriptions({"requestId": 1}) + raise AssertionError("Unsubscribe all with invalid request id worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_filter_unsubscribe_all_with_extra_field(self): + try: + self.delete_all_filter_subscriptions({"requestId": 1, "extraField": "extraValue"}) + raise AssertionError("Unsubscribe all with extra field worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) diff --git a/tests/relay/test_publish.py b/tests/relay/test_publish.py index 0beb52f822..5e94f961e7 100644 --- a/tests/relay/test_publish.py +++ b/tests/relay/test_publish.py @@ -55,7 +55,7 @@ class TestRelayPublish(StepsRelay): message = self.create_message(payload=to_base64("a" * (payload_length))) try: self.check_published_message_reaches_relay_peer(message, message_propagation_delay=2) - raise AssertionError("Duplicate message was retrieved twice") + raise AssertionError("Message with payload > 1MB was received") except Exception as ex: assert "couldn't find any messages" in str(ex)