sharding tests part 1 (#24)
* sharding struct * add peers * running nodes tests * cleanup sharding steps * new sharding tests * test_different_cluster_different_shard * new tests * new static sharding tests * new auto sharding tests * change structure * running nodes tests * new autosharding tests * adjust tests * new sharding tests * sharding * sharding filter * add peers
This commit is contained in:
parent
17ba9baf45
commit
0095b5e04a
|
@ -20,22 +20,38 @@ class REST(BaseClient):
|
|||
return info_response.json()
|
||||
|
||||
def get_peers(self):
|
||||
info_response = self.rest_call("get", "admin/v1/peers")
|
||||
return info_response.json()
|
||||
get_peers_response = self.rest_call("get", "admin/v1/peers")
|
||||
return get_peers_response.json()
|
||||
|
||||
def add_peers(self, peers):
|
||||
return self.rest_call("post", "admin/v1/peers", json.dumps(peers))
|
||||
|
||||
def set_relay_subscriptions(self, pubsub_topics):
|
||||
return self.rest_call("post", "relay/v1/subscriptions", json.dumps(pubsub_topics))
|
||||
|
||||
def set_relay_auto_subscriptions(self, content_topics):
|
||||
return self.rest_call("post", "relay/v1/auto/subscriptions", json.dumps(content_topics))
|
||||
|
||||
def delete_relay_subscriptions(self, pubsub_topics):
|
||||
return self.rest_call("delete", "relay/v1/subscriptions", json.dumps(pubsub_topics))
|
||||
|
||||
def delete_relay_auto_subscriptions(self, content_topics):
|
||||
return self.rest_call("delete", "relay/v1/auto/subscriptions", json.dumps(content_topics))
|
||||
|
||||
def send_relay_message(self, message, pubsub_topic):
|
||||
return self.rest_call("post", f"relay/v1/messages/{quote(pubsub_topic, safe='')}", json.dumps(message))
|
||||
|
||||
def send_relay_auto_message(self, message):
|
||||
return self.rest_call("post", "relay/v1/auto/messages", json.dumps(message))
|
||||
|
||||
def get_relay_messages(self, pubsub_topic):
|
||||
get_messages_response = self.rest_call("get", f"relay/v1/messages/{quote(pubsub_topic, safe='')}")
|
||||
return get_messages_response.json()
|
||||
|
||||
def get_relay_auto_messages(self, content_topic):
|
||||
get_messages_response = self.rest_call("get", f"relay/v1/auto/messages/{quote(content_topic, safe='')}")
|
||||
return get_messages_response.json()
|
||||
|
||||
def set_filter_subscriptions(self, subscription):
|
||||
set_subscriptions_response = self.rest_call("post", "filter/v2/subscriptions", json.dumps(subscription))
|
||||
return set_subscriptions_response.json()
|
||||
|
|
|
@ -212,18 +212,33 @@ class WakuNode:
|
|||
def get_peers(self):
|
||||
return self._api.get_peers()
|
||||
|
||||
def add_peers(self, peers):
|
||||
return self._api.add_peers(peers)
|
||||
|
||||
def set_relay_subscriptions(self, pubsub_topics):
|
||||
return self._api.set_relay_subscriptions(pubsub_topics)
|
||||
|
||||
def set_relay_auto_subscriptions(self, content_topics):
|
||||
return self._api.set_relay_auto_subscriptions(content_topics)
|
||||
|
||||
def delete_relay_subscriptions(self, pubsub_topics):
|
||||
return self._api.delete_relay_subscriptions(pubsub_topics)
|
||||
|
||||
def delete_relay_auto_subscriptions(self, content_topics):
|
||||
return self._api.delete_relay_auto_subscriptions(content_topics)
|
||||
|
||||
def send_relay_message(self, message, pubsub_topic):
|
||||
return self._api.send_relay_message(message, pubsub_topic)
|
||||
|
||||
def send_relay_auto_message(self, message):
|
||||
return self._api.send_relay_auto_message(message)
|
||||
|
||||
def get_relay_messages(self, pubsub_topic):
|
||||
return self._api.get_relay_messages(pubsub_topic)
|
||||
|
||||
def get_relay_auto_messages(self, content_topic):
|
||||
return self._api.get_relay_auto_messages(content_topic)
|
||||
|
||||
def set_filter_subscriptions(self, subscription):
|
||||
return self._api.set_filter_subscriptions(subscription)
|
||||
|
||||
|
|
|
@ -38,6 +38,8 @@ class StepsFilter:
|
|||
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
||||
self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}")
|
||||
self.node2.start(relay="false", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id)
|
||||
if self.node2.is_nwaku():
|
||||
self.node2.add_peers([self.multiaddr_with_id])
|
||||
self.main_nodes.append(self.node2)
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
|
@ -73,6 +75,8 @@ class StepsFilter:
|
|||
for index, node in enumerate(nodes):
|
||||
node = WakuNode(node, f"node{index + 3}_{self.test_id}")
|
||||
node.start(relay="false", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id)
|
||||
if node.is_nwaku():
|
||||
node.add_peers([self.multiaddr_with_id])
|
||||
self.optional_nodes.append(node)
|
||||
|
||||
@allure.step
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
import inspect
|
||||
import os
|
||||
from uuid import uuid4
|
||||
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from time import time
|
||||
import pytest
|
||||
|
@ -40,8 +38,11 @@ class StepsRelay:
|
|||
self.node1 = WakuNode(NODE_1, f"node1_{request.cls.test_id}")
|
||||
self.node1.start(relay="true", nodekey=NODEKEY)
|
||||
self.enr_uri = self.node1.get_enr_uri()
|
||||
self.multiaddr_with_id = self.node1.get_multiaddr_with_id()
|
||||
self.node2 = WakuNode(NODE_2, f"node2_{request.cls.test_id}")
|
||||
self.node2.start(relay="true", discv5_bootstrap_node=self.enr_uri)
|
||||
if self.node2.is_nwaku():
|
||||
self.node2.add_peers([self.multiaddr_with_id])
|
||||
self.main_nodes.extend([self.node1, self.node2])
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
|
@ -63,6 +64,8 @@ class StepsRelay:
|
|||
self.node2.start(
|
||||
relay="true", discv5_bootstrap_node=self.enr_uri, rln_creds_source=RLN_CREDENTIALS, rln_creds_id="2", rln_relay_membership_index="1"
|
||||
)
|
||||
if self.node2.is_nwaku():
|
||||
self.node2.add_peers([self.multiaddr_with_id])
|
||||
self.main_nodes.extend([self.node1, self.node2])
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
|
@ -75,6 +78,8 @@ class StepsRelay:
|
|||
for index, node in enumerate(nodes):
|
||||
node = WakuNode(node, f"node{index + 3}_{request.cls.test_id}")
|
||||
node.start(relay="true", discv5_bootstrap_node=self.enr_uri)
|
||||
if node.is_nwaku():
|
||||
node.add_peers([self.multiaddr_with_id])
|
||||
self.optional_nodes.append(node)
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
|
|
|
@ -0,0 +1,249 @@
|
|||
import inspect
|
||||
from uuid import uuid4
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from time import time
|
||||
import pytest
|
||||
import allure
|
||||
from src.libs.common import to_base64, delay
|
||||
from src.node.waku_message import WakuMessage
|
||||
from src.env_vars import (
|
||||
DEFAULT_NWAKU,
|
||||
NODE_1,
|
||||
NODE_2,
|
||||
ADDITIONAL_NODES,
|
||||
NODEKEY,
|
||||
)
|
||||
from src.node.waku_node import WakuNode
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
class StepsSharding:
|
||||
test_content_topic = "/myapp/1/latest/proto"
|
||||
test_pubsub_topic = "/waku/2/rs/2/0"
|
||||
test_payload = "Sharding works!!"
|
||||
auto_cluster = 2
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def sharding_setup(self):
|
||||
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
||||
self.main_relay_nodes = []
|
||||
self.optional_relay_nodes = []
|
||||
self.main_filter_nodes = []
|
||||
self.optional_filter_nodes = []
|
||||
|
||||
@allure.step
|
||||
def setup_first_relay_node(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs):
|
||||
self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}")
|
||||
kwargs = self._resolve_sharding_flags(cluster_id, pubsub_topic, content_topic, **kwargs)
|
||||
self.node1.start(relay="true", filter="true", nodekey=NODEKEY, **kwargs)
|
||||
self.enr_uri = self.node1.get_enr_uri()
|
||||
self.multiaddr_with_id = self.node1.get_multiaddr_with_id()
|
||||
self.main_relay_nodes.extend([self.node1])
|
||||
|
||||
@allure.step
|
||||
def setup_second_relay_node(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs):
|
||||
self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}")
|
||||
kwargs = self._resolve_sharding_flags(cluster_id, pubsub_topic, content_topic, **kwargs)
|
||||
self.node2.start(relay="true", discv5_bootstrap_node=self.enr_uri, **kwargs)
|
||||
if self.node2.is_nwaku():
|
||||
self.node2.add_peers([self.multiaddr_with_id])
|
||||
self.main_relay_nodes.extend([self.node2])
|
||||
|
||||
@allure.step
|
||||
def setup_second_node_as_filter(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs):
|
||||
self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}")
|
||||
kwargs = self._resolve_sharding_flags(cluster_id, pubsub_topic, content_topic, **kwargs)
|
||||
self.node2.start(relay="false", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id, **kwargs)
|
||||
if self.node2.is_nwaku():
|
||||
self.node2.add_peers([self.multiaddr_with_id])
|
||||
self.main_filter_nodes.extend([self.node2])
|
||||
|
||||
@allure.step
|
||||
def setup_main_relay_nodes(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs):
|
||||
self.setup_first_relay_node(cluster_id, pubsub_topic, content_topic, **kwargs)
|
||||
self.setup_second_relay_node(cluster_id, pubsub_topic, content_topic, **kwargs)
|
||||
|
||||
@allure.step
|
||||
def setup_optional_relay_nodes(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs):
|
||||
kwargs = self._resolve_sharding_flags(cluster_id=cluster_id, pubsub_topic=pubsub_topic, content_topic=content_topic, **kwargs)
|
||||
if ADDITIONAL_NODES:
|
||||
nodes = [node.strip() for node in ADDITIONAL_NODES.split(",")]
|
||||
else:
|
||||
pytest.skip("ADDITIONAL_NODES is empty, cannot run test")
|
||||
for index, node in enumerate(nodes):
|
||||
node = WakuNode(node, f"node{index + 3}_{self.test_id}")
|
||||
node.start(relay="true", discv5_bootstrap_node=self.enr_uri, **kwargs)
|
||||
if node.is_nwaku():
|
||||
node.add_peers([self.multiaddr_with_id])
|
||||
self.optional_relay_nodes.append(node)
|
||||
|
||||
@allure.step
|
||||
def setup_nwaku_relay_nodes(self, num_nodes, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs):
|
||||
kwargs = self._resolve_sharding_flags(cluster_id, pubsub_topic, content_topic, **kwargs)
|
||||
for index in range(num_nodes):
|
||||
node = WakuNode(DEFAULT_NWAKU, f"node{index + 3}_{self.test_id}")
|
||||
node.start(relay="true", discv5_bootstrap_node=self.enr_uri, **kwargs)
|
||||
node.add_peers([self.multiaddr_with_id])
|
||||
self.optional_relay_nodes.append(node)
|
||||
|
||||
@allure.step
|
||||
def subscribe_relay_node(self, node, content_topics, pubsub_topics):
|
||||
if content_topics:
|
||||
node.set_relay_auto_subscriptions(content_topics)
|
||||
elif pubsub_topics:
|
||||
node.set_relay_subscriptions(pubsub_topics)
|
||||
else:
|
||||
raise AttributeError("content_topics or pubsub_topics need to be passed")
|
||||
|
||||
@allure.step
|
||||
def subscribe_first_relay_node(self, content_topics=None, pubsub_topics=None):
|
||||
self.subscribe_relay_node(self.node1, content_topics, pubsub_topics)
|
||||
|
||||
@allure.step
|
||||
def subscribe_second_relay_node(self, content_topics=None, pubsub_topics=None):
|
||||
self.subscribe_relay_node(self.node2, content_topics, pubsub_topics)
|
||||
|
||||
@allure.step
|
||||
def subscribe_main_relay_nodes(self, content_topics=None, pubsub_topics=None):
|
||||
for node in self.main_relay_nodes:
|
||||
self.subscribe_relay_node(node, content_topics, pubsub_topics)
|
||||
|
||||
@allure.step
|
||||
def subscribe_optional_relay_nodes(self, content_topics=None, pubsub_topics=None):
|
||||
for node in self.optional_relay_nodes:
|
||||
self.subscribe_relay_node(node, content_topics, pubsub_topics)
|
||||
|
||||
@allure.step
|
||||
def unsubscribe_relay_node(self, node, content_topics, pubsub_topics):
|
||||
if content_topics:
|
||||
node.delete_relay_auto_subscriptions(content_topics)
|
||||
elif pubsub_topics:
|
||||
node.delete_relay_subscriptions(pubsub_topics)
|
||||
else:
|
||||
raise AttributeError("content_topics or pubsub_topics need to be passed")
|
||||
|
||||
@allure.step
|
||||
def unsubscribe_first_relay_node(self, content_topics=None, pubsub_topics=None):
|
||||
self.unsubscribe_relay_node(self.node1, content_topics, pubsub_topics)
|
||||
|
||||
@allure.step
|
||||
def unsubscribe_second_relay_node(self, content_topics=None, pubsub_topics=None):
|
||||
self.unsubscribe_relay_node(self.node2, content_topics, pubsub_topics)
|
||||
|
||||
@allure.step
|
||||
def unsubscribe_main_relay_nodes(self, content_topics=None, pubsub_topics=None):
|
||||
for node in self.main_relay_nodes:
|
||||
self.unsubscribe_relay_node(node, content_topics, pubsub_topics)
|
||||
|
||||
@allure.step
|
||||
def unsubscribe_optional_relay_nodes(self, content_topics=None, pubsub_topics=None):
|
||||
for node in self.optional_relay_nodes:
|
||||
self.unsubscribe_relay_node(node, content_topics, pubsub_topics)
|
||||
|
||||
@allure.step
|
||||
def subscribe_filter_node(self, node, content_topics=None, pubsub_topic=None):
|
||||
subscription = {"requestId": str(uuid4()), "contentFilters": content_topics, "pubsubTopic": pubsub_topic}
|
||||
node.set_filter_subscriptions(subscription)
|
||||
|
||||
@allure.step
|
||||
def relay_message(self, node, message, pubsub_topic=None):
|
||||
if pubsub_topic:
|
||||
node.send_relay_message(message, pubsub_topic)
|
||||
else:
|
||||
node.send_relay_auto_message(message)
|
||||
|
||||
@allure.step
|
||||
def retrieve_relay_message(self, node, content_topic=None, pubsub_topic=None):
|
||||
if content_topic:
|
||||
return node.get_relay_auto_messages(content_topic)
|
||||
elif pubsub_topic:
|
||||
return node.get_relay_messages(pubsub_topic)
|
||||
else:
|
||||
raise AttributeError("content_topic or pubsub_topic needs to be passed")
|
||||
|
||||
@allure.step
|
||||
def check_published_message_reaches_relay_peer(self, content_topic=None, pubsub_topic=None, sender=None, peer_list=None):
|
||||
message = self.create_message(contentTopic=content_topic) if content_topic else self.create_message()
|
||||
if not sender:
|
||||
sender = self.node1
|
||||
if not peer_list:
|
||||
peer_list = self.main_relay_nodes + self.optional_relay_nodes
|
||||
|
||||
self.relay_message(sender, message, pubsub_topic)
|
||||
delay(0.1)
|
||||
for index, peer in enumerate(peer_list):
|
||||
logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the published message")
|
||||
get_messages_response = self.retrieve_relay_message(peer, content_topic, pubsub_topic)
|
||||
assert get_messages_response, f"Peer NODE_{index + 1}:{peer.image} couldn't find any messages"
|
||||
assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}"
|
||||
waku_message = WakuMessage(get_messages_response)
|
||||
waku_message.assert_received_message(message)
|
||||
|
||||
@allure.step
|
||||
def get_filter_messages(self, content_topic, pubsub_topic=None, node=None):
|
||||
if node is None:
|
||||
node = self.node2
|
||||
if node.is_gowaku():
|
||||
return node.get_filter_messages(content_topic, pubsub_topic)
|
||||
elif node.is_nwaku():
|
||||
return node.get_filter_messages(content_topic)
|
||||
else:
|
||||
raise NotImplementedError("Not implemented for this node type")
|
||||
|
||||
@allure.step
|
||||
def check_published_message_reaches_filter_peer(self, content_topic=None, pubsub_topic=None, sender=None, peer_list=None):
|
||||
message = self.create_message(contentTopic=content_topic) if content_topic else self.create_message()
|
||||
if not sender:
|
||||
sender = self.node1
|
||||
if not peer_list:
|
||||
peer_list = self.main_filter_nodes + self.optional_filter_nodes
|
||||
|
||||
self.relay_message(sender, message, pubsub_topic)
|
||||
delay(0.1)
|
||||
for index, peer in enumerate(peer_list):
|
||||
logger.debug(f"Checking that peer NODE_{index + 2}:{peer.image} can find the published message")
|
||||
get_messages_response = self.get_filter_messages(message["contentTopic"], pubsub_topic=pubsub_topic, node=peer)
|
||||
assert get_messages_response, f"Peer NODE_{index + 2}:{peer.image} couldn't find any messages"
|
||||
assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}"
|
||||
waku_message = WakuMessage(get_messages_response)
|
||||
waku_message.assert_received_message(message)
|
||||
|
||||
@allure.step
|
||||
def check_published_message_doesnt_reach_relay_peer(self, pubsub_topic=None, content_topic=None):
|
||||
try:
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic, content_topic=content_topic)
|
||||
raise AssertionError("Retrieving messages on not subscribed content topic worked!!!")
|
||||
except Exception as ex:
|
||||
assert "Not Found" in str(ex)
|
||||
|
||||
@allure.step
|
||||
def check_publish_fails_on_not_subscribed_pubsub_topic(self, pubsub_topic):
|
||||
try:
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic)
|
||||
raise AssertionError("Publishing messages on unsubscribed shard worked!!!")
|
||||
except Exception as ex:
|
||||
assert f"Failed to publish: Node not subscribed to topic: {pubsub_topic}" in str(ex)
|
||||
|
||||
@allure.step
|
||||
def create_message(self, **kwargs):
|
||||
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
|
||||
message.update(kwargs)
|
||||
return message
|
||||
|
||||
@allure.step
|
||||
def _resolve_sharding_flags(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs):
|
||||
if pubsub_topic:
|
||||
kwargs["pubsub_topic"] = pubsub_topic
|
||||
if not cluster_id:
|
||||
try:
|
||||
if isinstance(pubsub_topic, list):
|
||||
pubsub_topic = pubsub_topic[0]
|
||||
cluster_id = pubsub_topic.split("/")[4]
|
||||
logger.debug(f"Cluster id was resolved to: {cluster_id}")
|
||||
except Exception as ex:
|
||||
raise Exception("Could not resolve cluster_id from pubsub_topic")
|
||||
kwargs["cluster_id"] = cluster_id
|
||||
if content_topic:
|
||||
kwargs["content_topic"] = content_topic
|
||||
return kwargs
|
|
@ -63,10 +63,63 @@ INVALID_CONTENT_TOPICS = [
|
|||
{"description": "A bool", "value": True},
|
||||
]
|
||||
|
||||
CONTENT_TOPICS_DIFFERENT_SHARDS = [
|
||||
"/myapp/1/latest/proto", # resolves to shard 0
|
||||
"/waku/2/content/test.js", # resolves to shard 1
|
||||
"/app/22/sometopic/someencoding", # resolves to shard 2
|
||||
"/toychat/2/huilong/proto", # resolves to shard 3
|
||||
"/statusim/1/community/cbor", # resolves to shard 4
|
||||
"/app/27/sometopic/someencoding", # resolves to shard 5
|
||||
"/app/29/sometopic/someencoding", # resolves to shard 6
|
||||
"/app/20/sometopic/someencoding", # resolves to shard 7
|
||||
]
|
||||
|
||||
CONTENT_TOPICS_SHARD_0 = [
|
||||
"/newsService/1.0/weekly/protobuf",
|
||||
"/newsService/1.0/alerts/xml",
|
||||
"/newsService/1.0/updates/json",
|
||||
"/newsService/2.0/alerts/json",
|
||||
"/newsService/2.0/summaries/xml",
|
||||
"/newsService/2.0/highlights/yaml",
|
||||
"/newsService/3.0/weekly/json",
|
||||
"/newsService/3.0/summaries/xml",
|
||||
]
|
||||
|
||||
CONTENT_TOPICS_SHARD_7 = [
|
||||
"/newsService/2.0/alerts/yaml",
|
||||
"/newsService/2.0/highlights/xml",
|
||||
"/newsService/3.0/daily/protobuf",
|
||||
"/newsService/3.0/alerts/xml",
|
||||
"/newsService/3.0/updates/protobuf",
|
||||
"/newsService/3.0/reviews/xml",
|
||||
"/newsService/4.0/alerts/yaml",
|
||||
"/newsService/4.0/updates/yaml",
|
||||
]
|
||||
|
||||
VALID_PUBSUB_TOPICS = ["/waku/2/rs/0/0", "/waku/2/rs/0/1", "/waku/2/rs/0/9", "/waku/2/rs/0/25", "/waku/2/rs/0/1000"]
|
||||
|
||||
INVALID_PUBSUB_TOPICS = ["/test/2/rs/0/1", "/waku/3/rs/0/1", "/waku/2/test/0/1", "/waku/2/rs/0/b", "/waku/2/rs/0"]
|
||||
|
||||
PUBSUB_TOPICS_DIFFERENT_CLUSTERS = [
|
||||
"/waku/2/rs/0/0",
|
||||
"/waku/2/rs/0/1",
|
||||
"/waku/2/rs/2/0",
|
||||
"/waku/2/rs/2/1",
|
||||
"/waku/2/rs/2/999",
|
||||
"/waku/2/rs/8/0",
|
||||
"/waku/2/rs/999/999",
|
||||
]
|
||||
|
||||
PUBSUB_TOPICS_SAME_CLUSTER = [
|
||||
"/waku/2/rs/2/0",
|
||||
"/waku/2/rs/2/1",
|
||||
"/waku/2/rs/2/2",
|
||||
"/waku/2/rs/2/3",
|
||||
"/waku/2/rs/2/4",
|
||||
"/waku/2/rs/2/5",
|
||||
"/waku/2/rs/2/6",
|
||||
"/waku/2/rs/2/7",
|
||||
]
|
||||
|
||||
SAMPLE_TIMESTAMPS = [
|
||||
{"description": "Now", "value": int(time() * 1e9), "valid_for": ["nwaku", "gowaku"]},
|
||||
|
|
|
@ -42,7 +42,6 @@ class TestFilterGetMessages(StepsFilter):
|
|||
def test_filter_get_message_with_meta(self):
|
||||
self.check_published_message_reaches_filter_peer(self.create_message(meta=to_base64(self.test_payload)))
|
||||
|
||||
@pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1068")
|
||||
def test_filter_get_message_with_ephemeral(self):
|
||||
failed_ephemeral = []
|
||||
for ephemeral in [True, False]:
|
||||
|
|
|
@ -168,7 +168,6 @@ class TestRelayPublish(StepsRelay):
|
|||
except Exception as ex:
|
||||
assert "Bad Request" in str(ex)
|
||||
|
||||
@pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1068")
|
||||
def test_publish_with_ephemeral(self):
|
||||
failed_ephemeral = []
|
||||
for ephemeral in [True, False]:
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
import pytest
|
||||
from src.env_vars import NODE_1, NODE_2
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from src.steps.sharding import StepsSharding
|
||||
from src.test_data import CONTENT_TOPICS_DIFFERENT_SHARDS, PUBSUB_TOPICS_SAME_CLUSTER
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
class TestFilterStaticSharding(StepsSharding):
|
||||
def test_filter_works_with_static_sharding(self):
|
||||
self.setup_first_relay_node(pubsub_topic=self.test_pubsub_topic)
|
||||
self.setup_second_node_as_filter(pubsub_topic=self.test_pubsub_topic)
|
||||
self.subscribe_first_relay_node(pubsub_topics=[self.test_pubsub_topic])
|
||||
self.subscribe_filter_node(self.node2, content_topics=[self.test_content_topic], pubsub_topic=self.test_pubsub_topic)
|
||||
self.check_published_message_reaches_filter_peer(pubsub_topic=self.test_pubsub_topic)
|
||||
|
||||
def test_filter_static_sharding_multiple_shards(self):
|
||||
self.setup_first_relay_node(pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER)
|
||||
self.setup_second_node_as_filter(pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER)
|
||||
self.subscribe_first_relay_node(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER)
|
||||
for content_topic, pubsub_topic in zip(CONTENT_TOPICS_DIFFERENT_SHARDS, PUBSUB_TOPICS_SAME_CLUSTER):
|
||||
self.subscribe_filter_node(self.node2, content_topics=[content_topic], pubsub_topic=pubsub_topic)
|
||||
for content_topic, pubsub_topic in zip(CONTENT_TOPICS_DIFFERENT_SHARDS, PUBSUB_TOPICS_SAME_CLUSTER):
|
||||
self.check_published_message_reaches_filter_peer(content_topic=content_topic, pubsub_topic=pubsub_topic)
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
"go-waku" in NODE_1 or "go-waku" in NODE_2,
|
||||
reason="Autosharding tests work only on nwaku because of https://github.com/waku-org/go-waku/issues/1061",
|
||||
)
|
||||
class TestFilterAutoSharding(StepsSharding):
|
||||
def test_filter_works_with_auto_sharding(self):
|
||||
self.setup_first_relay_node(cluster_id=self.auto_cluster, content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic)
|
||||
self.setup_second_node_as_filter(cluster_id=self.auto_cluster, content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic)
|
||||
self.subscribe_first_relay_node(content_topics=[self.test_content_topic])
|
||||
self.subscribe_filter_node(self.node2, content_topics=[self.test_content_topic], pubsub_topic=self.test_pubsub_topic)
|
||||
self.check_published_message_reaches_filter_peer(content_topic=self.test_content_topic)
|
||||
|
||||
def test_filter_auto_sharding_multiple_content_topics(self):
|
||||
self.setup_first_relay_node(
|
||||
cluster_id=self.auto_cluster, content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS, pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER
|
||||
)
|
||||
self.setup_second_node_as_filter(
|
||||
cluster_id=self.auto_cluster, content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS, pubsub_topic=self.test_pubsub_topic
|
||||
)
|
||||
self.subscribe_first_relay_node(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
|
||||
for content_topic, pubsub_topic in zip(CONTENT_TOPICS_DIFFERENT_SHARDS, PUBSUB_TOPICS_SAME_CLUSTER):
|
||||
self.subscribe_filter_node(self.node2, content_topics=[content_topic], pubsub_topic=pubsub_topic)
|
||||
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
|
||||
self.check_published_message_reaches_filter_peer(content_topic=content_topic)
|
|
@ -0,0 +1,42 @@
|
|||
import pytest
|
||||
from src.env_vars import ADDITIONAL_NODES, NODE_2
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from src.steps.sharding import StepsSharding
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
class TestMultipleNodes(StepsSharding):
|
||||
def test_static_shard_relay(self):
|
||||
self.setup_main_relay_nodes(pubsub_topic=self.test_pubsub_topic)
|
||||
self.setup_optional_relay_nodes(pubsub_topic=self.test_pubsub_topic)
|
||||
self.subscribe_main_relay_nodes(pubsub_topics=[self.test_pubsub_topic])
|
||||
self.subscribe_optional_relay_nodes(pubsub_topics=[self.test_pubsub_topic])
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=self.test_pubsub_topic)
|
||||
|
||||
@pytest.mark.skipif("go-waku" in NODE_2, reason="Test works only with nwaku")
|
||||
def test_static_shard_relay_10_nwaku_nodes(self):
|
||||
self.setup_first_relay_node(pubsub_topic=self.test_pubsub_topic)
|
||||
self.setup_nwaku_relay_nodes(num_nodes=9, pubsub_topic=self.test_pubsub_topic)
|
||||
self.subscribe_main_relay_nodes(pubsub_topics=[self.test_pubsub_topic])
|
||||
self.subscribe_optional_relay_nodes(pubsub_topics=[self.test_pubsub_topic])
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=self.test_pubsub_topic)
|
||||
|
||||
@pytest.mark.skipif(
|
||||
"go-waku" in NODE_2 or "go-waku" in ADDITIONAL_NODES,
|
||||
reason="Autosharding tests work only on nwaku because of https://github.com/waku-org/go-waku/issues/1061",
|
||||
)
|
||||
def test_auto_shard_relay(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
|
||||
self.setup_optional_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
|
||||
self.subscribe_main_relay_nodes(content_topics=[self.test_content_topic])
|
||||
self.subscribe_optional_relay_nodes(content_topics=[self.test_content_topic])
|
||||
self.check_published_message_reaches_relay_peer(content_topic=self.test_content_topic)
|
||||
|
||||
@pytest.mark.skipif("go-waku" in NODE_2, reason="Test works only with nwaku")
|
||||
def test_auto_shard_relay_10_nwaku_nodes(self):
|
||||
self.setup_first_relay_node(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
|
||||
self.setup_nwaku_relay_nodes(num_nodes=8, cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
|
||||
self.subscribe_main_relay_nodes(content_topics=[self.test_content_topic])
|
||||
self.subscribe_optional_relay_nodes(content_topics=[self.test_content_topic])
|
||||
self.check_published_message_reaches_relay_peer(content_topic=self.test_content_topic)
|
|
@ -0,0 +1,130 @@
|
|||
import pytest
|
||||
from src.env_vars import NODE_1, NODE_2
|
||||
from src.libs.common import delay, to_base64
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from src.steps.sharding import StepsSharding
|
||||
from src.test_data import CONTENT_TOPICS_DIFFERENT_SHARDS, CONTENT_TOPICS_SHARD_0
|
||||
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
"go-waku" in NODE_1 or "go-waku" in NODE_2,
|
||||
reason="Autosharding tests work only on nwaku because of https://github.com/waku-org/go-waku/issues/1061",
|
||||
)
|
||||
class TestRelayAutosharding(StepsSharding):
|
||||
def test_publish_without_subscribing_via_api_works(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
|
||||
for node in self.main_relay_nodes:
|
||||
self.relay_message(node, self.create_message(contentTopic=self.test_content_topic))
|
||||
|
||||
def test_retrieve_messages_without_subscribing_via_api(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
|
||||
try:
|
||||
self.check_published_message_reaches_relay_peer(content_topic=self.test_content_topic)
|
||||
if self.node2.is_nwaku():
|
||||
pass
|
||||
else:
|
||||
raise AssertionError("Retrieving messages without subscribing worked!!!")
|
||||
except Exception as ex:
|
||||
assert "Not Found" in str(ex)
|
||||
|
||||
def test_subscribe_and_publish_on_another_content_topic_from_same_shard(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
|
||||
self.subscribe_main_relay_nodes(content_topics=["/newsService/1.0/weekly/protobuf"])
|
||||
self.check_published_message_reaches_relay_peer(content_topic="/newsService/1.0/weekly/protobuf")
|
||||
self.check_published_message_reaches_relay_peer(content_topic=self.test_content_topic)
|
||||
|
||||
def test_subscribe_and_publish_on_another_content_topic_from_another_shard(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
|
||||
self.subscribe_main_relay_nodes(content_topics=["/toychat/2/huilong/proto"])
|
||||
self.check_published_message_reaches_relay_peer(content_topic="/toychat/2/huilong/proto")
|
||||
self.check_published_message_reaches_relay_peer(content_topic=self.test_content_topic)
|
||||
|
||||
def test_publish_on_not_subscribed_content_topic_works(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
|
||||
self.subscribe_main_relay_nodes(content_topics=[self.test_content_topic])
|
||||
for node in self.main_relay_nodes:
|
||||
self.relay_message(node, self.create_message(contentTopic="/toychat/2/huilong/proto"))
|
||||
|
||||
def test_cant_retrieve_messages_on_not_subscribed_content_topic(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
|
||||
self.subscribe_main_relay_nodes(content_topics=[self.test_content_topic])
|
||||
self.check_published_message_doesnt_reach_relay_peer(content_topic="/toychat/2/huilong/proto")
|
||||
|
||||
@pytest.mark.parametrize("content_topic_list", [CONTENT_TOPICS_SHARD_0, CONTENT_TOPICS_DIFFERENT_SHARDS])
|
||||
def test_subscribe_via_api_to_new_content_topics(self, content_topic_list):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=content_topic_list[:1])
|
||||
self.subscribe_main_relay_nodes(content_topics=content_topic_list[1:])
|
||||
for content_topic in content_topic_list[1:]:
|
||||
self.check_published_message_reaches_relay_peer(content_topic=content_topic)
|
||||
|
||||
def test_subscribe_one_by_one_to_different_content_topics_and_send_messages(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
|
||||
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS + CONTENT_TOPICS_SHARD_0:
|
||||
self.subscribe_main_relay_nodes(content_topics=[content_topic])
|
||||
self.check_published_message_reaches_relay_peer(content_topic=content_topic)
|
||||
|
||||
@pytest.mark.parametrize("content_topic_list", [CONTENT_TOPICS_SHARD_0, CONTENT_TOPICS_DIFFERENT_SHARDS])
|
||||
def test_unsubscribe_from_some_content_topics(self, content_topic_list):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
|
||||
self.subscribe_main_relay_nodes(content_topics=content_topic_list)
|
||||
for content_topic in content_topic_list:
|
||||
self.check_published_message_reaches_relay_peer(content_topic=content_topic)
|
||||
self.unsubscribe_main_relay_nodes(content_topics=content_topic_list[:3])
|
||||
for content_topic in content_topic_list[:3]:
|
||||
self.check_published_message_doesnt_reach_relay_peer(content_topic=content_topic)
|
||||
for content_topic in content_topic_list[3:]:
|
||||
self.check_published_message_reaches_relay_peer(content_topic=content_topic)
|
||||
|
||||
def test_unsubscribe_from_all_content_topics(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
|
||||
self.subscribe_main_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
|
||||
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
|
||||
self.check_published_message_reaches_relay_peer(content_topic=content_topic)
|
||||
self.unsubscribe_main_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
|
||||
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
|
||||
self.check_published_message_doesnt_reach_relay_peer(content_topic=content_topic)
|
||||
|
||||
def test_unsubscribe_from_all_content_topics_one_by_one(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
|
||||
self.subscribe_main_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
|
||||
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
|
||||
self.check_published_message_reaches_relay_peer(content_topic=content_topic)
|
||||
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
|
||||
self.unsubscribe_main_relay_nodes(content_topics=[content_topic])
|
||||
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
|
||||
self.check_published_message_doesnt_reach_relay_peer(content_topic=content_topic)
|
||||
|
||||
def test_resubscribe_to_unsubscribed_content_topics(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
|
||||
self.subscribe_main_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
|
||||
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
|
||||
self.check_published_message_reaches_relay_peer(content_topic=content_topic)
|
||||
self.unsubscribe_main_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
|
||||
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
|
||||
self.check_published_message_doesnt_reach_relay_peer(content_topic=content_topic)
|
||||
self.subscribe_main_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
|
||||
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
|
||||
self.check_published_message_reaches_relay_peer(content_topic=content_topic)
|
||||
|
||||
def test_unsubscribe_from_non_subscribed_content_topics(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
|
||||
self.unsubscribe_main_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
|
||||
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
|
||||
self.check_published_message_doesnt_reach_relay_peer(content_topic=content_topic)
|
||||
|
||||
@pytest.mark.parametrize("content_topic_list", [CONTENT_TOPICS_SHARD_0, CONTENT_TOPICS_DIFFERENT_SHARDS])
|
||||
def test_publish_on_multiple_content_topics_and_only_after_fetch_them(self, content_topic_list):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
|
||||
self.subscribe_main_relay_nodes(content_topics=content_topic_list)
|
||||
for content_topic in content_topic_list:
|
||||
self.relay_message(self.node1, self.create_message(payload=to_base64(content_topic), contentTopic=content_topic))
|
||||
delay(0.1)
|
||||
for content_topic in content_topic_list:
|
||||
get_messages_response = self.retrieve_relay_message(self.node2, content_topic=content_topic)
|
||||
assert get_messages_response, f"Peer NODE_2 couldn't find any messages"
|
||||
assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}"
|
||||
assert get_messages_response[0]["contentTopic"] == content_topic
|
||||
assert get_messages_response[0]["payload"] == to_base64(content_topic)
|
|
@ -0,0 +1,134 @@
|
|||
import pytest
|
||||
from src.env_vars import NODE_2
|
||||
from src.libs.common import delay, to_base64
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from src.steps.sharding import StepsSharding
|
||||
from src.test_data import PUBSUB_TOPICS_SAME_CLUSTER
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
class TestRelayStaticSharding(StepsSharding):
|
||||
def test_publish_without_subscribing_via_api_works(self):
|
||||
self.setup_main_relay_nodes(pubsub_topic=self.test_pubsub_topic)
|
||||
for node in self.main_relay_nodes:
|
||||
self.relay_message(node, self.create_message(), self.test_pubsub_topic)
|
||||
|
||||
def test_retrieve_messages_without_subscribing_via_api(self):
|
||||
self.setup_main_relay_nodes(pubsub_topic=self.test_pubsub_topic)
|
||||
try:
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=self.test_pubsub_topic)
|
||||
if self.node2.is_nwaku():
|
||||
pass
|
||||
else:
|
||||
raise AssertionError("Retrieving messages without subscribing worked!!!")
|
||||
except Exception as ex:
|
||||
assert "Not Found" in str(ex)
|
||||
|
||||
def test_subscribe_and_publish_on_another_shard(self):
|
||||
self.setup_main_relay_nodes(pubsub_topic=self.test_pubsub_topic)
|
||||
self.subscribe_main_relay_nodes(pubsub_topics=["/waku/2/rs/2/1"])
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic="/waku/2/rs/2/1")
|
||||
try:
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=self.test_pubsub_topic)
|
||||
if self.node2.is_nwaku():
|
||||
pass
|
||||
else:
|
||||
raise AssertionError("Retrieving messages without subscribing worked!!!")
|
||||
except Exception as ex:
|
||||
assert "Not Found" in str(ex)
|
||||
|
||||
def test_cant_publish_on_not_subscribed_shard(self):
|
||||
self.setup_main_relay_nodes(pubsub_topic=self.test_pubsub_topic)
|
||||
self.subscribe_main_relay_nodes(pubsub_topics=[self.test_pubsub_topic])
|
||||
self.check_publish_fails_on_not_subscribed_pubsub_topic("/waku/2/rs/2/1")
|
||||
|
||||
def test_subscribe_via_api_to_new_pubsub_topics(self):
|
||||
self.setup_main_relay_nodes(pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER[:1])
|
||||
self.subscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER[1:])
|
||||
for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER[1:]:
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic)
|
||||
|
||||
def test_subscribe_via_api_to_new_pubsub_topics_on_other_cluster(self):
|
||||
topics = ["/waku/2/rs/2/0", "/waku/2/rs/3/0", "/waku/2/rs/4/0"]
|
||||
self.setup_main_relay_nodes(cluster_id=2, pubsub_topic=topics[0])
|
||||
self.subscribe_first_relay_node(pubsub_topics=topics)
|
||||
self.subscribe_second_relay_node(pubsub_topics=topics)
|
||||
for pubsub_topic in topics:
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic)
|
||||
|
||||
def test_subscribe_one_by_one_to_different_pubsub_topics_and_send_messages(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic)
|
||||
for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER:
|
||||
self.subscribe_main_relay_nodes(pubsub_topics=[pubsub_topic])
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic)
|
||||
|
||||
def test_unsubscribe_from_some_pubsub_topics(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER)
|
||||
self.subscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER)
|
||||
for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER:
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic)
|
||||
self.unsubscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER[:3])
|
||||
for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER[:3]:
|
||||
self.check_publish_fails_on_not_subscribed_pubsub_topic(pubsub_topic)
|
||||
for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER[3:]:
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic)
|
||||
|
||||
@pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1034")
|
||||
def test_unsubscribe_from_all_pubsub_topics(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER)
|
||||
self.subscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER)
|
||||
for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER:
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic)
|
||||
self.unsubscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER)
|
||||
for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER:
|
||||
self.check_publish_fails_on_not_subscribed_pubsub_topic(pubsub_topic)
|
||||
|
||||
@pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1034")
|
||||
def test_unsubscribe_from_all_pubsub_topics_one_by_one(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic)
|
||||
self.subscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER)
|
||||
for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER:
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic)
|
||||
for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER:
|
||||
self.unsubscribe_main_relay_nodes(pubsub_topics=[pubsub_topic])
|
||||
for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER:
|
||||
self.check_publish_fails_on_not_subscribed_pubsub_topic(pubsub_topic)
|
||||
|
||||
@pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1034")
|
||||
def test_resubscribe_to_unsubscribed_pubsub_topics(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic)
|
||||
self.subscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER)
|
||||
for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER:
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic)
|
||||
self.unsubscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER)
|
||||
for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER:
|
||||
self.check_publish_fails_on_not_subscribed_pubsub_topic(pubsub_topic)
|
||||
self.subscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER)
|
||||
for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER:
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic)
|
||||
|
||||
def test_unsubscribe_from_non_subscribed_pubsub_topics(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic)
|
||||
try:
|
||||
self.unsubscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER)
|
||||
if self.node1.is_nwaku():
|
||||
pass
|
||||
else:
|
||||
raise AssertionError("Unsubscribe from non-subscribed pubsub_topic worked!!!")
|
||||
except Exception as ex:
|
||||
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
|
||||
for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER:
|
||||
self.check_publish_fails_on_not_subscribed_pubsub_topic(pubsub_topic)
|
||||
|
||||
def test_publish_on_multiple_pubsub_topics_and_only_after_fetch_them(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic)
|
||||
self.subscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER)
|
||||
for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER:
|
||||
self.relay_message(self.node1, self.create_message(payload=to_base64(pubsub_topic)), pubsub_topic=pubsub_topic)
|
||||
delay(0.1)
|
||||
for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER:
|
||||
get_messages_response = self.retrieve_relay_message(self.node2, pubsub_topic=pubsub_topic)
|
||||
assert get_messages_response, f"Peer NODE_2 couldn't find any messages"
|
||||
assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}"
|
||||
assert get_messages_response[0]["payload"] == to_base64(pubsub_topic)
|
|
@ -0,0 +1,113 @@
|
|||
import pytest
|
||||
from src.env_vars import NODE_1, NODE_2
|
||||
from src.libs.common import delay
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from src.steps.sharding import StepsSharding
|
||||
from src.test_data import CONTENT_TOPICS_DIFFERENT_SHARDS, CONTENT_TOPICS_SHARD_0, CONTENT_TOPICS_SHARD_7, PUBSUB_TOPICS_SAME_CLUSTER
|
||||
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
"go-waku" in NODE_1 or "go-waku" in NODE_2,
|
||||
reason="Autosharding tests work only on nwaku because of https://github.com/waku-org/go-waku/issues/1061",
|
||||
)
|
||||
class TestRunningNodesAutosharding(StepsSharding):
|
||||
@pytest.mark.parametrize("content_topic", CONTENT_TOPICS_DIFFERENT_SHARDS)
|
||||
def test_single_content_topic(self, content_topic):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=content_topic)
|
||||
self.subscribe_first_relay_node(content_topics=[content_topic])
|
||||
self.subscribe_second_relay_node(content_topics=[content_topic])
|
||||
self.check_published_message_reaches_relay_peer(content_topic=content_topic)
|
||||
|
||||
@pytest.mark.parametrize("content_topic_list", [CONTENT_TOPICS_SHARD_0, CONTENT_TOPICS_SHARD_7])
|
||||
def test_multiple_content_topics_same_shard(self, content_topic_list):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=content_topic_list)
|
||||
self.subscribe_first_relay_node(content_topics=content_topic_list)
|
||||
self.subscribe_second_relay_node(content_topics=content_topic_list)
|
||||
for content_topic in content_topic_list:
|
||||
self.check_published_message_reaches_relay_peer(content_topic=content_topic)
|
||||
|
||||
def test_multiple_content_topics_different_shard(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS)
|
||||
self.subscribe_first_relay_node(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
|
||||
self.subscribe_second_relay_node(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
|
||||
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
|
||||
self.check_published_message_reaches_relay_peer(content_topic=content_topic)
|
||||
|
||||
def test_2_nodes_different_content_topic_same_shard(self):
|
||||
self.setup_first_relay_node(cluster_id=self.auto_cluster, content_topic="/newsService/1.0/weekly/protobuf")
|
||||
self.setup_second_relay_node(cluster_id=self.auto_cluster, content_topic="/newsService/1.0/alerts/xml")
|
||||
self.subscribe_first_relay_node(content_topics=["/newsService/1.0/weekly/protobuf"])
|
||||
self.subscribe_second_relay_node(content_topics=["/newsService/1.0/alerts/xml"])
|
||||
try:
|
||||
self.check_published_message_reaches_relay_peer(content_topic="/newsService/1.0/weekly/protobuf")
|
||||
raise AssertionError("Publish on different content topic worked!!!")
|
||||
except Exception as ex:
|
||||
assert "Not Found" in str(ex)
|
||||
|
||||
def test_2_nodes_different_content_topic_different_shard(self):
|
||||
self.setup_first_relay_node(cluster_id=self.auto_cluster, content_topic="/myapp/1/latest/proto")
|
||||
self.setup_second_relay_node(cluster_id=self.auto_cluster, content_topic="/waku/2/content/test.js")
|
||||
self.subscribe_first_relay_node(content_topics=["/myapp/1/latest/proto"])
|
||||
self.subscribe_second_relay_node(content_topics=["/waku/2/content/test.js"])
|
||||
try:
|
||||
self.check_published_message_reaches_relay_peer(content_topic="/myapp/1/latest/proto")
|
||||
raise AssertionError("Publish on different content topic worked!!!")
|
||||
except Exception as ex:
|
||||
assert "Not Found" in str(ex)
|
||||
|
||||
@pytest.mark.parametrize("pubsub_topic", ["/waku/2/rs/2/0", "/waku/2/rs/2/1"])
|
||||
def test_pubsub_topic_also_in_docker_flags(self, pubsub_topic):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=pubsub_topic, content_topic=self.test_content_topic)
|
||||
self.subscribe_main_relay_nodes(content_topics=[self.test_content_topic])
|
||||
self.check_published_message_reaches_relay_peer(content_topic=self.test_content_topic)
|
||||
|
||||
def test_content_topic_not_in_docker_flags(self):
|
||||
self.setup_main_relay_nodes(cluster_id=2, pubsub_topic=self.test_pubsub_topic)
|
||||
self.subscribe_main_relay_nodes(content_topics=[self.test_content_topic])
|
||||
self.check_published_message_reaches_relay_peer(content_topic=self.test_content_topic)
|
||||
|
||||
def test_content_topic_and_pubsub_topic_not_in_docker_flags(self):
|
||||
self.setup_main_relay_nodes(cluster_id=2)
|
||||
self.subscribe_main_relay_nodes(content_topics=[self.test_content_topic])
|
||||
try:
|
||||
self.check_published_message_reaches_relay_peer(content_topic=self.test_content_topic)
|
||||
except Exception as ex:
|
||||
assert f"Peer NODE_2:{NODE_2} couldn't find any messages" in str(ex)
|
||||
|
||||
def test_multiple_content_topics_and_multiple_pubsub_topics(self):
|
||||
self.setup_main_relay_nodes(
|
||||
cluster_id=self.auto_cluster, content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS, pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER
|
||||
)
|
||||
self.subscribe_main_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
|
||||
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
|
||||
self.check_published_message_reaches_relay_peer(content_topic=content_topic)
|
||||
|
||||
def test_node_uses_both_auto_and_regular_apis(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic)
|
||||
self.subscribe_main_relay_nodes(content_topics=["/toychat/2/huilong/proto"])
|
||||
self.check_published_message_reaches_relay_peer(content_topic="/toychat/2/huilong/proto")
|
||||
self.subscribe_main_relay_nodes(pubsub_topics=[self.test_pubsub_topic])
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=self.test_pubsub_topic)
|
||||
|
||||
def test_sender_uses_auto_api_receiver_uses_regular_api(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic)
|
||||
self.subscribe_first_relay_node(content_topics=[self.test_content_topic])
|
||||
self.subscribe_second_relay_node(pubsub_topics=[self.test_pubsub_topic])
|
||||
self.relay_message(self.node1, self.create_message(contentTopic=self.test_content_topic))
|
||||
delay(0.1)
|
||||
get_messages_response = self.retrieve_relay_message(self.node2, pubsub_topic=self.test_pubsub_topic)
|
||||
assert get_messages_response, f"Peer NODE_2 couldn't find any messages"
|
||||
assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}"
|
||||
|
||||
def test_sender_uses_regular_api_receiver_uses_auto_api(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic)
|
||||
self.subscribe_first_relay_node(pubsub_topics=[self.test_pubsub_topic])
|
||||
self.subscribe_second_relay_node(content_topics=[self.test_content_topic])
|
||||
self.relay_message(self.node1, self.create_message(contentTopic=self.test_content_topic), pubsub_topic=self.test_pubsub_topic)
|
||||
delay(0.1)
|
||||
get_messages_response = self.retrieve_relay_message(self.node2, content_topic=self.test_content_topic)
|
||||
assert get_messages_response, f"Peer NODE_2 couldn't find any messages"
|
||||
assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}"
|
|
@ -0,0 +1,83 @@
|
|||
import pytest
|
||||
from src.env_vars import NODE_2
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from src.steps.sharding import StepsSharding
|
||||
from src.test_data import PUBSUB_TOPICS_DIFFERENT_CLUSTERS, PUBSUB_TOPICS_SAME_CLUSTER
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
class TestRunningNodesStaticSharding(StepsSharding):
|
||||
@pytest.mark.parametrize("pubsub_topic", PUBSUB_TOPICS_DIFFERENT_CLUSTERS)
|
||||
def test_single_pubsub_topic(self, pubsub_topic):
|
||||
self.setup_main_relay_nodes(pubsub_topic=pubsub_topic)
|
||||
self.subscribe_main_relay_nodes(pubsub_topics=[pubsub_topic])
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic)
|
||||
|
||||
def test_multiple_pubsub_topics_same_cluster(self):
|
||||
self.setup_main_relay_nodes(pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER)
|
||||
self.subscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER)
|
||||
for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER:
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic)
|
||||
|
||||
def test_multiple_pubsub_topics_different_clusters(self):
|
||||
self.setup_main_relay_nodes(pubsub_topic=PUBSUB_TOPICS_DIFFERENT_CLUSTERS)
|
||||
self.subscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_DIFFERENT_CLUSTERS)
|
||||
for pubsub_topic in PUBSUB_TOPICS_DIFFERENT_CLUSTERS:
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic)
|
||||
|
||||
def test_2_nodes_same_cluster_different_shards(self):
|
||||
self.setup_first_relay_node(pubsub_topic=self.test_pubsub_topic)
|
||||
self.setup_second_relay_node(pubsub_topic="/waku/2/rs/2/1")
|
||||
self.subscribe_first_relay_node(pubsub_topics=[self.test_pubsub_topic])
|
||||
self.subscribe_second_relay_node(pubsub_topics=["/waku/2/rs/2/1"])
|
||||
try:
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=self.test_pubsub_topic)
|
||||
raise AssertionError("Publish on different shard worked!!!")
|
||||
except Exception as ex:
|
||||
assert "Not Found" in str(ex)
|
||||
|
||||
def test_2_nodes_different_cluster_same_shard(self):
|
||||
self.setup_first_relay_node(pubsub_topic=self.test_pubsub_topic)
|
||||
self.setup_second_relay_node(pubsub_topic="/waku/2/rs/3/0")
|
||||
self.subscribe_first_relay_node(pubsub_topics=[self.test_pubsub_topic])
|
||||
self.subscribe_second_relay_node(pubsub_topics=["/waku/2/rs/3/0"])
|
||||
try:
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=self.test_pubsub_topic)
|
||||
raise AssertionError("Publish on different cluster worked!!!")
|
||||
except Exception as ex:
|
||||
assert "Not Found" in str(ex)
|
||||
|
||||
def test_2_nodes_different_cluster_different_shard(self):
|
||||
self.setup_first_relay_node(pubsub_topic=self.test_pubsub_topic)
|
||||
self.setup_second_relay_node(pubsub_topic="/waku/2/rs/3/1")
|
||||
self.subscribe_first_relay_node(pubsub_topics=[self.test_pubsub_topic])
|
||||
self.subscribe_second_relay_node(pubsub_topics=["/waku/2/rs/3/1"])
|
||||
try:
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=self.test_pubsub_topic)
|
||||
raise AssertionError("Publish on different cluster worked!!!")
|
||||
except Exception as ex:
|
||||
assert "Not Found" in str(ex)
|
||||
|
||||
@pytest.mark.parametrize("content_topic", ["/toychat/2/huilong/proto", "/aaaaa/3/bbbbb/proto"])
|
||||
@pytest.mark.skipif("go-waku" in NODE_2, reason="Test doesn't work on go-waku")
|
||||
def test_content_topic_also_in_docker_flags(self, content_topic):
|
||||
self.setup_main_relay_nodes(pubsub_topic=self.test_pubsub_topic, content_topic=content_topic)
|
||||
self.subscribe_main_relay_nodes(pubsub_topics=[self.test_pubsub_topic])
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=self.test_pubsub_topic)
|
||||
|
||||
# Bug reported: https://github.com/waku-org/go-waku/issues/1034#issuecomment-2011350765
|
||||
def test_pubsub_topic_not_in_docker_flags(self):
|
||||
self.setup_main_relay_nodes(cluster_id=2)
|
||||
self.subscribe_main_relay_nodes(pubsub_topics=[self.test_pubsub_topic])
|
||||
try:
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=self.test_pubsub_topic)
|
||||
except Exception as ex:
|
||||
assert f"Peer NODE_2:{NODE_2} couldn't find any messages" in str(ex)
|
||||
|
||||
def test_start_node_with_50_pubsub_topics(self):
|
||||
topics = ["/waku/2/rs/2/" + str(i) for i in range(50)]
|
||||
self.setup_main_relay_nodes(pubsub_topic=topics)
|
||||
self.subscribe_main_relay_nodes(pubsub_topics=topics)
|
||||
for pubsub_topic in topics:
|
||||
self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic)
|
Loading…
Reference in New Issue