waku-interop-tests/tests/sharding/test_relay_auto_sharding.py
Roman Zajic 0ce2572810
chore: refactor setup relay node for sharding (#48)
* fix: refactor setup relay for sharding

* fix: refactor also all related to _resolve_sharding_flags
2024-07-02 09:33:53 +02:00

132 lines
8.5 KiB
Python

import pytest
from src.env_vars import NODE_1, NODE_2
from src.libs.common import delay, to_base64
from src.libs.custom_logger import get_custom_logger
from src.steps.relay import StepsRelay
from src.steps.sharding import StepsSharding
from src.test_data import CONTENT_TOPICS_DIFFERENT_SHARDS, CONTENT_TOPICS_SHARD_0
logger = get_custom_logger(__name__)
@pytest.mark.skipif(
"go-waku" in NODE_1 or "go-waku" in NODE_2,
reason="Autosharding tests work only on nwaku because of https://github.com/waku-org/go-waku/issues/1061",
)
class TestRelayAutosharding(StepsSharding):
def test_publish_without_subscribing_via_api_works(self):
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
for node in self.main_nodes:
self.relay_message(node, self.create_message(contentTopic=self.test_content_topic))
def test_retrieve_messages_without_subscribing_via_api(self):
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
try:
self.check_published_message_reaches_relay_peer(content_topic=self.test_content_topic)
if self.node2.is_nwaku():
pass
else:
raise AssertionError("Retrieving messages without subscribing worked!!!")
except Exception as ex:
assert "Not Found" in str(ex)
def test_subscribe_and_publish_on_another_content_topic_from_same_shard(self):
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
self.subscribe_main_relay_nodes(content_topics=["/newsService/1.0/weekly/protobuf"])
self.check_published_message_reaches_relay_peer(content_topic="/newsService/1.0/weekly/protobuf")
self.check_published_message_reaches_relay_peer(content_topic=self.test_content_topic)
def test_subscribe_and_publish_on_another_content_topic_from_another_shard(self):
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
self.subscribe_main_relay_nodes(content_topics=["/toychat/2/huilong/proto"])
self.check_published_message_reaches_relay_peer(content_topic="/toychat/2/huilong/proto")
self.check_published_message_reaches_relay_peer(content_topic=self.test_content_topic)
def test_publish_on_not_subscribed_content_topic_works(self):
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
self.subscribe_main_relay_nodes(content_topics=[self.test_content_topic])
for node in self.main_nodes:
self.relay_message(node, self.create_message(contentTopic="/toychat/2/huilong/proto"))
def test_cant_retrieve_messages_on_not_subscribed_content_topic(self):
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
self.subscribe_main_relay_nodes(content_topics=[self.test_content_topic])
self.check_published_message_doesnt_reach_relay_peer(content_topic="/toychat/2/huilong/proto")
@pytest.mark.parametrize("content_topic_list", [CONTENT_TOPICS_SHARD_0, CONTENT_TOPICS_DIFFERENT_SHARDS])
def test_subscribe_via_api_to_new_content_topics(self, content_topic_list):
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=content_topic_list[:1])
self.subscribe_main_relay_nodes(content_topics=content_topic_list[1:])
for content_topic in content_topic_list[1:]:
self.check_published_message_reaches_relay_peer(content_topic=content_topic)
def test_subscribe_one_by_one_to_different_content_topics_and_send_messages(self):
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS + CONTENT_TOPICS_SHARD_0:
self.subscribe_main_relay_nodes(content_topics=[content_topic])
self.check_published_message_reaches_relay_peer(content_topic=content_topic)
@pytest.mark.parametrize("content_topic_list", [CONTENT_TOPICS_SHARD_0, CONTENT_TOPICS_DIFFERENT_SHARDS])
def test_unsubscribe_from_some_content_topics(self, content_topic_list):
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
self.subscribe_main_relay_nodes(content_topics=content_topic_list)
for content_topic in content_topic_list:
self.check_published_message_reaches_relay_peer(content_topic=content_topic)
self.unsubscribe_main_relay_nodes(content_topics=content_topic_list[:3])
for content_topic in content_topic_list[:3]:
self.check_published_message_doesnt_reach_relay_peer(content_topic=content_topic)
for content_topic in content_topic_list[3:]:
self.check_published_message_reaches_relay_peer(content_topic=content_topic)
def test_unsubscribe_from_all_content_topics(self):
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
self.subscribe_main_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
self.check_published_message_reaches_relay_peer(content_topic=content_topic)
self.unsubscribe_main_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
self.check_published_message_doesnt_reach_relay_peer(content_topic=content_topic)
def test_unsubscribe_from_all_content_topics_one_by_one(self):
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
self.subscribe_main_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
self.check_published_message_reaches_relay_peer(content_topic=content_topic)
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
self.unsubscribe_main_relay_nodes(content_topics=[content_topic])
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
self.check_published_message_doesnt_reach_relay_peer(content_topic=content_topic)
def test_resubscribe_to_unsubscribed_content_topics(self):
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
self.subscribe_main_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
self.check_published_message_reaches_relay_peer(content_topic=content_topic)
self.unsubscribe_main_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
self.check_published_message_doesnt_reach_relay_peer(content_topic=content_topic)
self.subscribe_main_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
self.check_published_message_reaches_relay_peer(content_topic=content_topic)
def test_unsubscribe_from_non_subscribed_content_topics(self):
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
self.unsubscribe_main_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
self.check_published_message_doesnt_reach_relay_peer(content_topic=content_topic)
@pytest.mark.parametrize("content_topic_list", [CONTENT_TOPICS_SHARD_0, CONTENT_TOPICS_DIFFERENT_SHARDS])
def test_publish_on_multiple_content_topics_and_only_after_fetch_them(self, content_topic_list):
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
self.subscribe_main_relay_nodes(content_topics=content_topic_list)
for content_topic in content_topic_list:
self.relay_message(self.node1, self.create_message(payload=to_base64(content_topic), contentTopic=content_topic))
delay(0.1)
for content_topic in content_topic_list:
get_messages_response = self.retrieve_relay_message(self.node2, content_topic=content_topic)
assert get_messages_response, f"Peer NODE_2 couldn't find any messages"
assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}"
assert get_messages_response[0]["contentTopic"] == content_topic
assert get_messages_response[0]["payload"] == to_base64(content_topic)