mirror of
https://github.com/logos-messaging/logos-messaging-interop-tests.git
synced 2026-01-10 18:03:07 +00:00
chore: refactor setup relay node for sharding (#48)
* fix: refactor setup relay for sharding * fix: refactor also all related to _resolve_sharding_flags
This commit is contained in:
parent
e8d5f78288
commit
0ce2572810
@ -56,6 +56,21 @@ def multiaddr2id(multiaddr):
|
||||
return multiaddr.split("/")[-1]
|
||||
|
||||
|
||||
def resolve_sharding_flags(kwargs):
|
||||
if "pubsub_topic" in kwargs:
|
||||
pubsub_topic = kwargs["pubsub_topic"]
|
||||
if not "cluster_id" in kwargs:
|
||||
try:
|
||||
if isinstance(pubsub_topic, list):
|
||||
pubsub_topic = pubsub_topic[0]
|
||||
cluster_id = pubsub_topic.split("/")[4]
|
||||
logger.debug(f"Cluster id was resolved to: {cluster_id}")
|
||||
kwargs["cluster_id"] = cluster_id
|
||||
except Exception as ex:
|
||||
raise Exception("Could not resolve cluster_id from pubsub_topic")
|
||||
return kwargs
|
||||
|
||||
|
||||
class WakuNode:
|
||||
def __init__(self, docker_image, docker_log_prefix=""):
|
||||
self._image_name = docker_image
|
||||
@ -124,6 +139,7 @@ class WakuNode:
|
||||
remove_container = True
|
||||
|
||||
kwargs = self.parse_peer_persistence_config(kwargs)
|
||||
kwargs = resolve_sharding_flags(kwargs)
|
||||
|
||||
default_args.update(sanitize_docker_flags(kwargs))
|
||||
|
||||
|
||||
@ -15,11 +15,12 @@ from src.env_vars import (
|
||||
)
|
||||
from src.node.waku_node import WakuNode
|
||||
from src.steps.common import StepsCommon
|
||||
from src.steps.relay import StepsRelay
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
class StepsSharding(StepsCommon):
|
||||
class StepsSharding(StepsRelay):
|
||||
test_content_topic = "/myapp/1/latest/proto"
|
||||
test_pubsub_topic = "/waku/2/rs/2/0"
|
||||
test_payload = "Sharding works!!"
|
||||
@ -28,44 +29,29 @@ class StepsSharding(StepsCommon):
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def sharding_setup(self):
|
||||
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
||||
self.main_relay_nodes = []
|
||||
self.optional_relay_nodes = []
|
||||
self.main_nodes = []
|
||||
self.optional_nodes = []
|
||||
self.main_filter_nodes = []
|
||||
self.optional_filter_nodes = []
|
||||
|
||||
@allure.step
|
||||
def setup_first_relay_node(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs):
|
||||
self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}")
|
||||
kwargs = self._resolve_sharding_flags(cluster_id, pubsub_topic, content_topic, **kwargs)
|
||||
self.node1.start(relay="true", filter="true", nodekey=NODEKEY, **kwargs)
|
||||
self.enr_uri = self.node1.get_enr_uri()
|
||||
self.multiaddr_with_id = self.node1.get_multiaddr_with_id()
|
||||
self.main_relay_nodes.extend([self.node1])
|
||||
def setup_first_relay_node_with_filter(self, **kwargs):
|
||||
self.setup_first_relay_node(filter="true", **kwargs)
|
||||
|
||||
@allure.step
|
||||
def setup_second_relay_node(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs):
|
||||
def setup_second_node_as_filter(self, **kwargs):
|
||||
self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}")
|
||||
kwargs = self._resolve_sharding_flags(cluster_id, pubsub_topic, content_topic, **kwargs)
|
||||
self.node2.start(relay="true", discv5_bootstrap_node=self.enr_uri, **kwargs)
|
||||
self.add_node_peer(self.node2, [self.multiaddr_with_id])
|
||||
self.main_relay_nodes.extend([self.node2])
|
||||
|
||||
@allure.step
|
||||
def setup_second_node_as_filter(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs):
|
||||
self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}")
|
||||
kwargs = self._resolve_sharding_flags(cluster_id, pubsub_topic, content_topic, **kwargs)
|
||||
self.node2.start(relay="false", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id, **kwargs)
|
||||
self.add_node_peer(self.node2, [self.multiaddr_with_id])
|
||||
self.main_filter_nodes.extend([self.node2])
|
||||
|
||||
@allure.step
|
||||
def setup_main_relay_nodes(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs):
|
||||
self.setup_first_relay_node(cluster_id, pubsub_topic, content_topic, **kwargs)
|
||||
self.setup_second_relay_node(cluster_id, pubsub_topic, content_topic, **kwargs)
|
||||
def setup_main_relay_nodes(self, **kwargs):
|
||||
self.setup_first_relay_node_with_filter(**kwargs)
|
||||
self.setup_second_relay_node(**kwargs)
|
||||
|
||||
@allure.step
|
||||
def setup_optional_relay_nodes(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs):
|
||||
kwargs = self._resolve_sharding_flags(cluster_id=cluster_id, pubsub_topic=pubsub_topic, content_topic=content_topic, **kwargs)
|
||||
def setup_optional_relay_nodes(self, **kwargs):
|
||||
if ADDITIONAL_NODES:
|
||||
nodes = [node.strip() for node in ADDITIONAL_NODES.split(",")]
|
||||
else:
|
||||
@ -74,16 +60,15 @@ class StepsSharding(StepsCommon):
|
||||
node = WakuNode(node, f"node{index + 3}_{self.test_id}")
|
||||
node.start(relay="true", discv5_bootstrap_node=self.enr_uri, **kwargs)
|
||||
self.add_node_peer(node, [self.multiaddr_with_id])
|
||||
self.optional_relay_nodes.append(node)
|
||||
self.optional_nodes.append(node)
|
||||
|
||||
@allure.step
|
||||
def setup_nwaku_relay_nodes(self, num_nodes, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs):
|
||||
kwargs = self._resolve_sharding_flags(cluster_id, pubsub_topic, content_topic, **kwargs)
|
||||
def setup_nwaku_relay_nodes(self, num_nodes, **kwargs):
|
||||
for index in range(num_nodes):
|
||||
node = WakuNode(DEFAULT_NWAKU, f"node{index + 3}_{self.test_id}")
|
||||
node.start(relay="true", discv5_bootstrap_node=self.enr_uri, **kwargs)
|
||||
self.add_node_peer(node, [self.multiaddr_with_id])
|
||||
self.optional_relay_nodes.append(node)
|
||||
self.optional_nodes.append(node)
|
||||
|
||||
@allure.step
|
||||
def subscribe_relay_node(self, node, content_topics, pubsub_topics):
|
||||
@ -104,12 +89,12 @@ class StepsSharding(StepsCommon):
|
||||
|
||||
@allure.step
|
||||
def subscribe_main_relay_nodes(self, content_topics=None, pubsub_topics=None):
|
||||
for node in self.main_relay_nodes:
|
||||
for node in self.main_nodes:
|
||||
self.subscribe_relay_node(node, content_topics, pubsub_topics)
|
||||
|
||||
@allure.step
|
||||
def subscribe_optional_relay_nodes(self, content_topics=None, pubsub_topics=None):
|
||||
for node in self.optional_relay_nodes:
|
||||
for node in self.optional_nodes:
|
||||
self.subscribe_relay_node(node, content_topics, pubsub_topics)
|
||||
|
||||
@allure.step
|
||||
@ -131,12 +116,12 @@ class StepsSharding(StepsCommon):
|
||||
|
||||
@allure.step
|
||||
def unsubscribe_main_relay_nodes(self, content_topics=None, pubsub_topics=None):
|
||||
for node in self.main_relay_nodes:
|
||||
for node in self.main_nodes:
|
||||
self.unsubscribe_relay_node(node, content_topics, pubsub_topics)
|
||||
|
||||
@allure.step
|
||||
def unsubscribe_optional_relay_nodes(self, content_topics=None, pubsub_topics=None):
|
||||
for node in self.optional_relay_nodes:
|
||||
for node in self.optional_nodes:
|
||||
self.unsubscribe_relay_node(node, content_topics, pubsub_topics)
|
||||
|
||||
@allure.step
|
||||
@ -166,7 +151,7 @@ class StepsSharding(StepsCommon):
|
||||
if not sender:
|
||||
sender = self.node1
|
||||
if not peer_list:
|
||||
peer_list = self.main_relay_nodes + self.optional_relay_nodes
|
||||
peer_list = self.main_nodes + self.optional_nodes
|
||||
|
||||
self.relay_message(sender, message, pubsub_topic)
|
||||
delay(0.1)
|
||||
@ -222,20 +207,3 @@ class StepsSharding(StepsCommon):
|
||||
raise AssertionError("Publishing messages on unsubscribed shard worked!!!")
|
||||
except Exception as ex:
|
||||
assert f"Failed to publish: Node not subscribed to topic: {pubsub_topic}" in str(ex)
|
||||
|
||||
@allure.step
|
||||
def _resolve_sharding_flags(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs):
|
||||
if pubsub_topic:
|
||||
kwargs["pubsub_topic"] = pubsub_topic
|
||||
if not cluster_id:
|
||||
try:
|
||||
if isinstance(pubsub_topic, list):
|
||||
pubsub_topic = pubsub_topic[0]
|
||||
cluster_id = pubsub_topic.split("/")[4]
|
||||
logger.debug(f"Cluster id was resolved to: {cluster_id}")
|
||||
except Exception as ex:
|
||||
raise Exception("Could not resolve cluster_id from pubsub_topic")
|
||||
kwargs["cluster_id"] = cluster_id
|
||||
if content_topic:
|
||||
kwargs["content_topic"] = content_topic
|
||||
return kwargs
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
import pytest
|
||||
from src.env_vars import NODE_1, NODE_2
|
||||
from src.libs.common import delay
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from src.steps.relay import StepsRelay
|
||||
from src.steps.sharding import StepsSharding
|
||||
from src.test_data import CONTENT_TOPICS_DIFFERENT_SHARDS, PUBSUB_TOPICS_SAME_CLUSTER
|
||||
|
||||
@ -9,14 +11,14 @@ logger = get_custom_logger(__name__)
|
||||
|
||||
class TestFilterStaticSharding(StepsSharding):
|
||||
def test_filter_works_with_static_sharding(self):
|
||||
self.setup_first_relay_node(pubsub_topic=self.test_pubsub_topic)
|
||||
self.setup_first_relay_node_with_filter(pubsub_topic=self.test_pubsub_topic)
|
||||
self.setup_second_node_as_filter(pubsub_topic=self.test_pubsub_topic)
|
||||
self.subscribe_first_relay_node(pubsub_topics=[self.test_pubsub_topic])
|
||||
self.subscribe_filter_node(self.node2, content_topics=[self.test_content_topic], pubsub_topic=self.test_pubsub_topic)
|
||||
self.check_published_message_reaches_filter_peer(pubsub_topic=self.test_pubsub_topic)
|
||||
|
||||
def test_filter_static_sharding_multiple_shards(self):
|
||||
self.setup_first_relay_node(pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER)
|
||||
self.setup_first_relay_node_with_filter(pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER)
|
||||
self.setup_second_node_as_filter(pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER)
|
||||
self.subscribe_first_relay_node(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER)
|
||||
for content_topic, pubsub_topic in zip(CONTENT_TOPICS_DIFFERENT_SHARDS, PUBSUB_TOPICS_SAME_CLUSTER):
|
||||
@ -31,14 +33,16 @@ class TestFilterStaticSharding(StepsSharding):
|
||||
)
|
||||
class TestFilterAutoSharding(StepsSharding):
|
||||
def test_filter_works_with_auto_sharding(self):
|
||||
self.setup_first_relay_node(cluster_id=self.auto_cluster, content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic)
|
||||
self.setup_first_relay_node_with_filter(
|
||||
cluster_id=self.auto_cluster, content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic
|
||||
)
|
||||
self.setup_second_node_as_filter(cluster_id=self.auto_cluster, content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic)
|
||||
self.subscribe_first_relay_node(content_topics=[self.test_content_topic])
|
||||
self.subscribe_filter_node(self.node2, content_topics=[self.test_content_topic], pubsub_topic=self.test_pubsub_topic)
|
||||
self.check_published_message_reaches_filter_peer(content_topic=self.test_content_topic)
|
||||
|
||||
def test_filter_auto_sharding_multiple_content_topics(self):
|
||||
self.setup_first_relay_node(
|
||||
self.setup_first_relay_node_with_filter(
|
||||
cluster_id=self.auto_cluster, content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS, pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER
|
||||
)
|
||||
self.setup_second_node_as_filter(
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
import pytest
|
||||
from src.env_vars import ADDITIONAL_NODES, NODE_2
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from src.steps.relay import StepsRelay
|
||||
from src.steps.sharding import StepsSharding
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
@ -16,7 +17,7 @@ class TestMultipleNodes(StepsSharding):
|
||||
|
||||
@pytest.mark.skipif("go-waku" in NODE_2, reason="Test works only with nwaku")
|
||||
def test_static_shard_relay_10_nwaku_nodes(self):
|
||||
self.setup_first_relay_node(pubsub_topic=self.test_pubsub_topic)
|
||||
self.setup_first_relay_node_with_filter(pubsub_topic=self.test_pubsub_topic)
|
||||
self.setup_nwaku_relay_nodes(num_nodes=9, pubsub_topic=self.test_pubsub_topic)
|
||||
self.subscribe_main_relay_nodes(pubsub_topics=[self.test_pubsub_topic])
|
||||
self.subscribe_optional_relay_nodes(pubsub_topics=[self.test_pubsub_topic])
|
||||
@ -35,7 +36,7 @@ class TestMultipleNodes(StepsSharding):
|
||||
|
||||
@pytest.mark.skipif("go-waku" in NODE_2, reason="Test works only with nwaku")
|
||||
def test_auto_shard_relay_10_nwaku_nodes(self):
|
||||
self.setup_first_relay_node(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
|
||||
self.setup_first_relay_node_with_filter(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
|
||||
self.setup_nwaku_relay_nodes(num_nodes=8, cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
|
||||
self.subscribe_main_relay_nodes(content_topics=[self.test_content_topic])
|
||||
self.subscribe_optional_relay_nodes(content_topics=[self.test_content_topic])
|
||||
|
||||
@ -2,6 +2,7 @@ import pytest
|
||||
from src.env_vars import NODE_1, NODE_2
|
||||
from src.libs.common import delay, to_base64
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from src.steps.relay import StepsRelay
|
||||
from src.steps.sharding import StepsSharding
|
||||
from src.test_data import CONTENT_TOPICS_DIFFERENT_SHARDS, CONTENT_TOPICS_SHARD_0
|
||||
|
||||
@ -16,7 +17,7 @@ logger = get_custom_logger(__name__)
|
||||
class TestRelayAutosharding(StepsSharding):
|
||||
def test_publish_without_subscribing_via_api_works(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
|
||||
for node in self.main_relay_nodes:
|
||||
for node in self.main_nodes:
|
||||
self.relay_message(node, self.create_message(contentTopic=self.test_content_topic))
|
||||
|
||||
def test_retrieve_messages_without_subscribing_via_api(self):
|
||||
@ -45,7 +46,7 @@ class TestRelayAutosharding(StepsSharding):
|
||||
def test_publish_on_not_subscribed_content_topic_works(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
|
||||
self.subscribe_main_relay_nodes(content_topics=[self.test_content_topic])
|
||||
for node in self.main_relay_nodes:
|
||||
for node in self.main_nodes:
|
||||
self.relay_message(node, self.create_message(contentTopic="/toychat/2/huilong/proto"))
|
||||
|
||||
def test_cant_retrieve_messages_on_not_subscribed_content_topic(self):
|
||||
|
||||
@ -11,7 +11,7 @@ logger = get_custom_logger(__name__)
|
||||
class TestRelayStaticSharding(StepsSharding):
|
||||
def test_publish_without_subscribing_via_api_works(self):
|
||||
self.setup_main_relay_nodes(pubsub_topic=self.test_pubsub_topic)
|
||||
for node in self.main_relay_nodes:
|
||||
for node in self.main_nodes:
|
||||
self.relay_message(node, self.create_message(), self.test_pubsub_topic)
|
||||
|
||||
def test_retrieve_messages_without_subscribing_via_api(self):
|
||||
|
||||
@ -37,7 +37,7 @@ class TestRunningNodesAutosharding(StepsSharding):
|
||||
self.check_published_message_reaches_relay_peer(content_topic=content_topic)
|
||||
|
||||
def test_2_nodes_different_content_topic_same_shard(self):
|
||||
self.setup_first_relay_node(cluster_id=self.auto_cluster, content_topic="/newsService/1.0/weekly/protobuf")
|
||||
self.setup_first_relay_node_with_filter(cluster_id=self.auto_cluster, content_topic="/newsService/1.0/weekly/protobuf")
|
||||
self.setup_second_relay_node(cluster_id=self.auto_cluster, content_topic="/newsService/1.0/alerts/xml")
|
||||
self.subscribe_first_relay_node(content_topics=["/newsService/1.0/weekly/protobuf"])
|
||||
self.subscribe_second_relay_node(content_topics=["/newsService/1.0/alerts/xml"])
|
||||
@ -48,7 +48,7 @@ class TestRunningNodesAutosharding(StepsSharding):
|
||||
assert "Not Found" in str(ex)
|
||||
|
||||
def test_2_nodes_different_content_topic_different_shard(self):
|
||||
self.setup_first_relay_node(cluster_id=self.auto_cluster, content_topic="/myapp/1/latest/proto")
|
||||
self.setup_first_relay_node_with_filter(cluster_id=self.auto_cluster, content_topic="/myapp/1/latest/proto")
|
||||
self.setup_second_relay_node(cluster_id=self.auto_cluster, content_topic="/waku/2/content/test.js")
|
||||
self.subscribe_first_relay_node(content_topics=["/myapp/1/latest/proto"])
|
||||
self.subscribe_second_relay_node(content_topics=["/waku/2/content/test.js"])
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
import pytest
|
||||
from src.env_vars import NODE_2
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from src.steps.relay import StepsRelay
|
||||
from src.steps.sharding import StepsSharding
|
||||
from src.test_data import PUBSUB_TOPICS_DIFFERENT_CLUSTERS, PUBSUB_TOPICS_SAME_CLUSTER
|
||||
|
||||
@ -27,7 +28,7 @@ class TestRunningNodesStaticSharding(StepsSharding):
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic)
|
||||
|
||||
def test_2_nodes_same_cluster_different_shards(self):
|
||||
self.setup_first_relay_node(pubsub_topic=self.test_pubsub_topic)
|
||||
self.setup_first_relay_node_with_filter(pubsub_topic=self.test_pubsub_topic)
|
||||
self.setup_second_relay_node(pubsub_topic="/waku/2/rs/2/1")
|
||||
self.subscribe_first_relay_node(pubsub_topics=[self.test_pubsub_topic])
|
||||
self.subscribe_second_relay_node(pubsub_topics=["/waku/2/rs/2/1"])
|
||||
@ -38,7 +39,7 @@ class TestRunningNodesStaticSharding(StepsSharding):
|
||||
assert "Not Found" in str(ex)
|
||||
|
||||
def test_2_nodes_different_cluster_same_shard(self):
|
||||
self.setup_first_relay_node(pubsub_topic=self.test_pubsub_topic)
|
||||
self.setup_first_relay_node_with_filter(pubsub_topic=self.test_pubsub_topic)
|
||||
self.setup_second_relay_node(pubsub_topic="/waku/2/rs/3/0")
|
||||
self.subscribe_first_relay_node(pubsub_topics=[self.test_pubsub_topic])
|
||||
self.subscribe_second_relay_node(pubsub_topics=["/waku/2/rs/3/0"])
|
||||
@ -49,7 +50,7 @@ class TestRunningNodesStaticSharding(StepsSharding):
|
||||
assert "Not Found" in str(ex)
|
||||
|
||||
def test_2_nodes_different_cluster_different_shard(self):
|
||||
self.setup_first_relay_node(pubsub_topic=self.test_pubsub_topic)
|
||||
self.setup_first_relay_node_with_filter(pubsub_topic=self.test_pubsub_topic)
|
||||
self.setup_second_relay_node(pubsub_topic="/waku/2/rs/3/1")
|
||||
self.subscribe_first_relay_node(pubsub_topics=[self.test_pubsub_topic])
|
||||
self.subscribe_second_relay_node(pubsub_topics=["/waku/2/rs/3/1"])
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user