fix: refactor setup relay for sharding

This commit is contained in:
Roman 2024-07-01 09:03:48 +02:00
parent ccad2a12f6
commit 23003f9edc
No known key found for this signature in database
GPG Key ID: B8FE070B54E11B75
6 changed files with 45 additions and 40 deletions

View File

@ -124,6 +124,7 @@ class WakuNode:
remove_container = True
kwargs = self.parse_peer_persistence_config(kwargs)
kwargs = self.resolve_sharding_flags(kwargs)
default_args.update(sanitize_docker_flags(kwargs))
@ -478,6 +479,23 @@ class WakuNode:
return kwargs
def resolve_sharding_flags(self, kwargs):
if "pubsub_topic" in kwargs:
pubsub_topic = kwargs["pubsub_topic"]
if not "cluster_id" in kwargs:
try:
if isinstance(pubsub_topic, list):
pubsub_topic = pubsub_topic[0]
cluster_id = pubsub_topic.split("/")[4]
logger.debug(f"Cluster id was resolved to: {cluster_id}")
kwargs["cluster_id"] = cluster_id
except Exception as ex:
raise Exception("Could not resolve cluster_id from pubsub_topic")
else:
kwargs["cluster_id"] = None
return kwargs
@property
def container(self):
return self._container

View File

@ -15,11 +15,12 @@ from src.env_vars import (
)
from src.node.waku_node import WakuNode
from src.steps.common import StepsCommon
from src.steps.relay import StepsRelay
logger = get_custom_logger(__name__)
class StepsSharding(StepsCommon):
class StepsSharding(StepsRelay):
test_content_topic = "/myapp/1/latest/proto"
test_pubsub_topic = "/waku/2/rs/2/0"
test_payload = "Sharding works!!"
@ -28,28 +29,11 @@ class StepsSharding(StepsCommon):
@pytest.fixture(scope="function", autouse=True)
def sharding_setup(self):
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
self.main_relay_nodes = []
self.optional_relay_nodes = []
self.main_nodes = []
self.optional_nodes = []
self.main_filter_nodes = []
self.optional_filter_nodes = []
@allure.step
def setup_first_relay_node(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs):
self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}")
kwargs = self._resolve_sharding_flags(cluster_id, pubsub_topic, content_topic, **kwargs)
self.node1.start(relay="true", filter="true", nodekey=NODEKEY, **kwargs)
self.enr_uri = self.node1.get_enr_uri()
self.multiaddr_with_id = self.node1.get_multiaddr_with_id()
self.main_relay_nodes.extend([self.node1])
@allure.step
def setup_second_relay_node(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs):
self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}")
kwargs = self._resolve_sharding_flags(cluster_id, pubsub_topic, content_topic, **kwargs)
self.node2.start(relay="true", discv5_bootstrap_node=self.enr_uri, **kwargs)
self.add_node_peer(self.node2, [self.multiaddr_with_id])
self.main_relay_nodes.extend([self.node2])
@allure.step
def setup_second_node_as_filter(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs):
self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}")
@ -59,13 +43,12 @@ class StepsSharding(StepsCommon):
self.main_filter_nodes.extend([self.node2])
@allure.step
def setup_main_relay_nodes(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs):
self.setup_first_relay_node(cluster_id, pubsub_topic, content_topic, **kwargs)
self.setup_second_relay_node(cluster_id, pubsub_topic, content_topic, **kwargs)
def setup_main_relay_nodes(self, **kwargs):
self.setup_first_relay_node(**kwargs)
self.setup_second_relay_node(**kwargs)
@allure.step
def setup_optional_relay_nodes(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs):
kwargs = self._resolve_sharding_flags(cluster_id=cluster_id, pubsub_topic=pubsub_topic, content_topic=content_topic, **kwargs)
def setup_optional_relay_nodes(self, **kwargs):
if ADDITIONAL_NODES:
nodes = [node.strip() for node in ADDITIONAL_NODES.split(",")]
else:
@ -74,16 +57,15 @@ class StepsSharding(StepsCommon):
node = WakuNode(node, f"node{index + 3}_{self.test_id}")
node.start(relay="true", discv5_bootstrap_node=self.enr_uri, **kwargs)
self.add_node_peer(node, [self.multiaddr_with_id])
self.optional_relay_nodes.append(node)
self.optional_nodes.append(node)
@allure.step
def setup_nwaku_relay_nodes(self, num_nodes, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs):
kwargs = self._resolve_sharding_flags(cluster_id, pubsub_topic, content_topic, **kwargs)
def setup_nwaku_relay_nodes(self, num_nodes, **kwargs):
for index in range(num_nodes):
node = WakuNode(DEFAULT_NWAKU, f"node{index + 3}_{self.test_id}")
node.start(relay="true", discv5_bootstrap_node=self.enr_uri, **kwargs)
self.add_node_peer(node, [self.multiaddr_with_id])
self.optional_relay_nodes.append(node)
self.optional_nodes.append(node)
@allure.step
def subscribe_relay_node(self, node, content_topics, pubsub_topics):
@ -103,13 +85,13 @@ class StepsSharding(StepsCommon):
self.subscribe_relay_node(self.node2, content_topics, pubsub_topics)
@allure.step
def subscribe_main_relay_nodes(self, content_topics=None, pubsub_topics=None):
for node in self.main_relay_nodes:
def subscribe_main_nodes(self, content_topics=None, pubsub_topics=None):
for node in self.main_nodes:
self.subscribe_relay_node(node, content_topics, pubsub_topics)
@allure.step
def subscribe_optional_relay_nodes(self, content_topics=None, pubsub_topics=None):
for node in self.optional_relay_nodes:
for node in self.optional_nodes:
self.subscribe_relay_node(node, content_topics, pubsub_topics)
@allure.step
@ -131,12 +113,12 @@ class StepsSharding(StepsCommon):
@allure.step
def unsubscribe_main_relay_nodes(self, content_topics=None, pubsub_topics=None):
for node in self.main_relay_nodes:
for node in self.main_nodes:
self.unsubscribe_relay_node(node, content_topics, pubsub_topics)
@allure.step
def unsubscribe_optional_relay_nodes(self, content_topics=None, pubsub_topics=None):
for node in self.optional_relay_nodes:
for node in self.optional_nodes:
self.unsubscribe_relay_node(node, content_topics, pubsub_topics)
@allure.step
@ -166,7 +148,7 @@ class StepsSharding(StepsCommon):
if not sender:
sender = self.node1
if not peer_list:
peer_list = self.main_relay_nodes + self.optional_relay_nodes
peer_list = self.main_nodes + self.optional_nodes
self.relay_message(sender, message, pubsub_topic)
delay(0.1)

