diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 3128bc4c04..9fa4f38d77 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -1,27 +1,9 @@ -## Problem +## PR Details -## Solution +## Issues reported: - - -## Diffs - - - -## Notes - - - -- Resolves -- Related to + diff --git a/src/node/waku_node.py b/src/node/waku_node.py index 86c469d41a..1349b9ccb4 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -67,6 +67,7 @@ class WakuNode: "min-relay-peers-to-publish": "1", "legacy-filter": "false", "log-level": "DEBUG", + "rest-filter-cache-capacity": "50", } default_args.update(go_waku_args) elif self.is_nwaku(): diff --git a/src/steps/filter.py b/src/steps/filter.py index bb8f83ac68..211ba4a073 100644 --- a/src/steps/filter.py +++ b/src/steps/filter.py @@ -92,7 +92,7 @@ class StepsFilter: delay(message_propagation_delay) for index, peer in enumerate(peer_list): logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the published message") - get_messages_response = self.get_filter_messages(message["contentTopic"], pubsub_topics=pubsub_topic, node=peer) + get_messages_response = self.get_filter_messages(message["contentTopic"], pubsub_topic=pubsub_topic, node=peer) assert get_messages_response, f"Peer NODE_{index}:{peer.image} couldn't find any messages" assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}" waku_message = WakuMessage(get_messages_response) @@ -175,8 +175,16 @@ class StepsFilter: node = self.node2 ping_sub_response = node.ping_filter_subscriptions(request_id) assert ping_sub_response["requestId"] == request_id + assert ping_sub_response["statusCode"] == 0 assert ping_sub_response["statusDesc"] in ["OK", ""] # until https://github.com/waku-org/nwaku/issues/2286 is fixed + def ping_without_filter_subscription(self, node=None): + try: + self.ping_filter_subscriptions(str(uuid4()), node=node) + raise AssertionError("Ping without any subscription worked") + except Exception as ex: + assert "peer has no subscriptions" in str(ex) + @allure.step def add_new_relay_subscription(self, pubsub_topics, node=None): if node is None: @@ -184,11 +192,11 @@ class StepsFilter: self.node1.set_relay_subscriptions(pubsub_topics) @allure.step - def get_filter_messages(self, content_topic, pubsub_topics=None, node=None): + def get_filter_messages(self, content_topic, pubsub_topic=None, node=None): if node is None: node = self.node2 if node.is_gowaku(): - return node.get_filter_messages(content_topic, pubsub_topics) + return node.get_filter_messages(content_topic, pubsub_topic) elif node.is_nwaku(): return node.get_filter_messages(content_topic) else: diff --git a/tests/filter/test_get_messages.py b/tests/filter/test_get_messages.py new file mode 100644 index 0000000000..c9962e3b72 --- /dev/null +++ b/tests/filter/test_get_messages.py @@ -0,0 +1,125 @@ +import pytest +from src.libs.common import delay, to_base64 +from src.libs.custom_logger import get_custom_logger +from src.test_data import SAMPLE_INPUTS, SAMPLE_TIMESTAMPS +from src.steps.filter import StepsFilter + +logger = get_custom_logger(__name__) + + +# here we will also implicitly test filter push, see: https://rfc.vac.dev/spec/12/#messagepush +@pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node", "subscribe_main_nodes") +class TestFilterGetMessages(StepsFilter): + def test_filter_get_message_with_valid_payloads(self): + failed_payloads = [] + for payload in SAMPLE_INPUTS: + logger.debug(f'Running test with payload {payload["description"]}') + message = self.create_message(payload=to_base64(payload["value"])) + try: + self.check_published_message_reaches_filter_peer(message) + except Exception as e: + logger.error(f'Payload {payload["description"]} failed: {str(e)}') + failed_payloads.append(payload["description"]) + assert not failed_payloads, f"Payloads failed: {failed_payloads}" + + def test_filter_get_message_with_valid_timestamps(self): + failed_timestamps = [] + for timestamp in SAMPLE_TIMESTAMPS: + if self.node1.type() in timestamp["valid_for"]: + logger.debug(f'Running test with timestamp {timestamp["description"]}') + message = self.create_message(timestamp=timestamp["value"]) + try: + self.check_published_message_reaches_filter_peer(message) + except Exception as ex: + logger.error(f'Timestamp {timestamp["description"]} failed: {str(ex)}') + failed_timestamps.append(timestamp) + assert not failed_timestamps, f"Timestamps failed: {failed_timestamps}" + + def test_filter_get_message_with_version(self): + self.check_published_message_reaches_filter_peer(self.create_message(version=10)) + + def test_filter_get_message_with_meta(self): + self.check_published_message_reaches_filter_peer(self.create_message(meta=to_base64(self.test_payload))) + + def test_filter_get_message_with_ephemeral(self): + failed_ephemeral = [] + for ephemeral in [True, False]: + logger.debug(f"Running test with Ephemeral {ephemeral}") + try: + self.check_published_message_reaches_filter_peer(self.create_message(ephemeral=ephemeral)) + except Exception as e: + logger.error(f"Massage with Ephemeral {ephemeral} failed: {str(e)}") + failed_ephemeral.append(ephemeral) + assert not failed_ephemeral, f"Ephemeral that failed: {failed_ephemeral}" + + def test_filter_get_message_with_rate_limit_proof(self): + rate_limit_proof = { + "proof": to_base64("proofData"), + "epoch": to_base64("epochData"), + "nullifier": to_base64("nullifierData"), + } + self.check_published_message_reaches_filter_peer(self.create_message(rateLimitProof=rate_limit_proof)) + + def test_filter_get_message_with_extra_field(self): + try: + self.check_published_message_reaches_filter_peer(self.create_message(extraField="extraValue")) + if self.node1.is_nwaku(): + raise AssertionError("Relay publish with extra field worked!!!") + elif self.node1.is_gowaku(): + pass + else: + raise NotImplementedError("Not implemented for this node type") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_filter_get_message_duplicate_message(self): + message = self.create_message() + self.check_published_message_reaches_filter_peer(message) + try: + self.check_published_message_reaches_filter_peer(message) + raise AssertionError("Duplicate message was retrieved twice") + except Exception as ex: + assert "couldn't find any messages" in str(ex) + + def test_filter_get_message_after_node_pauses_and_pauses(self): + self.check_published_message_reaches_filter_peer() + self.node1.pause() + self.node1.unpause() + self.check_published_message_reaches_filter_peer(self.create_message(payload=to_base64("M1"))) + self.node2.pause() + self.node2.unpause() + self.check_published_message_reaches_filter_peer(self.create_message(payload=to_base64("M2"))) + + def test_filter_get_message_after_node1_restarts(self): + self.check_published_message_reaches_filter_peer() + self.node1.restart() + self.node1.ensure_ready() + delay(2) + self.wait_for_subscriptions_on_main_nodes([self.test_content_topic]) + self.check_published_message_reaches_filter_peer() + + def test_filter_get_message_after_node2_restarts(self): + self.check_published_message_reaches_filter_peer() + self.node2.restart() + self.node2.ensure_ready() + delay(2) + self.wait_for_subscriptions_on_main_nodes([self.test_content_topic]) + self.check_published_message_reaches_filter_peer() + + def test_filter_get_50_messages(self): + num_messages = 50 + for index in range(num_messages): + message = self.create_message(payload=to_base64(f"M_{index}")) + self.node1.send_relay_message(message, self.test_pubsub_topic) + delay(1) + filter_messages = self.get_filter_messages(content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic, node=self.node2) + assert len(filter_messages) == num_messages + for index, message in enumerate(filter_messages): + assert message["payload"] == to_base64( + f"M_{index}" + ), f'Incorrect payload at index: {index}. Published {to_base64(f"M_{index}")} Received {message["payload"]}' + + def test_filter_get_message_with_1MB_payload(self): + payload_length = 1024 * 1023 + message = self.create_message(payload=to_base64("a" * (payload_length))) + self.check_published_message_reaches_filter_peer(message, message_propagation_delay=2) diff --git a/tests/filter/test_multiple_nodes.py b/tests/filter/test_multiple_nodes.py index 64281d7576..e4edbe13aa 100644 --- a/tests/filter/test_multiple_nodes.py +++ b/tests/filter/test_multiple_nodes.py @@ -19,3 +19,32 @@ class TestFilterMultipleNodes(StepsFilter): self.subscribe_optional_filter_nodes([self.second_content_topic]) self.check_published_message_reaches_filter_peer(peer_list=self.main_nodes) self.check_publish_without_filter_subscription(peer_list=self.optional_nodes) + + def test_filter_get_message_while_one_peer_is_paused(self): + self.setup_optional_filter_nodes() + self.wait_for_subscriptions_on_main_nodes([self.test_content_topic]) + self.subscribe_optional_filter_nodes([self.test_content_topic]) + self.check_published_message_reaches_filter_peer() + relay_message1 = self.create_message(contentTopic=self.test_content_topic) + relay_message2 = self.create_message(contentTopic=self.test_content_topic) + self.node2.pause() + self.node1.send_relay_message(relay_message1, self.test_pubsub_topic) + self.node2.unpause() + self.node1.send_relay_message(relay_message2, self.test_pubsub_topic) + filter_messages = self.get_filter_messages(content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic, node=self.node2) + assert len(filter_messages) == 2, "Both messages should've been returned" + + def test_filter_get_message_after_one_peer_was_stopped(self): + self.setup_optional_filter_nodes() + self.wait_for_subscriptions_on_main_nodes([self.test_content_topic]) + self.subscribe_optional_filter_nodes([self.test_content_topic]) + self.check_published_message_reaches_filter_peer(peer_list=self.main_nodes + self.optional_nodes) + self.node2.stop() + self.check_published_message_reaches_filter_peer(peer_list=self.optional_nodes) + + def test_ping_only_some_nodes_have_subscriptions(self): + self.setup_optional_filter_nodes() + self.wait_for_subscriptions_on_main_nodes([self.test_content_topic]) + self.ping_filter_subscriptions("1", node=self.node2) + for node in self.optional_nodes: + self.ping_without_filter_subscription(node=node) diff --git a/tests/filter/test_ping.py b/tests/filter/test_ping.py new file mode 100644 index 0000000000..497318ca4a --- /dev/null +++ b/tests/filter/test_ping.py @@ -0,0 +1,29 @@ +from uuid import uuid4 +import pytest +from src.steps.filter import StepsFilter + + +@pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node") +class TestFilterPing(StepsFilter): + def test_filter_ping_on_subscribed_peer(self, subscribe_main_nodes): + self.ping_filter_subscriptions(str(uuid4())) + + def test_filter_ping_on_peer_without_subscription(self): + self.ping_without_filter_subscription() + + def test_filter_ping_on_unsubscribed_peer(self, subscribe_main_nodes): + self.ping_filter_subscriptions(str(uuid4())) + self.delete_filter_subscription({"requestId": "12345", "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic}) + self.ping_without_filter_subscription() + + def test_filter_ping_without_request_id(self, subscribe_main_nodes): + try: + self.ping_filter_subscriptions("") + if self.node2.is_nwaku(): + pass + elif self.node2.is_gowaku(): + raise Exception("Ping without request id worked!!") + else: + raise NotImplementedError("Not implemented for this node type") + except Exception as ex: + assert "bad request id" in str(ex) diff --git a/tests/relay/test_multiple_nodes.py b/tests/relay/test_multiple_nodes.py index e4f7bcbd75..e8695ac84d 100644 --- a/tests/relay/test_multiple_nodes.py +++ b/tests/relay/test_multiple_nodes.py @@ -17,3 +17,19 @@ class TestRelayMultipleNodes(StepsRelay): raise AssertionError("Non subscribed nodes received the message!!") except Exception as ex: assert "Not Found" in str(ex), "Expected 404 Not Found when the message is not found" + + def test_relay_get_message_while_one_peer_is_paused(self, subscribe_optional_relay_nodes, relay_warm_up): + self.check_published_message_reaches_relay_peer() + relay_message1 = self.create_message(contentTopic=self.test_content_topic) + relay_message2 = self.create_message(contentTopic=self.test_content_topic) + self.node2.pause() + self.node1.send_relay_message(relay_message1, self.test_pubsub_topic) + self.node2.unpause() + self.node1.send_relay_message(relay_message2, self.test_pubsub_topic) + messages = self.node2.get_relay_messages(self.test_pubsub_topic) + assert len(messages) == 2, "Both messages should've been returned" + + def test_relay_get_message_after_one_peer_was_stopped(self, subscribe_optional_relay_nodes, relay_warm_up): + self.check_published_message_reaches_relay_peer(peer_list=self.main_nodes + self.optional_nodes) + self.node2.stop() + self.check_published_message_reaches_relay_peer(peer_list=self.optional_nodes) diff --git a/tests/relay/test_publish.py b/tests/relay/test_publish.py index dbcd171023..d37b13e1f4 100644 --- a/tests/relay/test_publish.py +++ b/tests/relay/test_publish.py @@ -232,7 +232,7 @@ class TestRelayPublish(StepsRelay): self.node1.ensure_ready() delay(2) self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) - self.wait_for_published_message_to_reach_relay_peer() + self.check_published_message_reaches_relay_peer() def test_publish_after_node2_restarts(self): self.check_published_message_reaches_relay_peer()