diff --git a/src/env_vars.py b/src/env_vars.py index e1e99159..92803cd1 100644 --- a/src/env_vars.py +++ b/src/env_vars.py @@ -21,7 +21,7 @@ NETWORK_NAME = get_env_var("NETWORK_NAME", "waku") SUBNET = get_env_var("SUBNET", "172.18.0.0/16") IP_RANGE = get_env_var("IP_RANGE", "172.18.0.0/24") GATEWAY = get_env_var("GATEWAY", "172.18.0.1") -DEFAULT_PUBSUBTOPIC = get_env_var("DEFAULT_PUBSUBTOPIC", "/waku/2/default-waku/proto") +DEFAULT_PUBSUB_TOPIC = get_env_var("DEFAULT_PUBSUB_TOPIC", "/waku/2/default-waku/proto") PROTOCOL = get_env_var("PROTOCOL", "REST") RUNNING_IN_CI = get_env_var("CI") NODEKEY = get_env_var("NODEKEY", "30348dd51465150e04a5d9d932c72864c8967f806cce60b5d26afeca1e77eb68") diff --git a/src/node/waku_node.py b/src/node/waku_node.py index c97c02c9..9fa4ba28 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -5,7 +5,7 @@ from tenacity import retry, stop_after_delay, wait_fixed from src.node.api_clients.rpc import RPC from src.node.api_clients.rest import REST from src.node.docker_mananger import DockerManager -from src.env_vars import DOCKER_LOG_DIR, DEFAULT_PUBSUBTOPIC, PROTOCOL +from src.env_vars import DOCKER_LOG_DIR, DEFAULT_PUBSUB_TOPIC, PROTOCOL from src.data_storage import DS logger = get_custom_logger(__name__) @@ -53,7 +53,7 @@ class WakuNode: "rpc-address": "0.0.0.0", "rest-address": "0.0.0.0", "nat": f"extip:{self._ext_ip}", - "pubsub-topic": DEFAULT_PUBSUBTOPIC, + "pubsub-topic": DEFAULT_PUBSUB_TOPIC, } if "go-waku" in self._docker_manager.image: @@ -110,13 +110,13 @@ class WakuNode: def info(self): return self._api.info() - def set_subscriptions(self, pubsub_topics=[DEFAULT_PUBSUBTOPIC]): + def set_subscriptions(self, pubsub_topics=[DEFAULT_PUBSUB_TOPIC]): return self._api.set_subscriptions(pubsub_topics) - def send_message(self, message, pubsub_topic=DEFAULT_PUBSUBTOPIC): + def send_message(self, message, pubsub_topic=DEFAULT_PUBSUB_TOPIC): return self._api.send_message(message, pubsub_topic) - def get_messages(self, pubsub_topic=DEFAULT_PUBSUBTOPIC): + def get_messages(self, pubsub_topic=DEFAULT_PUBSUB_TOPIC): return self._api.get_messages(pubsub_topic) @property diff --git a/src/steps/relay.py b/src/steps/relay.py index 37050e9a..84174ce2 100644 --- a/src/steps/relay.py +++ b/src/steps/relay.py @@ -23,7 +23,6 @@ class StepsRelay: self.test_pubsub_topic = "/waku/2/rs/18/1" self.test_content_topic = "/test/1/waku-relay/proto" self.test_payload = "Relay works!!" - self.test_message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} self.node1.set_subscriptions([self.test_pubsub_topic]) self.node2.set_subscriptions([self.test_pubsub_topic]) @@ -40,10 +39,8 @@ class StepsRelay: if not pubsub_topic: pubsub_topic = self.test_pubsub_topic self.node1.send_message(message, pubsub_topic) - delay(message_propagation_delay) get_messages_response = self.node2.get_messages(pubsub_topic) - logger.debug("Got reponse from remote peer %s", get_messages_response) assert get_messages_response, "Peer node couldn't find any messages" received_message = message_rpc_response_schema.load(get_messages_response[0]) self.assert_received_message(message, received_message) @@ -54,18 +51,18 @@ class StepsRelay: assert received_message.payload == sent_message["payload"], assert_fail_message("payload") assert received_message.contentTopic == sent_message["contentTopic"], assert_fail_message("contentTopic") - if "timestamp" in sent_message and sent_message["timestamp"]: + if "timestamp" in sent_message and sent_message["timestamp"] is not None: if isinstance(sent_message["timestamp"], float): assert math.isclose(float(received_message.timestamp), sent_message["timestamp"], rel_tol=1e-9), assert_fail_message("timestamp") else: assert str(received_message.timestamp) == str(sent_message["timestamp"]), assert_fail_message("timestamp") - if "version" in sent_message and sent_message["version"]: + if "version" in sent_message: assert str(received_message.version) == str(sent_message["version"]), assert_fail_message("version") - if "meta" in sent_message and sent_message["meta"]: + if "meta" in sent_message: assert str(received_message.meta) == str(sent_message["meta"]), assert_fail_message("meta") - if "ephemeral" in sent_message and sent_message["ephemeral"]: + if "ephemeral" in sent_message: assert str(received_message.ephemeral) == str(sent_message["ephemeral"]), assert_fail_message("ephemeral") - if "rateLimitProof" in sent_message and sent_message["rateLimitProof"]: + if "rateLimitProof" in sent_message: assert str(received_message.rateLimitProof) == str(sent_message["rateLimitProof"]), assert_fail_message("rateLimitProof") def wait_for_published_message_to_reach_peer(self, timeout_duration, time_between_retries=1): @@ -75,3 +72,12 @@ class StepsRelay: self.check_published_message_reaches_peer(message) check_peer_connection() + + def ensure_subscriptions_on_nodes(self, node_list, pubsub_topic_list): + for node in node_list: + node.set_subscriptions(pubsub_topic_list) + + def create_message(self, **kwargs): + message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + message.update(kwargs) + return message diff --git a/src/test_data.py b/src/test_data.py index c4c72846..b6e746f4 100644 --- a/src/test_data.py +++ b/src/test_data.py @@ -1,7 +1,7 @@ from time import time from datetime import datetime, timedelta -from src.env_vars import DEFAULT_PUBSUBTOPIC +from src.env_vars import DEFAULT_PUBSUB_TOPIC NOW = datetime.now() @@ -66,7 +66,7 @@ INVALID_CONTENT_TOPICS = [ ] VALID_PUBSUB_TOPICS = [ - DEFAULT_PUBSUBTOPIC, + DEFAULT_PUBSUB_TOPIC, "/waku/2/rs/18/1", "/test/2/rs/18/1", "/waku/3/rs/18/1", diff --git a/tests/conftest.py b/tests/conftest.py index 69914165..6eedd099 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -64,7 +64,7 @@ def attach_logs_on_fail(request): @pytest.fixture(scope="function", autouse=True) -def close_open_nodes(): +def close_open_nodes(attach_logs_on_fail): DS.waku_nodes = [] yield crashed_containers = [] diff --git a/tests/relay/test_publish.py b/tests/relay/test_publish.py index c40012a4..ca5eec70 100644 --- a/tests/relay/test_publish.py +++ b/tests/relay/test_publish.py @@ -12,7 +12,7 @@ class TestRelayPublish(StepsRelay): failed_payloads = [] for payload in SAMPLE_INPUTS: logger.debug("Running test with payload %s", payload["description"]) - message = {"payload": to_base64(payload["value"]), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + message = self.create_message(payload=to_base64(payload["value"])) try: self.check_published_message_reaches_peer(message) except Exception as e: @@ -24,7 +24,7 @@ class TestRelayPublish(StepsRelay): success_payloads = [] for payload in INVALID_PAYLOADS: logger.debug("Running test with payload %s", payload["description"]) - message = {"payload": payload["value"], "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + message = self.create_message(payload=payload["value"]) try: self.node1.send_message(message, self.test_pubsub_topic) success_payloads.append(payload) @@ -43,14 +43,14 @@ class TestRelayPublish(StepsRelay): def test_publish_with_payload_less_than_one_mb(self): payload_length = 1024 * 1023 logger.debug("Running test with payload length of %s bytes", payload_length) - message = {"payload": to_base64("a" * (payload_length)), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + message = self.create_message(payload=to_base64("a" * (payload_length))) self.check_published_message_reaches_peer(message, message_propagation_delay=2) def test_publish_with_payload_equal_or_more_than_one_mb(self): payload_length = 1024 * 1023 for payload_length in [1024 * 1024, 1024 * 1024 * 10]: logger.debug("Running test with payload length of %s bytes", payload_length) - message = {"payload": to_base64("a" * (payload_length)), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + message = self.create_message(payload=to_base64("a" * (payload_length))) try: self.check_published_message_reaches_peer(message, message_propagation_delay=2) raise AssertionError("Duplicate message was retrieved twice") @@ -61,7 +61,7 @@ class TestRelayPublish(StepsRelay): failed_content_topics = [] for content_topic in SAMPLE_INPUTS: logger.debug("Running test with content topic %s", content_topic["description"]) - message = {"payload": to_base64(self.test_payload), "contentTopic": content_topic["value"], "timestamp": int(time() * 1e9)} + message = self.create_message(contentTopic=content_topic["value"]) try: self.check_published_message_reaches_peer(message) except Exception as e: @@ -73,7 +73,7 @@ class TestRelayPublish(StepsRelay): success_content_topics = [] for content_topic in INVALID_CONTENT_TOPICS: logger.debug("Running test with contetn topic %s", content_topic["description"]) - message = {"payload": to_base64(self.test_payload), "contentTopic": content_topic["value"], "timestamp": int(time() * 1e9)} + message = self.create_message(contentTopic=content_topic["value"]) try: self.node1.send_message(message, self.test_pubsub_topic) success_content_topics.append(content_topic) @@ -90,22 +90,27 @@ class TestRelayPublish(StepsRelay): assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) def test_publish_on_multiple_pubsub_topics(self): - self.node1.set_subscriptions(VALID_PUBSUB_TOPICS) - self.node2.set_subscriptions(VALID_PUBSUB_TOPICS) + self.ensure_subscriptions_on_nodes([self.node1, self.node2], VALID_PUBSUB_TOPICS) failed_pubsub_topics = [] for pubsub_topic in VALID_PUBSUB_TOPICS: logger.debug("Running test with pubsub topic %s", pubsub_topic) - first_message = {"payload": to_base64("M1"), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} try: - self.check_published_message_reaches_peer(first_message, pubsub_topic=pubsub_topic) + self.check_published_message_reaches_peer(self.create_message(), pubsub_topic=pubsub_topic) except Exception as e: logger.error("PubusubTopic %s failed: %s", pubsub_topic, str(e)) failed_pubsub_topics.append(pubsub_topic) assert not failed_pubsub_topics, f"PubusubTopic failed: {failed_pubsub_topics}" + def test_message_published_on_different_pubsub_topic_is_not_retrieved(self): + self.ensure_subscriptions_on_nodes([self.node1, self.node2], VALID_PUBSUB_TOPICS) + self.node1.send_message(self.create_message(), VALID_PUBSUB_TOPICS[0]) + delay(0.1) + messages = self.node2.get_messages(VALID_PUBSUB_TOPICS[1]) + assert not messages, "Message was retrieved on wrong pubsub_topic" + def test_publish_on_unsubscribed_pubsub_topic(self): try: - self.check_published_message_reaches_peer(self.test_message, pubsub_topic="/waku/2/rs/19/1") + self.check_published_message_reaches_peer(self.create_message(), pubsub_topic="/waku/2/rs/19/1") raise AssertionError("Publish on unsubscribed pubsub_topic worked!!!") except Exception as ex: assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) @@ -115,7 +120,7 @@ class TestRelayPublish(StepsRelay): for timestamp in SAMPLE_TIMESTAMPS: if self.node1.type() in timestamp["valid_for"]: logger.debug("Running test with timestamp %s", timestamp["description"]) - message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": timestamp["value"]} + message = self.create_message(timestamp=timestamp["value"]) try: self.check_published_message_reaches_peer(message) except Exception as ex: @@ -128,7 +133,7 @@ class TestRelayPublish(StepsRelay): for timestamp in SAMPLE_TIMESTAMPS: if self.node1.type() not in timestamp["valid_for"]: logger.debug("Running test with timestamp %s", timestamp["description"]) - message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": timestamp["value"]} + message = self.create_message(timestamp=timestamp["value"]) try: self.check_published_message_reaches_peer(message) success_timestamps.append(timestamp) @@ -141,25 +146,21 @@ class TestRelayPublish(StepsRelay): self.check_published_message_reaches_peer(message) def test_publish_with_valid_version(self): - self.test_message["version"] = 10 - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(self.create_message(version=10)) def test_publish_with_invalid_version(self): - self.test_message["version"] = 2.1 try: - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(self.create_message(version=2.1)) raise AssertionError("Publish with invalid version worked!!!") except Exception as ex: assert "Bad Request" in str(ex) def test_publish_with_valid_meta(self): - self.test_message["meta"] = to_base64(self.test_payload) - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(self.create_message(meta=to_base64(self.test_payload))) def test_publish_with_invalid_meta(self): - self.test_message["meta"] = self.test_payload try: - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(self.create_message(meta=self.test_payload)) raise AssertionError("Publish with invalid meta worked!!!") except Exception as ex: assert "Bad Request" in str(ex) @@ -168,63 +169,58 @@ class TestRelayPublish(StepsRelay): failed_ephemeral = [] for ephemeral in [True, False]: logger.debug("Running test with Ephemeral %s", ephemeral) - self.test_message["ephemeral"] = ephemeral try: - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(self.create_message(ephemeral=ephemeral)) except Exception as e: logger.error("Massage with Ephemeral %s failed: %s", ephemeral, str(e)) failed_ephemeral.append(ephemeral) assert not failed_ephemeral, f"Ephemeral that failed: {failed_ephemeral}" def test_publish_with_rate_limit_proof(self): - self.test_message["rateLimitProof"] = { + rate_limit_proof = { "proof": to_base64("proofData"), "epoch": to_base64("epochData"), "nullifier": to_base64("nullifierData"), } - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(self.create_message(rateLimitProof=rate_limit_proof)) def test_publish_with_extra_field(self): - self.test_message["extraField"] = "extraValue" - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(self.create_message(extraField="extraValue")) def test_publish_and_retrieve_duplicate_message(self): - self.check_published_message_reaches_peer(self.test_message) + message = self.create_message() + self.check_published_message_reaches_peer(message) try: - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(message) raise AssertionError("Duplicate message was retrieved twice") except Exception as ex: assert "Peer node couldn't find any messages" in str(ex) def test_publish_after_node_pauses(self): - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(self.create_message()) self.node1.pause() self.node1.unpause() - self.test_message["payload"] = to_base64("new payload 1") - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(self.create_message(payload=to_base64("M1"))) self.node2.pause() self.node2.unpause() - self.test_message["payload"] = to_base64("new payload 2") - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(self.create_message(payload=to_base64("M2"))) def test_publish_after_node1_restarts(self): - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(self.create_message()) self.node1.restart() - self.node1.set_subscriptions([self.test_pubsub_topic]) - self.node2.set_subscriptions([self.test_pubsub_topic]) + self.ensure_subscriptions_on_nodes([self.node1, self.node2], [self.test_pubsub_topic]) self.wait_for_published_message_to_reach_peer(20) def test_publish_after_node2_restarts(self): - self.check_published_message_reaches_peer(self.test_message) + self.check_published_message_reaches_peer(self.create_message()) self.node2.restart() - self.node1.set_subscriptions([self.test_pubsub_topic]) - self.node2.set_subscriptions([self.test_pubsub_topic]) + self.ensure_subscriptions_on_nodes([self.node1, self.node2], [self.test_pubsub_topic]) self.wait_for_published_message_to_reach_peer(20) def test_publish_and_retrieve_100_messages(self): num_messages = 100 # if increase this number make sure to also increase rest-relay-cache-capacity flag for index in range(num_messages): - message = {"payload": to_base64(f"M_{index}"), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + message = self.create_message(payload=to_base64(f"M_{index}")) self.node1.send_message(message, self.test_pubsub_topic) delay(1) messages = self.node2.get_messages(self.test_pubsub_topic)