import pytest from src.libs.custom_logger import get_custom_logger from src.libs.common import delay from src.steps.store import StepsStore logger = get_custom_logger(__name__) class TestReliability(StepsStore): @pytest.fixture(scope="function", autouse=False) def node_setup(self, store_setup): self.setup_first_publishing_node(store="true", relay="true") self.setup_first_store_node(store="true", relay="true") self.subscribe_to_pubsub_topics_via_relay() def test_publishing_node_is_stopped(self, node_setup): self.publish_message_via("relay") self.check_published_message_is_stored(pageSize=5, ascending="true") self.publishing_node1.stop() store_response = self.store_node1.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3") assert len(store_response["messages"]) == 1 def test_publishing_node_restarts(self, node_setup): self.publish_message_via("relay") self.check_published_message_is_stored(pageSize=5, ascending="true") self.publishing_node1.restart() self.publishing_node1.ensure_ready() self.add_node_peer(self.store_node1, self.multiaddr_list) self.subscribe_to_pubsub_topics_via_relay(node=self.publishing_node1) self.publish_message_via("relay") self.check_published_message_is_stored(pageSize=5, ascending="true") for node in self.store_nodes: store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3") assert len(store_response["messages"]) == 2 def test_store_node_restarts(self, node_setup): self.publish_message_via("relay") self.check_published_message_is_stored(pageSize=5, ascending="true") self.store_node1.restart() self.store_node1.ensure_ready() self.subscribe_to_pubsub_topics_via_relay(node=self.store_node1) self.publish_message_via("relay") self.check_published_message_is_stored(pageSize=5, ascending="true") for node in self.store_nodes: store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3") assert len(store_response["messages"]) == 2 def test_publishing_node_paused_and_unpaused(self, node_setup): self.publish_message_via("relay") self.check_published_message_is_stored(pageSize=5, ascending="true") self.publishing_node1.pause() delay(1) self.publishing_node1.unpause() self.publishing_node1.ensure_ready() self.publish_message_via("relay") self.check_published_message_is_stored(pageSize=5, ascending="true") for node in self.store_nodes: store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3") assert len(store_response["messages"]) == 2 def test_store_node_paused_and_unpaused(self, node_setup): self.publish_message_via("relay") self.check_published_message_is_stored(pageSize=5, ascending="true") self.store_node1.pause() delay(1) self.store_node1.unpause() self.store_node1.ensure_ready() self.publish_message_via("relay") self.check_published_message_is_stored(pageSize=5, ascending="true") for node in self.store_nodes: store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3") assert len(store_response["messages"]) == 2 def test_message_relayed_while_store_node_is_paused(self, node_setup): self.publish_message_via("relay") self.check_published_message_is_stored(pageSize=5, ascending="true") self.store_node1.pause() self.publish_message_via("relay") self.store_node1.unpause() self.store_node1.ensure_ready() self.check_published_message_is_stored(pageSize=5, ascending="true") for node in self.store_nodes: store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3") assert len(store_response["messages"]) == 2 def test_message_relayed_while_store_node_is_stopped(self, node_setup): self.publish_message_via("relay") self.check_published_message_is_stored(pageSize=5, ascending="true") self.store_node1.stop() self.publish_message_via("relay") self.store_node1.start() self.store_node1.ensure_ready() self.add_node_peer(self.store_node1, self.multiaddr_list) self.subscribe_to_pubsub_topics_via_relay(node=self.store_node1) delay(1) self.check_published_message_is_stored(pageSize=5, ascending="true") for node in self.store_nodes: store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3") assert len(store_response["messages"]) == 2 def test_message_relayed_before_store_node_is_started(self): self.setup_first_publishing_node(store="true", relay="true") self.subscribe_to_pubsub_topics_via_relay() self.publish_message_via("relay") self.setup_first_store_node(store="true", relay="true") self.subscribe_to_pubsub_topics_via_relay() delay(1) self.check_published_message_is_stored(pageSize=5, ascending="true")