diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index fb6a2667..3128bc4c 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -12,6 +12,13 @@ If this is a bug fix, please describe why the old behavior was problematic. +## Diffs + + + ## Notes diff --git a/src/data_classes.py b/src/data_classes.py deleted file mode 100644 index 6527250e..00000000 --- a/src/data_classes.py +++ /dev/null @@ -1,18 +0,0 @@ -from dataclasses import dataclass, field -from marshmallow_dataclass import class_schema -from typing import Optional, Union - - -@dataclass -class MessageRpcResponse: - payload: str - contentTopic: str - version: Optional[int] - timestamp: Optional[int] - ephemeral: Optional[bool] - meta: Optional[str] - rateLimitProof: Optional[Union[dict, str]] = field(default_factory=dict) - rate_limit_proof: Optional[dict] = field(default_factory=dict) - - -message_rpc_response_schema = class_schema(MessageRpcResponse)() diff --git a/src/node/api_clients/base_client.py b/src/node/api_clients/base_client.py index 7709f034..d40cf025 100644 --- a/src/node/api_clients/base_client.py +++ b/src/node/api_clients/base_client.py @@ -28,17 +28,25 @@ class BaseClient(ABC): pass @abstractmethod - def set_subscriptions(self, pubsub_topics): + def set_relay_subscriptions(self, pubsub_topics): pass @abstractmethod - def delete_subscriptions(self, pubsub_topics): + def delete_relay_subscriptions(self, pubsub_topics): pass @abstractmethod - def send_message(self, message, pubsub_topic): + def send_relay_message(self, message, pubsub_topic): pass @abstractmethod - def get_messages(self, pubsub_topic): + def get_relay_messages(self, pubsub_topic): + pass + + @abstractmethod + def set_filter_subscriptions(self, subscription): + pass + + @abstractmethod + def get_filter_messages(self, content_topic): pass diff --git a/src/node/api_clients/rest.py b/src/node/api_clients/rest.py index a35f2be4..1434f221 100644 --- a/src/node/api_clients/rest.py +++ b/src/node/api_clients/rest.py @@ -1,6 +1,5 @@ from src.libs.custom_logger import get_custom_logger import json -from dataclasses import asdict from urllib.parse import quote from src.node.api_clients.base_client import BaseClient @@ -20,15 +19,27 @@ class REST(BaseClient): info_response = self.rest_call("get", "debug/v1/info") return info_response.json() - def set_subscriptions(self, pubsub_topics): + def set_relay_subscriptions(self, pubsub_topics): return self.rest_call("post", "relay/v1/subscriptions", json.dumps(pubsub_topics)) - def delete_subscriptions(self, pubsub_topics): + def delete_relay_subscriptions(self, pubsub_topics): return self.rest_call("delete", "relay/v1/subscriptions", json.dumps(pubsub_topics)) - def send_message(self, message, pubsub_topic): + def send_relay_message(self, message, pubsub_topic): return self.rest_call("post", f"relay/v1/messages/{quote(pubsub_topic, safe='')}", json.dumps(message)) - def get_messages(self, pubsub_topic): + def get_relay_messages(self, pubsub_topic): get_messages_response = self.rest_call("get", f"relay/v1/messages/{quote(pubsub_topic, safe='')}") return get_messages_response.json() + + def set_filter_subscriptions(self, subscription): + set_subscriptions_response = self.rest_call("post", "filter/v2/subscriptions", json.dumps(subscription)) + return set_subscriptions_response.json() + + def get_filter_messages(self, content_topic): + get_messages_response = self.rest_call("get", f"filter/v2/messages/{quote(content_topic, safe='')}") + return get_messages_response.json() + + def update_filter_subscriptions(self, subscription): + update_subscriptions_response = self.rest_call("put", "filter/v2/subscriptions", json.dumps(subscription)) + return update_subscriptions_response.json() diff --git a/src/node/api_clients/rpc.py b/src/node/api_clients/rpc.py index efdf3742..e665c11e 100644 --- a/src/node/api_clients/rpc.py +++ b/src/node/api_clients/rpc.py @@ -21,21 +21,35 @@ class RPC(BaseClient): info_response = self.rpc_call("get_waku_v2_debug_v1_info", []) return info_response.json()["result"] - def set_subscriptions(self, pubsub_topics): + def set_relay_subscriptions(self, pubsub_topics): if "nwaku" in self._image_name: return self.rpc_call("post_waku_v2_relay_v1_subscriptions", [pubsub_topics]) else: return self.rpc_call("post_waku_v2_relay_v1_subscription", [pubsub_topics]) - def delete_subscriptions(self, pubsub_topics): + def delete_relay_subscriptions(self, pubsub_topics): if "nwaku" in self._image_name: return self.rpc_call("delete_waku_v2_relay_v1_subscriptions", [pubsub_topics]) else: return self.rpc_call("delete_waku_v2_relay_v1_subscription", [pubsub_topics]) - def send_message(self, message, pubsub_topic): + def send_relay_message(self, message, pubsub_topic): return self.rpc_call("post_waku_v2_relay_v1_message", [pubsub_topic, message]) - def get_messages(self, pubsub_topic): + def get_relay_messages(self, pubsub_topic): get_messages_response = self.rpc_call("get_waku_v2_relay_v1_messages", [pubsub_topic]) return get_messages_response.json()["result"] + + def set_filter_subscriptions(self, subscription): + set_subscriptions_response = self.rpc_call( + "post_waku_v2_filter_v1_subscription", + [ + subscription.get("contentFilters", []), + subscription.get("pubsubTopic", None), + ], + ) + return set_subscriptions_response.json()["result"] + + def get_filter_messages(self, content_topic): + get_messages_response = self.rpc_call("get_waku_v2_filter_v1_messages", [content_topic]) + return get_messages_response.json()["result"] diff --git a/src/node/waku_message.py b/src/node/waku_message.py new file mode 100644 index 00000000..c3b6f15f --- /dev/null +++ b/src/node/waku_message.py @@ -0,0 +1,48 @@ +from dataclasses import dataclass, field +from marshmallow_dataclass import class_schema +from typing import Optional, Union +import math +import allure + + +@dataclass +class MessageRpcResponse: + payload: str + contentTopic: str + version: Optional[int] + timestamp: Optional[int] + ephemeral: Optional[bool] + meta: Optional[str] + rateLimitProof: Optional[Union[dict, str]] = field(default_factory=dict) + rate_limit_proof: Optional[dict] = field(default_factory=dict) + + +message_rpc_response_schema = class_schema(MessageRpcResponse)() + + +class WakuMessage: + def __init__(self, message_response): + self.received_messages = message_response + + @allure.step + def assert_received_message(self, sent_message, index=0): + message = message_rpc_response_schema.load(self.received_messages[index]) + + def assert_fail_message(field_name): + return f"Incorrect field: {field_name}. Published: {sent_message[field_name]} Received: {getattr(message, field_name)}" + + assert message.payload == sent_message["payload"], assert_fail_message("payload") + assert message.contentTopic == sent_message["contentTopic"], assert_fail_message("contentTopic") + if sent_message.get("timestamp") is not None: + if isinstance(sent_message["timestamp"], float): + assert math.isclose(float(message.timestamp), sent_message["timestamp"], rel_tol=1e-9), assert_fail_message("timestamp") + else: + assert str(message.timestamp) == str(sent_message["timestamp"]), assert_fail_message("timestamp") + if "version" in sent_message: + assert str(message.version) == str(sent_message["version"]), assert_fail_message("version") + if "meta" in sent_message: + assert str(message.meta) == str(sent_message["meta"]), assert_fail_message("meta") + if "ephemeral" in sent_message: + assert str(message.ephemeral) == str(sent_message["ephemeral"]), assert_fail_message("ephemeral") + if "rateLimitProof" in sent_message: + assert str(message.rateLimitProof) == str(sent_message["rateLimitProof"]), assert_fail_message("rateLimitProof") diff --git a/src/node/waku_node.py b/src/node/waku_node.py index e9a56987..a1a27d33 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -1,4 +1,6 @@ import os + +import pytest from src.libs.common import delay from src.libs.custom_logger import get_custom_logger from tenacity import retry, stop_after_delay, wait_fixed @@ -27,7 +29,8 @@ class WakuNode: self._ports = self._docker_manager.generate_ports() self._rest_port = self._ports[0] self._rpc_port = self._ports[1] - self._websocket_port = self._ports[2] + self._websocket_port = self._ports[3] + self._tcp_port = self._ports[2] if PROTOCOL == "RPC": self._api = RPC(self._rpc_port, self._image_name) @@ -53,6 +56,8 @@ class WakuNode: "rpc-address": "0.0.0.0", "rest-address": "0.0.0.0", "nat": f"extip:{self._ext_ip}", + "peer-exchange": "true", + "discv5-discovery": "true", } if "go-waku" in self._docker_manager.image: @@ -69,7 +74,7 @@ class WakuNode: self._container = self._docker_manager.start_container(self._docker_manager.image, self._ports, default_args, self._log_path, self._ext_ip) logger.debug( - f"Started container from image {self._image_name}. RPC: {self._rpc_port} REST: {self._rest_port} WebSocket: {self._websocket_port}" + f"Started container from image {self._image_name}. RPC: {self._rpc_port} REST: {self._rest_port} WebSocket: {self._websocket_port} TCP: {self._tcp_port}" ) DS.waku_nodes.append(self) delay(1) # if we fire requests to soon after starting the node will sometimes fail to start correctly @@ -103,23 +108,51 @@ class WakuNode: @retry(stop=stop_after_delay(10), wait=wait_fixed(0.1), reraise=True) def ensure_ready(self): - self.info() + self.info_response = self.info() logger.info(f"{PROTOCOL} service is ready !!") + def get_enr_uri(self): + try: + return self.info_response["enrUri"] + except Exception as ex: + raise AttributeError(f"Could not find enrUri in the info call because of error: {str(ex)}") + + def get_multiaddr_with_id(self): + addresses = self.info_response.get("listenAddresses", []) + ws_address = next((addr for addr in addresses if "/ws" not in addr), None) + if ws_address: + identifier = ws_address.split("/p2p/")[-1] + new_address = f"/ip4/{self._ext_ip}/tcp/{self._tcp_port}/p2p/{identifier}" + return new_address + else: + raise AttributeError("No '/ws' address found") + def info(self): return self._api.info() - def set_subscriptions(self, pubsub_topics): - return self._api.set_subscriptions(pubsub_topics) + def set_relay_subscriptions(self, pubsub_topics): + return self._api.set_relay_subscriptions(pubsub_topics) - def delete_subscriptions(self, pubsub_topics): - return self._api.delete_subscriptions(pubsub_topics) + def delete_relay_subscriptions(self, pubsub_topics): + return self._api.delete_relay_subscriptions(pubsub_topics) - def send_message(self, message, pubsub_topic): - return self._api.send_message(message, pubsub_topic) + def send_relay_message(self, message, pubsub_topic): + return self._api.send_relay_message(message, pubsub_topic) - def get_messages(self, pubsub_topic): - return self._api.get_messages(pubsub_topic) + def get_relay_messages(self, pubsub_topic): + return self._api.get_relay_messages(pubsub_topic) + + def set_filter_subscriptions(self, subscription): + return self._api.set_filter_subscriptions(subscription) + + def update_filter_subscriptions(self, subscription): + if PROTOCOL == "RPC": + pytest.skip("This method doesn't exist for RPC protocol") + else: + return self._api.update_filter_subscriptions(subscription) + + def get_filter_messages(self, content_topic): + return self._api.get_filter_messages(content_topic) @property def image(self): diff --git a/src/steps/filter.py b/src/steps/filter.py new file mode 100644 index 00000000..398b66f5 --- /dev/null +++ b/src/steps/filter.py @@ -0,0 +1,124 @@ +import inspect +from uuid import uuid4 +from src.libs.custom_logger import get_custom_logger +from time import time +import pytest +import allure +from src.libs.common import to_base64, delay +from src.node.waku_message import WakuMessage +from src.env_vars import NODE_1, NODE_2, ADDITIONAL_NODES, NODEKEY +from src.node.waku_node import WakuNode +from tenacity import retry, stop_after_delay, wait_fixed +from src.test_data import VALID_PUBSUB_TOPICS + +logger = get_custom_logger(__name__) + + +class StepsFilter: + test_pubsub_topic = VALID_PUBSUB_TOPICS[1] + test_content_topic = "/test/1/waku-filter/proto" + second_conted_topic = "/test/2/waku-filter/proto" + test_payload = "Filter works!!" + + @pytest.fixture(scope="function") + def setup_relay_node(self, request): + logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") + self.node1 = WakuNode(NODE_1, f"node1_{request.cls.test_id}") + start_args = {"relay": "true", "filter": "true", "nodekey": NODEKEY} + if self.node1.is_gowaku(): + start_args["min_relay_peers_to_publish"] = "0" + self.node1.start(**start_args) + self.enr_uri = self.node1.get_enr_uri() + self.multiaddr_with_id = self.node1.get_multiaddr_with_id() + + @pytest.fixture(scope="function") + def setup_main_filter_node(self, request): + logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") + self.node2 = WakuNode(NODE_2, f"node2_{request.cls.test_id}") + self.node2.start(filter="true", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id) + self.main_nodes = [self.node2] + self.optional_nodes = [] + + @pytest.fixture(scope="function") + def subscribe_main_nodes(self): + logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") + self.wait_for_subscriptions_on_main_nodes([self.test_content_topic]) + + @pytest.fixture(scope="function") + def setup_optional_filter_nodes(self, request): + logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") + if ADDITIONAL_NODES: + nodes = [node.strip() for node in ADDITIONAL_NODES.split(",")] + else: + pytest.skip("ADDITIONAL_NODES is empty, cannot run test") + for index, node in enumerate(nodes): + node = WakuNode(node, f"node{index}_{request.cls.test_id}") + node.start(filter="true", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id) + self.optional_nodes.append(node) + + @allure.step + def check_published_message_reaches_filter_peer( + self, message=None, pubsub_topic=None, message_propagation_delay=0.1, sender=None, peer_list=None + ): + if message is None: + message = self.create_message() + if pubsub_topic is None: + pubsub_topic = self.test_pubsub_topic + if not sender: + sender = self.node1 + if not peer_list: + peer_list = self.main_nodes + self.optional_nodes + + sender.send_relay_message(message, pubsub_topic) + delay(message_propagation_delay) + for index, peer in enumerate(peer_list): + logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the published message") + get_messages_response = self.get_filter_messages(message["contentTopic"], node=peer) + assert get_messages_response, f"Peer NODE_{index}:{peer.image} couldn't find any messages" + assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}" + waku_message = WakuMessage(get_messages_response) + waku_message.assert_received_message(message) + + @retry(stop=stop_after_delay(10), wait=wait_fixed(1), reraise=True) + @allure.step + def wait_for_subscriptions_on_main_nodes(self, content_topic_list, pubsub_topic=None): + if pubsub_topic is None: + pubsub_topic = self.test_pubsub_topic + self.node1.set_relay_subscriptions([pubsub_topic]) + request_id = str(uuid4()) + filter_sub_response = self.create_filter_subscription( + {"requestId": request_id, "contentFilters": content_topic_list, "pubsubTopic": pubsub_topic} + ) + assert filter_sub_response["requestId"] == request_id + assert filter_sub_response["statusCode"] == 0 + assert filter_sub_response["statusDesc"] == "" + + @allure.step + def create_filter_subscription(self, subscription, node=None): + if node is None: + node = self.node2 + return node.set_filter_subscriptions(subscription) + + @allure.step + def update_filter_subscription(self, subscription, node=None): + if node is None: + node = self.node2 + return node.update_filter_subscriptions(subscription) + + @allure.step + def add_new_relay_subscription(self, pubsub_topics, node=None): + if node is None: + node = self.node1 + self.node1.set_relay_subscriptions(pubsub_topics) + + @allure.step + def get_filter_messages(self, content_topic, node=None): + if node is None: + node = self.node2 + return node.get_filter_messages(content_topic) + + @allure.step + def create_message(self, **kwargs): + message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + message.update(kwargs) + return message diff --git a/src/steps/relay.py b/src/steps/relay.py index f8fa0b99..78336c57 100644 --- a/src/steps/relay.py +++ b/src/steps/relay.py @@ -1,20 +1,20 @@ import inspect from src.libs.custom_logger import get_custom_logger -import math from time import time import pytest import allure from src.libs.common import to_base64, delay -from src.data_classes import message_rpc_response_schema +from src.node.waku_message import WakuMessage from src.env_vars import NODE_1, NODE_2, ADDITIONAL_NODES, NODEKEY, RUNNING_IN_CI from src.node.waku_node import WakuNode from tenacity import retry, stop_after_delay, wait_fixed +from src.test_data import VALID_PUBSUB_TOPICS logger = get_custom_logger(__name__) class StepsRelay: - test_pubsub_topic = "/waku/2/rs/18/1" + test_pubsub_topic = VALID_PUBSUB_TOPICS[1] test_content_topic = "/test/1/waku-relay/proto" test_payload = "Relay works!!" @@ -22,13 +22,10 @@ class StepsRelay: def setup_main_relay_nodes(self, request): logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") self.node1 = WakuNode(NODE_1, f"node1_{request.cls.test_id}") - self.node1.start(relay="true", discv5_discovery="true", peer_exchange="true", nodekey=NODEKEY) - try: - self.enr_uri = self.node1.info()["enrUri"] - except Exception as ex: - raise AttributeError(f"Could not find enrUri in the info call because of error: {str(ex)}") + self.node1.start(relay="true", nodekey=NODEKEY) + self.enr_uri = self.node1.get_enr_uri() self.node2 = WakuNode(NODE_2, f"node2_{request.cls.test_id}") - self.node2.start(relay="true", discv5_discovery="true", discv5_bootstrap_node=self.enr_uri, peer_exchange="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.enr_uri) self.main_nodes = [self.node1, self.node2] self.optional_nodes = [] @@ -41,31 +38,31 @@ class StepsRelay: pytest.skip("ADDITIONAL_NODES is empty, cannot run test") for index, node in enumerate(nodes): node = WakuNode(node, f"node{index}_{request.cls.test_id}") - node.start(relay="true", discv5_discovery="true", discv5_bootstrap_node=self.enr_uri, peer_exchange="true") + node.start(relay="true", discv5_bootstrap_node=self.enr_uri) self.optional_nodes.append(node) @pytest.fixture(scope="function") def subscribe_main_relay_nodes(self): logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") - self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) @pytest.fixture(scope="function") def subscribe_optional_relay_nodes(self): logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") - self.ensure_subscriptions_on_nodes(self.optional_nodes, [self.test_pubsub_topic]) + self.ensure_relay_subscriptions_on_nodes(self.optional_nodes, [self.test_pubsub_topic]) @pytest.fixture(scope="function") def relay_warm_up(self): try: - self.wait_for_published_message_to_reach_peer() + self.wait_for_published_message_to_reach_relay_peer() logger.info("WARM UP successful!!") except Exception as ex: raise TimeoutError(f"WARM UP FAILED WITH: {ex}") - # this method should be used only for the tests that use the warm_up fixture - # otherwise use wait_for_published_message_to_reach_peer + # this method should be used only for the tests that use the relay_warm_up fixture + # otherwise use wait_for_published_message_to_reach_relay_peer @allure.step - def check_published_message_reaches_peer(self, message=None, pubsub_topic=None, message_propagation_delay=0.1, sender=None, peer_list=None): + def check_published_message_reaches_relay_peer(self, message=None, pubsub_topic=None, message_propagation_delay=0.1, sender=None, peer_list=None): if message is None: message = self.create_message() if pubsub_topic is None: @@ -75,66 +72,47 @@ class StepsRelay: if not peer_list: peer_list = self.main_nodes + self.optional_nodes - sender.send_message(message, pubsub_topic) + sender.send_relay_message(message, pubsub_topic) delay(message_propagation_delay) for index, peer in enumerate(peer_list): logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the published message") - get_messages_response = peer.get_messages(pubsub_topic) + get_messages_response = peer.get_relay_messages(pubsub_topic) assert get_messages_response, f"Peer NODE_{index}:{peer.image} couldn't find any messages" - received_message = message_rpc_response_schema.load(get_messages_response[0]) - self.assert_received_message(message, received_message) + assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}" + waku_message = WakuMessage(get_messages_response) + waku_message.assert_received_message(message) @allure.step - def check_publish_without_subscription(self, pubsub_topic): + def check_publish_without_relay_subscription(self, pubsub_topic): try: - self.node1.send_message(self.create_message(), pubsub_topic) + self.node1.send_relay_message(self.create_message(), pubsub_topic) raise AssertionError("Publish with no subscription worked!!!") except Exception as ex: assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) # we need much bigger timeout in CI because we run tests in parallel there and the machine itself is slower @allure.step - def wait_for_published_message_to_reach_peer( + def wait_for_published_message_to_reach_relay_peer( self, timeout_duration=120 if RUNNING_IN_CI else 20, time_between_retries=1, pubsub_topic=None, sender=None, peer_list=None ): @retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(time_between_retries), reraise=True) def check_peer_connection(): message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} - self.check_published_message_reaches_peer(message, pubsub_topic=pubsub_topic, sender=sender, peer_list=peer_list) + self.check_published_message_reaches_relay_peer(message, pubsub_topic=pubsub_topic, sender=sender, peer_list=peer_list) check_peer_connection() @allure.step - def assert_received_message(self, sent_message, received_message): - def assert_fail_message(field_name): - return f"Incorrect field: {field_name}. Published: {sent_message[field_name]} Received: {getattr(received_message, field_name)}" - - assert received_message.payload == sent_message["payload"], assert_fail_message("payload") - assert received_message.contentTopic == sent_message["contentTopic"], assert_fail_message("contentTopic") - if sent_message.get("timestamp") is not None: - if isinstance(sent_message["timestamp"], float): - assert math.isclose(float(received_message.timestamp), sent_message["timestamp"], rel_tol=1e-9), assert_fail_message("timestamp") - else: - assert str(received_message.timestamp) == str(sent_message["timestamp"]), assert_fail_message("timestamp") - if "version" in sent_message: - assert str(received_message.version) == str(sent_message["version"]), assert_fail_message("version") - if "meta" in sent_message: - assert str(received_message.meta) == str(sent_message["meta"]), assert_fail_message("meta") - if "ephemeral" in sent_message: - assert str(received_message.ephemeral) == str(sent_message["ephemeral"]), assert_fail_message("ephemeral") - if "rateLimitProof" in sent_message: - assert str(received_message.rateLimitProof) == str(sent_message["rateLimitProof"]), assert_fail_message("rateLimitProof") + def ensure_relay_subscriptions_on_nodes(self, node_list, pubsub_topic_list): + for node in node_list: + node.set_relay_subscriptions(pubsub_topic_list) @allure.step - def ensure_subscriptions_on_nodes(self, node_list, pubsub_topic_list): + def delete_relay_subscriptions_on_nodes(self, node_list, pubsub_topic_list): for node in node_list: - node.set_subscriptions(pubsub_topic_list) + node.delete_relay_subscriptions(pubsub_topic_list) @allure.step - def delete_subscriptions_on_nodes(self, node_list, pubsub_topic_list): - for node in node_list: - node.delete_subscriptions(pubsub_topic_list) - def create_message(self, **kwargs): message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} message.update(kwargs) diff --git a/tests/filter/__init__.py b/tests/filter/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/filter/test_subscribe_create.py b/tests/filter/test_subscribe_create.py new file mode 100644 index 00000000..87f57d0c --- /dev/null +++ b/tests/filter/test_subscribe_create.py @@ -0,0 +1,120 @@ +import pytest +from src.libs.custom_logger import get_custom_logger +from src.test_data import INVALID_CONTENT_TOPICS, SAMPLE_INPUTS, VALID_PUBSUB_TOPICS +from src.steps.filter import StepsFilter + +logger = get_custom_logger(__name__) + + +@pytest.mark.usefixtures("setup_relay_node", "setup_main_filter_node") +class TestFilterSubscribeUpdate(StepsFilter): + def test_filter_subscribe_to_single_topics(self): + self.wait_for_subscriptions_on_main_nodes([self.test_content_topic]) + self.check_published_message_reaches_filter_peer() + + def test_filter_subscribe_to_multiple_pubsub_topic(self): + failed_pubsub_topics = [] + for pubsub_topic in VALID_PUBSUB_TOPICS: + content_topic = pubsub_topic + logger.debug(f"Running test with pubsub topic: {pubsub_topic}") + try: + self.wait_for_subscriptions_on_main_nodes([content_topic], pubsub_topic) + message = self.create_message(contentTopic=content_topic) + self.check_published_message_reaches_filter_peer(message, pubsub_topic=pubsub_topic) + except Exception as ex: + logger.error(f"PubsubTopic {pubsub_topic} failed: {str(ex)}") + failed_pubsub_topics.append(pubsub_topic) + assert not failed_pubsub_topics, f"PubsubTopics failed: {failed_pubsub_topics}" + + def test_filter_subscribe_to_30_content_topics_in_one_call(self): + failed_content_topics = [] + self.wait_for_subscriptions_on_main_nodes([input["value"] for input in SAMPLE_INPUTS[:30]]) + for content_topic in SAMPLE_INPUTS[:30]: + logger.debug(f'Running test with content topic {content_topic["description"]}') + message = self.create_message(contentTopic=content_topic["value"]) + try: + self.check_published_message_reaches_filter_peer(message) + except Exception as ex: + logger.error(f'ContentTopic {content_topic["description"]} failed: {str(ex)}') + failed_content_topics.append(content_topic) + assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}" + + def test_filter_subscribe_to_30_content_topics_in_separate_calls(self, subscribe_main_nodes): + for content_topic in SAMPLE_INPUTS[:30]: + self.create_filter_subscription({"requestId": "1", "contentFilters": [content_topic["value"]], "pubsubTopic": self.test_pubsub_topic}) + failed_content_topics = [] + for content_topic in SAMPLE_INPUTS[:30]: + logger.debug(f'Running test with content topic {content_topic["description"]}') + message = self.create_message(contentTopic=content_topic["value"]) + try: + self.check_published_message_reaches_filter_peer(message) + except Exception as ex: + logger.error(f'ContentTopic {content_topic["description"]} failed: {str(ex)}') + failed_content_topics.append(content_topic) + assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}" + + def test_filter_subscribe_to_31_content_topics(self, subscribe_main_nodes): + try: + _31_content_topics = [input["value"] for input in SAMPLE_INPUTS[:31]] + self.create_filter_subscription({"requestId": "1", "contentFilters": _31_content_topics, "pubsubTopic": self.test_pubsub_topic}) + raise AssertionError("Subscribe with more than 30 content topics worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_filter_subscribe_refresh(self): + for _ in range(2): + self.wait_for_subscriptions_on_main_nodes([self.test_content_topic]) + self.check_published_message_reaches_filter_peer() + + def test_filter_subscribe_with_multiple_overlapping_content_topics(self): + self.wait_for_subscriptions_on_main_nodes([input["value"] for input in SAMPLE_INPUTS[:3]]) + self.wait_for_subscriptions_on_main_nodes([input["value"] for input in SAMPLE_INPUTS[1:4]]) + for content_topic in SAMPLE_INPUTS[:4]: + message = self.create_message(contentTopic=content_topic["value"]) + self.check_published_message_reaches_filter_peer(message) + + def test_filter_subscribe_with_no_pubsub_topic(self, subscribe_main_nodes): + try: + self.create_filter_subscription({"requestId": "1", "contentFilters": [self.test_content_topic]}) + raise AssertionError(f"Subscribe with no pubusub topics worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_filter_subscribe_with_invalid_pubsub_topic_format(self, subscribe_main_nodes): + try: + self.create_filter_subscription({"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": [self.test_pubsub_topic]}) + raise AssertionError(f"Subscribe with invalid pubusub topics worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_filter_subscribe_with_no_content_topic(self, subscribe_main_nodes): + try: + self.create_filter_subscription({"requestId": "1", "pubsubTopic": self.test_pubsub_topic}) + raise AssertionError(f"Subscribe with no content topics worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_filter_subscribe_with_invalid_content_topic_format(self, subscribe_main_nodes): + success_content_topics = [] + for content_topic in INVALID_CONTENT_TOPICS: + logger.debug(f'Running test with contetn topic {content_topic["description"]}') + try: + self.create_filter_subscription({"requestId": "1", "contentFilters": [content_topic], "pubsubTopic": self.test_pubsub_topic}) + success_content_topics.append(content_topic) + except Exception as ex: + assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) + assert not success_content_topics, f"Invalid Content topics that didn't failed: {success_content_topics}" + + def test_filter_subscribe_with_no_request_id(self, subscribe_main_nodes): + try: + self.create_filter_subscription({"contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic}) + raise AssertionError(f"Subscribe with no request id worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_filter_subscribe_with_invalid_request_id(self, subscribe_main_nodes): + try: + self.create_filter_subscription({"requestId": 1, "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic}) + raise AssertionError(f"Subscribe with invalid request id worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) diff --git a/tests/filter/test_subscribe_update.py b/tests/filter/test_subscribe_update.py new file mode 100644 index 00000000..54b20411 --- /dev/null +++ b/tests/filter/test_subscribe_update.py @@ -0,0 +1,107 @@ +import pytest +from src.libs.custom_logger import get_custom_logger +from src.test_data import INVALID_CONTENT_TOPICS, SAMPLE_INPUTS, VALID_PUBSUB_TOPICS +from src.steps.filter import StepsFilter + +logger = get_custom_logger(__name__) + + +@pytest.mark.usefixtures("setup_relay_node", "setup_main_filter_node") +class TestFilterSubscribeCreate(StepsFilter): + def test_filter_update_subscription_add_a_new_content_topic(self): + self.wait_for_subscriptions_on_main_nodes([self.test_content_topic], pubsub_topic=self.test_pubsub_topic) + self.update_filter_subscription({"requestId": "1", "contentFilters": [self.second_conted_topic], "pubsubTopic": self.test_pubsub_topic}) + self.check_published_message_reaches_filter_peer(self.create_message(contentTopic=self.test_content_topic)) + self.check_published_message_reaches_filter_peer(self.create_message(contentTopic=self.second_conted_topic)) + + def test_filter_update_subscription_add_30_new_content_topics(self): + self.wait_for_subscriptions_on_main_nodes([self.test_content_topic], pubsub_topic=self.test_pubsub_topic) + self.update_filter_subscription( + {"requestId": "1", "contentFilters": [input["value"] for input in SAMPLE_INPUTS[:30]], "pubsubTopic": self.test_pubsub_topic} + ) + self.check_published_message_reaches_filter_peer(self.create_message(contentTopic=self.test_content_topic)) + failed_content_topics = [] + for content_topic in SAMPLE_INPUTS[:30]: + logger.debug(f'Running test with content topic {content_topic["description"]}') + message = self.create_message(contentTopic=content_topic["value"]) + try: + self.check_published_message_reaches_filter_peer(message) + except Exception as ex: + logger.error(f'ContentTopic {content_topic["description"]} failed: {str(ex)}') + failed_content_topics.append(content_topic) + assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}" + + def test_filter_update_subscription_add_31_new_content_topics(self, subscribe_main_nodes): + try: + _31_content_topics = [input["value"] for input in SAMPLE_INPUTS[:31]] + self.update_filter_subscription({"requestId": "1", "contentFilters": _31_content_topics, "pubsubTopic": self.test_pubsub_topic}) + raise AssertionError("Subscribe with more than 30 content topics worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_filter_update_subscription_refresh_existing(self): + self.wait_for_subscriptions_on_main_nodes([self.test_content_topic], pubsub_topic=self.test_pubsub_topic) + self.update_filter_subscription({"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic}) + self.check_published_message_reaches_filter_peer(self.create_message(contentTopic=self.test_content_topic)) + + def test_filter_update_subscription_add_a_new_pubsub_topic(self): + self.wait_for_subscriptions_on_main_nodes([self.test_content_topic], pubsub_topic=self.test_pubsub_topic) + self.update_filter_subscription( + {"requestId": "1", "contentFilters": [self.test_content_topic, self.second_conted_topic], "pubsubTopic": VALID_PUBSUB_TOPICS[4]} + ) + self.add_new_relay_subscription(VALID_PUBSUB_TOPICS[4:5]) + self.check_published_message_reaches_filter_peer( + self.create_message(contentTopic=self.test_content_topic), pubsub_topic=self.test_pubsub_topic + ) + self.check_published_message_reaches_filter_peer( + self.create_message(contentTopic=self.test_content_topic), pubsub_topic=VALID_PUBSUB_TOPICS[4] + ) + self.check_published_message_reaches_filter_peer( + self.create_message(contentTopic=self.second_conted_topic), pubsub_topic=VALID_PUBSUB_TOPICS[4] + ) + + def test_filter_update_subscription_with_no_pubsub_topic(self, subscribe_main_nodes): + try: + self.update_filter_subscription({"requestId": "1", "contentFilters": [self.test_content_topic]}) + raise AssertionError(f"Subscribe with no pubusub topics worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_filter_update_subscription_with_pubsub_topic_list_instead_of_string(self, subscribe_main_nodes): + try: + self.update_filter_subscription({"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": [self.test_pubsub_topic]}) + raise AssertionError(f"Subscribe with invalid pubusub topics worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_filter_update_subscription_with_no_content_topic(self, subscribe_main_nodes): + try: + self.update_filter_subscription({"requestId": "1", "pubsubTopic": self.test_pubsub_topic}) + raise AssertionError(f"Subscribe with no content topics worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_filter_update_subscription_with_invalid_content_topic_format(self, subscribe_main_nodes): + success_content_topics = [] + for content_topic in INVALID_CONTENT_TOPICS: + logger.debug(f'Running test with contetn topic {content_topic["description"]}') + try: + self.update_filter_subscription({"requestId": "1", "contentFilters": [content_topic], "pubsubTopic": self.test_pubsub_topic}) + success_content_topics.append(content_topic) + except Exception as ex: + assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) + assert not success_content_topics, f"Invalid Content topics that didn't failed: {success_content_topics}" + + def test_filter_update_subscription_with_no_request_id(self, subscribe_main_nodes): + try: + self.update_filter_subscription({"contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic}) + raise AssertionError(f"Subscribe with no request id worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_filter_update_subscription_with_invalid_request_id(self, subscribe_main_nodes): + try: + self.update_filter_subscription({"requestId": 1, "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic}) + raise AssertionError(f"Subscribe with invalid request id worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) diff --git a/tests/relay/test_multiple_nodes.py b/tests/relay/test_multiple_nodes.py index 2741b5e7..621d6da3 100644 --- a/tests/relay/test_multiple_nodes.py +++ b/tests/relay/test_multiple_nodes.py @@ -5,15 +5,15 @@ from src.steps.relay import StepsRelay @pytest.mark.usefixtures("setup_main_relay_nodes", "setup_optional_relay_nodes", "subscribe_main_relay_nodes") class TestMultipleNodes(StepsRelay): def test_first_node_to_start_publishes(self, subscribe_optional_relay_nodes, relay_warm_up): - self.check_published_message_reaches_peer() + self.check_published_message_reaches_relay_peer() def test_last_node_to_start_publishes(self, subscribe_optional_relay_nodes, relay_warm_up): - self.check_published_message_reaches_peer(sender=self.optional_nodes[-1]) + self.check_published_message_reaches_relay_peer(sender=self.optional_nodes[-1]) def test_optional_nodes_not_subscribed_to_same_pubsub_topic(self): - self.wait_for_published_message_to_reach_peer(peer_list=self.main_nodes) + self.wait_for_published_message_to_reach_relay_peer(peer_list=self.main_nodes) try: - self.check_published_message_reaches_peer(peer_list=self.optional_nodes) + self.check_published_message_reaches_relay_peer(peer_list=self.optional_nodes) raise AssertionError("Non subscribed nodes received the message!!") except Exception as ex: assert "Not Found" in str(ex), "Expected 404 Not Found when the message is not found" diff --git a/tests/relay/test_publish.py b/tests/relay/test_publish.py index 16a39006..0beb52f8 100644 --- a/tests/relay/test_publish.py +++ b/tests/relay/test_publish.py @@ -4,7 +4,7 @@ from time import time from src.libs.common import delay, to_base64 from src.steps.relay import StepsRelay from src.test_data import INVALID_CONTENT_TOPICS, INVALID_PAYLOADS, SAMPLE_INPUTS, SAMPLE_TIMESTAMPS, VALID_PUBSUB_TOPICS -from src.data_classes import message_rpc_response_schema +from src.node.waku_message import WakuMessage logger = get_custom_logger(__name__) @@ -17,7 +17,7 @@ class TestRelayPublish(StepsRelay): logger.debug(f'Running test with payload {payload["description"]}') message = self.create_message(payload=to_base64(payload["value"])) try: - self.check_published_message_reaches_peer(message) + self.check_published_message_reaches_relay_peer(message) except Exception as e: logger.error(f'Payload {payload["description"]} failed: {str(e)}') failed_payloads.append(payload["description"]) @@ -29,7 +29,7 @@ class TestRelayPublish(StepsRelay): logger.debug(f'Running test with payload {payload["description"]}') message = self.create_message(payload=payload["value"]) try: - self.node1.send_message(message, self.test_pubsub_topic) + self.node1.send_relay_message(message, self.test_pubsub_topic) success_payloads.append(payload) except Exception as ex: assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) @@ -38,7 +38,7 @@ class TestRelayPublish(StepsRelay): def test_publish_with_missing_payload(self): message = {"contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} try: - self.node1.send_message(message, self.test_pubsub_topic) + self.node1.send_relay_message(message, self.test_pubsub_topic) raise AssertionError("Publish with missing payload worked!!!") except Exception as ex: assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) @@ -47,14 +47,14 @@ class TestRelayPublish(StepsRelay): payload_length = 1024 * 1023 logger.debug(f"Running test with payload length of {payload_length} bytes") message = self.create_message(payload=to_base64("a" * (payload_length))) - self.check_published_message_reaches_peer(message, message_propagation_delay=2) + self.check_published_message_reaches_relay_peer(message, message_propagation_delay=2) def test_publish_with_payload_equal_or_more_than_one_mb(self): for payload_length in [1024 * 1024, 1024 * 1024 * 10]: logger.debug(f"Running test with payload length of {payload_length} bytes") message = self.create_message(payload=to_base64("a" * (payload_length))) try: - self.check_published_message_reaches_peer(message, message_propagation_delay=2) + self.check_published_message_reaches_relay_peer(message, message_propagation_delay=2) raise AssertionError("Duplicate message was retrieved twice") except Exception as ex: assert "couldn't find any messages" in str(ex) @@ -65,7 +65,7 @@ class TestRelayPublish(StepsRelay): logger.debug(f'Running test with content topic {content_topic["description"]}') message = self.create_message(contentTopic=content_topic["value"]) try: - self.check_published_message_reaches_peer(message) + self.check_published_message_reaches_relay_peer(message) except Exception as e: logger.error(f'ContentTopic {content_topic["description"]} failed: {str(e)}') failed_content_topics.append(content_topic) @@ -77,7 +77,7 @@ class TestRelayPublish(StepsRelay): logger.debug(f'Running test with contetn topic {content_topic["description"]}') message = self.create_message(contentTopic=content_topic["value"]) try: - self.node1.send_message(message, self.test_pubsub_topic) + self.node1.send_relay_message(message, self.test_pubsub_topic) success_content_topics.append(content_topic) except Exception as ex: assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) @@ -86,33 +86,33 @@ class TestRelayPublish(StepsRelay): def test_publish_with_missing_content_topic(self): message = {"payload": to_base64(self.test_payload), "timestamp": int(time() * 1e9)} try: - self.node1.send_message(message, self.test_pubsub_topic) + self.node1.send_relay_message(message, self.test_pubsub_topic) raise AssertionError("Publish with missing content_topic worked!!!") except Exception as ex: assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) def test_publish_on_multiple_pubsub_topics(self): - self.ensure_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS) + self.ensure_relay_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS) failed_pubsub_topics = [] for pubsub_topic in VALID_PUBSUB_TOPICS: logger.debug(f"Running test with pubsub topic {pubsub_topic}") try: - self.check_published_message_reaches_peer(pubsub_topic=pubsub_topic) + self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic) except Exception as e: logger.error(f"PubusubTopic {pubsub_topic} failed: {str(e)}") failed_pubsub_topics.append(pubsub_topic) assert not failed_pubsub_topics, f"PubusubTopic failed: {failed_pubsub_topics}" def test_message_published_on_different_pubsub_topic_is_not_retrieved(self): - self.ensure_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS) - self.node1.send_message(self.create_message(), VALID_PUBSUB_TOPICS[0]) + self.ensure_relay_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS) + self.node1.send_relay_message(self.create_message(), VALID_PUBSUB_TOPICS[0]) delay(0.1) - messages = self.node2.get_messages(VALID_PUBSUB_TOPICS[1]) + messages = self.node2.get_relay_messages(VALID_PUBSUB_TOPICS[1]) assert not messages, "Message was retrieved on wrong pubsub_topic" def test_publish_on_non_subscribed_pubsub_topic(self): try: - self.check_published_message_reaches_peer(pubsub_topic="/waku/2/rs/19/1") + self.check_published_message_reaches_relay_peer(pubsub_topic=VALID_PUBSUB_TOPICS[4]) raise AssertionError("Publish on unsubscribed pubsub_topic worked!!!") except Exception as ex: assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) @@ -124,7 +124,7 @@ class TestRelayPublish(StepsRelay): logger.debug(f'Running test with timestamp {timestamp["description"]}') message = self.create_message(timestamp=timestamp["value"]) try: - self.check_published_message_reaches_peer(message) + self.check_published_message_reaches_relay_peer(message) except Exception as ex: logger.error(f'Timestamp {timestamp["description"]} failed: {str(ex)}') failed_timestamps.append(timestamp) @@ -137,7 +137,7 @@ class TestRelayPublish(StepsRelay): logger.debug(f'Running test with timestamp {timestamp["description"]}') message = self.create_message(timestamp=timestamp["value"]) try: - self.check_published_message_reaches_peer(message) + self.check_published_message_reaches_relay_peer(message) success_timestamps.append(timestamp) except Exception as e: pass @@ -145,24 +145,24 @@ class TestRelayPublish(StepsRelay): def test_publish_with_no_timestamp(self): message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic} - self.check_published_message_reaches_peer(message) + self.check_published_message_reaches_relay_peer(message) def test_publish_with_valid_version(self): - self.check_published_message_reaches_peer(self.create_message(version=10)) + self.check_published_message_reaches_relay_peer(self.create_message(version=10)) def test_publish_with_invalid_version(self): try: - self.check_published_message_reaches_peer(self.create_message(version=2.1)) + self.check_published_message_reaches_relay_peer(self.create_message(version=2.1)) raise AssertionError("Publish with invalid version worked!!!") except Exception as ex: assert "Bad Request" in str(ex) def test_publish_with_valid_meta(self): - self.check_published_message_reaches_peer(self.create_message(meta=to_base64(self.test_payload))) + self.check_published_message_reaches_relay_peer(self.create_message(meta=to_base64(self.test_payload))) def test_publish_with_invalid_meta(self): try: - self.check_published_message_reaches_peer(self.create_message(meta=self.test_payload)) + self.check_published_message_reaches_relay_peer(self.create_message(meta=self.test_payload)) raise AssertionError("Publish with invalid meta worked!!!") except Exception as ex: assert "Bad Request" in str(ex) @@ -172,7 +172,7 @@ class TestRelayPublish(StepsRelay): for ephemeral in [True, False]: logger.debug(f"Running test with Ephemeral {ephemeral}") try: - self.check_published_message_reaches_peer(self.create_message(ephemeral=ephemeral)) + self.check_published_message_reaches_relay_peer(self.create_message(ephemeral=ephemeral)) except Exception as e: logger.error(f"Massage with Ephemeral {ephemeral} failed: {str(e)}") failed_ephemeral.append(ephemeral) @@ -184,16 +184,16 @@ class TestRelayPublish(StepsRelay): "epoch": to_base64("epochData"), "nullifier": to_base64("nullifierData"), } - self.check_published_message_reaches_peer(self.create_message(rateLimitProof=rate_limit_proof)) + self.check_published_message_reaches_relay_peer(self.create_message(rateLimitProof=rate_limit_proof)) def test_publish_with_extra_field(self): - self.check_published_message_reaches_peer(self.create_message(extraField="extraValue")) + self.check_published_message_reaches_relay_peer(self.create_message(extraField="extraValue")) def test_publish_and_retrieve_duplicate_message(self): message = self.create_message() - self.check_published_message_reaches_peer(message) + self.check_published_message_reaches_relay_peer(message) try: - self.check_published_message_reaches_peer(message) + self.check_published_message_reaches_relay_peer(message) raise AssertionError("Duplicate message was retrieved twice") except Exception as ex: assert "couldn't find any messages" in str(ex) @@ -201,43 +201,43 @@ class TestRelayPublish(StepsRelay): def test_publish_while_peer_is_paused(self): message = self.create_message() self.node2.pause() - self.node1.send_message(message, self.test_pubsub_topic) + self.node1.send_relay_message(message, self.test_pubsub_topic) self.node2.unpause() - get_messages_response = self.node2.get_messages(self.test_pubsub_topic) + get_messages_response = self.node2.get_relay_messages(self.test_pubsub_topic) assert get_messages_response, "Peer node couldn't find any messages" - received_message = message_rpc_response_schema.load(get_messages_response[0]) - self.assert_received_message(message, received_message) + waku_message = WakuMessage(get_messages_response) + waku_message.assert_received_message(message) def test_publish_after_node_pauses_and_pauses(self): - self.check_published_message_reaches_peer() + self.check_published_message_reaches_relay_peer() self.node1.pause() self.node1.unpause() - self.check_published_message_reaches_peer(self.create_message(payload=to_base64("M1"))) + self.check_published_message_reaches_relay_peer(self.create_message(payload=to_base64("M1"))) self.node2.pause() self.node2.unpause() - self.check_published_message_reaches_peer(self.create_message(payload=to_base64("M2"))) + self.check_published_message_reaches_relay_peer(self.create_message(payload=to_base64("M2"))) def test_publish_after_node1_restarts(self): - self.check_published_message_reaches_peer() + self.check_published_message_reaches_relay_peer() self.node1.restart() self.node1.ensure_ready() - self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) - self.wait_for_published_message_to_reach_peer() + self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + self.wait_for_published_message_to_reach_relay_peer() def test_publish_after_node2_restarts(self): - self.check_published_message_reaches_peer() + self.check_published_message_reaches_relay_peer() self.node2.restart() self.node2.ensure_ready() - self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) - self.wait_for_published_message_to_reach_peer() + self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + self.wait_for_published_message_to_reach_relay_peer() def test_publish_and_retrieve_100_messages(self): num_messages = 100 # if increase this number make sure to also increase rest-relay-cache-capacity flag for index in range(num_messages): message = self.create_message(payload=to_base64(f"M_{index}")) - self.node1.send_message(message, self.test_pubsub_topic) + self.node1.send_relay_message(message, self.test_pubsub_topic) delay(1) - messages = self.node2.get_messages(self.test_pubsub_topic) + messages = self.node2.get_relay_messages(self.test_pubsub_topic) assert len(messages) == num_messages for index, message in enumerate(messages): assert message["payload"] == to_base64( diff --git a/tests/relay/test_subscribe.py b/tests/relay/test_subscribe.py index e4acf574..5d92d345 100644 --- a/tests/relay/test_subscribe.py +++ b/tests/relay/test_subscribe.py @@ -9,68 +9,68 @@ logger = get_custom_logger(__name__) @pytest.mark.usefixtures("setup_main_relay_nodes") class TestRelaySubscribe(StepsRelay): - def test_no_subscription(self): - self.check_publish_without_subscription(self.test_pubsub_topic) + def test_relay_no_subscription(self): + self.check_publish_without_relay_subscription(self.test_pubsub_topic) - def test_subscribe_to_single_pubsub_topic(self): - self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) - self.wait_for_published_message_to_reach_peer() + def test_relay_subscribe_to_single_pubsub_topic(self): + self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + self.wait_for_published_message_to_reach_relay_peer() - def test_subscribe_to_already_existing_pubsub_topic(self): - self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) - self.wait_for_published_message_to_reach_peer() - self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) - self.check_published_message_reaches_peer() + def test_relay_subscribe_to_already_existing_pubsub_topic(self): + self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + self.wait_for_published_message_to_reach_relay_peer() + self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + self.check_published_message_reaches_relay_peer() - def test_subscribe_with_multiple_overlapping_pubsub_topics(self): - self.ensure_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS[:3]) - self.ensure_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS[1:4]) + def test_relay_subscribe_with_multiple_overlapping_pubsub_topics(self): + self.ensure_relay_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS[:3]) + self.ensure_relay_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS[1:4]) for pubsub_topic in VALID_PUBSUB_TOPICS[:4]: - self.wait_for_published_message_to_reach_peer(pubsub_topic=pubsub_topic) + self.wait_for_published_message_to_reach_relay_peer(pubsub_topic=pubsub_topic) - def test_subscribe_with_empty_pubsub_topic_list(self): - self.ensure_subscriptions_on_nodes(self.main_nodes, []) + def test_relay_subscribe_with_empty_pubsub_topic_list(self): + self.ensure_relay_subscriptions_on_nodes(self.main_nodes, []) - def test_subscribe_with_invalid_pubsub_topic_format(self): + def test_relay_subscribe_with_invalid_pubsub_topic_format(self): success_pubsub_topics = [] for pubsub_topic in INVALID_PUBSUB_TOPICS: logger.debug(f"Running test with payload {pubsub_topic}") try: - self.ensure_subscriptions_on_nodes(self.main_nodes, pubsub_topic) + self.ensure_relay_subscriptions_on_nodes(self.main_nodes, pubsub_topic) success_pubsub_topics.append(pubsub_topic) except Exception as ex: assert "Bad Request" in str(ex) assert not success_pubsub_topics, f"Invalid Pubsub Topics that didn't failed: {success_pubsub_topics}" - def test_unsubscribe_from_single_pubsub_topic(self): - self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) - self.wait_for_published_message_to_reach_peer() - self.delete_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) - self.check_publish_without_subscription(self.test_pubsub_topic) + def test_relay_unsubscribe_from_single_pubsub_topic(self): + self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + self.wait_for_published_message_to_reach_relay_peer() + self.delete_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + self.check_publish_without_relay_subscription(self.test_pubsub_topic) - def test_unsubscribe_from_all_pubsub_topics(self): - self.ensure_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS) + def test_relay_unsubscribe_from_all_pubsub_topics(self): + self.ensure_relay_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS) for pubsub_topic in VALID_PUBSUB_TOPICS: - self.wait_for_published_message_to_reach_peer(pubsub_topic=pubsub_topic) - self.delete_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS) + self.wait_for_published_message_to_reach_relay_peer(pubsub_topic=pubsub_topic) + self.delete_relay_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS) for pubsub_topic in VALID_PUBSUB_TOPICS: - self.check_publish_without_subscription(pubsub_topic) + self.check_publish_without_relay_subscription(pubsub_topic) - def test_unsubscribe_from_some_pubsub_topics(self): - self.ensure_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS) + def test_relay_unsubscribe_from_some_pubsub_topics(self): + self.ensure_relay_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS) for pubsub_topic in VALID_PUBSUB_TOPICS: - self.wait_for_published_message_to_reach_peer(pubsub_topic=pubsub_topic) - self.delete_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS[:3]) + self.wait_for_published_message_to_reach_relay_peer(pubsub_topic=pubsub_topic) + self.delete_relay_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS[:3]) for pubsub_topic in VALID_PUBSUB_TOPICS[:3]: - self.check_publish_without_subscription(pubsub_topic) + self.check_publish_without_relay_subscription(pubsub_topic) for pubsub_topic in VALID_PUBSUB_TOPICS[3:]: - self.check_published_message_reaches_peer(pubsub_topic=pubsub_topic) + self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic) - def test_unsubscribe_from_non_existing_pubsub_topic(self): - self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) - self.wait_for_published_message_to_reach_peer() + def test_relay_unsubscribe_from_non_existing_pubsub_topic(self): + self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + self.wait_for_published_message_to_reach_relay_peer() try: - self.delete_subscriptions_on_nodes(self.main_nodes, ["/waku/2/rs/999/99"]) + self.delete_relay_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS[4:5]) if self.node1.is_nwaku(): pass # nwaku doesn't fail in this case elif self.node1.is_gowaku(): @@ -79,32 +79,32 @@ class TestRelaySubscribe(StepsRelay): raise Exception("Not implemented") except Exception as ex: assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) - self.check_published_message_reaches_peer() + self.check_published_message_reaches_relay_peer() - def test_unsubscribe_with_invalid_pubsub_topic_format(self): + def test_relay_unsubscribe_with_invalid_pubsub_topic_format(self): success_pubsub_topics = [] for pubsub_topic in INVALID_PUBSUB_TOPICS: logger.debug(f"Running test with payload {pubsub_topic}") try: - self.delete_subscriptions_on_nodes(self.main_nodes, pubsub_topic) + self.delete_relay_subscriptions_on_nodes(self.main_nodes, pubsub_topic) success_pubsub_topics.append(pubsub_topic) except Exception as ex: assert "Bad Request" in str(ex) assert not success_pubsub_topics, f"Invalid Pubsub Topics that didn't failed: {success_pubsub_topics}" - def test_resubscribe_to_unsubscribed_pubsub_topic(self): - self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) - self.wait_for_published_message_to_reach_peer() - self.delete_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) - self.check_publish_without_subscription(self.test_pubsub_topic) - self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) - self.check_published_message_reaches_peer() + def test_relay_resubscribe_to_unsubscribed_pubsub_topic(self): + self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + self.wait_for_published_message_to_reach_relay_peer() + self.delete_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + self.check_publish_without_relay_subscription(self.test_pubsub_topic) + self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + self.check_published_message_reaches_relay_peer() - def test_publish_on_default_pubsub_topic_without_beeing_subscribed_to_it(self): - self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) - self.wait_for_published_message_to_reach_peer() + def test_relay_publish_on_default_pubsub_topic_without_beeing_subscribed_to_it(self): + self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + self.wait_for_published_message_to_reach_relay_peer() try: - self.check_published_message_reaches_peer(pubsub_topic=DEFAULT_PUBSUB_TOPIC) + self.check_published_message_reaches_relay_peer(pubsub_topic=DEFAULT_PUBSUB_TOPIC) raise AssertionError(f"Publish on {DEFAULT_PUBSUB_TOPIC} with beeing subscribed to it worked!!!") except Exception as ex: assert "Not Found" in str(ex)