diff --git a/src/steps/store.py b/src/steps/store.py index c5426072..6185784d 100644 --- a/src/steps/store.py +++ b/src/steps/store.py @@ -116,7 +116,7 @@ class StepsStore(StepsCommon): node.set_filter_subscriptions(subscription) @allure.step - def publish_message(self, via="relay", pubsub_topic=None, message=None, message_propagation_delay=0.1, sender=None): + def publish_message(self, via="relay", pubsub_topic=None, message=None, message_propagation_delay=0.01, sender=None): self.message = self.create_message() if message is None else message if pubsub_topic is None: pubsub_topic = self.test_pubsub_topic diff --git a/tests/store/test_api_flags.py b/tests/store/test_api_flags.py new file mode 100644 index 00000000..38dfa784 --- /dev/null +++ b/tests/store/test_api_flags.py @@ -0,0 +1,63 @@ +import pytest +from src.env_vars import NODE_1 +from src.libs.common import to_base64 +from src.libs.custom_logger import get_custom_logger +from src.node.waku_message import WakuMessage +from src.steps.store import StepsStore +from src.test_data import SAMPLE_INPUTS + +logger = get_custom_logger(__name__) + +# TO DO test without pubsubtopic freezes + + +@pytest.mark.usefixtures("node_setup") +class TestApiFlags(StepsStore): + def test_store_with_peerAddr(self): + self.publish_message() + self.check_published_message_is_stored(store_node=self.store_node1, peer_addr=self.multiaddr_list[0]) + + @pytest.mark.xfail("nwaku" in NODE_1, reason="Bug reported: https://github.com/waku-org/nwaku/issues/2715") + def test_store_with_hashes(self): + message_hash_list = [] + for payload in SAMPLE_INPUTS: + message = self.create_message(payload=to_base64(payload["value"])) + self.publish_message(message=message) + message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message)) + for node in self.store_nodes: + for message_hash in message_hash_list: + store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, hashes=message_hash, page_size=50, ascending="true") + assert len(store_response["messages"]) == 1 + assert store_response["messages"][0]["messageHash"]["data"] == message_hash + + def test_store_with_mulitple_hashes(self): + message_hash_list = [] + for payload in SAMPLE_INPUTS: + message = self.create_message(payload=to_base64(payload["value"])) + self.publish_message(message=message) + message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message)) + for node in self.store_nodes: + store_response = node.get_store_messages( + pubsub_topic=self.test_pubsub_topic, hashes=f"{message_hash_list[0]},{message_hash_list[4]}", page_size=50, ascending="true" + ) + assert len(store_response["messages"]) == 2 + assert ( + store_response["messages"][0]["messageHash"]["data"] == message_hash_list[0] + ), "Incorrect messaged filtered based on multiple hashes" + assert ( + store_response["messages"][1]["messageHash"]["data"] == message_hash_list[4] + ), "Incorrect messaged filtered based on multiple hashes" + + def test_store_include_data(self): + message_list = [] + for payload in SAMPLE_INPUTS: + message = self.create_message(payload=to_base64(payload["value"])) + self.publish_message(message=message) + message_list.append(message) + for node in self.store_nodes: + store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, include_data="true", page_size=50, ascending="true") + assert len(store_response["messages"]) == len(SAMPLE_INPUTS) + for index, message in enumerate(store_response["messages"]): + assert message["message"]["payload"] == message_list[index]["payload"] + waku_message = WakuMessage([message["message"]]) + waku_message.assert_received_message(message_list[index]) diff --git a/tests/store/test_cursor.py b/tests/store/test_cursor.py index 8ada2afc..35893011 100644 --- a/tests/store/test_cursor.py +++ b/tests/store/test_cursor.py @@ -13,7 +13,7 @@ class TestCursor(StepsStore): expected_message_hash_list = [] for i in range(2000): message = self.create_message(payload=to_base64(f"Message_{i}")) - self.publish_message(message=message, message_propagation_delay=0.01) + self.publish_message(message=message) expected_message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message)) store_response = {"paginationCursor": {"data": ""}} response_message_hash_list = [] @@ -32,7 +32,7 @@ class TestCursor(StepsStore): cursor_index = cursor_index if cursor_index < 100 else 100 for i in range(message_count): message = self.create_message(payload=to_base64(f"Message_{i}")) - self.publish_message(message=message, message_propagation_delay=0.01) + self.publish_message(message=message) message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message)) for node in self.store_nodes: store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=cursor_index, ascending="true") @@ -47,7 +47,7 @@ class TestCursor(StepsStore): def test_passing_cursor_not_returned_in_paginationCursor(self): cursor = "" for i in range(10): - self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")), message_propagation_delay=0.01) + self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}"))) for node in self.store_nodes: store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true") # retrieving the cursor with the message hash of the 3rd message stored @@ -59,7 +59,7 @@ class TestCursor(StepsStore): def test_passing_cursor_of_the_last_message_from_the_store(self): cursor = "" for i in range(10): - self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")), message_propagation_delay=0.01) + self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}"))) for node in self.store_nodes: store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=10, ascending="true") # retrieving the cursor with the message hash of the last message stored @@ -71,7 +71,7 @@ class TestCursor(StepsStore): @pytest.mark.xfail("nwaku" in NODE_1, reason="Bug reported: https://github.com/waku-org/nwaku/issues/2716") def test_passing_cursor_of_non_existing_message_from_the_store(self): for i in range(4): - self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")), message_propagation_delay=0.01) + self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}"))) for node in self.store_nodes: store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=10, ascending="true") # creating a cursor to a message that doesn't exist @@ -84,7 +84,7 @@ class TestCursor(StepsStore): @pytest.mark.xfail("nwaku" in NODE_1, reason="Bug reported: https://github.com/waku-org/nwaku/issues/2717") def test_passing_invalid_cursor(self): for i in range(4): - self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")), message_propagation_delay=0.01) + self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}"))) for node in self.store_nodes: store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=10, ascending="true") # creating a invalid base64 cursor @@ -96,7 +96,7 @@ class TestCursor(StepsStore): @pytest.mark.xfail("nwaku" in NODE_1, reason="Bug reported: https://github.com/waku-org/nwaku/issues/2717") def test_passing_non_base64_cursor(self): for i in range(4): - self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")), message_propagation_delay=0.01) + self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}"))) for node in self.store_nodes: store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=10, ascending="true") # creating a non base64 cursor diff --git a/tests/store/test_get_messages.py b/tests/store/test_get_messages.py index ec8eda31..fa7f2f19 100644 --- a/tests/store/test_get_messages.py +++ b/tests/store/test_get_messages.py @@ -86,3 +86,8 @@ class TestGetMessages(StepsStore): assert len(store_response["messages"]) == len(SAMPLE_INPUTS) for index, message_hash in enumerate(store_response["messages"]): assert message_hash["messageHash"]["data"] == message_hash_list[index], f"Message hash at index {index} doesn't match" + + def test_store_is_empty(self): + for node in self.store_nodes: + store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=50, ascending="true") + assert len(store_response["messages"]) == 0 diff --git a/tests/store/test_page_size.py b/tests/store/test_page_size.py index 4c74ece2..dd3fb967 100644 --- a/tests/store/test_page_size.py +++ b/tests/store/test_page_size.py @@ -7,21 +7,21 @@ from src.steps.store import StepsStore class TestPageSize(StepsStore): def test_default_page_size(self): for i in range(30): - self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")), message_propagation_delay=0.01) + self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}"))) for node in self.store_nodes: store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, ascending="true") assert len(store_response["messages"]) == 20, "Message count mismatch" def test_page_size_0_defaults_to_20(self): for i in range(30): - self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")), message_propagation_delay=0.01) + self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}"))) for node in self.store_nodes: store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=0, ascending="true") assert len(store_response["messages"]) == 20, "Message count mismatch" def test_max_page_size(self): for i in range(200): - self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")), message_propagation_delay=0.01) + self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}"))) for node in self.store_nodes: store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=200, ascending="true") assert len(store_response["messages"]) == 100, "Message count mismatch" @@ -29,7 +29,7 @@ class TestPageSize(StepsStore): @pytest.mark.parametrize("page_size", [1, 11, 39, 81, 99]) def test_different_page_size(self, page_size): for i in range(page_size + 1): - self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")), message_propagation_delay=0.01) + self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}"))) for node in self.store_nodes: store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=page_size, ascending="true") assert len(store_response["messages"]) == page_size, "Message count mismatch" diff --git a/tests/store/test_sorting.py b/tests/store/test_sorting.py index 1cff2c4a..1662bfe7 100644 --- a/tests/store/test_sorting.py +++ b/tests/store/test_sorting.py @@ -10,7 +10,7 @@ class TestSorting(StepsStore): expected_message_hash_list = [] for i in range(10): message = self.create_message(payload=to_base64(f"Message_{i}")) - self.publish_message(message=message, message_propagation_delay=0.01) + self.publish_message(message=message) expected_message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message)) for node in self.store_nodes: store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending=ascending) diff --git a/tests/store/test_time_filter.py b/tests/store/test_time_filter.py index 9daf9bd8..802a2668 100644 --- a/tests/store/test_time_filter.py +++ b/tests/store/test_time_filter.py @@ -6,13 +6,11 @@ from src.steps.store import StepsStore logger = get_custom_logger(__name__) -## tests with time filters - - @pytest.mark.usefixtures("node_setup") class TestTimeFilter(StepsStore): - def test_messages_with_timestamps_close_to_now(self): - ts = [ + @pytest.fixture(scope="function", autouse=True) + def setup_test_data(self): + self.ts_pass = [ {"description": "3 sec Past", "value": int((datetime.now() - timedelta(seconds=3)).timestamp() * 1e9)}, {"description": "1 sec Past", "value": int((datetime.now() - timedelta(seconds=1)).timestamp() * 1e9)}, {"description": "0.1 sec Past", "value": int((datetime.now() - timedelta(seconds=0.1)).timestamp() * 1e9)}, @@ -20,8 +18,14 @@ class TestTimeFilter(StepsStore): {"description": "2 sec Future", "value": int((datetime.now() + timedelta(seconds=2)).timestamp() * 1e9)}, {"description": "10 sec Future", "value": int((datetime.now() + timedelta(seconds=10)).timestamp() * 1e9)}, ] + self.ts_fail = [ + {"description": "20 sec Past", "value": int((datetime.now() - timedelta(seconds=20)).timestamp() * 1e9)}, + {"description": "40 sec Future", "value": int((datetime.now() + timedelta(seconds=40)).timestamp() * 1e9)}, + ] + + def test_messages_with_timestamps_close_to_now(self): failed_timestamps = [] - for timestamp in ts: + for timestamp in self.ts_pass: logger.debug(f'Running test with payload {timestamp["description"]}') message = self.create_message(timestamp=timestamp["value"]) try: @@ -34,11 +38,7 @@ class TestTimeFilter(StepsStore): def test_messages_with_timestamps_far_from_now(self): success_timestamps = [] - ts = [ - {"description": "20 sec Past", "value": int((datetime.now() - timedelta(seconds=20)).timestamp() * 1e9)}, - {"description": "40 sec Future", "value": int((datetime.now() + timedelta(seconds=40)).timestamp() * 1e9)}, - ] - for timestamp in ts: + for timestamp in self.ts_fail: logger.debug(f'Running test with payload {timestamp["description"]}') message = self.create_message(timestamp=timestamp["value"]) try: @@ -48,3 +48,73 @@ class TestTimeFilter(StepsStore): logger.error(f'Payload {timestamp["description"]} succeeded where it should have failed: {str(ex)}') success_timestamps.append(timestamp["description"]) assert not success_timestamps, f"Timestamps succeeded: {success_timestamps}" + + def test_time_filter_matches_one_message(self): + message_hash_list = [] + for timestamp in self.ts_pass: + message = self.create_message(timestamp=timestamp["value"]) + self.publish_message(message=message) + message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message)) + for node in self.store_nodes: + store_response = node.get_store_messages( + pubsub_topic=self.test_pubsub_topic, + page_size=20, + ascending="true", + start_time=self.ts_pass[0]["value"] - 100000, + end_time=self.ts_pass[0]["value"] + 100000, + ) + assert len(store_response["messages"]) == 1, "Message count mismatch" + assert store_response["messages"][0]["messageHash"]["data"] == message_hash_list[0], "Incorrect messaged filtered based on time" + + def test_time_filter_matches_multiple_messages(self): + message_hash_list = [] + for timestamp in self.ts_pass: + message = self.create_message(timestamp=timestamp["value"]) + self.publish_message(message=message) + message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message)) + for node in self.store_nodes: + store_response = node.get_store_messages( + pubsub_topic=self.test_pubsub_topic, + page_size=20, + ascending="true", + start_time=self.ts_pass[0]["value"] - 100000, + end_time=self.ts_pass[4]["value"] + 100000, + ) + assert len(store_response["messages"]) == 5, "Message count mismatch" + for i in range(5): + assert ( + store_response["messages"][i]["messageHash"]["data"] == message_hash_list[i] + ), f"Incorrect messaged filtered based on time at index {i}" + + def test_time_filter_matches_no_message(self): + message_hash_list = [] + for timestamp in self.ts_pass: + message = self.create_message(timestamp=timestamp["value"]) + self.publish_message(message=message) + message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message)) + for node in self.store_nodes: + store_response = node.get_store_messages( + pubsub_topic=self.test_pubsub_topic, + page_size=20, + ascending="true", + start_time=self.ts_pass[0]["value"] - 100000, + end_time=self.ts_pass[0]["value"] - 100, + ) + assert len(store_response["messages"]) == 0, "Message count mismatch" + + def test_time_filter_start_time_equals_end_time(self): + message_hash_list = [] + for timestamp in self.ts_pass: + message = self.create_message(timestamp=timestamp["value"]) + self.publish_message(message=message) + message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message)) + for node in self.store_nodes: + store_response = node.get_store_messages( + pubsub_topic=self.test_pubsub_topic, + page_size=20, + ascending="true", + start_time=self.ts_pass[0]["value"], + end_time=self.ts_pass[0]["value"], + ) + assert len(store_response["messages"]) == 1, "Message count mismatch" + assert store_response["messages"][0]["messageHash"]["data"] == message_hash_list[0], "Incorrect messaged filtered based on time" diff --git a/tests/store/test_topics.py b/tests/store/test_topics.py new file mode 100644 index 00000000..25005669 --- /dev/null +++ b/tests/store/test_topics.py @@ -0,0 +1,79 @@ +import pytest +from src.steps.store import StepsStore +from src.test_data import CONTENT_TOPICS_DIFFERENT_SHARDS + + +class TestTopics(StepsStore): + @pytest.fixture(scope="function", autouse=True) + def topics_setup(self, node_setup): + self.message_hash_list = [] + for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS: + message = self.create_message(contentTopic=content_topic) + self.publish_message(message=message) + self.message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message)) + + def test_store_with_one_content_topic(self): + for node in self.store_nodes: + for index, content_topic in enumerate(CONTENT_TOPICS_DIFFERENT_SHARDS): + store_response = node.get_store_messages(content_topics=content_topic, page_size=20, ascending="true") + assert len(store_response["messages"]) == 1, "Message count mismatch" + assert ( + store_response["messages"][0]["messageHash"]["data"] == self.message_hash_list[index] + ), "Incorrect messaged filtered based on content topic" + + def test_store_with_multiple_content_topics(self): + for node in self.store_nodes: + store_response = node.get_store_messages( + content_topics=f"{CONTENT_TOPICS_DIFFERENT_SHARDS[0]},{CONTENT_TOPICS_DIFFERENT_SHARDS[4]}", page_size=20, ascending="true" + ) + assert len(store_response["messages"]) == 2, "Message count mismatch" + assert ( + store_response["messages"][0]["messageHash"]["data"] == self.message_hash_list[0] + ), "Incorrect messaged filtered based on multiple content topics" + assert ( + store_response["messages"][1]["messageHash"]["data"] == self.message_hash_list[4] + ), "Incorrect messaged filtered based on multiple content topics" + + def test_store_with_unknown_content_topic(self): + for node in self.store_nodes: + store_response = node.get_store_messages(content_topics="test", page_size=20, ascending="true") + assert len(store_response["messages"]) == 0, "Message count mismatch" + + def test_store_with_unknown_pubsub_topic(self): + for node in self.store_nodes: + store_response = node.get_store_messages(pubsub_topic="test", page_size=20, ascending="true") + assert len(store_response["messages"]) == 0, "Message count mismatch" + + def test_store_with_both_pubsub_topic_and_content_topic(self): + for node in self.store_nodes: + for index, content_topic in enumerate(CONTENT_TOPICS_DIFFERENT_SHARDS): + store_response = node.get_store_messages( + pubsub_topic=self.test_pubsub_topic, content_topics=content_topic, page_size=20, ascending="true" + ) + assert len(store_response["messages"]) == 1, "Message count mismatch" + assert ( + store_response["messages"][0]["messageHash"]["data"] == self.message_hash_list[index] + ), "Incorrect messaged filtered based on content topic" + + def test_store_with_unknown_pubsub_topic_but_known_content_topic(self): + for node in self.store_nodes: + store_response = node.get_store_messages( + pubsub_topic="test", content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS[0], page_size=20, ascending="true" + ) + assert len(store_response["messages"]) == 0, "Message count mismatch" + + def test_store_with_both_pubsub_topic_and_content_topic(self): + for node in self.store_nodes: + for index, content_topic in enumerate(CONTENT_TOPICS_DIFFERENT_SHARDS): + store_response = node.get_store_messages( + pubsub_topic=self.test_pubsub_topic, content_topics=content_topic, page_size=20, ascending="true" + ) + assert len(store_response["messages"]) == 1, "Message count mismatch" + assert ( + store_response["messages"][0]["messageHash"]["data"] == self.message_hash_list[index] + ), "Incorrect messaged filtered based on content topic" + + def test_store_without_pubsub_topic_and_content_topic(self): + for node in self.store_nodes: + store_response = node.get_store_messages(page_size=20, ascending="true") + assert len(store_response["messages"]) == len(CONTENT_TOPICS_DIFFERENT_SHARDS), "Message count mismatch"