import inspect from uuid import uuid4 from src.libs.custom_logger import get_custom_logger from time import time import pytest import allure from src.libs.common import to_base64, delay from src.node.waku_message import WakuMessage from src.env_vars import NODE_1, NODE_2, ADDITIONAL_NODES, NODEKEY from src.node.waku_node import WakuNode from tenacity import retry, stop_after_delay, wait_fixed from src.test_data import VALID_PUBSUB_TOPICS logger = get_custom_logger(__name__) class StepsFilter: test_pubsub_topic = VALID_PUBSUB_TOPICS[1] second_pubsub_topic = VALID_PUBSUB_TOPICS[2] another_cluster_pubsub_topic = "/waku/2/rs/2/2" test_content_topic = "/test/1/waku-filter/proto" second_content_topic = "/test/2/waku-filter/proto" test_payload = "Filter works!!" @pytest.fixture(scope="function", autouse=True) def filter_setup(self): logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") self.main_nodes = [] self.optional_nodes = [] @pytest.fixture(scope="function") def setup_main_relay_node(self): logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") self.relay_node_start(NODE_1) @pytest.fixture(scope="function") def setup_main_filter_node(self): logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") self.node2.start(relay="false", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id) self.main_nodes.append(self.node2) @pytest.fixture(scope="function") def subscribe_main_nodes(self): logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") self.wait_for_subscriptions_on_main_nodes([self.test_content_topic]) @pytest.fixture(scope="function") @retry(stop=stop_after_delay(20), wait=wait_fixed(1), reraise=True) def filter_warm_up(self): try: self.ping_filter_subscriptions("1") except Exception as ex: if "peer has no subscriptions" in str(ex): logger.info("WARM UP successful!!") else: raise TimeoutError(f"WARM UP FAILED WITH: {ex}") def relay_node_start(self, node): self.node1 = WakuNode(node, f"node1_{self.test_id}") start_args = {"relay": "true", "filter": "true", "nodekey": NODEKEY} if self.node1.is_gowaku(): start_args["min_relay_peers_to_publish"] = "0" self.node1.start(**start_args) self.enr_uri = self.node1.get_enr_uri() self.multiaddr_with_id = self.node1.get_multiaddr_with_id() def setup_optional_filter_nodes(self, node_list=ADDITIONAL_NODES): if node_list: nodes = [node.strip() for node in node_list.split(",") if node] else: pytest.skip("ADDITIONAL_NODES/node_list is empty, cannot run test") for index, node in enumerate(nodes): node = WakuNode(node, f"node{index + 3}_{self.test_id}") node.start(relay="false", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id) self.optional_nodes.append(node) @allure.step def check_published_message_reaches_filter_peer( self, message=None, pubsub_topic=None, message_propagation_delay=0.1, sender=None, peer_list=None ): if message is None: message = self.create_message() if pubsub_topic is None: pubsub_topic = self.test_pubsub_topic if not sender: sender = self.node1 if not peer_list: peer_list = self.main_nodes + self.optional_nodes sender.send_relay_message(message, pubsub_topic) delay(message_propagation_delay) for index, peer in enumerate(peer_list): logger.debug(f"Checking that peer NODE_{index + 2}:{peer.image} can find the published message") get_messages_response = self.get_filter_messages(message["contentTopic"], pubsub_topic=pubsub_topic, node=peer) assert get_messages_response, f"Peer NODE_{index + 2}:{peer.image} couldn't find any messages" assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}" waku_message = WakuMessage(get_messages_response) waku_message.assert_received_message(message) @allure.step def check_publish_without_filter_subscription(self, message=None, pubsub_topic=None, peer_list=None): try: self.check_published_message_reaches_filter_peer(message=message, pubsub_topic=pubsub_topic, peer_list=peer_list) raise AssertionError("Publish with no subscription worked!!!") except Exception as ex: assert "Bad Request" in str(ex) or "Not Found" in str(ex) or "couldn't find any messages" in str(ex) @allure.step def wait_for_published_message_to_reach_filter_peer( self, timeout_duration=120, time_between_retries=1, pubsub_topic=None, sender=None, peer_list=None ): @retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(time_between_retries), reraise=True) def publish_and_check_filter_peer(): message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} self.check_published_message_reaches_filter_peer(message, pubsub_topic=pubsub_topic, sender=sender, peer_list=peer_list) publish_and_check_filter_peer() @allure.step def wait_for_subscriptions_on_main_nodes(self, content_topic_list, pubsub_topic=None): if pubsub_topic is None: pubsub_topic = self.test_pubsub_topic self.node1.set_relay_subscriptions([pubsub_topic]) request_id = str(uuid4()) filter_sub_response = self.create_filter_subscription_with_retry( {"requestId": request_id, "contentFilters": content_topic_list, "pubsubTopic": pubsub_topic} ) assert filter_sub_response["requestId"] == request_id assert filter_sub_response["statusDesc"] in ["OK", ""] # until https://github.com/waku-org/nwaku/issues/2286 is fixed @allure.step def subscribe_optional_filter_nodes(self, content_topic_list, pubsub_topic=None): if pubsub_topic is None: pubsub_topic = self.test_pubsub_topic for node in self.optional_nodes: request_id = str(uuid4()) self.create_filter_subscription_with_retry( {"requestId": request_id, "contentFilters": content_topic_list, "pubsubTopic": pubsub_topic}, node=node ) @retry(stop=stop_after_delay(60), wait=wait_fixed(1), reraise=True) @allure.step def create_filter_subscription_with_retry(self, subscription, node=None): return self.create_filter_subscription(subscription, node) @allure.step def create_filter_subscription(self, subscription, node=None): if node is None: node = self.node2 return node.set_filter_subscriptions(subscription) @allure.step def update_filter_subscription(self, subscription, node=None): if node is None: node = self.node2 if node.is_gowaku(): pytest.skip(f"This method doesn't exist for node {node.type()}") return node.update_filter_subscriptions(subscription) @allure.step def delete_filter_subscription(self, subscription, status=None, node=None): if node is None: node = self.node2 delete_sub_response = node.delete_filter_subscriptions(subscription) if node.is_gowaku() and "requestId" not in subscription: assert delete_sub_response["requestId"] == "" else: assert delete_sub_response["requestId"] == subscription["requestId"] if status is None: assert delete_sub_response["statusDesc"] in ["OK", ""] # until https://github.com/waku-org/nwaku/issues/2286 is fixed else: assert status in delete_sub_response["statusDesc"] @allure.step def delete_all_filter_subscriptions(self, request_id, node=None): if node is None: node = self.node2 delete_sub_response = node.delete_all_filter_subscriptions(request_id) assert delete_sub_response["requestId"] == request_id["requestId"] assert delete_sub_response["statusDesc"] in ["OK", ""] # until https://github.com/waku-org/nwaku/issues/2286 is fixed @allure.step def ping_filter_subscriptions(self, request_id, node=None): if node is None: node = self.node2 ping_sub_response = node.ping_filter_subscriptions(request_id) assert ping_sub_response["requestId"] == request_id assert ping_sub_response["statusDesc"] in ["OK", ""] # until https://github.com/waku-org/nwaku/issues/2286 is fixed def ping_without_filter_subscription(self, node=None): try: self.ping_filter_subscriptions(str(uuid4()), node=node) raise AssertionError("Ping without any subscription worked") except Exception as ex: assert "peer has no subscription" in str(ex) or "ping request failed" in str(ex) @allure.step def add_new_relay_subscription(self, pubsub_topics, node=None): if node is None: node = self.node1 self.node1.set_relay_subscriptions(pubsub_topics) @allure.step def get_filter_messages(self, content_topic, pubsub_topic=None, node=None): if node is None: node = self.node2 if node.is_gowaku(): return node.get_filter_messages(content_topic, pubsub_topic) elif node.is_nwaku(): return node.get_filter_messages(content_topic) else: raise NotImplementedError("Not implemented for this node type") @allure.step def create_message(self, **kwargs): message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} message.update(kwargs) return message