diff --git a/src/node/waku_node.py b/src/node/waku_node.py index eefb0782..99e565b0 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -121,6 +121,20 @@ class WakuNode: # "filter-subscription-timeout": "600", } + store_sync = kwargs.pop("store_sync", "false").lower() == "true" + store_sync_range = kwargs.pop("store_sync_range", None) + store_sync_interval = kwargs.pop("store_sync_interval", None) + + if store_sync: + default_args["store-sync"] = "true" + default_args.setdefault("store", "true") + + if store_sync_range: + default_args["store-sync-range"] = store_sync_range + + if store_sync_interval: + default_args["store-sync-interval"] = store_sync_interval + if self.is_nwaku(): nwaku_args = { "shard": "0", diff --git a/src/steps/common.py b/src/steps/common.py index 79c297e8..ccdfcf53 100644 --- a/src/steps/common.py +++ b/src/steps/common.py @@ -2,6 +2,7 @@ import base64 import hashlib import inspect from time import time +from time import time_ns import allure import pytest from datetime import timedelta, datetime @@ -39,7 +40,9 @@ class StepsCommon: @allure.step def create_message(self, **kwargs): - message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + ts_ns = time_ns() + ts_ns = int(f"{ts_ns:019d}") + message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": ts_ns} message.update(kwargs) return message diff --git a/src/steps/store.py b/src/steps/store.py index 9d6649bd..543db8c4 100644 --- a/src/steps/store.py +++ b/src/steps/store.py @@ -225,6 +225,7 @@ class StepsStore(StepsCommon): **kwargs, ) + logger.debug(f"messages length is {len(self.store_response.messages)}") assert self.store_response.messages, f"Peer {node.image} couldn't find any messages. Actual response: {self.store_response.resp_json}" assert len(self.store_response.messages) >= len( messages_to_check diff --git a/tests/store_sync/test_store_sync.py b/tests/store_sync/test_store_sync.py index 0a947517..042855ab 100644 --- a/tests/store_sync/test_store_sync.py +++ b/tests/store_sync/test_store_sync.py @@ -993,3 +993,99 @@ class TestStoreSync(StepsStore): resp3 = self.get_messages_from_store(self.node3, page_size=200, cursor="", ascending="true", peer_id="") logger.debug("Node3 store has %d messages expected %d", len(resp3.messages), msgs_to_publish) assert len(resp3.messages) == msgs_to_publish, f"Node3 store mismatch: expected {msgs_to_publish}, " f"got {len(resp3.messages)}" + + def test_continuous_store_sync(self): + msgs_per_round = 30 + rounds = 3 + sleep_between_rounds = 30 + publish_delay = 0.01 + sync_interval = 6 + sync_range = 600 + jitter = 0 + + self.node1.start( + store="true", + store_sync="true", + relay="true", + dns_discovery="false", + ) + + self.node2.start( + store="true", + store_sync="true", + store_sync_interval=sync_interval, + store_sync_range=sync_range, + store_sync_relay_jitter=jitter, + relay="false", + dns_discovery="false", + ) + + self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()]) + + total_published = 0 + for _ in range(rounds): + for _ in range(msgs_per_round): + self.publish_message(sender=self.node1, via="relay") + total_published += 1 + time.sleep(publish_delay) + + time.sleep(sync_interval * 2) + + resp = self.get_messages_from_store( + self.node2, + page_size=100, + cursor="", + ascending="true", + peer_id="", + ) + logger.debug(f"Node-2 store has {len(resp.messages)}/{total_published} messages") + assert len(resp.messages) == total_published, f"expected {total_published}, got {len(resp.messages)}" + + time.sleep(sleep_between_rounds) + + def test_store_sync_high_jitter_stress(self): + sync_interval = 10 + sync_range = 120 + jitter = 90 + msgs_per_node = 50 + message_delay = 0.0 + page_size = 100 + + nodes = [self.node1, self.node2, self.node3] + + for n in nodes: + n.start( + store="true", + store_sync="true", + store_sync_interval=sync_interval, + store_sync_range=sync_range, + store_sync_relay_jitter=jitter, + relay="true", + dns_discovery="false", + ) + n.set_relay_subscriptions([self.test_pubsub_topic]) + + for i, a in enumerate(nodes): + for b in nodes[i + 1 :]: + self.add_node_peer(a, [b.get_multiaddr_with_id()]) + self.add_node_peer(b, [a.get_multiaddr_with_id()]) + + expected_hashes = [] + for _ in range(msgs_per_node): + msgs = [self.create_message() for _ in nodes] + for node, msg in zip(nodes, msgs): + self.publish_message( + sender=node, + via="relay", + message=msg, + message_propagation_delay=message_delay, + ) + expected_hashes.append(self.compute_message_hash(self.test_pubsub_topic, msg, hash_type="hex")) + + delay(120) + + for node in nodes: + store_resp = self.get_messages_from_store(node, page_size=page_size, ascending="true") + retrieved_hashes = [store_resp.message_hash(i) for i in range(len(store_resp.messages))] + assert len(retrieved_hashes) == len(expected_hashes), " message count mismatch" + assert retrieved_hashes == expected_hashes, "{ message hash mismatch"