diff --git a/src/steps/store.py b/src/steps/store.py index 7121588f..f9193525 100644 --- a/src/steps/store.py +++ b/src/steps/store.py @@ -8,6 +8,7 @@ from src.env_vars import ( ADDITIONAL_NODES, NODE_1, NODE_2, + NODEKEY, ) from src.node.waku_node import WakuNode from src.steps.common import StepsCommon @@ -52,7 +53,7 @@ class StepsStore(StepsCommon): @allure.step def setup_first_publishing_node(self, store="true", relay="true", **kwargs): - self.publishing_node1 = self.start_publishing_node(NODE_1, node_index=1, store=store, relay=relay, **kwargs) + self.publishing_node1 = self.start_publishing_node(NODE_1, node_index=1, store=store, relay=relay, nodekey=NODEKEY, **kwargs) self.enr_uri = self.publishing_node1.get_enr_uri() @allure.step @@ -142,6 +143,8 @@ class StepsStore(StepsCommon): ): if store_node is None: store_node = self.store_nodes + if pubsubTopic is None: + pubsubTopic = self.test_pubsub_topic elif not isinstance(store_node, list): store_node = [store_node] else: diff --git a/tests/store/test_get_messages.py b/tests/store/test_get_messages.py index 99caf483..90c5a0eb 100644 --- a/tests/store/test_get_messages.py +++ b/tests/store/test_get_messages.py @@ -1,6 +1,6 @@ import pytest from src.libs.custom_logger import get_custom_logger -from src.libs.common import to_base64 +from src.libs.common import delay, to_base64 from src.steps.store import StepsStore from src.test_data import SAMPLE_INPUTS, VALID_PUBSUB_TOPICS @@ -19,7 +19,7 @@ class TestGetMessages(StepsStore): # only one test for store v1, all other tests are using the new store v3 def test_legacy_store_v1(self): self.publish_message_via("relay") - self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v1") + self.check_published_message_is_stored(pageSize=5, ascending="true", store_v="v1") def test_get_store_messages_with_different_payloads(self): failed_payloads = [] @@ -28,11 +28,11 @@ class TestGetMessages(StepsStore): message = self.create_message(payload=to_base64(payload["value"])) try: self.publish_message_via("relay", message=message) - self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=50, ascending="true") + self.check_published_message_is_stored(pageSize=50, ascending="true") except Exception as e: logger.error(f'Payload {payload["description"]} failed: {str(e)}') failed_payloads.append(payload["description"]) - self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=50, ascending="true", store_v="v1") + self.check_published_message_is_stored(pageSize=50, ascending="true", store_v="v1") assert not failed_payloads, f"Payloads failed: {failed_payloads}" def test_get_multiple_store_messages(self): @@ -54,7 +54,7 @@ class TestGetMessages(StepsStore): message = self.create_message(contentTopic=content_topic["value"]) try: self.publish_message_via("relay", message=message) - self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=50, ascending="true") + self.check_published_message_is_stored(pageSize=50, ascending="true") except Exception as e: logger.error(f'ContentTopic {content_topic["description"]} failed: {str(e)}') failed_content_topics.append(content_topic) @@ -76,12 +76,97 @@ class TestGetMessages(StepsStore): def test_get_store_message_with_meta(self): message = self.create_message(meta=to_base64(self.test_payload)) self.publish_message_via("relay", message=message) - self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=50, ascending="true") + self.check_published_message_is_stored(pageSize=5, ascending="true") def test_get_store_duplicate_messages(self): message = self.create_message() self.publish_message_via("relay", message=message) self.publish_message_via("relay", message=message) - self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=50, ascending="true") + self.check_published_message_is_stored(pageSize=5, ascending="true") # only one message is stored assert len(self.store_response["messages"]) == 1 + + def test_publishing_node_is_stopped(self): + self.publish_message_via("relay") + self.check_published_message_is_stored(pageSize=5, ascending="true") + self.publishing_node1.stop() + store_response = self.store_node1.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3") + assert len(store_response["messages"]) == 1 + + def test_publishing_node_restarts(self): + self.publish_message_via("relay") + self.check_published_message_is_stored(pageSize=5, ascending="true") + self.publishing_node1.restart() + self.publishing_node1.ensure_ready() + self.add_node_peer(self.store_node1, self.multiaddr_list) + self.subscribe_to_pubsub_topics_via_relay(node=self.publishing_node1) + self.publish_message_via("relay") + self.check_published_message_is_stored(pageSize=5, ascending="true") + for node in self.store_nodes: + store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3") + assert len(store_response["messages"]) == 2 + + def test_store_node_restarts(self): + self.publish_message_via("relay") + self.check_published_message_is_stored(pageSize=5, ascending="true") + self.store_node1.restart() + self.store_node1.ensure_ready() + self.subscribe_to_pubsub_topics_via_relay(node=self.store_node1) + self.publish_message_via("relay") + self.check_published_message_is_stored(pageSize=5, ascending="true") + for node in self.store_nodes: + store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3") + assert len(store_response["messages"]) == 2 + + def test_publishing_node_paused_and_unpaused(self): + self.publish_message_via("relay") + self.check_published_message_is_stored(pageSize=5, ascending="true") + self.publishing_node1.pause() + delay(1) + self.publishing_node1.unpause() + self.publishing_node1.ensure_ready() + self.publish_message_via("relay") + self.check_published_message_is_stored(pageSize=5, ascending="true") + for node in self.store_nodes: + store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3") + assert len(store_response["messages"]) == 2 + + def test_store_node_paused_and_unpaused(self): + self.publish_message_via("relay") + self.check_published_message_is_stored(pageSize=5, ascending="true") + self.store_node1.pause() + delay(1) + self.store_node1.unpause() + self.store_node1.ensure_ready() + self.publish_message_via("relay") + self.check_published_message_is_stored(pageSize=5, ascending="true") + for node in self.store_nodes: + store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3") + assert len(store_response["messages"]) == 2 + + def test_message_relayed_while_store_node_is_paused(self): + self.publish_message_via("relay") + self.check_published_message_is_stored(pageSize=5, ascending="true") + self.store_node1.pause() + self.publish_message_via("relay") + self.store_node1.unpause() + self.store_node1.ensure_ready() + self.check_published_message_is_stored(pageSize=5, ascending="true") + for node in self.store_nodes: + store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3") + assert len(store_response["messages"]) == 2 + + def test_message_relayed_while_store_node_is_stopped(self): + self.publish_message_via("relay") + self.check_published_message_is_stored(pageSize=5, ascending="true") + self.store_node1.stop() + self.publish_message_via("relay") + self.store_node1.start() + self.store_node1.ensure_ready() + self.add_node_peer(self.store_node1, self.multiaddr_list) + self.subscribe_to_pubsub_topics_via_relay(node=self.store_node1) + delay(1) + self.check_published_message_is_stored(pageSize=5, ascending="true") + for node in self.store_nodes: + store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3") + assert len(store_response["messages"]) == 2 diff --git a/tests/store/test_running_nodes.py b/tests/store/test_running_nodes.py index 61d8d5ee..e233bbc0 100644 --- a/tests/store/test_running_nodes.py +++ b/tests/store/test_running_nodes.py @@ -7,7 +7,7 @@ class TestRunningNodes(StepsStore): self.setup_first_store_node(store="true", relay="true") self.subscribe_to_pubsub_topics_via_relay() self.publish_message_via("relay") - self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true") + self.check_published_message_is_stored(pageSize=5, ascending="true") def test_main_node_relay_and_store__peer_only_store(self): self.setup_first_publishing_node(store="true", relay="true") @@ -21,21 +21,21 @@ class TestRunningNodes(StepsStore): self.setup_first_store_node(store="false", relay="true") self.subscribe_to_pubsub_topics_via_relay() self.publish_message_via("relay") - self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true") + self.check_published_message_is_stored(pageSize=5, ascending="true") def test_main_node_relay_and_store__peer_neither_relay_nor_store(self): self.setup_first_publishing_node(store="true", relay="true") self.setup_first_store_node(store="false", relay="false") self.subscribe_to_pubsub_topics_via_relay() self.publish_message_via("relay") - self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true") + self.check_published_message_is_stored(pageSize=5, ascending="true") def test_main_node_only_relay__peer_relay_and_store(self): self.setup_first_publishing_node(store="false", relay="true") self.setup_first_store_node(store="true", relay="true") self.subscribe_to_pubsub_topics_via_relay() self.publish_message_via("relay") - self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true") + self.check_published_message_is_stored(pageSize=5, ascending="true") def test_main_node_only_relay__peer_only_store(self): self.setup_first_publishing_node(store="false", relay="true") @@ -50,7 +50,7 @@ class TestRunningNodes(StepsStore): self.subscribe_to_pubsub_topics_via_relay() self.publish_message_via("relay") try: - self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true") + self.check_published_message_is_stored(pageSize=5, ascending="true") except Exception as ex: assert "failed to negotiate protocol: protocols not supported" in str(ex) or "PEER_DIAL_FAILURE" in str(ex) @@ -59,11 +59,11 @@ class TestRunningNodes(StepsStore): self.setup_first_store_node(store="false", relay="false", lightpush="true", lightpushnode=self.multiaddr_list[0]) self.subscribe_to_pubsub_topics_via_relay() self.publish_message_via("lightpush", sender=self.store_node1) - self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true") + self.check_published_message_is_stored(pageSize=5, ascending="true") def test_store_with_filter(self): self.setup_first_publishing_node(store="true", relay="true", filter="true") self.setup_first_store_node(store="false", relay="false", filter="true") self.subscribe_to_pubsub_topics_via_relay() self.publish_message_via("relay") - self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true") + self.check_published_message_is_stored(pageSize=5, ascending="true")