From bd5c541d3f9696136761742ab5dcf8c5c80665d7 Mon Sep 17 00:00:00 2001 From: Florin Barbu Date: Fri, 17 May 2024 15:49:36 +0300 Subject: [PATCH] store tests --- src/node/api_clients/rest.py | 30 ++++++------ src/node/waku_node.py | 28 +++++------ src/steps/store.py | 54 ++++++++++++--------- tests/store/test_cursor.py | 13 +++++ tests/store/test_ephemeral.py | 28 +++++++++++ tests/store/test_get_messages.py | 66 +++++++++++++------------ tests/store/test_reliability.py | 81 +++++++++++++++---------------- tests/store/test_running_nodes.py | 32 ++++++------ tests/store/test_time_filter.py | 27 +++++++++++ 9 files changed, 217 insertions(+), 142 deletions(-) create mode 100644 tests/store/test_cursor.py create mode 100644 tests/store/test_ephemeral.py create mode 100644 tests/store/test_time_filter.py diff --git a/src/node/api_clients/rest.py b/src/node/api_clients/rest.py index c42566ec..72963ae1 100644 --- a/src/node/api_clients/rest.py +++ b/src/node/api_clients/rest.py @@ -93,29 +93,29 @@ class REST(BaseClient): return get_messages_response.json() def get_store_messages( - self, peerAddr, includeData, pubsubTopic, contentTopics, startTime, endTime, hashes, cursor, pageSize, ascending, store_v, **kwargs + self, peer_addr, include_data, pubsub_topic, content_topics, start_time, end_time, hashes, cursor, page_size, ascending, store_v, **kwargs ): base_url = f"store/{store_v}/messages" params = [] - if peerAddr is not None: - params.append(f"peerAddr={quote(peerAddr, safe='')}") - if includeData is not None: - params.append(f"includeData={includeData}") - if pubsubTopic is not None: - params.append(f"pubsubTopic={quote(pubsubTopic, safe='')}") - if contentTopics is not None: - params.append(f"contentTopics={quote(contentTopics, safe='')}") - if startTime is not None: - params.append(f"startTime={startTime}") - if endTime is not None: - params.append(f"endTime={endTime}") + if peer_addr is not None: + params.append(f"peerAddr={quote(peer_addr, safe='')}") + if include_data is not None: + params.append(f"includeData={include_data}") + if pubsub_topic is not None: + params.append(f"pubsubTopic={quote(pubsub_topic, safe='')}") + if content_topics is not None: + params.append(f"contentTopics={quote(content_topics, safe='')}") + if start_time is not None: + params.append(f"startTime={start_time}") + if end_time is not None: + params.append(f"endTime={end_time}") if hashes is not None: params.append(f"hashes={quote(hashes, safe='')}") if cursor is not None: params.append(f"cursor={quote(cursor, safe='')}") - if pageSize is not None: - params.append(f"pageSize={pageSize}") + if page_size is not None: + params.append(f"pageSize={page_size}") if ascending is not None: params.append(f"ascending={ascending}") diff --git a/src/node/waku_node.py b/src/node/waku_node.py index b2b1f560..a5cb1758 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -292,29 +292,29 @@ class WakuNode: def get_store_messages( self, - peerAddr=None, - includeData=None, - pubsubTopic=None, - contentTopics=None, - startTime=None, - endTime=None, + peer_addr=None, + include_data=None, + pubsub_topic=None, + content_topics=None, + start_time=None, + end_time=None, hashes=None, cursor=None, - pageSize=None, + page_size=None, ascending=None, store_v=None, **kwargs, ): return self._api.get_store_messages( - peerAddr=peerAddr, - includeData=includeData, - pubsubTopic=pubsubTopic, - contentTopics=contentTopics, - startTime=startTime, - endTime=endTime, + peer_addr=peer_addr, + include_data=include_data, + pubsub_topic=pubsub_topic, + content_topics=content_topics, + start_time=start_time, + end_time=end_time, hashes=hashes, cursor=cursor, - pageSize=pageSize, + page_size=page_size, ascending=ascending, store_v=store_v, **kwargs, diff --git a/src/steps/store.py b/src/steps/store.py index f9193525..333aad9c 100644 --- a/src/steps/store.py +++ b/src/steps/store.py @@ -29,6 +29,12 @@ class StepsStore(StepsCommon): self.optional_nodes = [] self.multiaddr_list = [] + @pytest.fixture(scope="function", autouse=False) + def node_setup(self, store_setup): + self.setup_first_publishing_node(store="true", relay="true") + self.setup_first_store_node(store="true", relay="true") + self.subscribe_to_pubsub_topics_via_relay() + @allure.step def start_publishing_node(self, image, node_index, **kwargs): node = WakuNode(image, f"publishing_node{node_index}_{self.test_id}") @@ -110,41 +116,45 @@ class StepsStore(StepsCommon): node.set_filter_subscriptions(subscription) @allure.step - def publish_message_via(self, type, pubsub_topic=None, message=None, message_propagation_delay=0.1, sender=None): + def publish_message(self, via="relay", pubsub_topic=None, message=None, message_propagation_delay=0.1, sender=None): self.message = self.create_message() if message is None else message if pubsub_topic is None: pubsub_topic = self.test_pubsub_topic if not sender: sender = self.publishing_node1 - if type == "relay": + if via == "relay": logger.debug("Relaying message") sender.send_relay_message(self.message, pubsub_topic) - elif type == "lightpush": + elif via == "lightpush": payload = self.create_payload(pubsub_topic, self.message) sender.send_light_push_message(payload) delay(message_propagation_delay) + return self.message @allure.step def check_published_message_is_stored( self, store_node=None, - peerAddr=None, - includeData=None, - pubsubTopic=None, - contentTopics=None, - startTime=None, - endTime=None, + peer_addr=None, + include_data=None, + pubsub_topic=None, + content_topics=None, + start_time=None, + end_time=None, hashes=None, cursor=None, - pageSize=None, + page_size=None, ascending=None, store_v="v3", + message_to_check=None, **kwargs, ): + if pubsub_topic is None: + pubsub_topic = self.test_pubsub_topic + if message_to_check is None: + message_to_check = self.message if store_node is None: store_node = self.store_nodes - if pubsubTopic is None: - pubsubTopic = self.test_pubsub_topic elif not isinstance(store_node, list): store_node = [store_node] else: @@ -152,15 +162,15 @@ class StepsStore(StepsCommon): for node in store_node: logger.debug(f"Checking that peer {node.image} can find the stored message") self.store_response = node.get_store_messages( - peerAddr=peerAddr, - includeData=includeData, - pubsubTopic=pubsubTopic, - contentTopics=contentTopics, - startTime=startTime, - endTime=endTime, + peer_addr=peer_addr, + include_data=include_data, + pubsub_topic=pubsub_topic, + content_topics=content_topics, + start_time=start_time, + end_time=end_time, hashes=hashes, cursor=cursor, - pageSize=pageSize, + page_size=page_size, ascending=ascending, store_v=store_v, **kwargs, @@ -174,9 +184,9 @@ class StepsStore(StepsCommon): self.store_response["messages"][store_message_index:], schema=MessageRpcResponseStore if node.is_nwaku() else MessageRpcResponse ) if store_v == "v1": - waku_message.assert_received_message(self.message) + waku_message.assert_received_message(message_to_check) else: - expected_hash = self.compute_message_hash(pubsubTopic, self.message) + expected_hash = self.compute_message_hash(pubsub_topic, message_to_check) assert ( expected_hash == self.store_response["messages"][store_message_index]["message_hash"]["data"] ), f"Message hash returned by store doesn't match the computed message hash {expected_hash}" @@ -186,7 +196,7 @@ class StepsStore(StepsCommon): if not pubsub_topic: pubsub_topic = self.test_pubsub_topic try: - self.check_published_message_is_stored(pubsubTopic=pubsub_topic, pageSize=5, ascending="true") + self.check_published_message_is_stored(pubsubTopic=pubsub_topic, page_size=5, ascending="true") except Exception as ex: assert "couldn't find any messages" in str(ex) diff --git a/tests/store/test_cursor.py b/tests/store/test_cursor.py new file mode 100644 index 00000000..0db71fc4 --- /dev/null +++ b/tests/store/test_cursor.py @@ -0,0 +1,13 @@ +import pytest +from src.libs.common import to_base64 +from src.steps.store import StepsStore + + +@pytest.mark.usefixtures("node_setup") +class TestCursor(StepsStore): + def test_get_multiple_2000_store_messages(self): + for i in range(110): + self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")), message_propagation_delay=0.001) + for node in self.store_nodes: + store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, page_size=50, ascending="true", store_v="v3") + print(len(store_response["messages"])) diff --git a/tests/store/test_ephemeral.py b/tests/store/test_ephemeral.py new file mode 100644 index 00000000..fb99372c --- /dev/null +++ b/tests/store/test_ephemeral.py @@ -0,0 +1,28 @@ +import pytest +from src.libs.custom_logger import get_custom_logger +from src.libs.common import to_base64 +from src.steps.store import StepsStore +from src.test_data import SAMPLE_INPUTS, VALID_PUBSUB_TOPICS + +logger = get_custom_logger(__name__) + + +@pytest.mark.usefixtures("node_setup") +class TestEphemeral(StepsStore): + def test_message_with_ephemeral_true(self): + self.publish_message(message=self.create_message(ephemeral=True)) + self.check_store_returns_empty_response() + + def test_message_with_ephemeral_false(self): + self.publish_message(message=self.create_message(ephemeral=False)) + self.check_published_message_is_stored(page_size=5, ascending="true") + + def test_message_with_both_ephemeral_true_and_false(self): + self.publish_message(message=self.create_message(ephemeral=True)) + stored = self.publish_message(message=self.create_message(ephemeral=False)) + self.check_published_message_is_stored(page_size=5, ascending="true", message_to_check=stored) + assert len(self.store_response["messages"]) == 1 + stored = self.publish_message(message=self.create_message(ephemeral=False)) + self.publish_message(message=self.create_message(ephemeral=True)) + self.check_published_message_is_stored(page_size=5, ascending="true", message_to_check=stored) + assert len(self.store_response["messages"]) == 2 diff --git a/tests/store/test_get_messages.py b/tests/store/test_get_messages.py index cf74e767..f15f2fbb 100644 --- a/tests/store/test_get_messages.py +++ b/tests/store/test_get_messages.py @@ -6,20 +6,15 @@ from src.test_data import SAMPLE_INPUTS, VALID_PUBSUB_TOPICS logger = get_custom_logger(__name__) -# TO DO test without pubsubtopic freezes +# TO DO test without pubsubtopic freezes +@pytest.mark.usefixtures("node_setup") class TestGetMessages(StepsStore): - @pytest.fixture(scope="function", autouse=True) - def node_setup(self, store_setup): - self.setup_first_publishing_node(store="true", relay="true") - self.setup_first_store_node(store="true", relay="true") - self.subscribe_to_pubsub_topics_via_relay() - # only one test for store v1, all other tests are using the new store v3 def test_legacy_store_v1(self): - self.publish_message_via("relay") - self.check_published_message_is_stored(pageSize=5, ascending="true", store_v="v1") + self.publish_message() + self.check_published_message_is_stored(page_size=5, ascending="true", store_v="v1") def test_get_store_messages_with_different_payloads(self): failed_payloads = [] @@ -27,24 +22,13 @@ class TestGetMessages(StepsStore): logger.debug(f'Running test with payload {payload["description"]}') message = self.create_message(payload=to_base64(payload["value"])) try: - self.publish_message_via("relay", message=message) - self.check_published_message_is_stored(pageSize=50, ascending="true") + self.publish_message(message=message) + self.check_published_message_is_stored(page_size=50, ascending="true") except Exception as e: logger.error(f'Payload {payload["description"]} failed: {str(e)}') failed_payloads.append(payload["description"]) assert not failed_payloads, f"Payloads failed: {failed_payloads}" - - def test_get_multiple_store_messages(self): - message_hash_list = [] - for payload in SAMPLE_INPUTS: - message = self.create_message(payload=to_base64(payload["value"])) - self.publish_message_via("relay", message=message) - message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message)) - for node in self.store_nodes: - store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=50, ascending="true", store_v="v3") - assert len(store_response["messages"]) == len(SAMPLE_INPUTS) - for index, message_hash in enumerate(store_response["messages"]): - assert message_hash["message_hash"]["data"] == message_hash_list[index], f"Message hash at index {index} doesn't match" + assert len(self.store_response["messages"]) == len(SAMPLE_INPUTS) def test_get_store_messages_with_different_content_topics(self): failed_content_topics = [] @@ -52,12 +36,13 @@ class TestGetMessages(StepsStore): logger.debug(f'Running test with content topic {content_topic["description"]}') message = self.create_message(contentTopic=content_topic["value"]) try: - self.publish_message_via("relay", message=message) - self.check_published_message_is_stored(pageSize=50, ascending="true") + self.publish_message(message=message) + self.check_published_message_is_stored(page_size=50, ascending="true") except Exception as e: logger.error(f'ContentTopic {content_topic["description"]} failed: {str(e)}') failed_content_topics.append(content_topic) assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}" + assert len(self.store_response["messages"]) == len(SAMPLE_INPUTS) def test_get_store_messages_with_different_pubsub_topics(self): self.subscribe_to_pubsub_topics_via_relay(pubsub_topics=VALID_PUBSUB_TOPICS) @@ -65,8 +50,8 @@ class TestGetMessages(StepsStore): for pubsub_topic in VALID_PUBSUB_TOPICS: logger.debug(f"Running test with pubsub topic {pubsub_topic}") try: - self.publish_message_via("relay", pubsub_topic=pubsub_topic) - self.check_published_message_is_stored(pubsubTopic=pubsub_topic, pageSize=50, ascending="true") + self.publish_message(pubsub_topic=pubsub_topic) + self.check_published_message_is_stored(pubsub_topic=pubsub_topic, page_size=50, ascending="true") except Exception as e: logger.error(f"PubsubTopic pubsub_topic failed: {str(e)}") failed_pubsub_topics.append(pubsub_topic) @@ -74,13 +59,30 @@ class TestGetMessages(StepsStore): def test_get_store_message_with_meta(self): message = self.create_message(meta=to_base64(self.test_payload)) - self.publish_message_via("relay", message=message) - self.check_published_message_is_stored(pageSize=5, ascending="true") + self.publish_message(message=message) + self.check_published_message_is_stored(page_size=5, ascending="true") + + def test_get_store_message_with_version(self): + message = self.create_message(version=10) + self.publish_message(message=message) + self.check_published_message_is_stored(page_size=5, ascending="true") def test_get_store_duplicate_messages(self): message = self.create_message() - self.publish_message_via("relay", message=message) - self.publish_message_via("relay", message=message) - self.check_published_message_is_stored(pageSize=5, ascending="true") + self.publish_message(message=message) + self.publish_message(message=message) + self.check_published_message_is_stored(page_size=5, ascending="true") # only one message is stored assert len(self.store_response["messages"]) == 1 + + def test_get_multiple_store_messages(self): + message_hash_list = [] + for payload in SAMPLE_INPUTS: + message = self.create_message(payload=to_base64(payload["value"])) + self.publish_message(message=message) + message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message)) + for node in self.store_nodes: + store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, page_size=50, ascending="true", store_v="v3") + assert len(store_response["messages"]) == len(SAMPLE_INPUTS) + for index, message_hash in enumerate(store_response["messages"]): + assert message_hash["message_hash"]["data"] == message_hash_list[index], f"Message hash at index {index} doesn't match" diff --git a/tests/store/test_reliability.py b/tests/store/test_reliability.py index 06a5f7da..5d0f5129 100644 --- a/tests/store/test_reliability.py +++ b/tests/store/test_reliability.py @@ -1,4 +1,3 @@ -import pytest from src.libs.custom_logger import get_custom_logger from src.libs.common import delay from src.steps.store import StepsStore @@ -7,102 +6,98 @@ logger = get_custom_logger(__name__) class TestReliability(StepsStore): - @pytest.fixture(scope="function", autouse=False) - def node_setup(self, store_setup): - self.setup_first_publishing_node(store="true", relay="true") - self.setup_first_store_node(store="true", relay="true") - self.subscribe_to_pubsub_topics_via_relay() - def test_publishing_node_is_stopped(self, node_setup): - self.publish_message_via("relay") - self.check_published_message_is_stored(pageSize=5, ascending="true") + self.publish_message() + self.check_published_message_is_stored(page_size=5, ascending="true") self.publishing_node1.stop() - store_response = self.store_node1.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3") + store_response = self.store_node1.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true", store_v="v3") assert len(store_response["messages"]) == 1 def test_publishing_node_restarts(self, node_setup): - self.publish_message_via("relay") - self.check_published_message_is_stored(pageSize=5, ascending="true") + self.publish_message() + self.check_published_message_is_stored(page_size=5, ascending="true") self.publishing_node1.restart() self.publishing_node1.ensure_ready() self.add_node_peer(self.store_node1, self.multiaddr_list) self.subscribe_to_pubsub_topics_via_relay(node=self.publishing_node1) - self.publish_message_via("relay") - self.check_published_message_is_stored(pageSize=5, ascending="true") + self.publish_message() + self.check_published_message_is_stored(page_size=5, ascending="true") for node in self.store_nodes: - store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3") + store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true", store_v="v3") assert len(store_response["messages"]) == 2 def test_store_node_restarts(self, node_setup): - self.publish_message_via("relay") - self.check_published_message_is_stored(pageSize=5, ascending="true") + self.publish_message() + self.check_published_message_is_stored(page_size=5, ascending="true") self.store_node1.restart() self.store_node1.ensure_ready() self.subscribe_to_pubsub_topics_via_relay(node=self.store_node1) - self.publish_message_via("relay") - self.check_published_message_is_stored(pageSize=5, ascending="true") + self.publish_message() + self.check_published_message_is_stored(page_size=5, ascending="true") for node in self.store_nodes: - store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3") + store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true", store_v="v3") assert len(store_response["messages"]) == 2 def test_publishing_node_paused_and_unpaused(self, node_setup): - self.publish_message_via("relay") - self.check_published_message_is_stored(pageSize=5, ascending="true") + self.publish_message() + self.check_published_message_is_stored(page_size=5, ascending="true") self.publishing_node1.pause() delay(1) self.publishing_node1.unpause() self.publishing_node1.ensure_ready() - self.publish_message_via("relay") - self.check_published_message_is_stored(pageSize=5, ascending="true") + self.publish_message() + self.check_published_message_is_stored(page_size=5, ascending="true") for node in self.store_nodes: - store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3") + store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true", store_v="v3") assert len(store_response["messages"]) == 2 def test_store_node_paused_and_unpaused(self, node_setup): - self.publish_message_via("relay") - self.check_published_message_is_stored(pageSize=5, ascending="true") + self.publish_message() + self.check_published_message_is_stored(page_size=5, ascending="true") self.store_node1.pause() delay(1) self.store_node1.unpause() self.store_node1.ensure_ready() - self.publish_message_via("relay") - self.check_published_message_is_stored(pageSize=5, ascending="true") + self.publish_message() + self.check_published_message_is_stored(page_size=5, ascending="true") for node in self.store_nodes: - store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3") + store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true", store_v="v3") assert len(store_response["messages"]) == 2 def test_message_relayed_while_store_node_is_paused(self, node_setup): - self.publish_message_via("relay") - self.check_published_message_is_stored(pageSize=5, ascending="true") + self.publish_message() + self.check_published_message_is_stored(page_size=5, ascending="true") self.store_node1.pause() - self.publish_message_via("relay") + self.publish_message() self.store_node1.unpause() self.store_node1.ensure_ready() - self.check_published_message_is_stored(pageSize=5, ascending="true") + self.check_published_message_is_stored(page_size=5, ascending="true") for node in self.store_nodes: - store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3") + store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true", store_v="v3") assert len(store_response["messages"]) == 2 def test_message_relayed_while_store_node_is_stopped(self, node_setup): - self.publish_message_via("relay") - self.check_published_message_is_stored(pageSize=5, ascending="true") + self.publish_message() + self.check_published_message_is_stored(page_size=5, ascending="true") self.store_node1.stop() - self.publish_message_via("relay") + self.publish_message() self.store_node1.start() self.store_node1.ensure_ready() self.add_node_peer(self.store_node1, self.multiaddr_list) self.subscribe_to_pubsub_topics_via_relay(node=self.store_node1) delay(1) - self.check_published_message_is_stored(pageSize=5, ascending="true") + self.check_published_message_is_stored(page_size=5, ascending="true") for node in self.store_nodes: - store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3") + store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true", store_v="v3") assert len(store_response["messages"]) == 2 def test_message_relayed_before_store_node_is_started(self): self.setup_first_publishing_node(store="true", relay="true") self.subscribe_to_pubsub_topics_via_relay() - self.publish_message_via("relay") + self.publish_message() self.setup_first_store_node(store="true", relay="true") + delay(10) self.subscribe_to_pubsub_topics_via_relay() - delay(1) - self.check_published_message_is_stored(pageSize=5, ascending="true") + self.add_node_peer(self.store_node1, self.multiaddr_list) + delay(10) + self.check_published_message_is_stored(page_size=5, ascending="true") diff --git a/tests/store/test_running_nodes.py b/tests/store/test_running_nodes.py index e233bbc0..7d6bc8bc 100644 --- a/tests/store/test_running_nodes.py +++ b/tests/store/test_running_nodes.py @@ -6,51 +6,51 @@ class TestRunningNodes(StepsStore): self.setup_first_publishing_node(store="true", relay="true") self.setup_first_store_node(store="true", relay="true") self.subscribe_to_pubsub_topics_via_relay() - self.publish_message_via("relay") - self.check_published_message_is_stored(pageSize=5, ascending="true") + self.publish_message() + self.check_published_message_is_stored(page_size=5, ascending="true") def test_main_node_relay_and_store__peer_only_store(self): self.setup_first_publishing_node(store="true", relay="true") self.setup_first_store_node(store="true", relay="false") self.subscribe_to_pubsub_topics_via_relay() - self.publish_message_via("relay") + self.publish_message() self.check_store_returns_empty_response() def test_main_node_relay_and_store__peer_only_relay(self): self.setup_first_publishing_node(store="true", relay="true") self.setup_first_store_node(store="false", relay="true") self.subscribe_to_pubsub_topics_via_relay() - self.publish_message_via("relay") - self.check_published_message_is_stored(pageSize=5, ascending="true") + self.publish_message() + self.check_published_message_is_stored(page_size=5, ascending="true") def test_main_node_relay_and_store__peer_neither_relay_nor_store(self): self.setup_first_publishing_node(store="true", relay="true") self.setup_first_store_node(store="false", relay="false") self.subscribe_to_pubsub_topics_via_relay() - self.publish_message_via("relay") - self.check_published_message_is_stored(pageSize=5, ascending="true") + self.publish_message() + self.check_published_message_is_stored(page_size=5, ascending="true") def test_main_node_only_relay__peer_relay_and_store(self): self.setup_first_publishing_node(store="false", relay="true") self.setup_first_store_node(store="true", relay="true") self.subscribe_to_pubsub_topics_via_relay() - self.publish_message_via("relay") - self.check_published_message_is_stored(pageSize=5, ascending="true") + self.publish_message() + self.check_published_message_is_stored(page_size=5, ascending="true") def test_main_node_only_relay__peer_only_store(self): self.setup_first_publishing_node(store="false", relay="true") self.setup_first_store_node(store="true", relay="false") self.subscribe_to_pubsub_topics_via_relay() - self.publish_message_via("relay") + self.publish_message() self.check_store_returns_empty_response() def test_main_node_only_relay__peer_only_relay(self): self.setup_first_publishing_node(store="false", relay="true") self.setup_first_store_node(store="false", relay="true") self.subscribe_to_pubsub_topics_via_relay() - self.publish_message_via("relay") + self.publish_message() try: - self.check_published_message_is_stored(pageSize=5, ascending="true") + self.check_published_message_is_stored(page_size=5, ascending="true") except Exception as ex: assert "failed to negotiate protocol: protocols not supported" in str(ex) or "PEER_DIAL_FAILURE" in str(ex) @@ -58,12 +58,12 @@ class TestRunningNodes(StepsStore): self.setup_first_publishing_node(store="true", relay="true", lightpush="true") self.setup_first_store_node(store="false", relay="false", lightpush="true", lightpushnode=self.multiaddr_list[0]) self.subscribe_to_pubsub_topics_via_relay() - self.publish_message_via("lightpush", sender=self.store_node1) - self.check_published_message_is_stored(pageSize=5, ascending="true") + self.publish_message(via="lightpush", sender=self.store_node1) + self.check_published_message_is_stored(page_size=5, ascending="true") def test_store_with_filter(self): self.setup_first_publishing_node(store="true", relay="true", filter="true") self.setup_first_store_node(store="false", relay="false", filter="true") self.subscribe_to_pubsub_topics_via_relay() - self.publish_message_via("relay") - self.check_published_message_is_stored(pageSize=5, ascending="true") + self.publish_message() + self.check_published_message_is_stored(page_size=5, ascending="true") diff --git a/tests/store/test_time_filter.py b/tests/store/test_time_filter.py new file mode 100644 index 00000000..8d67e419 --- /dev/null +++ b/tests/store/test_time_filter.py @@ -0,0 +1,27 @@ +import pytest +from datetime import timedelta, datetime +from src.libs.custom_logger import get_custom_logger +from src.steps.store import StepsStore + +logger = get_custom_logger(__name__) + + +@pytest.mark.usefixtures("node_setup") +class TestTimeFilter(StepsStore): + def test_messages_with_timestamps_close_to_now(self): + failed_timestamps = [] + sample_ts = [ + int((datetime.now() - timedelta(seconds=2)).timestamp() * 1e9), + int((datetime.now() + timedelta(seconds=2)).timestamp() * 1e9), + int((datetime.now() + timedelta(seconds=10)).timestamp() * 1e9), + ] + for timestamp in sample_ts: + logger.debug(f"Running test with timestamp {timestamp}") + message = self.create_message(timestamp=timestamp) + try: + self.publish_message(message=message) + self.check_published_message_is_stored(page_size=5, ascending="true") + except Exception as ex: + logger.error(f"Timestamp {timestamp} failed: {str(ex)}") + failed_timestamps.append(timestamp) + assert not failed_timestamps, f"Timestamps failed: {failed_timestamps}"