From 0ce257281033b807c098e7d5d93243b59eafdd1b Mon Sep 17 00:00:00 2001 From: Roman Zajic Date: Tue, 2 Jul 2024 09:33:53 +0200 Subject: [PATCH] chore: refactor setup relay node for sharding (#48) * fix: refactor setup relay for sharding * fix: refactor also all related to _resolve_sharding_flags --- src/node/waku_node.py | 16 +++++ src/steps/sharding.py | 70 +++++-------------- tests/sharding/test_filter.py | 12 ++-- tests/sharding/test_multiple_nodes.py | 5 +- tests/sharding/test_relay_auto_sharding.py | 5 +- tests/sharding/test_relay_static_sharding.py | 2 +- .../test_running_nodes_auto_sharding.py | 4 +- .../test_running_nodes_static_sharding.py | 7 +- 8 files changed, 56 insertions(+), 65 deletions(-) diff --git a/src/node/waku_node.py b/src/node/waku_node.py index 20c7e45f..de518821 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -56,6 +56,21 @@ def multiaddr2id(multiaddr): return multiaddr.split("/")[-1] +def resolve_sharding_flags(kwargs): + if "pubsub_topic" in kwargs: + pubsub_topic = kwargs["pubsub_topic"] + if not "cluster_id" in kwargs: + try: + if isinstance(pubsub_topic, list): + pubsub_topic = pubsub_topic[0] + cluster_id = pubsub_topic.split("/")[4] + logger.debug(f"Cluster id was resolved to: {cluster_id}") + kwargs["cluster_id"] = cluster_id + except Exception as ex: + raise Exception("Could not resolve cluster_id from pubsub_topic") + return kwargs + + class WakuNode: def __init__(self, docker_image, docker_log_prefix=""): self._image_name = docker_image @@ -124,6 +139,7 @@ class WakuNode: remove_container = True kwargs = self.parse_peer_persistence_config(kwargs) + kwargs = resolve_sharding_flags(kwargs) default_args.update(sanitize_docker_flags(kwargs)) diff --git a/src/steps/sharding.py b/src/steps/sharding.py index e5c5c9b5..fb3d828f 100644 --- a/src/steps/sharding.py +++ b/src/steps/sharding.py @@ -15,11 +15,12 @@ from src.env_vars import ( ) from src.node.waku_node import WakuNode from src.steps.common import StepsCommon +from src.steps.relay import StepsRelay logger = get_custom_logger(__name__) -class StepsSharding(StepsCommon): +class StepsSharding(StepsRelay): test_content_topic = "/myapp/1/latest/proto" test_pubsub_topic = "/waku/2/rs/2/0" test_payload = "Sharding works!!" @@ -28,44 +29,29 @@ class StepsSharding(StepsCommon): @pytest.fixture(scope="function", autouse=True) def sharding_setup(self): logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") - self.main_relay_nodes = [] - self.optional_relay_nodes = [] + self.main_nodes = [] + self.optional_nodes = [] self.main_filter_nodes = [] self.optional_filter_nodes = [] @allure.step - def setup_first_relay_node(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs): - self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}") - kwargs = self._resolve_sharding_flags(cluster_id, pubsub_topic, content_topic, **kwargs) - self.node1.start(relay="true", filter="true", nodekey=NODEKEY, **kwargs) - self.enr_uri = self.node1.get_enr_uri() - self.multiaddr_with_id = self.node1.get_multiaddr_with_id() - self.main_relay_nodes.extend([self.node1]) + def setup_first_relay_node_with_filter(self, **kwargs): + self.setup_first_relay_node(filter="true", **kwargs) @allure.step - def setup_second_relay_node(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs): + def setup_second_node_as_filter(self, **kwargs): self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") - kwargs = self._resolve_sharding_flags(cluster_id, pubsub_topic, content_topic, **kwargs) - self.node2.start(relay="true", discv5_bootstrap_node=self.enr_uri, **kwargs) - self.add_node_peer(self.node2, [self.multiaddr_with_id]) - self.main_relay_nodes.extend([self.node2]) - - @allure.step - def setup_second_node_as_filter(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs): - self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") - kwargs = self._resolve_sharding_flags(cluster_id, pubsub_topic, content_topic, **kwargs) self.node2.start(relay="false", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id, **kwargs) self.add_node_peer(self.node2, [self.multiaddr_with_id]) self.main_filter_nodes.extend([self.node2]) @allure.step - def setup_main_relay_nodes(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs): - self.setup_first_relay_node(cluster_id, pubsub_topic, content_topic, **kwargs) - self.setup_second_relay_node(cluster_id, pubsub_topic, content_topic, **kwargs) + def setup_main_relay_nodes(self, **kwargs): + self.setup_first_relay_node_with_filter(**kwargs) + self.setup_second_relay_node(**kwargs) @allure.step - def setup_optional_relay_nodes(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs): - kwargs = self._resolve_sharding_flags(cluster_id=cluster_id, pubsub_topic=pubsub_topic, content_topic=content_topic, **kwargs) + def setup_optional_relay_nodes(self, **kwargs): if ADDITIONAL_NODES: nodes = [node.strip() for node in ADDITIONAL_NODES.split(",")] else: @@ -74,16 +60,15 @@ class StepsSharding(StepsCommon): node = WakuNode(node, f"node{index + 3}_{self.test_id}") node.start(relay="true", discv5_bootstrap_node=self.enr_uri, **kwargs) self.add_node_peer(node, [self.multiaddr_with_id]) - self.optional_relay_nodes.append(node) + self.optional_nodes.append(node) @allure.step - def setup_nwaku_relay_nodes(self, num_nodes, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs): - kwargs = self._resolve_sharding_flags(cluster_id, pubsub_topic, content_topic, **kwargs) + def setup_nwaku_relay_nodes(self, num_nodes, **kwargs): for index in range(num_nodes): node = WakuNode(DEFAULT_NWAKU, f"node{index + 3}_{self.test_id}") node.start(relay="true", discv5_bootstrap_node=self.enr_uri, **kwargs) self.add_node_peer(node, [self.multiaddr_with_id]) - self.optional_relay_nodes.append(node) + self.optional_nodes.append(node) @allure.step def subscribe_relay_node(self, node, content_topics, pubsub_topics): @@ -104,12 +89,12 @@ class StepsSharding(StepsCommon): @allure.step def subscribe_main_relay_nodes(self, content_topics=None, pubsub_topics=None): - for node in self.main_relay_nodes: + for node in self.main_nodes: self.subscribe_relay_node(node, content_topics, pubsub_topics) @allure.step def subscribe_optional_relay_nodes(self, content_topics=None, pubsub_topics=None): - for node in self.optional_relay_nodes: + for node in self.optional_nodes: self.subscribe_relay_node(node, content_topics, pubsub_topics) @allure.step @@ -131,12 +116,12 @@ class StepsSharding(StepsCommon): @allure.step def unsubscribe_main_relay_nodes(self, content_topics=None, pubsub_topics=None): - for node in self.main_relay_nodes: + for node in self.main_nodes: self.unsubscribe_relay_node(node, content_topics, pubsub_topics) @allure.step def unsubscribe_optional_relay_nodes(self, content_topics=None, pubsub_topics=None): - for node in self.optional_relay_nodes: + for node in self.optional_nodes: self.unsubscribe_relay_node(node, content_topics, pubsub_topics) @allure.step @@ -166,7 +151,7 @@ class StepsSharding(StepsCommon): if not sender: sender = self.node1 if not peer_list: - peer_list = self.main_relay_nodes + self.optional_relay_nodes + peer_list = self.main_nodes + self.optional_nodes self.relay_message(sender, message, pubsub_topic) delay(0.1) @@ -222,20 +207,3 @@ class StepsSharding(StepsCommon): raise AssertionError("Publishing messages on unsubscribed shard worked!!!") except Exception as ex: assert f"Failed to publish: Node not subscribed to topic: {pubsub_topic}" in str(ex) - - @allure.step - def _resolve_sharding_flags(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs): - if pubsub_topic: - kwargs["pubsub_topic"] = pubsub_topic - if not cluster_id: - try: - if isinstance(pubsub_topic, list): - pubsub_topic = pubsub_topic[0] - cluster_id = pubsub_topic.split("/")[4] - logger.debug(f"Cluster id was resolved to: {cluster_id}") - except Exception as ex: - raise Exception("Could not resolve cluster_id from pubsub_topic") - kwargs["cluster_id"] = cluster_id - if content_topic: - kwargs["content_topic"] = content_topic - return kwargs diff --git a/tests/sharding/test_filter.py b/tests/sharding/test_filter.py index f3e08eb4..4abacddc 100644 --- a/tests/sharding/test_filter.py +++ b/tests/sharding/test_filter.py @@ -1,6 +1,8 @@ import pytest from src.env_vars import NODE_1, NODE_2 +from src.libs.common import delay from src.libs.custom_logger import get_custom_logger +from src.steps.relay import StepsRelay from src.steps.sharding import StepsSharding from src.test_data import CONTENT_TOPICS_DIFFERENT_SHARDS, PUBSUB_TOPICS_SAME_CLUSTER @@ -9,14 +11,14 @@ logger = get_custom_logger(__name__) class TestFilterStaticSharding(StepsSharding): def test_filter_works_with_static_sharding(self): - self.setup_first_relay_node(pubsub_topic=self.test_pubsub_topic) + self.setup_first_relay_node_with_filter(pubsub_topic=self.test_pubsub_topic) self.setup_second_node_as_filter(pubsub_topic=self.test_pubsub_topic) self.subscribe_first_relay_node(pubsub_topics=[self.test_pubsub_topic]) self.subscribe_filter_node(self.node2, content_topics=[self.test_content_topic], pubsub_topic=self.test_pubsub_topic) self.check_published_message_reaches_filter_peer(pubsub_topic=self.test_pubsub_topic) def test_filter_static_sharding_multiple_shards(self): - self.setup_first_relay_node(pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER) + self.setup_first_relay_node_with_filter(pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER) self.setup_second_node_as_filter(pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER) self.subscribe_first_relay_node(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER) for content_topic, pubsub_topic in zip(CONTENT_TOPICS_DIFFERENT_SHARDS, PUBSUB_TOPICS_SAME_CLUSTER): @@ -31,14 +33,16 @@ class TestFilterStaticSharding(StepsSharding): ) class TestFilterAutoSharding(StepsSharding): def test_filter_works_with_auto_sharding(self): - self.setup_first_relay_node(cluster_id=self.auto_cluster, content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic) + self.setup_first_relay_node_with_filter( + cluster_id=self.auto_cluster, content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic + ) self.setup_second_node_as_filter(cluster_id=self.auto_cluster, content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic) self.subscribe_first_relay_node(content_topics=[self.test_content_topic]) self.subscribe_filter_node(self.node2, content_topics=[self.test_content_topic], pubsub_topic=self.test_pubsub_topic) self.check_published_message_reaches_filter_peer(content_topic=self.test_content_topic) def test_filter_auto_sharding_multiple_content_topics(self): - self.setup_first_relay_node( + self.setup_first_relay_node_with_filter( cluster_id=self.auto_cluster, content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS, pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER ) self.setup_second_node_as_filter( diff --git a/tests/sharding/test_multiple_nodes.py b/tests/sharding/test_multiple_nodes.py index 3ae81363..6d02d64f 100644 --- a/tests/sharding/test_multiple_nodes.py +++ b/tests/sharding/test_multiple_nodes.py @@ -1,6 +1,7 @@ import pytest from src.env_vars import ADDITIONAL_NODES, NODE_2 from src.libs.custom_logger import get_custom_logger +from src.steps.relay import StepsRelay from src.steps.sharding import StepsSharding logger = get_custom_logger(__name__) @@ -16,7 +17,7 @@ class TestMultipleNodes(StepsSharding): @pytest.mark.skipif("go-waku" in NODE_2, reason="Test works only with nwaku") def test_static_shard_relay_10_nwaku_nodes(self): - self.setup_first_relay_node(pubsub_topic=self.test_pubsub_topic) + self.setup_first_relay_node_with_filter(pubsub_topic=self.test_pubsub_topic) self.setup_nwaku_relay_nodes(num_nodes=9, pubsub_topic=self.test_pubsub_topic) self.subscribe_main_relay_nodes(pubsub_topics=[self.test_pubsub_topic]) self.subscribe_optional_relay_nodes(pubsub_topics=[self.test_pubsub_topic]) @@ -35,7 +36,7 @@ class TestMultipleNodes(StepsSharding): @pytest.mark.skipif("go-waku" in NODE_2, reason="Test works only with nwaku") def test_auto_shard_relay_10_nwaku_nodes(self): - self.setup_first_relay_node(cluster_id=self.auto_cluster, content_topic=self.test_content_topic) + self.setup_first_relay_node_with_filter(cluster_id=self.auto_cluster, content_topic=self.test_content_topic) self.setup_nwaku_relay_nodes(num_nodes=8, cluster_id=self.auto_cluster, content_topic=self.test_content_topic) self.subscribe_main_relay_nodes(content_topics=[self.test_content_topic]) self.subscribe_optional_relay_nodes(content_topics=[self.test_content_topic]) diff --git a/tests/sharding/test_relay_auto_sharding.py b/tests/sharding/test_relay_auto_sharding.py index af53ac54..e92720f6 100644 --- a/tests/sharding/test_relay_auto_sharding.py +++ b/tests/sharding/test_relay_auto_sharding.py @@ -2,6 +2,7 @@ import pytest from src.env_vars import NODE_1, NODE_2 from src.libs.common import delay, to_base64 from src.libs.custom_logger import get_custom_logger +from src.steps.relay import StepsRelay from src.steps.sharding import StepsSharding from src.test_data import CONTENT_TOPICS_DIFFERENT_SHARDS, CONTENT_TOPICS_SHARD_0 @@ -16,7 +17,7 @@ logger = get_custom_logger(__name__) class TestRelayAutosharding(StepsSharding): def test_publish_without_subscribing_via_api_works(self): self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic) - for node in self.main_relay_nodes: + for node in self.main_nodes: self.relay_message(node, self.create_message(contentTopic=self.test_content_topic)) def test_retrieve_messages_without_subscribing_via_api(self): @@ -45,7 +46,7 @@ class TestRelayAutosharding(StepsSharding): def test_publish_on_not_subscribed_content_topic_works(self): self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic) self.subscribe_main_relay_nodes(content_topics=[self.test_content_topic]) - for node in self.main_relay_nodes: + for node in self.main_nodes: self.relay_message(node, self.create_message(contentTopic="/toychat/2/huilong/proto")) def test_cant_retrieve_messages_on_not_subscribed_content_topic(self): diff --git a/tests/sharding/test_relay_static_sharding.py b/tests/sharding/test_relay_static_sharding.py index 13adc123..364bc73c 100644 --- a/tests/sharding/test_relay_static_sharding.py +++ b/tests/sharding/test_relay_static_sharding.py @@ -11,7 +11,7 @@ logger = get_custom_logger(__name__) class TestRelayStaticSharding(StepsSharding): def test_publish_without_subscribing_via_api_works(self): self.setup_main_relay_nodes(pubsub_topic=self.test_pubsub_topic) - for node in self.main_relay_nodes: + for node in self.main_nodes: self.relay_message(node, self.create_message(), self.test_pubsub_topic) def test_retrieve_messages_without_subscribing_via_api(self): diff --git a/tests/sharding/test_running_nodes_auto_sharding.py b/tests/sharding/test_running_nodes_auto_sharding.py index 7963a31d..f0063dad 100644 --- a/tests/sharding/test_running_nodes_auto_sharding.py +++ b/tests/sharding/test_running_nodes_auto_sharding.py @@ -37,7 +37,7 @@ class TestRunningNodesAutosharding(StepsSharding): self.check_published_message_reaches_relay_peer(content_topic=content_topic) def test_2_nodes_different_content_topic_same_shard(self): - self.setup_first_relay_node(cluster_id=self.auto_cluster, content_topic="/newsService/1.0/weekly/protobuf") + self.setup_first_relay_node_with_filter(cluster_id=self.auto_cluster, content_topic="/newsService/1.0/weekly/protobuf") self.setup_second_relay_node(cluster_id=self.auto_cluster, content_topic="/newsService/1.0/alerts/xml") self.subscribe_first_relay_node(content_topics=["/newsService/1.0/weekly/protobuf"]) self.subscribe_second_relay_node(content_topics=["/newsService/1.0/alerts/xml"]) @@ -48,7 +48,7 @@ class TestRunningNodesAutosharding(StepsSharding): assert "Not Found" in str(ex) def test_2_nodes_different_content_topic_different_shard(self): - self.setup_first_relay_node(cluster_id=self.auto_cluster, content_topic="/myapp/1/latest/proto") + self.setup_first_relay_node_with_filter(cluster_id=self.auto_cluster, content_topic="/myapp/1/latest/proto") self.setup_second_relay_node(cluster_id=self.auto_cluster, content_topic="/waku/2/content/test.js") self.subscribe_first_relay_node(content_topics=["/myapp/1/latest/proto"]) self.subscribe_second_relay_node(content_topics=["/waku/2/content/test.js"]) diff --git a/tests/sharding/test_running_nodes_static_sharding.py b/tests/sharding/test_running_nodes_static_sharding.py index b6e3a105..fe52d141 100644 --- a/tests/sharding/test_running_nodes_static_sharding.py +++ b/tests/sharding/test_running_nodes_static_sharding.py @@ -1,6 +1,7 @@ import pytest from src.env_vars import NODE_2 from src.libs.custom_logger import get_custom_logger +from src.steps.relay import StepsRelay from src.steps.sharding import StepsSharding from src.test_data import PUBSUB_TOPICS_DIFFERENT_CLUSTERS, PUBSUB_TOPICS_SAME_CLUSTER @@ -27,7 +28,7 @@ class TestRunningNodesStaticSharding(StepsSharding): self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic) def test_2_nodes_same_cluster_different_shards(self): - self.setup_first_relay_node(pubsub_topic=self.test_pubsub_topic) + self.setup_first_relay_node_with_filter(pubsub_topic=self.test_pubsub_topic) self.setup_second_relay_node(pubsub_topic="/waku/2/rs/2/1") self.subscribe_first_relay_node(pubsub_topics=[self.test_pubsub_topic]) self.subscribe_second_relay_node(pubsub_topics=["/waku/2/rs/2/1"]) @@ -38,7 +39,7 @@ class TestRunningNodesStaticSharding(StepsSharding): assert "Not Found" in str(ex) def test_2_nodes_different_cluster_same_shard(self): - self.setup_first_relay_node(pubsub_topic=self.test_pubsub_topic) + self.setup_first_relay_node_with_filter(pubsub_topic=self.test_pubsub_topic) self.setup_second_relay_node(pubsub_topic="/waku/2/rs/3/0") self.subscribe_first_relay_node(pubsub_topics=[self.test_pubsub_topic]) self.subscribe_second_relay_node(pubsub_topics=["/waku/2/rs/3/0"]) @@ -49,7 +50,7 @@ class TestRunningNodesStaticSharding(StepsSharding): assert "Not Found" in str(ex) def test_2_nodes_different_cluster_different_shard(self): - self.setup_first_relay_node(pubsub_topic=self.test_pubsub_topic) + self.setup_first_relay_node_with_filter(pubsub_topic=self.test_pubsub_topic) self.setup_second_relay_node(pubsub_topic="/waku/2/rs/3/1") self.subscribe_first_relay_node(pubsub_topics=[self.test_pubsub_topic]) self.subscribe_second_relay_node(pubsub_topics=["/waku/2/rs/3/1"])