diff --git a/src/node/waku_node.py b/src/node/waku_node.py index eefb0782..99e565b0 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -121,6 +121,20 @@ class WakuNode: # "filter-subscription-timeout": "600", } + store_sync = kwargs.pop("store_sync", "false").lower() == "true" + store_sync_range = kwargs.pop("store_sync_range", None) + store_sync_interval = kwargs.pop("store_sync_interval", None) + + if store_sync: + default_args["store-sync"] = "true" + default_args.setdefault("store", "true") + + if store_sync_range: + default_args["store-sync-range"] = store_sync_range + + if store_sync_interval: + default_args["store-sync-interval"] = store_sync_interval + if self.is_nwaku(): nwaku_args = { "shard": "0", diff --git a/src/steps/common.py b/src/steps/common.py index 79c297e8..ccdfcf53 100644 --- a/src/steps/common.py +++ b/src/steps/common.py @@ -2,6 +2,7 @@ import base64 import hashlib import inspect from time import time +from time import time_ns import allure import pytest from datetime import timedelta, datetime @@ -39,7 +40,9 @@ class StepsCommon: @allure.step def create_message(self, **kwargs): - message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + ts_ns = time_ns() + ts_ns = int(f"{ts_ns:019d}") + message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": ts_ns} message.update(kwargs) return message diff --git a/src/steps/store.py b/src/steps/store.py index 9d6649bd..543db8c4 100644 --- a/src/steps/store.py +++ b/src/steps/store.py @@ -225,6 +225,7 @@ class StepsStore(StepsCommon): **kwargs, ) + logger.debug(f"messages length is {len(self.store_response.messages)}") assert self.store_response.messages, f"Peer {node.image} couldn't find any messages. Actual response: {self.store_response.resp_json}" assert len(self.store_response.messages) >= len( messages_to_check diff --git a/tests/store_sync/test_store_sync.py b/tests/store_sync/test_store_sync.py index ac69feab..48c2cd70 100644 --- a/tests/store_sync/test_store_sync.py +++ b/tests/store_sync/test_store_sync.py @@ -5,6 +5,7 @@ from src.libs.custom_logger import get_custom_logger from src.node.store_response import StoreResponse from src.node.waku_node import WakuNode from src.steps.store import StepsStore +import time logger = get_custom_logger(__name__) @@ -483,3 +484,284 @@ class TestStoreSync(StepsStore): assert ( node1_message == node2_message == node3_message == self.num_messages ), f"Store messages are not equal to each other or not equal to {self.num_messages}" + + def test_sync_flags_no_relay_2nodes(self): + self.node1.start( + store="true", + store_sync="true", + store_sync_interval=10, + store_sync_range=45, + store_sync_relay_jitter=0, + relay="true", + ) + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + + self.node2.start( + store="true", + store_sync="true", + store_sync_interval=10, + store_sync_range=45, + store_sync_relay_jitter=0, + relay="false", + discv5_bootstrap_node=self.node1.get_enr_uri(), + ) + + self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()]) + delay(1) + + message_list = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)] + + delay(20) # wait for the sync to finish + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=message_list) + + def test_sync_flags_node2_start_later(self): + self.node1.start( + store="true", + store_sync="true", + store_sync_interval=10, + store_sync_range=45, + store_sync_relay_jitter=0, + relay="true", + ) + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + + message_list = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)] + delay(1) + self.node2.start( + store="true", + store_sync="true", + store_sync_interval=10, + store_sync_range=45, + store_sync_relay_jitter=0, + relay="false", + discv5_bootstrap_node=self.node1.get_enr_uri(), + ) + + self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()]) + + delay(65) + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=message_list) + + def test_store_sync_indirect_node(self): + self.node1.start( + store="true", + store_sync="true", + store_sync_interval=10, + store_sync_range=45, + store_sync_relay_jitter=0, + relay="true", + ) + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + + self.node2.start( + store="true", + store_sync="true", + store_sync_interval=10, + store_sync_range=45, + store_sync_relay_jitter=0, + relay="false", + discv5_bootstrap_node=self.node1.get_enr_uri(), + ) + + self.node3.start( + store="true", + store_sync="true", + store_sync_interval=10, + store_sync_range=45, + store_sync_relay_jitter=0, + relay="false", + dns_discovery="false", + discv5_bootstrap_node=self.node2.get_enr_uri(), + ) + self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()]) + message_list = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)] + delay(65) + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=message_list) + + def test_store_sync_long_chain(self): + sync_interval = 10 + sync_range = 120 + self.node1.start( + store="true", + store_sync="true", + store_sync_interval=sync_interval, + store_sync_range=sync_range, + store_sync_relay_jitter=0, + relay="true", + dns_discovery="false", + ) + + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + self.node5 = WakuNode(NODE_2, f"node5_{self.test_id}") + self.node6 = WakuNode(NODE_2, f"node6_{self.test_id}") + self.node7 = WakuNode(NODE_2, f"node7_{self.test_id}") + self.node8 = WakuNode(NODE_2, f"node8_{self.test_id}") + + extra_nodes = [ + self.node2, + self.node3, + self.node4, + self.node5, + self.node6, + self.node7, + self.node8, + ] + prev = self.node1 + for node in extra_nodes: + node.start( + store="true", + store_sync="true", + store_sync_interval=sync_interval, + store_sync_range=sync_range, + store_sync_relay_jitter=0, + relay="false", + dns_discovery="false", + ) + + self.add_node_peer(node, [prev.get_multiaddr_with_id()]) + prev = node + + published = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)] + delay(sync_interval * 7 + 20) + self.check_published_message_is_stored( + page_size=100, + ascending="true", + store_node=self.node8, + messages_to_check=published, + ) + + def test_store_sync_overlap_sync_window(self): + sync_interval = 15 + sync_range = 45 + intervals = 6 + publish_secs = sync_interval * intervals + + self.node1.start(relay="true", store="true", store_sync="true", dns_discovery="false") + + self.node2.start( + relay="false", + store="true", + store_sync="true", + store_sync_interval=sync_interval, + store_sync_range=sync_range, + store_sync_relay_jitter=0, + dns_discovery="false", + discv5_bootstrap_node=self.node1.get_enr_uri(), + ) + + self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()]) + + logger.debug(f"Publishing {publish_secs} messages at 1 msg/s") + published_hashes = [] + for i in range(publish_secs): + msg = self.publish_message(sender=self.node1, via="relay") + published_hashes.append(self.compute_message_hash(self.test_pubsub_topic, msg, hash_type="hex")) + delay(0.8) + + logger.debug(f"Waiting {sync_interval * 2} seconds to allow at least two sync rounds") + delay(sync_interval * 2) + + logger.debug("Querying node2 store for all messages") + resp = self.get_messages_from_store(self.node2, page_size=2000, ascending="true") + store_hashes = [resp.message_hash(i) for i in range(len(resp.messages))] + + logger.debug(f"Store returned {len(store_hashes)} messages, published publish_secs" f" messages") + + assert len(set(store_hashes)) == publish_secs + assert set(store_hashes) == set(published_hashes) + + @pytest.mark.timeout(60 * 20) + def test_query_after_long_time(self): + sync_range = 120 + backlog_secs = 10 * 60 + publish_delay = 0.8 + sync_interval = 10 + + self.node1.start(store="true", relay="true", store_sync="true", dns_discovery="false") + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + + logger.debug(f"Publishing {backlog_secs} messages at 1 msg/s to build backlog") + published_hashes = [] + for _ in range(backlog_secs): + msg = self.publish_message(sender=self.node1, via="relay") + published_hashes.append(self.compute_message_hash(self.test_pubsub_topic, msg, hash_type="hex")) + delay(publish_delay) + + expected_hashes = published_hashes[-sync_range:] + + self.node2.start( + store="true", + store_sync="true", + store_sync_interval=sync_interval, + store_sync_range=sync_range, + store_sync_relay_jitter=0, + relay="false", + dns_discovery="false", + discv5_bootstrap_node=self.node1.get_enr_uri(), + ) + self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()]) + + logger.debug(f"Waiting {sync_interval * 2} s to let Node B finish its first sync") + delay(sync_interval * 4) + + store_response = StoreResponse({"paginationCursor": "", "pagination_cursor": ""}, self.node2) + store_hashes = [] + while store_response.pagination_cursor is not None: + cursor = store_response.pagination_cursor + store_response = self.get_messages_from_store(self.node2, page_size=100, cursor=cursor, ascending="true") + for idx in range(len(store_response.messages)): + store_hashes.append(store_response.message_hash(idx)) + + logger.debug(f"Store returned {len(store_hashes)} messages; expected {len(expected_hashes)}") + assert len(store_hashes) == len(expected_hashes), "Incorrect number of messages synced" + assert set(store_hashes) == set(expected_hashes), "Node B synced wrong message set" + + @pytest.mark.timeout(60 * 3) + def test_store_sync_after_partition_under_100_msgs(self): + sync_interval = 10 + sync_range = 180 + node2up_secs = 20 + node2down_secs = 60 + publish_delay = 0.8 + total_expected = node2up_secs + node2down_secs + + self.node1.start(store="true", relay="true", store_sync="true", dns_discovery="false") + self.node2.start( + store="true", + store_sync="true", + store_sync_interval=sync_interval, + store_sync_range=sync_range, + store_sync_relay_jitter=0, + relay="false", + dns_discovery="false", + discv5_bootstrap_node=self.node1.get_enr_uri(), + ) + self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()]) + + published_hashes = [] + for _ in range(node2up_secs): + m = self.publish_message(sender=self.node1, via="relay") + published_hashes.append(self.compute_message_hash(self.test_pubsub_topic, m, hash_type="hex")) + delay(publish_delay) + + logger.debug("Pausing Node2 container ") + self.node2.pause() + + logger.debug("Publishing while node2 paused ") + for _ in range(node2down_secs): + m = self.publish_message(sender=self.node1, via="relay") + published_hashes.append(self.compute_message_hash(self.test_pubsub_topic, m, hash_type="hex")) + delay(publish_delay) + + logger.debug("Unpausing Node2") + self.node2.unpause() + delay(sync_interval * 2) + + resp = self.get_messages_from_store(self.node2, ascending="true", page_size=100) + store_hashes = [resp.message_hash(i) for i in range(len(resp.messages))] + + logger.debug(f"Node2 store has {len(store_hashes)} messages; expected {total_expected}") + assert len(store_hashes) == total_expected, "Message count mismatch after partition" + assert set(store_hashes) == set(published_hashes), "Missing or extra messages after sync"