From f8db76ab20512e8039e14c628a0b621920cab80d Mon Sep 17 00:00:00 2001 From: Florin Barbu Date: Fri, 24 Nov 2023 14:04:24 +0200 Subject: [PATCH] Relay Subscribe (#5) * github actions report summary * use env instead of inputs * multiple nodes tests * fix warm up * fix warm up * small fix after CI run * small fix after CI run 2 * add new multi-node test * self review * relay subscribe tests * small fix * new tests * fix typo --- src/node/api_clients/base_client.py | 13 ++-- src/node/api_clients/rest.py | 3 + src/node/api_clients/rpc.py | 6 ++ src/node/waku_node.py | 12 +-- src/steps/relay.py | 35 +++++++-- src/test_data.py | 2 + tests/relay/test_multiple_nodes.py | 6 +- tests/relay/test_publish.py | 14 ++-- tests/relay/test_subscribe.py | 110 ++++++++++++++++++++++++++++ 9 files changed, 172 insertions(+), 29 deletions(-) create mode 100644 tests/relay/test_subscribe.py diff --git a/src/node/api_clients/base_client.py b/src/node/api_clients/base_client.py index be1c291fc7..7709f0344d 100644 --- a/src/node/api_clients/base_client.py +++ b/src/node/api_clients/base_client.py @@ -8,11 +8,6 @@ logger = get_custom_logger(__name__) class BaseClient(ABC): - # The retry decorator is applied to handle transient errors gracefully. This is particularly - # useful when running tests in parallel, where occasional network-related errors such as - # connection drops, timeouts, or temporary unavailability of a service can occur. Retrying - # ensures that such intermittent issues don't cause the tests to fail outright. - @retry(stop=stop_after_delay(0.5), wait=wait_fixed(0.1), reraise=True) def make_request(self, method, url, headers=None, data=None): logger.debug(f"{method.upper()} call: {url} with payload: {data}") response = requests.request(method.upper(), url, headers=headers, data=data, timeout=API_REQUEST_TIMEOUT) @@ -20,10 +15,10 @@ class BaseClient(ABC): response.raise_for_status() except requests.HTTPError as http_err: logger.error(f"HTTP error occurred: {http_err}. Response content: {response.content}") - raise + raise Exception(f"Error: {http_err} with response: {response.content}") except Exception as err: logger.error(f"An error occurred: {err}. Response content: {response.content}") - raise + raise Exception(f"Error: {err} with response: {response.content}") else: logger.info(f"Response status code: {response.status_code}. Response content: {response.content}") return response @@ -36,6 +31,10 @@ class BaseClient(ABC): def set_subscriptions(self, pubsub_topics): pass + @abstractmethod + def delete_subscriptions(self, pubsub_topics): + pass + @abstractmethod def send_message(self, message, pubsub_topic): pass diff --git a/src/node/api_clients/rest.py b/src/node/api_clients/rest.py index ced0a6787d..a35f2be4d8 100644 --- a/src/node/api_clients/rest.py +++ b/src/node/api_clients/rest.py @@ -23,6 +23,9 @@ class REST(BaseClient): def set_subscriptions(self, pubsub_topics): return self.rest_call("post", "relay/v1/subscriptions", json.dumps(pubsub_topics)) + def delete_subscriptions(self, pubsub_topics): + return self.rest_call("delete", "relay/v1/subscriptions", json.dumps(pubsub_topics)) + def send_message(self, message, pubsub_topic): return self.rest_call("post", f"relay/v1/messages/{quote(pubsub_topic, safe='')}", json.dumps(message)) diff --git a/src/node/api_clients/rpc.py b/src/node/api_clients/rpc.py index 522fc0e7db..efdf374262 100644 --- a/src/node/api_clients/rpc.py +++ b/src/node/api_clients/rpc.py @@ -27,6 +27,12 @@ class RPC(BaseClient): else: return self.rpc_call("post_waku_v2_relay_v1_subscription", [pubsub_topics]) + def delete_subscriptions(self, pubsub_topics): + if "nwaku" in self._image_name: + return self.rpc_call("delete_waku_v2_relay_v1_subscriptions", [pubsub_topics]) + else: + return self.rpc_call("delete_waku_v2_relay_v1_subscription", [pubsub_topics]) + def send_message(self, message, pubsub_topic): return self.rpc_call("post_waku_v2_relay_v1_message", [pubsub_topic, message]) diff --git a/src/node/waku_node.py b/src/node/waku_node.py index f701b14a7e..e9a5698727 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -53,7 +53,6 @@ class WakuNode: "rpc-address": "0.0.0.0", "rest-address": "0.0.0.0", "nat": f"extip:{self._ext_ip}", - "pubsub-topic": DEFAULT_PUBSUB_TOPIC, } if "go-waku" in self._docker_manager.image: @@ -110,15 +109,16 @@ class WakuNode: def info(self): return self._api.info() - def set_subscriptions(self, pubsub_topics=None): - if not pubsub_topics: - pubsub_topics = [DEFAULT_PUBSUB_TOPIC] + def set_subscriptions(self, pubsub_topics): return self._api.set_subscriptions(pubsub_topics) - def send_message(self, message, pubsub_topic=DEFAULT_PUBSUB_TOPIC): + def delete_subscriptions(self, pubsub_topics): + return self._api.delete_subscriptions(pubsub_topics) + + def send_message(self, message, pubsub_topic): return self._api.send_message(message, pubsub_topic) - def get_messages(self, pubsub_topic=DEFAULT_PUBSUB_TOPIC): + def get_messages(self, pubsub_topic): return self._api.get_messages(pubsub_topic) @property diff --git a/src/steps/relay.py b/src/steps/relay.py index d9bafdbadd..f8fa0b99ca 100644 --- a/src/steps/relay.py +++ b/src/steps/relay.py @@ -23,8 +23,11 @@ class StepsRelay: logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") self.node1 = WakuNode(NODE_1, f"node1_{request.cls.test_id}") self.node1.start(relay="true", discv5_discovery="true", peer_exchange="true", nodekey=NODEKEY) - self.enr_uri = self.node1.info()["enrUri"] - self.node2 = WakuNode(NODE_2, f"node1_{request.cls.test_id}") + try: + self.enr_uri = self.node1.info()["enrUri"] + except Exception as ex: + raise AttributeError(f"Could not find enrUri in the info call because of error: {str(ex)}") + self.node2 = WakuNode(NODE_2, f"node2_{request.cls.test_id}") self.node2.start(relay="true", discv5_discovery="true", discv5_bootstrap_node=self.enr_uri, peer_exchange="true") self.main_nodes = [self.node1, self.node2] self.optional_nodes = [] @@ -62,29 +65,42 @@ class StepsRelay: # this method should be used only for the tests that use the warm_up fixture # otherwise use wait_for_published_message_to_reach_peer @allure.step - def check_published_message_reaches_peer(self, message, pubsub_topic=None, message_propagation_delay=0.1, sender=None, peer_list=None): + def check_published_message_reaches_peer(self, message=None, pubsub_topic=None, message_propagation_delay=0.1, sender=None, peer_list=None): + if message is None: + message = self.create_message() + if pubsub_topic is None: + pubsub_topic = self.test_pubsub_topic if not sender: sender = self.node1 if not peer_list: peer_list = self.main_nodes + self.optional_nodes - sender.send_message(message, pubsub_topic or self.test_pubsub_topic) + + sender.send_message(message, pubsub_topic) delay(message_propagation_delay) for index, peer in enumerate(peer_list): logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the published message") - get_messages_response = peer.get_messages(pubsub_topic or self.test_pubsub_topic) + get_messages_response = peer.get_messages(pubsub_topic) assert get_messages_response, f"Peer NODE_{index}:{peer.image} couldn't find any messages" received_message = message_rpc_response_schema.load(get_messages_response[0]) self.assert_received_message(message, received_message) + @allure.step + def check_publish_without_subscription(self, pubsub_topic): + try: + self.node1.send_message(self.create_message(), pubsub_topic) + raise AssertionError("Publish with no subscription worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) + # we need much bigger timeout in CI because we run tests in parallel there and the machine itself is slower @allure.step def wait_for_published_message_to_reach_peer( - self, timeout_duration=120 if RUNNING_IN_CI else 20, time_between_retries=1, sender=None, peer_list=None + self, timeout_duration=120 if RUNNING_IN_CI else 20, time_between_retries=1, pubsub_topic=None, sender=None, peer_list=None ): @retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(time_between_retries), reraise=True) def check_peer_connection(): message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} - self.check_published_message_reaches_peer(message, sender=sender, peer_list=peer_list) + self.check_published_message_reaches_peer(message, pubsub_topic=pubsub_topic, sender=sender, peer_list=peer_list) check_peer_connection() @@ -114,6 +130,11 @@ class StepsRelay: for node in node_list: node.set_subscriptions(pubsub_topic_list) + @allure.step + def delete_subscriptions_on_nodes(self, node_list, pubsub_topic_list): + for node in node_list: + node.delete_subscriptions(pubsub_topic_list) + def create_message(self, **kwargs): message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} message.update(kwargs) diff --git a/src/test_data.py b/src/test_data.py index b6e746f4a4..cc10f5a2f5 100644 --- a/src/test_data.py +++ b/src/test_data.py @@ -77,6 +77,8 @@ VALID_PUBSUB_TOPICS = [ "test", ] +INVALID_PUBSUB_TOPICS = ["/test/2/rs/18/1", ("/waku/2/rs/18/1"), {"pubsub_topic": "/waku/3/rs/18/1"}, True, 12345678, [["/waku/2/rs/18/1"]]] + SAMPLE_TIMESTAMPS = [ {"description": "Now", "value": int(time() * 1e9), "valid_for": ["nwaku", "gowaku"]}, diff --git a/tests/relay/test_multiple_nodes.py b/tests/relay/test_multiple_nodes.py index aa7cc121e5..2741b5e734 100644 --- a/tests/relay/test_multiple_nodes.py +++ b/tests/relay/test_multiple_nodes.py @@ -5,15 +5,15 @@ from src.steps.relay import StepsRelay @pytest.mark.usefixtures("setup_main_relay_nodes", "setup_optional_relay_nodes", "subscribe_main_relay_nodes") class TestMultipleNodes(StepsRelay): def test_first_node_to_start_publishes(self, subscribe_optional_relay_nodes, relay_warm_up): - self.check_published_message_reaches_peer(self.create_message()) + self.check_published_message_reaches_peer() def test_last_node_to_start_publishes(self, subscribe_optional_relay_nodes, relay_warm_up): - self.check_published_message_reaches_peer(self.create_message(), sender=self.optional_nodes[-1]) + self.check_published_message_reaches_peer(sender=self.optional_nodes[-1]) def test_optional_nodes_not_subscribed_to_same_pubsub_topic(self): self.wait_for_published_message_to_reach_peer(peer_list=self.main_nodes) try: - self.check_published_message_reaches_peer(self.create_message(), peer_list=self.optional_nodes) + self.check_published_message_reaches_peer(peer_list=self.optional_nodes) raise AssertionError("Non subscribed nodes received the message!!") except Exception as ex: assert "Not Found" in str(ex), "Expected 404 Not Found when the message is not found" diff --git a/tests/relay/test_publish.py b/tests/relay/test_publish.py index 0f45ff9819..16a3900657 100644 --- a/tests/relay/test_publish.py +++ b/tests/relay/test_publish.py @@ -97,7 +97,7 @@ class TestRelayPublish(StepsRelay): for pubsub_topic in VALID_PUBSUB_TOPICS: logger.debug(f"Running test with pubsub topic {pubsub_topic}") try: - self.check_published_message_reaches_peer(self.create_message(), pubsub_topic=pubsub_topic) + self.check_published_message_reaches_peer(pubsub_topic=pubsub_topic) except Exception as e: logger.error(f"PubusubTopic {pubsub_topic} failed: {str(e)}") failed_pubsub_topics.append(pubsub_topic) @@ -110,9 +110,9 @@ class TestRelayPublish(StepsRelay): messages = self.node2.get_messages(VALID_PUBSUB_TOPICS[1]) assert not messages, "Message was retrieved on wrong pubsub_topic" - def test_publish_on_unsubscribed_pubsub_topic(self): + def test_publish_on_non_subscribed_pubsub_topic(self): try: - self.check_published_message_reaches_peer(self.create_message(), pubsub_topic="/waku/2/rs/19/1") + self.check_published_message_reaches_peer(pubsub_topic="/waku/2/rs/19/1") raise AssertionError("Publish on unsubscribed pubsub_topic worked!!!") except Exception as ex: assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) @@ -209,7 +209,7 @@ class TestRelayPublish(StepsRelay): self.assert_received_message(message, received_message) def test_publish_after_node_pauses_and_pauses(self): - self.check_published_message_reaches_peer(self.create_message()) + self.check_published_message_reaches_peer() self.node1.pause() self.node1.unpause() self.check_published_message_reaches_peer(self.create_message(payload=to_base64("M1"))) @@ -218,14 +218,16 @@ class TestRelayPublish(StepsRelay): self.check_published_message_reaches_peer(self.create_message(payload=to_base64("M2"))) def test_publish_after_node1_restarts(self): - self.check_published_message_reaches_peer(self.create_message()) + self.check_published_message_reaches_peer() self.node1.restart() + self.node1.ensure_ready() self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) self.wait_for_published_message_to_reach_peer() def test_publish_after_node2_restarts(self): - self.check_published_message_reaches_peer(self.create_message()) + self.check_published_message_reaches_peer() self.node2.restart() + self.node2.ensure_ready() self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) self.wait_for_published_message_to_reach_peer() diff --git a/tests/relay/test_subscribe.py b/tests/relay/test_subscribe.py new file mode 100644 index 0000000000..e4acf57466 --- /dev/null +++ b/tests/relay/test_subscribe.py @@ -0,0 +1,110 @@ +import pytest +from src.env_vars import DEFAULT_PUBSUB_TOPIC +from src.libs.custom_logger import get_custom_logger +from src.steps.relay import StepsRelay +from src.test_data import INVALID_PUBSUB_TOPICS, VALID_PUBSUB_TOPICS + +logger = get_custom_logger(__name__) + + +@pytest.mark.usefixtures("setup_main_relay_nodes") +class TestRelaySubscribe(StepsRelay): + def test_no_subscription(self): + self.check_publish_without_subscription(self.test_pubsub_topic) + + def test_subscribe_to_single_pubsub_topic(self): + self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + self.wait_for_published_message_to_reach_peer() + + def test_subscribe_to_already_existing_pubsub_topic(self): + self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + self.wait_for_published_message_to_reach_peer() + self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + self.check_published_message_reaches_peer() + + def test_subscribe_with_multiple_overlapping_pubsub_topics(self): + self.ensure_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS[:3]) + self.ensure_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS[1:4]) + for pubsub_topic in VALID_PUBSUB_TOPICS[:4]: + self.wait_for_published_message_to_reach_peer(pubsub_topic=pubsub_topic) + + def test_subscribe_with_empty_pubsub_topic_list(self): + self.ensure_subscriptions_on_nodes(self.main_nodes, []) + + def test_subscribe_with_invalid_pubsub_topic_format(self): + success_pubsub_topics = [] + for pubsub_topic in INVALID_PUBSUB_TOPICS: + logger.debug(f"Running test with payload {pubsub_topic}") + try: + self.ensure_subscriptions_on_nodes(self.main_nodes, pubsub_topic) + success_pubsub_topics.append(pubsub_topic) + except Exception as ex: + assert "Bad Request" in str(ex) + assert not success_pubsub_topics, f"Invalid Pubsub Topics that didn't failed: {success_pubsub_topics}" + + def test_unsubscribe_from_single_pubsub_topic(self): + self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + self.wait_for_published_message_to_reach_peer() + self.delete_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + self.check_publish_without_subscription(self.test_pubsub_topic) + + def test_unsubscribe_from_all_pubsub_topics(self): + self.ensure_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS) + for pubsub_topic in VALID_PUBSUB_TOPICS: + self.wait_for_published_message_to_reach_peer(pubsub_topic=pubsub_topic) + self.delete_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS) + for pubsub_topic in VALID_PUBSUB_TOPICS: + self.check_publish_without_subscription(pubsub_topic) + + def test_unsubscribe_from_some_pubsub_topics(self): + self.ensure_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS) + for pubsub_topic in VALID_PUBSUB_TOPICS: + self.wait_for_published_message_to_reach_peer(pubsub_topic=pubsub_topic) + self.delete_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS[:3]) + for pubsub_topic in VALID_PUBSUB_TOPICS[:3]: + self.check_publish_without_subscription(pubsub_topic) + for pubsub_topic in VALID_PUBSUB_TOPICS[3:]: + self.check_published_message_reaches_peer(pubsub_topic=pubsub_topic) + + def test_unsubscribe_from_non_existing_pubsub_topic(self): + self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + self.wait_for_published_message_to_reach_peer() + try: + self.delete_subscriptions_on_nodes(self.main_nodes, ["/waku/2/rs/999/99"]) + if self.node1.is_nwaku(): + pass # nwaku doesn't fail in this case + elif self.node1.is_gowaku(): + raise AssertionError("Unsubscribe from non-subscribed pubsub_topic worked!!!") + else: + raise Exception("Not implemented") + except Exception as ex: + assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) + self.check_published_message_reaches_peer() + + def test_unsubscribe_with_invalid_pubsub_topic_format(self): + success_pubsub_topics = [] + for pubsub_topic in INVALID_PUBSUB_TOPICS: + logger.debug(f"Running test with payload {pubsub_topic}") + try: + self.delete_subscriptions_on_nodes(self.main_nodes, pubsub_topic) + success_pubsub_topics.append(pubsub_topic) + except Exception as ex: + assert "Bad Request" in str(ex) + assert not success_pubsub_topics, f"Invalid Pubsub Topics that didn't failed: {success_pubsub_topics}" + + def test_resubscribe_to_unsubscribed_pubsub_topic(self): + self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + self.wait_for_published_message_to_reach_peer() + self.delete_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + self.check_publish_without_subscription(self.test_pubsub_topic) + self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + self.check_published_message_reaches_peer() + + def test_publish_on_default_pubsub_topic_without_beeing_subscribed_to_it(self): + self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + self.wait_for_published_message_to_reach_peer() + try: + self.check_published_message_reaches_peer(pubsub_topic=DEFAULT_PUBSUB_TOPIC) + raise AssertionError(f"Publish on {DEFAULT_PUBSUB_TOPIC} with beeing subscribed to it worked!!!") + except Exception as ex: + assert "Not Found" in str(ex)