From d7be7e9504c6be639fe078a7269af919f820da9f Mon Sep 17 00:00:00 2001 From: Florin Barbu Date: Wed, 27 Dec 2023 16:03:31 +0200 Subject: [PATCH] multiple node tests (#8) * mulitple node tests * fix filter get message * adjustments for gowaku as filter * small adjustments * small adjustments * adjustments after CI runs * prepare for PR * prepare for PR * prepare for PR * address review comments * use cluster id 0 as suggested by Prem * small adjustments after CI run * small adjustments after CI run * small adjustments after CI run * small adjustments after CI run --- .github/workflows/test.yml | 10 ++- src/env_vars.py | 8 +- src/node/api_clients/base_client.py | 2 +- src/node/api_clients/rest.py | 8 +- src/node/docker_mananger.py | 2 +- src/node/waku_node.py | 33 ++++++-- src/steps/filter.py | 106 ++++++++++++++++-------- src/steps/metrics.py | 33 ++++++++ src/steps/relay.py | 11 ++- src/test_data.py | 14 +--- tests/filter/test_idle_subscriptions.py | 39 +++++++++ tests/filter/test_multiple_nodes.py | 21 +++++ tests/filter/test_subscribe_create.py | 41 +++++++-- tests/filter/test_subscribe_update.py | 8 +- tests/filter/test_unsubscribe.py | 43 ++++++++-- tests/filter/test_unsubscribe_all.py | 18 ++-- tests/relay/test_multiple_nodes.py | 2 +- tests/relay/test_publish.py | 16 +++- tests/relay/test_subscribe.py | 2 +- 19 files changed, 316 insertions(+), 101 deletions(-) create mode 100644 src/steps/metrics.py create mode 100644 tests/filter/test_idle_subscriptions.py create mode 100644 tests/filter/test_multiple_nodes.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 8fffa8006f..a87c219503 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -1,5 +1,9 @@ name: Interop Tests +concurrency: + group: Interop-tests + cancel-in-progress: false + on: schedule: - cron: '0 3 * * *' @@ -12,17 +16,17 @@ on: required: true description: "Node that usually publishes messages. Used for all tests" type: string - default: "wakuorg/nwaku:latest" + default: "harbor.status.im/wakuorg/go-waku:latest" node2: required: true description: "Node that usually queries for published messages. Used for all tests" type: string - default: "wakuorg/go-waku:latest" + default: "harbor.status.im/wakuorg/nwaku:latest" additional_nodes: required: false description: "Additional optional nodes used in e2e tests, separated by ," type: string - default: "wakuorg/nwaku:latest,wakuorg/go-waku:latest" + default: "harbor.status.im/wakuorg/nwaku:latest,harbor.status.im/wakuorg/go-waku:latest,harbor.status.im/wakuorg/nwaku:latest" protocol: description: "Protocol used to comunicate inside the network" required: true diff --git a/src/env_vars.py b/src/env_vars.py index a996661cc3..b14f16ca8b 100644 --- a/src/env_vars.py +++ b/src/env_vars.py @@ -14,9 +14,11 @@ def get_env_var(var_name, default=None): # Configuration constants. Need to be upercase to appear in reports -NODE_1 = get_env_var("NODE_1", "wakuorg/go-waku:latest") -NODE_2 = get_env_var("NODE_2", "wakuorg/nwaku:latest") -ADDITIONAL_NODES = get_env_var("ADDITIONAL_NODES", "wakuorg/nwaku:latest,wakuorg/go-waku:latest") +DEFAULT_NWAKU = "harbor.status.im/wakuorg/nwaku:latest" +DEFAULT_GOWAKU = "harbor.status.im/wakuorg/go-waku:latest" +NODE_1 = get_env_var("NODE_1", DEFAULT_GOWAKU) +NODE_2 = get_env_var("NODE_2", DEFAULT_NWAKU) +ADDITIONAL_NODES = get_env_var("ADDITIONAL_NODES", f"{DEFAULT_NWAKU},{DEFAULT_GOWAKU},{DEFAULT_NWAKU}") # more nodes need to follow the NODE_X pattern DOCKER_LOG_DIR = get_env_var("DOCKER_LOG_DIR", "./log/docker") NETWORK_NAME = get_env_var("NETWORK_NAME", "waku") diff --git a/src/node/api_clients/base_client.py b/src/node/api_clients/base_client.py index 723668e4c9..ceeeab812a 100644 --- a/src/node/api_clients/base_client.py +++ b/src/node/api_clients/base_client.py @@ -8,7 +8,7 @@ logger = get_custom_logger(__name__) class BaseClient(ABC): def make_request(self, method, url, headers=None, data=None): - logger.debug(f"{method.upper()} call: {url} with payload: {data}") + logger.info(f"{method.upper()} call: {url} with payload: {data}") response = requests.request(method.upper(), url, headers=headers, data=data, timeout=API_REQUEST_TIMEOUT) try: response.raise_for_status() diff --git a/src/node/api_clients/rest.py b/src/node/api_clients/rest.py index cfa059ea67..10268db6a5 100644 --- a/src/node/api_clients/rest.py +++ b/src/node/api_clients/rest.py @@ -52,6 +52,10 @@ class REST(BaseClient): ping_subscriptions_response = self.rest_call("get", f"filter/v2/subscriptions/{quote(request_id, safe='')}") return ping_subscriptions_response.json() - def get_filter_messages(self, content_topic): - get_messages_response = self.rest_call("get", f"filter/v2/messages/{quote(content_topic, safe='')}") + def get_filter_messages(self, content_topic, pubsub_topic=None): + if pubsub_topic is not None: + endpoint = f"filter/v2/messages/{quote(pubsub_topic, safe='')}/{quote(content_topic, safe='')}" + else: + endpoint = f"filter/v2/messages/{quote(content_topic, safe='')}" + get_messages_response = self.rest_call("get", endpoint) return get_messages_response.json() diff --git a/src/node/docker_mananger.py b/src/node/docker_mananger.py index 152e04dfa8..6f59a9a9c4 100644 --- a/src/node/docker_mananger.py +++ b/src/node/docker_mananger.py @@ -59,7 +59,7 @@ class DockerManager: for chunk in container.logs(stream=True): log_file.write(chunk) - def generate_ports(self, base_port=None, count=5): + def generate_ports(self, base_port=None, count=6): if base_port is None: base_port = random.randint(1024, 65535 - count) ports = [base_port + i for i in range(count)] diff --git a/src/node/waku_node.py b/src/node/waku_node.py index 697ea5d116..86c469d41a 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -1,13 +1,13 @@ import os - import pytest +import requests from src.libs.common import delay from src.libs.custom_logger import get_custom_logger from tenacity import retry, stop_after_delay, wait_fixed from src.node.api_clients.rpc import RPC from src.node.api_clients.rest import REST from src.node.docker_mananger import DockerManager -from src.env_vars import DOCKER_LOG_DIR, DEFAULT_PUBSUB_TOPIC, PROTOCOL +from src.env_vars import DOCKER_LOG_DIR, PROTOCOL from src.data_storage import DS logger = get_custom_logger(__name__) @@ -29,8 +29,9 @@ class WakuNode: self._ports = self._docker_manager.generate_ports() self._rest_port = self._ports[0] self._rpc_port = self._ports[1] - self._websocket_port = self._ports[3] self._tcp_port = self._ports[2] + self._websocket_port = self._ports[3] + self._metrics_port = self._ports[5] if PROTOCOL == "RPC": self._api = RPC(self._rpc_port, self._image_name) @@ -58,15 +59,26 @@ class WakuNode: "nat": f"extip:{self._ext_ip}", "peer-exchange": "true", "discv5-discovery": "true", + "cluster-id": "0", } - if "go-waku" in self._docker_manager.image: + if self.is_gowaku(): go_waku_args = { "min-relay-peers-to-publish": "1", "legacy-filter": "false", "log-level": "DEBUG", } default_args.update(go_waku_args) + elif self.is_nwaku(): + nwaku_args = { + "metrics-server": "true", + "metrics-server-address": "0.0.0.0", + "metrics-server-port": self._metrics_port, + "metrics-logging": "true", + } + default_args.update(nwaku_args) + else: + raise NotImplementedError("Not implemented for this node type") for key, value in kwargs.items(): key = key.replace("_", "-") @@ -89,6 +101,7 @@ class WakuNode: if self._container: logger.debug(f"Stopping container with id {self._container.short_id}") self._container.stop() + self._container = None logger.debug("Container stopped.") def restart(self): @@ -166,8 +179,16 @@ class WakuNode: else: return self._api.ping_filter_subscriptions(request_id) - def get_filter_messages(self, content_topic): - return self._api.get_filter_messages(content_topic) + def get_filter_messages(self, content_topic, pubsub_topic=None): + return self._api.get_filter_messages(content_topic, pubsub_topic) + + def get_metrics(self): + if self.is_nwaku(): + metrics = requests.get(f"http://localhost:{self._metrics_port}/metrics") + metrics.raise_for_status() + return metrics.content.decode("utf-8") + else: + pytest.skip(f"This method doesn't exist for node {self.type()}") @property def image(self): diff --git a/src/steps/filter.py b/src/steps/filter.py index 7e5ea68f80..bb8f83ac68 100644 --- a/src/steps/filter.py +++ b/src/steps/filter.py @@ -17,46 +17,34 @@ logger = get_custom_logger(__name__) class StepsFilter: test_pubsub_topic = VALID_PUBSUB_TOPICS[1] second_pubsub_topic = VALID_PUBSUB_TOPICS[2] + another_cluster_pubsub_topic = "/waku/2/rs/2/2" test_content_topic = "/test/1/waku-filter/proto" second_content_topic = "/test/2/waku-filter/proto" test_payload = "Filter works!!" - @pytest.fixture(scope="function") - def setup_relay_node(self, request): + @pytest.fixture(scope="function", autouse=True) + def filter_setup(self): logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") - self.node1 = WakuNode(NODE_1, f"node1_{request.cls.test_id}") - start_args = {"relay": "true", "filter": "true", "nodekey": NODEKEY} - if self.node1.is_gowaku(): - start_args["min_relay_peers_to_publish"] = "0" - self.node1.start(**start_args) - self.enr_uri = self.node1.get_enr_uri() - self.multiaddr_with_id = self.node1.get_multiaddr_with_id() + self.main_nodes = [] + self.optional_nodes = [] @pytest.fixture(scope="function") - def setup_main_filter_node(self, request): + def setup_main_relay_node(self): logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") - self.node2 = WakuNode(NODE_2, f"node2_{request.cls.test_id}") + self.relay_node_start(NODE_1) + + @pytest.fixture(scope="function") + def setup_main_filter_node(self): + logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") + self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") self.node2.start(relay="false", filter="true", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id) - self.main_nodes = [self.node2] - self.optional_nodes = [] + self.main_nodes.append(self.node2) @pytest.fixture(scope="function") def subscribe_main_nodes(self): logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") self.wait_for_subscriptions_on_main_nodes([self.test_content_topic]) - @pytest.fixture(scope="function") - def setup_optional_filter_nodes(self, request): - logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") - if ADDITIONAL_NODES: - nodes = [node.strip() for node in ADDITIONAL_NODES.split(",")] - else: - pytest.skip("ADDITIONAL_NODES is empty, cannot run test") - for index, node in enumerate(nodes): - node = WakuNode(node, f"additional_node{index}_{request.cls.test_id}") - node.start(relay="false", filter="true", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id) - self.optional_nodes.append(node) - @pytest.fixture(scope="function") @retry(stop=stop_after_delay(20), wait=wait_fixed(1), reraise=True) def filter_warm_up(self): @@ -68,6 +56,25 @@ class StepsFilter: else: raise TimeoutError(f"WARM UP FAILED WITH: {ex}") + def relay_node_start(self, node): + self.node1 = WakuNode(node, f"node1_{self.test_id}") + start_args = {"relay": "true", "filter": "true", "nodekey": NODEKEY} + if self.node1.is_gowaku(): + start_args["min_relay_peers_to_publish"] = "0" + self.node1.start(**start_args) + self.enr_uri = self.node1.get_enr_uri() + self.multiaddr_with_id = self.node1.get_multiaddr_with_id() + + def setup_optional_filter_nodes(self, node_list=ADDITIONAL_NODES): + if node_list: + nodes = [node.strip() for node in node_list.split(",") if node] + else: + pytest.skip("ADDITIONAL_NODES/node_list is empty, cannot run test") + for index, node in enumerate(nodes): + node = WakuNode(node, f"additional_node{index + 1}_{self.test_id}") + node.start(relay="false", filter="true", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id) + self.optional_nodes.append(node) + @allure.step def check_published_message_reaches_filter_peer( self, message=None, pubsub_topic=None, message_propagation_delay=0.1, sender=None, peer_list=None @@ -85,33 +92,47 @@ class StepsFilter: delay(message_propagation_delay) for index, peer in enumerate(peer_list): logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the published message") - get_messages_response = self.get_filter_messages(message["contentTopic"], node=peer) + get_messages_response = self.get_filter_messages(message["contentTopic"], pubsub_topics=pubsub_topic, node=peer) assert get_messages_response, f"Peer NODE_{index}:{peer.image} couldn't find any messages" assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}" waku_message = WakuMessage(get_messages_response) waku_message.assert_received_message(message) @allure.step - def check_publish_without_filter_subscription(self, message=None, pubsub_topic=None): + def check_publish_without_filter_subscription(self, message=None, pubsub_topic=None, peer_list=None): try: - self.check_published_message_reaches_filter_peer(message=message, pubsub_topic=pubsub_topic) + self.check_published_message_reaches_filter_peer(message=message, pubsub_topic=pubsub_topic, peer_list=peer_list) raise AssertionError("Publish with no subscription worked!!!") except Exception as ex: - assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) + assert "Bad Request" in str(ex) or "Not Found" in str(ex) or "couldn't find any messages" in str(ex) - @retry(stop=stop_after_delay(30), wait=wait_fixed(1), reraise=True) @allure.step def wait_for_subscriptions_on_main_nodes(self, content_topic_list, pubsub_topic=None): if pubsub_topic is None: pubsub_topic = self.test_pubsub_topic self.node1.set_relay_subscriptions([pubsub_topic]) request_id = str(uuid4()) - filter_sub_response = self.create_filter_subscription( + filter_sub_response = self.create_filter_subscription_with_retry( {"requestId": request_id, "contentFilters": content_topic_list, "pubsubTopic": pubsub_topic} ) assert filter_sub_response["requestId"] == request_id assert filter_sub_response["statusDesc"] in ["OK", ""] # until https://github.com/waku-org/nwaku/issues/2286 is fixed + @allure.step + def subscribe_optional_filter_nodes(self, content_topic_list, pubsub_topic=None): + if pubsub_topic is None: + pubsub_topic = self.test_pubsub_topic + for node in self.optional_nodes: + request_id = str(uuid4()) + self.create_filter_subscription_with_retry( + {"requestId": request_id, "contentFilters": content_topic_list, "pubsubTopic": pubsub_topic}, node=node + ) + + @retry(stop=stop_after_delay(60), wait=wait_fixed(1), reraise=True) + @allure.step + def create_filter_subscription_with_retry(self, subscription, node=None): + return self.create_filter_subscription(subscription, node) + @allure.step def create_filter_subscription(self, subscription, node=None): if node is None: @@ -122,15 +143,23 @@ class StepsFilter: def update_filter_subscription(self, subscription, node=None): if node is None: node = self.node2 + if node.is_gowaku(): + pytest.skip(f"This method doesn't exist for node {node.type()}") return node.update_filter_subscriptions(subscription) @allure.step - def delete_filter_subscription(self, subscription, node=None): + def delete_filter_subscription(self, subscription, status=None, node=None): if node is None: node = self.node2 delete_sub_response = node.delete_filter_subscriptions(subscription) - assert delete_sub_response["requestId"] == subscription["requestId"] - assert delete_sub_response["statusDesc"] in ["OK", ""] # until https://github.com/waku-org/nwaku/issues/2286 is fixed + if node.is_gowaku() and "requestId" not in subscription: + assert delete_sub_response["requestId"] == "" + else: + assert delete_sub_response["requestId"] == subscription["requestId"] + if status is None: + assert delete_sub_response["statusDesc"] in ["OK", ""] # until https://github.com/waku-org/nwaku/issues/2286 is fixed + else: + assert status in delete_sub_response["statusDesc"] @allure.step def delete_all_filter_subscriptions(self, request_id, node=None): @@ -155,10 +184,15 @@ class StepsFilter: self.node1.set_relay_subscriptions(pubsub_topics) @allure.step - def get_filter_messages(self, content_topic, node=None): + def get_filter_messages(self, content_topic, pubsub_topics=None, node=None): if node is None: node = self.node2 - return node.get_filter_messages(content_topic) + if node.is_gowaku(): + return node.get_filter_messages(content_topic, pubsub_topics) + elif node.is_nwaku(): + return node.get_filter_messages(content_topic) + else: + raise NotImplementedError("Not implemented for this node type") @allure.step def create_message(self, **kwargs): diff --git a/src/steps/metrics.py b/src/steps/metrics.py new file mode 100644 index 0000000000..d7f73b5d0c --- /dev/null +++ b/src/steps/metrics.py @@ -0,0 +1,33 @@ +from src.libs.custom_logger import get_custom_logger +import allure +from tenacity import retry, stop_after_delay, wait_fixed + + +logger = get_custom_logger(__name__) + + +class StepsMetrics: + @allure.step + def check_metric(self, node, metric_name, expected_value): + logger.debug(f"Checking metric: {metric_name} has {expected_value}") + response = node.get_metrics() + lines = response.split("\n") + actual_value = None + for line in lines: + if line.startswith(metric_name): + parts = line.split(" ") + if len(parts) >= 2: + actual_value = float(parts[1]) + break + if actual_value is None: + raise AttributeError(f"Metric '{metric_name}' not found") + logger.debug(f"Found metric: {metric_name} with value {actual_value}") + assert actual_value == expected_value, f"Expected value for '{metric_name}' is {expected_value}, but got {actual_value}" + + @allure.step + def wait_for_metric(self, node, metric_name, expected_value, timeout_duration=90): + @retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(1), reraise=True) + def check_metric_with_retry(): + self.check_metric(node, metric_name, expected_value) + + check_metric_with_retry() diff --git a/src/steps/relay.py b/src/steps/relay.py index 78336c57fe..b59684a3b9 100644 --- a/src/steps/relay.py +++ b/src/steps/relay.py @@ -18,6 +18,12 @@ class StepsRelay: test_content_topic = "/test/1/waku-relay/proto" test_payload = "Relay works!!" + @pytest.fixture(scope="function", autouse=True) + def relay_setup(self): + logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") + self.main_nodes = [] + self.optional_nodes = [] + @pytest.fixture(scope="function") def setup_main_relay_nodes(self, request): logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") @@ -26,8 +32,7 @@ class StepsRelay: self.enr_uri = self.node1.get_enr_uri() self.node2 = WakuNode(NODE_2, f"node2_{request.cls.test_id}") self.node2.start(relay="true", discv5_bootstrap_node=self.enr_uri) - self.main_nodes = [self.node1, self.node2] - self.optional_nodes = [] + self.main_nodes.extend([self.node1, self.node2]) @pytest.fixture(scope="function") def setup_optional_relay_nodes(self, request): @@ -37,7 +42,7 @@ class StepsRelay: else: pytest.skip("ADDITIONAL_NODES is empty, cannot run test") for index, node in enumerate(nodes): - node = WakuNode(node, f"node{index}_{request.cls.test_id}") + node = WakuNode(node, f"additional_node{index}_{request.cls.test_id}") node.start(relay="true", discv5_bootstrap_node=self.enr_uri) self.optional_nodes.append(node) diff --git a/src/test_data.py b/src/test_data.py index cc10f5a2f5..f11669fce5 100644 --- a/src/test_data.py +++ b/src/test_data.py @@ -65,19 +65,9 @@ INVALID_CONTENT_TOPICS = [ {"description": "A bool", "value": True}, ] -VALID_PUBSUB_TOPICS = [ - DEFAULT_PUBSUB_TOPIC, - "/waku/2/rs/18/1", - "/test/2/rs/18/1", - "/waku/3/rs/18/1", - "/waku/2/test/18/1", - "/waku/2/rs/66/1", - "/waku/2/rs/18/50", - "/waku/18/50", - "test", -] +VALID_PUBSUB_TOPICS = ["/waku/2/rs/0/1", "/waku/2/rs/0/0", "/waku/2/rs/0/9", "/waku/2/rs/0/25", "/waku/2/rs/0/1000", DEFAULT_PUBSUB_TOPIC] -INVALID_PUBSUB_TOPICS = ["/test/2/rs/18/1", ("/waku/2/rs/18/1"), {"pubsub_topic": "/waku/3/rs/18/1"}, True, 12345678, [["/waku/2/rs/18/1"]]] +INVALID_PUBSUB_TOPICS = ["/test/2/rs/0/1", "/waku/3/rs/0/1", "/waku/2/test/0/1", "/waku/2/rs/0/b", "/waku/2/rs/0"] SAMPLE_TIMESTAMPS = [ diff --git a/tests/filter/test_idle_subscriptions.py b/tests/filter/test_idle_subscriptions.py new file mode 100644 index 0000000000..8429d9bbea --- /dev/null +++ b/tests/filter/test_idle_subscriptions.py @@ -0,0 +1,39 @@ +import pytest +from src.libs.common import delay +from src.env_vars import DEFAULT_NWAKU +from src.libs.custom_logger import get_custom_logger +from src.steps.filter import StepsFilter +from src.steps.metrics import StepsMetrics + +logger = get_custom_logger(__name__) + + +@pytest.mark.skip(reason="Skipping until https://github.com/waku-org/nwaku/issues/2293 is fixed") +class TestIdleSubscriptions(StepsFilter, StepsMetrics): + # tests will probably suffer minor adjustments after https://github.com/waku-org/nwaku/issues/2293 is fixed + + @pytest.mark.timeout(60 * 10) + def test_idle_filter_subscriptions_for_more_than_5_nodes(self): + self.relay_node_start(DEFAULT_NWAKU) + filter_node_list = f"{DEFAULT_NWAKU}," * 6 + self.setup_optional_filter_nodes(filter_node_list) + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.subscribe_optional_filter_nodes([self.test_content_topic]) + self.check_published_message_reaches_filter_peer(peer_list=self.optional_nodes) + self.wait_for_metric(self.node1, "waku_filter_subscriptions", 6.0) + delay(60 * 5) # not sure how many seconds to put here so hardcoded 5 minutes to be sure + # after some idle time nodes should disconnect and we should see max 5 connections + self.wait_for_metric(self.node1, "waku_filter_subscriptions", 5.0) + + @pytest.mark.timeout(60 * 10) + def test_idle_filter_subscriptions_after_node_disconnection(self): + self.relay_node_start(DEFAULT_NWAKU) + self.setup_optional_filter_nodes(DEFAULT_NWAKU) + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.subscribe_optional_filter_nodes([self.test_content_topic]) + self.check_published_message_reaches_filter_peer(peer_list=self.optional_nodes) + self.wait_for_metric(self.node1, "waku_filter_subscriptions", 1.0) + self.optional_nodes[0].stop() + delay(60 * 5) # not sure how many seconds to put here so hardcoded 5 minutes to be sure + # after some idle time the stopped node should disconnect and we should see 0 connections + self.wait_for_metric(self.node1, "waku_filter_subscriptions", 0.0) diff --git a/tests/filter/test_multiple_nodes.py b/tests/filter/test_multiple_nodes.py new file mode 100644 index 0000000000..64281d7576 --- /dev/null +++ b/tests/filter/test_multiple_nodes.py @@ -0,0 +1,21 @@ +import pytest +from src.libs.custom_logger import get_custom_logger +from src.steps.filter import StepsFilter + +logger = get_custom_logger(__name__) + + +@pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node") +class TestFilterMultipleNodes(StepsFilter): + def test_all_nodes_subscribed_to_the_topic(self): + self.setup_optional_filter_nodes() + self.wait_for_subscriptions_on_main_nodes([self.test_content_topic]) + self.subscribe_optional_filter_nodes([self.test_content_topic]) + self.check_published_message_reaches_filter_peer() + + def test_optional_nodes_not_subscribed_to_same_topic(self): + self.setup_optional_filter_nodes() + self.wait_for_subscriptions_on_main_nodes([self.test_content_topic]) + self.subscribe_optional_filter_nodes([self.second_content_topic]) + self.check_published_message_reaches_filter_peer(peer_list=self.main_nodes) + self.check_publish_without_filter_subscription(peer_list=self.optional_nodes) diff --git a/tests/filter/test_subscribe_create.py b/tests/filter/test_subscribe_create.py index 9f80420736..de267b5160 100644 --- a/tests/filter/test_subscribe_create.py +++ b/tests/filter/test_subscribe_create.py @@ -6,13 +6,13 @@ from src.steps.filter import StepsFilter logger = get_custom_logger(__name__) -@pytest.mark.usefixtures("setup_relay_node", "setup_main_filter_node") -class TestFilterSubscribeUpdate(StepsFilter): +@pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node") +class TestFilterSubscribeCreate(StepsFilter): def test_filter_subscribe_to_single_topics(self): self.wait_for_subscriptions_on_main_nodes([self.test_content_topic]) self.check_published_message_reaches_filter_peer() - def test_filter_subscribe_to_multiple_pubsub_topic(self): + def test_filter_subscribe_to_multiple_pubsub_topic_from_same_cluster(self): failed_pubsub_topics = [] for pubsub_topic in VALID_PUBSUB_TOPICS: content_topic = pubsub_topic @@ -26,6 +26,25 @@ class TestFilterSubscribeUpdate(StepsFilter): failed_pubsub_topics.append(pubsub_topic) assert not failed_pubsub_topics, f"PubsubTopics failed: {failed_pubsub_topics}" + def test_filter_subscribe_to_pubsub_topic_from_another_cluster_id(self): + self.wait_for_subscriptions_on_main_nodes([self.test_content_topic], pubsub_topic=self.another_cluster_pubsub_topic) + self.check_published_message_reaches_filter_peer(pubsub_topic=self.another_cluster_pubsub_topic) + + def test_filter_subscribe_to_pubsub_topics_from_multiple_clusters(self): + pubsub_topic_list = [self.test_pubsub_topic, self.another_cluster_pubsub_topic, self.second_pubsub_topic] + failed_pubsub_topics = [] + for pubsub_topic in pubsub_topic_list: + content_topic = pubsub_topic + logger.debug(f"Running test with pubsub topic: {pubsub_topic}") + try: + self.wait_for_subscriptions_on_main_nodes([content_topic], pubsub_topic) + message = self.create_message(contentTopic=content_topic) + self.check_published_message_reaches_filter_peer(message, pubsub_topic=pubsub_topic) + except Exception as ex: + logger.error(f"PubsubTopic {pubsub_topic} failed: {str(ex)}") + failed_pubsub_topics.append(pubsub_topic) + assert not failed_pubsub_topics, f"PubsubTopics failed: {failed_pubsub_topics}" + def test_filter_subscribe_to_30_content_topics_in_one_call(self): failed_content_topics = [] self.wait_for_subscriptions_on_main_nodes([input["value"] for input in SAMPLE_INPUTS[:30]]) @@ -76,7 +95,7 @@ class TestFilterSubscribeUpdate(StepsFilter): def test_filter_subscribe_with_no_pubsub_topic(self, subscribe_main_nodes): try: self.create_filter_subscription({"requestId": "1", "contentFilters": [self.test_content_topic]}) - raise AssertionError("Subscribe with no pubusub topics worked!!!") + # raise AssertionError("Subscribe with no pubusub topics worked!!!") commented until https://github.com/waku-org/nwaku/issues/2315 is fixed except Exception as ex: assert "Bad Request" in str(ex) @@ -90,7 +109,12 @@ class TestFilterSubscribeUpdate(StepsFilter): def test_filter_subscribe_with_no_content_topic(self, subscribe_main_nodes): try: self.create_filter_subscription({"requestId": "1", "pubsubTopic": self.test_pubsub_topic}) - raise AssertionError("Subscribe with no content topics worked!!!") + if self.node2.is_nwaku(): + raise AssertionError("Subscribe with extra field worked!!!") + elif self.node2.is_gowaku(): + pass + else: + raise NotImplementedError("Not implemented for this node type") except Exception as ex: assert "Bad Request" in str(ex) @@ -124,6 +148,11 @@ class TestFilterSubscribeUpdate(StepsFilter): self.create_filter_subscription( {"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic, "extraField": "extraValue"} ) - raise AssertionError("Subscribe with extra field worked!!!") + if self.node2.is_nwaku(): + raise AssertionError("Subscribe with extra field worked!!!") + elif self.node2.is_gowaku(): + pass + else: + raise NotImplementedError("Not implemented for this node type") except Exception as ex: assert "Bad Request" in str(ex) diff --git a/tests/filter/test_subscribe_update.py b/tests/filter/test_subscribe_update.py index ca2cc889b9..259fba740b 100644 --- a/tests/filter/test_subscribe_update.py +++ b/tests/filter/test_subscribe_update.py @@ -6,8 +6,8 @@ from src.steps.filter import StepsFilter logger = get_custom_logger(__name__) -@pytest.mark.usefixtures("setup_relay_node", "setup_main_filter_node") -class TestFilterSubscribeCreate(StepsFilter): +@pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node") +class TestFilterSubscribeUpdate(StepsFilter): def test_filter_update_subscription_add_a_new_content_topic(self): self.wait_for_subscriptions_on_main_nodes([self.test_content_topic], pubsub_topic=self.test_pubsub_topic) self.update_filter_subscription({"requestId": "1", "contentFilters": [self.second_content_topic], "pubsubTopic": self.test_pubsub_topic}) @@ -62,8 +62,8 @@ class TestFilterSubscribeCreate(StepsFilter): def test_filter_update_subscription_with_no_pubsub_topic(self, subscribe_main_nodes): try: - self.update_filter_subscription({"requestId": "1", "contentFilters": [self.test_content_topic]}) - raise AssertionError("Subscribe with no pubusub topics worked!!!") + self.update_filter_subscription({"requestId": "1", "contentFilters": [self.second_content_topic]}) + # raise AssertionError("Subscribe with no pubusub topics worked!!!") commented until https://github.com/waku-org/nwaku/issues/2315 is fixed except Exception as ex: assert "Bad Request" in str(ex) diff --git a/tests/filter/test_unsubscribe.py b/tests/filter/test_unsubscribe.py index 50e9f3b105..02375bf16b 100644 --- a/tests/filter/test_unsubscribe.py +++ b/tests/filter/test_unsubscribe.py @@ -3,7 +3,7 @@ from src.test_data import SAMPLE_INPUTS from src.steps.filter import StepsFilter -@pytest.mark.usefixtures("setup_relay_node", "setup_main_filter_node", "subscribe_main_nodes") +@pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node", "subscribe_main_nodes") class TestFilterUnSubscribe(StepsFilter): def test_filter_unsubscribe_from_single_content_topic(self): self.check_published_message_reaches_filter_peer() @@ -38,15 +38,28 @@ class TestFilterUnSubscribe(StepsFilter): self.check_publish_without_filter_subscription(self.create_message(contentTopic=self.second_content_topic), self.second_pubsub_topic) def test_filter_unsubscribe_from_non_existing_content_topic(self): - self.delete_filter_subscription({"requestId": "1", "contentFilters": [self.second_content_topic], "pubsubTopic": self.test_pubsub_topic}) + try: + self.delete_filter_subscription( + {"requestId": "1", "contentFilters": [self.second_content_topic], "pubsubTopic": self.test_pubsub_topic}, + status="can't unsubscribe" if self.node2.is_gowaku() else "", + ) + except Exception as ex: + assert "Not Found" in str(ex) and "peer has no subscriptions" in str(ex) self.check_published_message_reaches_filter_peer() def test_filter_unsubscribe_from_non_existing_pubsub_topic(self): try: - self.delete_filter_subscription({"requestId": "1", "contentFilters": [self.test_pubsub_topic], "pubsubTopic": self.second_pubsub_topic}) - raise AssertionError("Unsubscribe with non existing pubsub topic worked!!!") + self.delete_filter_subscription( + {"requestId": "1", "contentFilters": [self.test_pubsub_topic], "pubsubTopic": self.second_pubsub_topic}, status="can't unsubscribe" + ) + if self.node2.is_nwaku(): + raise AssertionError("Unsubscribe with non existing pubsub topic worked!!!") + elif self.node2.is_gowaku(): + pass + else: + raise NotImplementedError("Not implemented for this node type") except Exception as ex: - assert "Not Found" and "peer has no subscriptions" in str(ex) + assert "Not Found" in str(ex) and "peer has no subscriptions" in str(ex) self.check_published_message_reaches_filter_peer() def test_filter_unsubscribe_from_31_content_topics(self): @@ -56,7 +69,7 @@ class TestFilterUnSubscribe(StepsFilter): ) raise AssertionError("Unsubscribe from more than 30 content topics worked!!!") except Exception as ex: - assert "Not Found" and "exceeds maximum content topics: 30" in str(ex) + assert "exceeds maximum content topics: 30" in str(ex) def test_filter_unsubscribe_with_no_content_topic(self): try: @@ -99,8 +112,15 @@ class TestFilterUnSubscribe(StepsFilter): def test_filter_unsubscribe_with_no_request_id(self): try: - self.delete_filter_subscription({"contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic}) - raise AssertionError("Unsubscribe with no request id worked!!!") + self.delete_filter_subscription( + {"contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic}, status="can't unsubscribe" + ) + if self.node2.is_nwaku(): + raise AssertionError("Unsubscribe with no request id worked!!!") + elif self.node2.is_gowaku(): + pass + else: + raise NotImplementedError("Not implemented for this node type") except Exception as ex: assert "Bad Request" in str(ex) @@ -116,7 +136,12 @@ class TestFilterUnSubscribe(StepsFilter): self.delete_filter_subscription( {"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic, "extraField": "extraValue"} ) - raise AssertionError("Unsubscribe with extra field worked!!!") + if self.node2.is_nwaku(): + raise AssertionError("Unsubscribe with extra field worked!!!") + elif self.node2.is_gowaku(): + pass + else: + raise NotImplementedError("Not implemented for this node type") except Exception as ex: assert "Bad Request" in str(ex) diff --git a/tests/filter/test_unsubscribe_all.py b/tests/filter/test_unsubscribe_all.py index 0fd57d7392..f966f18b5f 100644 --- a/tests/filter/test_unsubscribe_all.py +++ b/tests/filter/test_unsubscribe_all.py @@ -4,7 +4,7 @@ from src.steps.filter import StepsFilter from random import choice -@pytest.mark.usefixtures("setup_relay_node", "setup_main_filter_node", "filter_warm_up") +@pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node") class TestFilterUnSubscribeAll(StepsFilter): def test_filter_unsubscribe_all_from_few_content_topics(self): content_topics = [input["value"] for input in SAMPLE_INPUTS[:5]] @@ -45,14 +45,12 @@ class TestFilterUnSubscribeAll(StepsFilter): self.delete_all_filter_subscriptions({"requestId": "1"}) raise AssertionError("Unsubscribe all on peer without subscriptions worked!!!") except Exception as ex: - assert "Not Found" and "peer has no subscriptions" in str(ex) - - def test_filter_unsubscribe_all_with_no_request_id(self, subscribe_main_nodes): - try: - self.delete_all_filter_subscriptions({}) - raise AssertionError("Unsubscribe all with no request id worked!!!") - except Exception as ex: - assert "Bad Request" in str(ex) + if self.node2.is_nwaku(): + assert "Not Found" in str(ex) and "peer has no subscriptions" in str(ex) + elif self.node2.is_gowaku(): + assert "subscription not found" in str(ex) + else: + raise NotImplementedError("Not implemented for this node type") def test_filter_unsubscribe_all_with_invalid_request_id(self, subscribe_main_nodes): try: @@ -61,7 +59,7 @@ class TestFilterUnSubscribeAll(StepsFilter): except Exception as ex: assert "Bad Request" in str(ex) - def test_filter_unsubscribe_all_with_extra_field(self): + def test_filter_unsubscribe_all_with_extra_field(self, subscribe_main_nodes): try: self.delete_all_filter_subscriptions({"requestId": 1, "extraField": "extraValue"}) raise AssertionError("Unsubscribe all with extra field worked!!!") diff --git a/tests/relay/test_multiple_nodes.py b/tests/relay/test_multiple_nodes.py index 621d6da352..e4f7bcbd75 100644 --- a/tests/relay/test_multiple_nodes.py +++ b/tests/relay/test_multiple_nodes.py @@ -3,7 +3,7 @@ from src.steps.relay import StepsRelay @pytest.mark.usefixtures("setup_main_relay_nodes", "setup_optional_relay_nodes", "subscribe_main_relay_nodes") -class TestMultipleNodes(StepsRelay): +class TestRelayMultipleNodes(StepsRelay): def test_first_node_to_start_publishes(self, subscribe_optional_relay_nodes, relay_warm_up): self.check_published_message_reaches_relay_peer() diff --git a/tests/relay/test_publish.py b/tests/relay/test_publish.py index 5e94f961e7..dbcd171023 100644 --- a/tests/relay/test_publish.py +++ b/tests/relay/test_publish.py @@ -44,7 +44,7 @@ class TestRelayPublish(StepsRelay): assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) def test_publish_with_payload_less_than_one_mb(self): - payload_length = 1024 * 1023 + payload_length = 1024 * 700 # after encoding to base64 this be close to 1MB logger.debug(f"Running test with payload length of {payload_length} bytes") message = self.create_message(payload=to_base64("a" * (payload_length))) self.check_published_message_reaches_relay_peer(message, message_propagation_delay=2) @@ -57,7 +57,7 @@ class TestRelayPublish(StepsRelay): self.check_published_message_reaches_relay_peer(message, message_propagation_delay=2) raise AssertionError("Message with payload > 1MB was received") except Exception as ex: - assert "couldn't find any messages" in str(ex) + assert "couldn't find any messages" in str(ex) or "Bad Request" in str(ex) or "Internal Server Error" in str(ex) def test_publish_with_valid_content_topics(self): failed_content_topics = [] @@ -187,7 +187,16 @@ class TestRelayPublish(StepsRelay): self.check_published_message_reaches_relay_peer(self.create_message(rateLimitProof=rate_limit_proof)) def test_publish_with_extra_field(self): - self.check_published_message_reaches_relay_peer(self.create_message(extraField="extraValue")) + try: + self.check_published_message_reaches_relay_peer(self.create_message(extraField="extraValue")) + if self.node1.is_nwaku(): + raise AssertionError("Relay publish with extra field worked!!!") + elif self.node1.is_gowaku(): + pass + else: + raise NotImplementedError("Not implemented for this node type") + except Exception as ex: + assert "Bad Request" in str(ex) def test_publish_and_retrieve_duplicate_message(self): message = self.create_message() @@ -221,6 +230,7 @@ class TestRelayPublish(StepsRelay): self.check_published_message_reaches_relay_peer() self.node1.restart() self.node1.ensure_ready() + delay(2) self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) self.wait_for_published_message_to_reach_relay_peer() diff --git a/tests/relay/test_subscribe.py b/tests/relay/test_subscribe.py index 5d92d345de..0b25a7c990 100644 --- a/tests/relay/test_subscribe.py +++ b/tests/relay/test_subscribe.py @@ -76,7 +76,7 @@ class TestRelaySubscribe(StepsRelay): elif self.node1.is_gowaku(): raise AssertionError("Unsubscribe from non-subscribed pubsub_topic worked!!!") else: - raise Exception("Not implemented") + raise NotImplementedError("Not implemented for this node type") except Exception as ex: assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) self.check_published_message_reaches_relay_peer()