diff --git a/tests/filter/test_get_messages.py b/tests/filter/test_get_messages.py new file mode 100644 index 00000000..c9962e3b --- /dev/null +++ b/tests/filter/test_get_messages.py @@ -0,0 +1,125 @@ +import pytest +from src.libs.common import delay, to_base64 +from src.libs.custom_logger import get_custom_logger +from src.test_data import SAMPLE_INPUTS, SAMPLE_TIMESTAMPS +from src.steps.filter import StepsFilter + +logger = get_custom_logger(__name__) + + +# here we will also implicitly test filter push, see: https://rfc.vac.dev/spec/12/#messagepush +@pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node", "subscribe_main_nodes") +class TestFilterGetMessages(StepsFilter): + def test_filter_get_message_with_valid_payloads(self): + failed_payloads = [] + for payload in SAMPLE_INPUTS: + logger.debug(f'Running test with payload {payload["description"]}') + message = self.create_message(payload=to_base64(payload["value"])) + try: + self.check_published_message_reaches_filter_peer(message) + except Exception as e: + logger.error(f'Payload {payload["description"]} failed: {str(e)}') + failed_payloads.append(payload["description"]) + assert not failed_payloads, f"Payloads failed: {failed_payloads}" + + def test_filter_get_message_with_valid_timestamps(self): + failed_timestamps = [] + for timestamp in SAMPLE_TIMESTAMPS: + if self.node1.type() in timestamp["valid_for"]: + logger.debug(f'Running test with timestamp {timestamp["description"]}') + message = self.create_message(timestamp=timestamp["value"]) + try: + self.check_published_message_reaches_filter_peer(message) + except Exception as ex: + logger.error(f'Timestamp {timestamp["description"]} failed: {str(ex)}') + failed_timestamps.append(timestamp) + assert not failed_timestamps, f"Timestamps failed: {failed_timestamps}" + + def test_filter_get_message_with_version(self): + self.check_published_message_reaches_filter_peer(self.create_message(version=10)) + + def test_filter_get_message_with_meta(self): + self.check_published_message_reaches_filter_peer(self.create_message(meta=to_base64(self.test_payload))) + + def test_filter_get_message_with_ephemeral(self): + failed_ephemeral = [] + for ephemeral in [True, False]: + logger.debug(f"Running test with Ephemeral {ephemeral}") + try: + self.check_published_message_reaches_filter_peer(self.create_message(ephemeral=ephemeral)) + except Exception as e: + logger.error(f"Massage with Ephemeral {ephemeral} failed: {str(e)}") + failed_ephemeral.append(ephemeral) + assert not failed_ephemeral, f"Ephemeral that failed: {failed_ephemeral}" + + def test_filter_get_message_with_rate_limit_proof(self): + rate_limit_proof = { + "proof": to_base64("proofData"), + "epoch": to_base64("epochData"), + "nullifier": to_base64("nullifierData"), + } + self.check_published_message_reaches_filter_peer(self.create_message(rateLimitProof=rate_limit_proof)) + + def test_filter_get_message_with_extra_field(self): + try: + self.check_published_message_reaches_filter_peer(self.create_message(extraField="extraValue")) + if self.node1.is_nwaku(): + raise AssertionError("Relay publish with extra field worked!!!") + elif self.node1.is_gowaku(): + pass + else: + raise NotImplementedError("Not implemented for this node type") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_filter_get_message_duplicate_message(self): + message = self.create_message() + self.check_published_message_reaches_filter_peer(message) + try: + self.check_published_message_reaches_filter_peer(message) + raise AssertionError("Duplicate message was retrieved twice") + except Exception as ex: + assert "couldn't find any messages" in str(ex) + + def test_filter_get_message_after_node_pauses_and_pauses(self): + self.check_published_message_reaches_filter_peer() + self.node1.pause() + self.node1.unpause() + self.check_published_message_reaches_filter_peer(self.create_message(payload=to_base64("M1"))) + self.node2.pause() + self.node2.unpause() + self.check_published_message_reaches_filter_peer(self.create_message(payload=to_base64("M2"))) + + def test_filter_get_message_after_node1_restarts(self): + self.check_published_message_reaches_filter_peer() + self.node1.restart() + self.node1.ensure_ready() + delay(2) + self.wait_for_subscriptions_on_main_nodes([self.test_content_topic]) + self.check_published_message_reaches_filter_peer() + + def test_filter_get_message_after_node2_restarts(self): + self.check_published_message_reaches_filter_peer() + self.node2.restart() + self.node2.ensure_ready() + delay(2) + self.wait_for_subscriptions_on_main_nodes([self.test_content_topic]) + self.check_published_message_reaches_filter_peer() + + def test_filter_get_50_messages(self): + num_messages = 50 + for index in range(num_messages): + message = self.create_message(payload=to_base64(f"M_{index}")) + self.node1.send_relay_message(message, self.test_pubsub_topic) + delay(1) + filter_messages = self.get_filter_messages(content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic, node=self.node2) + assert len(filter_messages) == num_messages + for index, message in enumerate(filter_messages): + assert message["payload"] == to_base64( + f"M_{index}" + ), f'Incorrect payload at index: {index}. Published {to_base64(f"M_{index}")} Received {message["payload"]}' + + def test_filter_get_message_with_1MB_payload(self): + payload_length = 1024 * 1023 + message = self.create_message(payload=to_base64("a" * (payload_length))) + self.check_published_message_reaches_filter_peer(message, message_propagation_delay=2) diff --git a/tests/filter/test_ping.py b/tests/filter/test_ping.py new file mode 100644 index 00000000..d4de3545 --- /dev/null +++ b/tests/filter/test_ping.py @@ -0,0 +1,29 @@ +from uuid import uuid4 +import pytest +from src.steps.filter import StepsFilter + + +@pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node") +class TestFilterPing(StepsFilter): + def test_filter_ping_on_subscribed_peer(self, subscribe_main_nodes): + self.ping_filter_subscriptions(str(uuid4())) + + def test_filter_ping_on_peer_without_subscription(self): + self.ping_without_filter_subscription() + + def test_filter_ping_on_unsubscribed_peer(self, subscribe_main_nodes): + self.ping_filter_subscriptions(str(uuid4())) + self.delete_filter_subscription({"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic}) + self.ping_without_filter_subscription() + + def test_filter_ping_without_request_id(self, subscribe_main_nodes): + try: + self.ping_filter_subscriptions("") + if self.node2.is_nwaku(): + pass + elif self.node2.is_gowaku(): + raise Exception("Ping without request id worked!!") + else: + raise NotImplementedError("Not implemented for this node type") + except Exception as ex: + assert "bad request id" in str(ex)