View File

@ -1,19 +1,21 @@
import pytest
from src.env_vars import NODE_1, NODE_2
from src.libs.common import delay
from src.libs.custom_logger import get_custom_logger
from src.steps.relay import StepsRelay
from src.steps.sharding import StepsSharding
from src.test_data import CONTENT_TOPICS_DIFFERENT_SHARDS, PUBSUB_TOPICS_SAME_CLUSTER
logger = get_custom_logger(__name__)
class TestFilterStaticSharding(StepsSharding):
class TestFilterStaticSharding(StepsSharding, StepsRelay):
def test_filter_works_with_static_sharding(self):
self.setup_first_relay_node(pubsub_topic=self.test_pubsub_topic)
self.setup_second_node_as_filter(pubsub_topic=self.test_pubsub_topic)
self.subscribe_first_relay_node(pubsub_topics=[self.test_pubsub_topic])
self.subscribe_filter_node(self.node2, content_topics=[self.test_content_topic], pubsub_topic=self.test_pubsub_topic)
self.check_published_message_reaches_filter_peer(pubsub_topic=self.test_pubsub_topic)
# self.check_published_message_reaches_filter_peer(pubsub_topic=self.test_pubsub_topic)
def test_filter_static_sharding_multiple_shards(self):
self.setup_first_relay_node(pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER)
@ -29,7 +31,7 @@ class TestFilterStaticSharding(StepsSharding):
"go-waku" in NODE_1 or "go-waku" in NODE_2,
reason="Autosharding tests work only on nwaku because of https://github.com/waku-org/go-waku/issues/1061",
)
class TestFilterAutoSharding(StepsSharding):
class TestFilterAutoSharding(StepsSharding, StepsRelay):
def test_filter_works_with_auto_sharding(self):
self.setup_first_relay_node(cluster_id=self.auto_cluster, content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic)
self.setup_second_node_as_filter(cluster_id=self.auto_cluster, content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic)

View File

@ -1,12 +1,13 @@
import pytest
from src.env_vars import ADDITIONAL_NODES, NODE_2
from src.libs.custom_logger import get_custom_logger
from src.steps.relay import StepsRelay
from src.steps.sharding import StepsSharding
logger = get_custom_logger(__name__)
class TestMultipleNodes(StepsSharding):
class TestMultipleNodes(StepsSharding, StepsRelay):
def test_static_shard_relay(self):
self.setup_main_relay_nodes(pubsub_topic=self.test_pubsub_topic)
self.setup_optional_relay_nodes(pubsub_topic=self.test_pubsub_topic)

View File

@ -2,6 +2,7 @@ import pytest
from src.env_vars import NODE_1, NODE_2
from src.libs.common import delay, to_base64
from src.libs.custom_logger import get_custom_logger
from src.steps.relay import StepsRelay
from src.steps.sharding import StepsSharding
from src.test_data import CONTENT_TOPICS_DIFFERENT_SHARDS, CONTENT_TOPICS_SHARD_0

View File

@ -1,13 +1,14 @@
import pytest
from src.env_vars import NODE_2
from src.libs.custom_logger import get_custom_logger
from src.steps.relay import StepsRelay
from src.steps.sharding import StepsSharding
from src.test_data import PUBSUB_TOPICS_DIFFERENT_CLUSTERS, PUBSUB_TOPICS_SAME_CLUSTER
logger = get_custom_logger(__name__)
class TestRunningNodesStaticSharding(StepsSharding):
class TestRunningNodesStaticSharding(StepsSharding, StepsRelay):
@pytest.mark.parametrize("pubsub_topic", PUBSUB_TOPICS_DIFFERENT_CLUSTERS)
def test_single_pubsub_topic(self, pubsub_topic):
self.setup_main_relay_nodes(pubsub_topic=pubsub_topic)