mirror of
https://github.com/logos-messaging/logos-messaging-interop-tests.git
synced 2026-01-04 06:53:07 +00:00
Add changes to enable sync (#125)
* Add changes to enable sync * Adding new test * Make changes to allow store-sync to work * Checkout the image with the bug fix * Ad change to make test work * Adding store-sync test scenarios * Adding more tests * Adding new set of tests * Adding extensive tests * Make PR ready for review * revert changes
This commit is contained in:
parent
a8ed2e8856
commit
06041040f3
@ -121,6 +121,20 @@ class WakuNode:
|
||||
# "filter-subscription-timeout": "600",
|
||||
}
|
||||
|
||||
store_sync = kwargs.pop("store_sync", "false").lower() == "true"
|
||||
store_sync_range = kwargs.pop("store_sync_range", None)
|
||||
store_sync_interval = kwargs.pop("store_sync_interval", None)
|
||||
|
||||
if store_sync:
|
||||
default_args["store-sync"] = "true"
|
||||
default_args.setdefault("store", "true")
|
||||
|
||||
if store_sync_range:
|
||||
default_args["store-sync-range"] = store_sync_range
|
||||
|
||||
if store_sync_interval:
|
||||
default_args["store-sync-interval"] = store_sync_interval
|
||||
|
||||
if self.is_nwaku():
|
||||
nwaku_args = {
|
||||
"shard": "0",
|
||||
|
||||
@ -2,6 +2,7 @@ import base64
|
||||
import hashlib
|
||||
import inspect
|
||||
from time import time
|
||||
from time import time_ns
|
||||
import allure
|
||||
import pytest
|
||||
from datetime import timedelta, datetime
|
||||
@ -39,7 +40,9 @@ class StepsCommon:
|
||||
|
||||
@allure.step
|
||||
def create_message(self, **kwargs):
|
||||
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
|
||||
ts_ns = time_ns()
|
||||
ts_ns = int(f"{ts_ns:019d}")
|
||||
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": ts_ns}
|
||||
message.update(kwargs)
|
||||
return message
|
||||
|
||||
|
||||
@ -225,6 +225,7 @@ class StepsStore(StepsCommon):
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
logger.debug(f"messages length is {len(self.store_response.messages)}")
|
||||
assert self.store_response.messages, f"Peer {node.image} couldn't find any messages. Actual response: {self.store_response.resp_json}"
|
||||
assert len(self.store_response.messages) >= len(
|
||||
messages_to_check
|
||||
|
||||
@ -5,6 +5,7 @@ from src.libs.custom_logger import get_custom_logger
|
||||
from src.node.store_response import StoreResponse
|
||||
from src.node.waku_node import WakuNode
|
||||
from src.steps.store import StepsStore
|
||||
import time
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
@ -483,3 +484,284 @@ class TestStoreSync(StepsStore):
|
||||
assert (
|
||||
node1_message == node2_message == node3_message == self.num_messages
|
||||
), f"Store messages are not equal to each other or not equal to {self.num_messages}"
|
||||
|
||||
def test_sync_flags_no_relay_2nodes(self):
|
||||
self.node1.start(
|
||||
store="true",
|
||||
store_sync="true",
|
||||
store_sync_interval=10,
|
||||
store_sync_range=45,
|
||||
store_sync_relay_jitter=0,
|
||||
relay="true",
|
||||
)
|
||||
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
||||
|
||||
self.node2.start(
|
||||
store="true",
|
||||
store_sync="true",
|
||||
store_sync_interval=10,
|
||||
store_sync_range=45,
|
||||
store_sync_relay_jitter=0,
|
||||
relay="false",
|
||||
discv5_bootstrap_node=self.node1.get_enr_uri(),
|
||||
)
|
||||
|
||||
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
||||
delay(1)
|
||||
|
||||
message_list = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)]
|
||||
|
||||
delay(20) # wait for the sync to finish
|
||||
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=message_list)
|
||||
|
||||
def test_sync_flags_node2_start_later(self):
|
||||
self.node1.start(
|
||||
store="true",
|
||||
store_sync="true",
|
||||
store_sync_interval=10,
|
||||
store_sync_range=45,
|
||||
store_sync_relay_jitter=0,
|
||||
relay="true",
|
||||
)
|
||||
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
||||
|
||||
message_list = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)]
|
||||
delay(1)
|
||||
self.node2.start(
|
||||
store="true",
|
||||
store_sync="true",
|
||||
store_sync_interval=10,
|
||||
store_sync_range=45,
|
||||
store_sync_relay_jitter=0,
|
||||
relay="false",
|
||||
discv5_bootstrap_node=self.node1.get_enr_uri(),
|
||||
)
|
||||
|
||||
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
||||
|
||||
delay(65)
|
||||
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=message_list)
|
||||
|
||||
def test_store_sync_indirect_node(self):
|
||||
self.node1.start(
|
||||
store="true",
|
||||
store_sync="true",
|
||||
store_sync_interval=10,
|
||||
store_sync_range=45,
|
||||
store_sync_relay_jitter=0,
|
||||
relay="true",
|
||||
)
|
||||
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
||||
|
||||
self.node2.start(
|
||||
store="true",
|
||||
store_sync="true",
|
||||
store_sync_interval=10,
|
||||
store_sync_range=45,
|
||||
store_sync_relay_jitter=0,
|
||||
relay="false",
|
||||
discv5_bootstrap_node=self.node1.get_enr_uri(),
|
||||
)
|
||||
|
||||
self.node3.start(
|
||||
store="true",
|
||||
store_sync="true",
|
||||
store_sync_interval=10,
|
||||
store_sync_range=45,
|
||||
store_sync_relay_jitter=0,
|
||||
relay="false",
|
||||
dns_discovery="false",
|
||||
discv5_bootstrap_node=self.node2.get_enr_uri(),
|
||||
)
|
||||
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
||||
message_list = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)]
|
||||
delay(65)
|
||||
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=message_list)
|
||||
|
||||
def test_store_sync_long_chain(self):
|
||||
sync_interval = 10
|
||||
sync_range = 120
|
||||
self.node1.start(
|
||||
store="true",
|
||||
store_sync="true",
|
||||
store_sync_interval=sync_interval,
|
||||
store_sync_range=sync_range,
|
||||
store_sync_relay_jitter=0,
|
||||
relay="true",
|
||||
dns_discovery="false",
|
||||
)
|
||||
|
||||
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
||||
|
||||
self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}")
|
||||
self.node5 = WakuNode(NODE_2, f"node5_{self.test_id}")
|
||||
self.node6 = WakuNode(NODE_2, f"node6_{self.test_id}")
|
||||
self.node7 = WakuNode(NODE_2, f"node7_{self.test_id}")
|
||||
self.node8 = WakuNode(NODE_2, f"node8_{self.test_id}")
|
||||
|
||||
extra_nodes = [
|
||||
self.node2,
|
||||
self.node3,
|
||||
self.node4,
|
||||
self.node5,
|
||||
self.node6,
|
||||
self.node7,
|
||||
self.node8,
|
||||
]
|
||||
prev = self.node1
|
||||
for node in extra_nodes:
|
||||
node.start(
|
||||
store="true",
|
||||
store_sync="true",
|
||||
store_sync_interval=sync_interval,
|
||||
store_sync_range=sync_range,
|
||||
store_sync_relay_jitter=0,
|
||||
relay="false",
|
||||
dns_discovery="false",
|
||||
)
|
||||
|
||||
self.add_node_peer(node, [prev.get_multiaddr_with_id()])
|
||||
prev = node
|
||||
|
||||
published = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)]
|
||||
delay(sync_interval * 7 + 20)
|
||||
self.check_published_message_is_stored(
|
||||
page_size=100,
|
||||
ascending="true",
|
||||
store_node=self.node8,
|
||||
messages_to_check=published,
|
||||
)
|
||||
|
||||
def test_store_sync_overlap_sync_window(self):
|
||||
sync_interval = 15
|
||||
sync_range = 45
|
||||
intervals = 6
|
||||
publish_secs = sync_interval * intervals
|
||||
|
||||
self.node1.start(relay="true", store="true", store_sync="true", dns_discovery="false")
|
||||
|
||||
self.node2.start(
|
||||
relay="false",
|
||||
store="true",
|
||||
store_sync="true",
|
||||
store_sync_interval=sync_interval,
|
||||
store_sync_range=sync_range,
|
||||
store_sync_relay_jitter=0,
|
||||
dns_discovery="false",
|
||||
discv5_bootstrap_node=self.node1.get_enr_uri(),
|
||||
)
|
||||
|
||||
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
||||
|
||||
logger.debug(f"Publishing {publish_secs} messages at 1 msg/s")
|
||||
published_hashes = []
|
||||
for i in range(publish_secs):
|
||||
msg = self.publish_message(sender=self.node1, via="relay")
|
||||
published_hashes.append(self.compute_message_hash(self.test_pubsub_topic, msg, hash_type="hex"))
|
||||
delay(0.8)
|
||||
|
||||
logger.debug(f"Waiting {sync_interval * 2} seconds to allow at least two sync rounds")
|
||||
delay(sync_interval * 2)
|
||||
|
||||
logger.debug("Querying node2 store for all messages")
|
||||
resp = self.get_messages_from_store(self.node2, page_size=2000, ascending="true")
|
||||
store_hashes = [resp.message_hash(i) for i in range(len(resp.messages))]
|
||||
|
||||
logger.debug(f"Store returned {len(store_hashes)} messages, published publish_secs" f" messages")
|
||||
|
||||
assert len(set(store_hashes)) == publish_secs
|
||||
assert set(store_hashes) == set(published_hashes)
|
||||
|
||||
@pytest.mark.timeout(60 * 20)
|
||||
def test_query_after_long_time(self):
|
||||
sync_range = 120
|
||||
backlog_secs = 10 * 60
|
||||
publish_delay = 0.8
|
||||
sync_interval = 10
|
||||
|
||||
self.node1.start(store="true", relay="true", store_sync="true", dns_discovery="false")
|
||||
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
||||
|
||||
logger.debug(f"Publishing {backlog_secs} messages at 1 msg/s to build backlog")
|
||||
published_hashes = []
|
||||
for _ in range(backlog_secs):
|
||||
msg = self.publish_message(sender=self.node1, via="relay")
|
||||
published_hashes.append(self.compute_message_hash(self.test_pubsub_topic, msg, hash_type="hex"))
|
||||
delay(publish_delay)
|
||||
|
||||
expected_hashes = published_hashes[-sync_range:]
|
||||
|
||||
self.node2.start(
|
||||
store="true",
|
||||
store_sync="true",
|
||||
store_sync_interval=sync_interval,
|
||||
store_sync_range=sync_range,
|
||||
store_sync_relay_jitter=0,
|
||||
relay="false",
|
||||
dns_discovery="false",
|
||||
discv5_bootstrap_node=self.node1.get_enr_uri(),
|
||||
)
|
||||
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
||||
|
||||
logger.debug(f"Waiting {sync_interval * 2} s to let Node B finish its first sync")
|
||||
delay(sync_interval * 4)
|
||||
|
||||
store_response = StoreResponse({"paginationCursor": "", "pagination_cursor": ""}, self.node2)
|
||||
store_hashes = []
|
||||
while store_response.pagination_cursor is not None:
|
||||
cursor = store_response.pagination_cursor
|
||||
store_response = self.get_messages_from_store(self.node2, page_size=100, cursor=cursor, ascending="true")
|
||||
for idx in range(len(store_response.messages)):
|
||||
store_hashes.append(store_response.message_hash(idx))
|
||||
|
||||
logger.debug(f"Store returned {len(store_hashes)} messages; expected {len(expected_hashes)}")
|
||||
assert len(store_hashes) == len(expected_hashes), "Incorrect number of messages synced"
|
||||
assert set(store_hashes) == set(expected_hashes), "Node B synced wrong message set"
|
||||
|
||||
@pytest.mark.timeout(60 * 3)
|
||||
def test_store_sync_after_partition_under_100_msgs(self):
|
||||
sync_interval = 10
|
||||
sync_range = 180
|
||||
node2up_secs = 20
|
||||
node2down_secs = 60
|
||||
publish_delay = 0.8
|
||||
total_expected = node2up_secs + node2down_secs
|
||||
|
||||
self.node1.start(store="true", relay="true", store_sync="true", dns_discovery="false")
|
||||
self.node2.start(
|
||||
store="true",
|
||||
store_sync="true",
|
||||
store_sync_interval=sync_interval,
|
||||
store_sync_range=sync_range,
|
||||
store_sync_relay_jitter=0,
|
||||
relay="false",
|
||||
dns_discovery="false",
|
||||
discv5_bootstrap_node=self.node1.get_enr_uri(),
|
||||
)
|
||||
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
||||
|
||||
published_hashes = []
|
||||
for _ in range(node2up_secs):
|
||||
m = self.publish_message(sender=self.node1, via="relay")
|
||||
published_hashes.append(self.compute_message_hash(self.test_pubsub_topic, m, hash_type="hex"))
|
||||
delay(publish_delay)
|
||||
|
||||
logger.debug("Pausing Node2 container ")
|
||||
self.node2.pause()
|
||||
|
||||
logger.debug("Publishing while node2 paused ")
|
||||
for _ in range(node2down_secs):
|
||||
m = self.publish_message(sender=self.node1, via="relay")
|
||||
published_hashes.append(self.compute_message_hash(self.test_pubsub_topic, m, hash_type="hex"))
|
||||
delay(publish_delay)
|
||||
|
||||
logger.debug("Unpausing Node2")
|
||||
self.node2.unpause()
|
||||
delay(sync_interval * 2)
|
||||
|
||||
resp = self.get_messages_from_store(self.node2, ascending="true", page_size=100)
|
||||
store_hashes = [resp.message_hash(i) for i in range(len(resp.messages))]
|
||||
|
||||
logger.debug(f"Node2 store has {len(store_hashes)} messages; expected {total_expected}")
|
||||
assert len(store_hashes) == total_expected, "Message count mismatch after partition"
|
||||
assert set(store_hashes) == set(published_hashes), "Missing or extra messages after sync"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user